Skip to content

Instantly share code, notes, and snippets.

@rmetzger
Created April 15, 2021 19:22
Show Gist options
  • Save rmetzger/7d5dbdaa118c63f5875c8c9520cc311d to your computer and use it in GitHub Desktop.
Save rmetzger/7d5dbdaa118c63f5875c8c9520cc311d to your computer and use it in GitHub Desktop.
Write to key
package de.robertmetzger;
import org.apache.flink.api.common.serialization.Encoder;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.SimpleVersionedStringSerializer;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Random;
import java.util.concurrent.TimeUnit;
public class MLService2 {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Tuple2<String, String>> stream =
env.addSource(
new SourceFunction<Tuple2<String, String>>() {
private boolean running = true;
@Override
public void run(SourceContext<Tuple2<String, String>> ctx)
throws Exception {
Random rng = new Random();
while (running) {
Thread.sleep(500);
ctx.collect(
Tuple2.of(Integer.toString(rng.nextInt(10)), "hello"));
}
}
@Override
public void cancel() {
running = false;
}
});
final StreamingFileSink<Tuple2<String, String>> sink =
StreamingFileSink.forRowFormat(
new Path("file:///tmp/flink/out"), new TupleEncoder())
.withBucketAssigner(new KeyBucketAssigner())
.withRollingPolicy(
DefaultRollingPolicy.builder()
.withRolloverInterval(TimeUnit.MINUTES.toMillis(15))
.withInactivityInterval(TimeUnit.MINUTES.toMillis(5))
.withMaxPartSize(1024 * 1024 * 1024)
.build())
.build();
stream.keyBy(0).addSink(sink);
env.execute("yolo");
}
private static class TupleEncoder implements Encoder<Tuple2<String, String>> {
@Override
public void encode(Tuple2<String, String> stringStringTuple2, OutputStream outputStream)
throws IOException {
outputStream.write(stringStringTuple2.toString().getBytes());
}
}
private static class KeyBucketAssigner
implements BucketAssigner<Tuple2<String, String>, String> {
@Override
public String getBucketId(Tuple2<String, String> stringStringTuple2, Context context) {
return stringStringTuple2.f0;
}
@Override
public SimpleVersionedSerializer<String> getSerializer() {
return SimpleVersionedStringSerializer.INSTANCE;
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment