Skip to content

Instantly share code, notes, and snippets.

@sjwiesman
Created January 24, 2017 16:30
Show Gist options
  • Save sjwiesman/fc99c64f44a93cfc9c7aa62c070a9358 to your computer and use it in GitHub Desktop.
Save sjwiesman/fc99c64f44a93cfc9c7aa62c070a9358 to your computer and use it in GitHub Desktop.
Flink RichFsSinkFunction
public abstract class RichFsSinkFunction<IN> extends RichSinkFunction<IN> {
private Collector<Path> collector;
public void setCollector(Collector<Path> collector) {
this.collector = collector;
}
public void collect(Path finalPath) {
/// Or something to this regard that can forward on paths
if (collector != null) {
collector.collect(finalPath);
}
}
}
public class BucketingSink<T> extends RichFsSinkFunction<T> . . . {
//Now, whenever a pendingPath is renamed to a finalPath that finalPath is also collected
fs.rename(pendingPath, finalPath);
collect(finalPath)
}
// Now when using,
DataStream<Int> stream = ???;
// this now works the same as normal, no collector is ever set and this is the end of the pipeline
stream.addSink(new BucketingSink<Int>);
//This call writes out using the bucketing sink as normal but the operator
//now adds a collector which will forward on final paths. These paths go to a
//final operator of parallelism 1 where the only operation is to add a sink
//or some other operator with similar semantics (Path => Unit)
stream
.addFsSink(new BucketingSink<Int>)
.addSink(new SinkFunction<Path> {
public void invoke(Path value) throws Exception {
LOG.info("File {} is complete", value);
}
});
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment