Created
January 24, 2017 16:30
-
-
Save sjwiesman/fc99c64f44a93cfc9c7aa62c070a9358 to your computer and use it in GitHub Desktop.
Flink RichFsSinkFunction
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public abstract class RichFsSinkFunction<IN> extends RichSinkFunction<IN> { | |
private Collector<Path> collector; | |
public void setCollector(Collector<Path> collector) { | |
this.collector = collector; | |
} | |
public void collect(Path finalPath) { | |
/// Or something to this regard that can forward on paths | |
if (collector != null) { | |
collector.collect(finalPath); | |
} | |
} | |
} | |
public class BucketingSink<T> extends RichFsSinkFunction<T> . . . { | |
//Now, whenever a pendingPath is renamed to a finalPath that finalPath is also collected | |
fs.rename(pendingPath, finalPath); | |
collect(finalPath) | |
} | |
// Now when using, | |
DataStream<Int> stream = ???; | |
// this now works the same as normal, no collector is ever set and this is the end of the pipeline | |
stream.addSink(new BucketingSink<Int>); | |
//This call writes out using the bucketing sink as normal but the operator | |
//now adds a collector which will forward on final paths. These paths go to a | |
//final operator of parallelism 1 where the only operation is to add a sink | |
//or some other operator with similar semantics (Path => Unit) | |
stream | |
.addFsSink(new BucketingSink<Int>) | |
.addSink(new SinkFunction<Path> { | |
public void invoke(Path value) throws Exception { | |
LOG.info("File {} is complete", value); | |
} | |
}); | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment