Skip to content

Instantly share code, notes, and snippets.

@derekm
Last active September 18, 2020 19:53
Show Gist options
  • Save derekm/62c256b55833b297fc656fdc96e643c2 to your computer and use it in GitHub Desktop.
Save derekm/62c256b55833b297fc656fdc96e643c2 to your computer and use it in GitHub Desktop.
Pravega Client API 101 code snippets
StreamConfiguration streamConfig = StreamConfiguration.builder()
.scalingPolicy(ScalingPolicy.fixed(1))
.build();
URI controllerURI = URI.create("tcp://localhost:9090");
try (StreamManager streamManager = StreamManager.create(controllerURI)) {
streamManager.createScope("tutorial");
streamManager.createStream("tutorial", "numbers", streamConfig);
}
ClientConfig clientConfig = ClientConfig.builder()
.controllerURI(controllerURI).build();
EventWriterConfig writerConfig = EventWriterConfig.builder().build();
EventStreamClientFactory factory = EventStreamClientFactory
.withScope("tutorial", clientConfig);
EventStreamWriter<Integer> writer = factory
.createEventWriter("numbers", new JavaSerializer<Integer>(), writerConfig);
writer.writeEvent(1);
writer.writeEvent(2);
writer.writeEvent(3);
writer.flush();
ReaderGroupManager readerGroupManager = ReaderGroupManager
.withScope("tutorial", clientConfig);
ReaderGroupConfig readerGroupConfig = ReaderGroupConfig.builder()
.stream("tutorial/numbers").build();
readerGroupManager.createReaderGroup("numReader", readerGroupConfig);
EventStreamReader<Integer> reader = factory
.createReader("myId", "numReader",
new JavaSerializer<Integer>(), ReaderConfig.builder().build());
Integer intEvent;
while ((intEvent = reader.readNextEvent(1000).getEvent()) != null) {
System.out.println(intEvent);
}
reader.close();
try (ReaderGroup readerGroup = readerGroupManager.getReaderGroup("numReader")) {
readerGroup.readerOffline("myId", null);
}
readerGroupManager.deleteReaderGroup("numReader");
// or
try (ReaderGroup readerGroup = readerGroupManager.getReaderGroup("numReader")) {
readerGroup.resetReaderGroup(readerGroupConfig);
}
StreamInfo streamInfo;
StreamCut tail; // current end of stream
try (StreamManager streamManager = StreamManager.create(controllerURI)) {
streamInfo = streamManager.getStreamInfo("tutorial", "numbers");
tail = streamInfo.getTailStreamCut();
}
ReaderGroupConfig readerGroupConfig = ReaderGroupConfig.builder()
.stream("tutorial/numbers", tail).build();
readerGroupManager.createReaderGroup("tailNumReader", readerGroupConfig);
writer.writeEvent(4);
writer.writeEvent(5);
writer.writeEvent(6);
writer.flush();
try (ReaderGroup readerGroup = readerGroupManager.getReaderGroup("tailNumReader");
EventStreamReader<Integer> tailReader = factory
.createReader("tailId", "tailNumReader",
new JavaSerializer<Integer>(), ReaderConfig.builder().build())) {
Integer intEvent;
while ((intEvent = tailReader.readNextEvent(2000).getEvent()) != null
|| readerGroup.getMetrics().unreadBytes() != 0) {
System.out.println(intEvent);
}
}
StreamConfiguration parallelConfig = StreamConfiguration.builder()
.scalingPolicy(ScalingPolicy.fixed(5))
.build();
try (StreamManager streamManager = StreamManager.create(controllerURI)) {
streamManager.createStream("tutorial", "parallel-numbers", parallelConfig);
}
EventStreamWriter<Integer> paraWriter = factory
.createEventWriter("parallel-numbers",
new JavaSerializer<Integer>(), writerConfig);
paraWriter.writeEvent(1);
paraWriter.writeEvent(2);
paraWriter.writeEvent(3);
paraWriter.writeEvent(4);
paraWriter.writeEvent(5);
paraWriter.writeEvent(6);
paraWriter.close();
ReaderGroupConfig readerGroupConfig = ReaderGroupConfig.builder()
.stream("tutorial/parallel-numbers").build();
readerGroupManager.createReaderGroup("paraNumReader", readerGroupConfig);
try (EventStreamReader<Integer> reader = factory
.createReader("paraId", "paraNumReader",
new JavaSerializer<Integer>(), ReaderConfig.builder().build())) {
Integer intEvent;
while ((intEvent = reader.readNextEvent(1000).getEvent()) != null) {
System.out.println(intEvent);
}
}
try (StreamManager streamManager = StreamManager.create(controllerURI)) {
streamManager.createStream("tutorial", "parallel-decades", parallelConfig);
}
EventStreamWriter<Integer> decWriter = factory
.createEventWriter("parallel-decades",
new JavaSerializer<Integer>(), writerConfig);
Map<String, Integer> decades = new LinkedHashMap<>();
decades.put("ones", 0);
decades.put("tens", 10);
decades.put("twenties", 20);
decades.entrySet().stream().forEach(decade -> {
IntStream.range(decade.getValue(), decade.getValue() + 10)
.forEachOrdered(n -> {
try {
decWriter.writeEvent(decade.getKey(), n);
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
});
});
decWriter.close();
ReaderGroupConfig readerGroupConfig = ReaderGroupConfig.builder()
.stream("tutorial/parallel-decades").build();
readerGroupManager.createReaderGroup("paraDecReader", readerGroupConfig);
EventStreamReader<Integer> reader = factory
.createReader("decId", "paraDecReader",
new JavaSerializer<Integer>(), ReaderConfig.builder().build());
Integer intEvent;
while ((intEvent = reader.readNextEvent(1000).getEvent()) != null) {
System.out.println(intEvent);
}
reader.close();
try (StreamManager streamManager = StreamManager.create(controllerURI)) {
streamManager.createScope("tutorial");
streamManager.createStream("tutorial", "scaled-stream", streamConfig);
}
EventStreamWriter<Integer> scaledWriter = factory
.createEventWriter("scaled-stream", new JavaSerializer<Integer>(), writerConfig);
scaledWriter.writeEvent(1);
scaledWriter.writeEvent(2);
scaledWriter.writeEvent(3);
scaledWriter.flush();
// get segment ID and scale stream
ScheduledExecutorService executor = ExecutorServiceHelpers
.newScheduledThreadPool(4, "executor");
Controller controller = new ControllerImpl(ControllerImplConfig.builder()
.clientConfig(clientConfig).build(), executor);
StreamSegments streamSegments = controller.getCurrentSegments("tutorial", "scaled-stream")
.get();
System.out.println("Number of segments: " + streamSegments.getNumberOfSegments());
long segmentId = streamSegments.getSegments().iterator().next().getSegmentId();
System.out.println("Segment ID to scale: " + segmentId);
Map<Double, Double> newKeyRanges = new HashMap<>();
newKeyRanges.put(0.0, 0.5);
newKeyRanges.put(0.5, 1.0);
CompletableFuture<Boolean> scaleStream = controller
.scaleStream(Stream.of("tutorial/scaled-stream"),
Collections.singletonList(segmentId),
newKeyRanges, executor).getFuture();
// write 4 thru 9 to scaled stream
if (scaleStream.join()) {
List<CompletableFuture<Void>> writes = IntStream.range(4, 10).parallel()
.mapToObj(scaledWriter::writeEvent).collect(Collectors.toList());
Futures.allOf(writes).join();
} else {
throw new RuntimeException("Oops, something went wrong!");
}
controller.close();
scaledWriter.close();
private <T> java.util.stream.Stream<T> iteratorToStream(Iterator<T> itor) {
return StreamSupport.stream(((Iterable<T>) () -> itor).spliterator(), false);
}
// get segments and their events with batch client
BatchClientFactory batchClient = BatchClientFactory.withScope("tutorial", clientConfig);
StreamSegmentsIterator segments = batchClient
.getSegments(Stream.of("tutorial/scaled-stream"), null, null);
Set<SegmentIterator<Integer>> segmentIterators = new HashSet<>();
iteratorToStream(segments.getIterator())
.collect(Collectors.toSet())
.parallelStream()
.flatMap(segmentRange -> {
System.out.println("Segment ID: " + segmentRange.getSegmentId());
SegmentIterator<Integer> segmentIterator = batchClient
.readSegment(segmentRange, new JavaSerializer<Integer>());
segmentIterators.add(segmentIterator);
return iteratorToStream(segmentIterator);
}).forEach(System.out::println);
segmentIterators.stream().forEach(SegmentIterator::close);
batchClient.close();
try (StreamManager streamManager = StreamManager.create(controllerURI)) {
streamManager.createScope("tutorial");
streamManager.createStream("tutorial", "int-bytestream", streamConfig);
}
try (ByteStreamClientFactory byteFactory = ByteStreamClientFactory
.withScope("tutorial", clientConfig)) {
try (ByteStreamWriter byteWriter = byteFactory
.createByteStreamWriter("int-bytestream");
DataOutputStream outStream = new DataOutputStream(byteWriter)) {
outStream.writeInt(1);
outStream.writeInt(2);
outStream.writeInt(3);
}
try (ByteStreamReader byteReader = byteFactory
.createByteStreamReader("int-bytestream");
DataInputStream inStream = new DataInputStream(byteReader)) {
System.out.println(inStream.readInt());
System.out.println(inStream.readInt());
System.out.println(inStream.readInt());
}
}
factory.close();
readerGroupManager.deleteReaderGroup("numReader");
readerGroupManager.deleteReaderGroup("tailNumReader");
readerGroupManager.deleteReaderGroup("paraNumReader");
readerGroupManager.deleteReaderGroup("paraDecReader");
readerGroupManager.close();
try (StreamManager streamManager = StreamManager.create(controllerURI)) {
streamManager.sealStream("tutorial", "numbers");
streamManager.deleteStream("tutorial", "numbers");
streamManager.sealStream("tutorial", "parallel-numbers");
streamManager.deleteStream("tutorial", "parallel-numbers");
streamManager.sealStream("tutorial", "parallel-decades");
streamManager.deleteStream("tutorial", "parallel-decades");
streamManager.sealStream("tutorial", "scaled-stream");
streamManager.deleteStream("tutorial", "scaled-stream");
streamManager.sealStream("tutorial", "int-bytestream");
streamManager.deleteStream("tutorial", "int-bytestream");
streamManager.deleteScope("tutorial");
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment