Last active
September 18, 2020 19:53
-
-
Save derekm/62c256b55833b297fc656fdc96e643c2 to your computer and use it in GitHub Desktop.
Pravega Client API 101 code snippets
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
StreamConfiguration streamConfig = StreamConfiguration.builder() | |
.scalingPolicy(ScalingPolicy.fixed(1)) | |
.build(); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
URI controllerURI = URI.create("tcp://localhost:9090"); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
try (StreamManager streamManager = StreamManager.create(controllerURI)) { | |
streamManager.createScope("tutorial"); | |
streamManager.createStream("tutorial", "numbers", streamConfig); | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
ClientConfig clientConfig = ClientConfig.builder() | |
.controllerURI(controllerURI).build(); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
EventWriterConfig writerConfig = EventWriterConfig.builder().build(); | |
EventStreamClientFactory factory = EventStreamClientFactory | |
.withScope("tutorial", clientConfig); | |
EventStreamWriter<Integer> writer = factory | |
.createEventWriter("numbers", new JavaSerializer<Integer>(), writerConfig); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
writer.writeEvent(1); | |
writer.writeEvent(2); | |
writer.writeEvent(3); | |
writer.flush(); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
ReaderGroupManager readerGroupManager = ReaderGroupManager | |
.withScope("tutorial", clientConfig); | |
ReaderGroupConfig readerGroupConfig = ReaderGroupConfig.builder() | |
.stream("tutorial/numbers").build(); | |
readerGroupManager.createReaderGroup("numReader", readerGroupConfig); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
EventStreamReader<Integer> reader = factory | |
.createReader("myId", "numReader", | |
new JavaSerializer<Integer>(), ReaderConfig.builder().build()); | |
Integer intEvent; | |
while ((intEvent = reader.readNextEvent(1000).getEvent()) != null) { | |
System.out.println(intEvent); | |
} | |
reader.close(); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
try (ReaderGroup readerGroup = readerGroupManager.getReaderGroup("numReader")) { | |
readerGroup.readerOffline("myId", null); | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
readerGroupManager.deleteReaderGroup("numReader"); | |
// or | |
try (ReaderGroup readerGroup = readerGroupManager.getReaderGroup("numReader")) { | |
readerGroup.resetReaderGroup(readerGroupConfig); | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
StreamInfo streamInfo; | |
StreamCut tail; // current end of stream | |
try (StreamManager streamManager = StreamManager.create(controllerURI)) { | |
streamInfo = streamManager.getStreamInfo("tutorial", "numbers"); | |
tail = streamInfo.getTailStreamCut(); | |
} | |
ReaderGroupConfig readerGroupConfig = ReaderGroupConfig.builder() | |
.stream("tutorial/numbers", tail).build(); | |
readerGroupManager.createReaderGroup("tailNumReader", readerGroupConfig); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
writer.writeEvent(4); | |
writer.writeEvent(5); | |
writer.writeEvent(6); | |
writer.flush(); | |
try (ReaderGroup readerGroup = readerGroupManager.getReaderGroup("tailNumReader"); | |
EventStreamReader<Integer> tailReader = factory | |
.createReader("tailId", "tailNumReader", | |
new JavaSerializer<Integer>(), ReaderConfig.builder().build())) { | |
Integer intEvent; | |
while ((intEvent = tailReader.readNextEvent(2000).getEvent()) != null | |
|| readerGroup.getMetrics().unreadBytes() != 0) { | |
System.out.println(intEvent); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
StreamConfiguration parallelConfig = StreamConfiguration.builder() | |
.scalingPolicy(ScalingPolicy.fixed(5)) | |
.build(); | |
try (StreamManager streamManager = StreamManager.create(controllerURI)) { | |
streamManager.createStream("tutorial", "parallel-numbers", parallelConfig); | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
EventStreamWriter<Integer> paraWriter = factory | |
.createEventWriter("parallel-numbers", | |
new JavaSerializer<Integer>(), writerConfig); | |
paraWriter.writeEvent(1); | |
paraWriter.writeEvent(2); | |
paraWriter.writeEvent(3); | |
paraWriter.writeEvent(4); | |
paraWriter.writeEvent(5); | |
paraWriter.writeEvent(6); | |
paraWriter.close(); | |
ReaderGroupConfig readerGroupConfig = ReaderGroupConfig.builder() | |
.stream("tutorial/parallel-numbers").build(); | |
readerGroupManager.createReaderGroup("paraNumReader", readerGroupConfig); | |
try (EventStreamReader<Integer> reader = factory | |
.createReader("paraId", "paraNumReader", | |
new JavaSerializer<Integer>(), ReaderConfig.builder().build())) { | |
Integer intEvent; | |
while ((intEvent = reader.readNextEvent(1000).getEvent()) != null) { | |
System.out.println(intEvent); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
try (StreamManager streamManager = StreamManager.create(controllerURI)) { | |
streamManager.createStream("tutorial", "parallel-decades", parallelConfig); | |
} | |
EventStreamWriter<Integer> decWriter = factory | |
.createEventWriter("parallel-decades", | |
new JavaSerializer<Integer>(), writerConfig); | |
Map<String, Integer> decades = new LinkedHashMap<>(); | |
decades.put("ones", 0); | |
decades.put("tens", 10); | |
decades.put("twenties", 20); | |
decades.entrySet().stream().forEach(decade -> { | |
IntStream.range(decade.getValue(), decade.getValue() + 10) | |
.forEachOrdered(n -> { | |
try { | |
decWriter.writeEvent(decade.getKey(), n); | |
} catch (InterruptedException | ExecutionException e) { | |
throw new RuntimeException(e); | |
} | |
}); | |
}); | |
decWriter.close(); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
ReaderGroupConfig readerGroupConfig = ReaderGroupConfig.builder() | |
.stream("tutorial/parallel-decades").build(); | |
readerGroupManager.createReaderGroup("paraDecReader", readerGroupConfig); | |
EventStreamReader<Integer> reader = factory | |
.createReader("decId", "paraDecReader", | |
new JavaSerializer<Integer>(), ReaderConfig.builder().build()); | |
Integer intEvent; | |
while ((intEvent = reader.readNextEvent(1000).getEvent()) != null) { | |
System.out.println(intEvent); | |
} | |
reader.close(); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
try (StreamManager streamManager = StreamManager.create(controllerURI)) { | |
streamManager.createScope("tutorial"); | |
streamManager.createStream("tutorial", "scaled-stream", streamConfig); | |
} | |
EventStreamWriter<Integer> scaledWriter = factory | |
.createEventWriter("scaled-stream", new JavaSerializer<Integer>(), writerConfig); | |
scaledWriter.writeEvent(1); | |
scaledWriter.writeEvent(2); | |
scaledWriter.writeEvent(3); | |
scaledWriter.flush(); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// get segment ID and scale stream | |
ScheduledExecutorService executor = ExecutorServiceHelpers | |
.newScheduledThreadPool(4, "executor"); | |
Controller controller = new ControllerImpl(ControllerImplConfig.builder() | |
.clientConfig(clientConfig).build(), executor); | |
StreamSegments streamSegments = controller.getCurrentSegments("tutorial", "scaled-stream") | |
.get(); | |
System.out.println("Number of segments: " + streamSegments.getNumberOfSegments()); | |
long segmentId = streamSegments.getSegments().iterator().next().getSegmentId(); | |
System.out.println("Segment ID to scale: " + segmentId); | |
Map<Double, Double> newKeyRanges = new HashMap<>(); | |
newKeyRanges.put(0.0, 0.5); | |
newKeyRanges.put(0.5, 1.0); | |
CompletableFuture<Boolean> scaleStream = controller | |
.scaleStream(Stream.of("tutorial/scaled-stream"), | |
Collections.singletonList(segmentId), | |
newKeyRanges, executor).getFuture(); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// write 4 thru 9 to scaled stream | |
if (scaleStream.join()) { | |
List<CompletableFuture<Void>> writes = IntStream.range(4, 10).parallel() | |
.mapToObj(scaledWriter::writeEvent).collect(Collectors.toList()); | |
Futures.allOf(writes).join(); | |
} else { | |
throw new RuntimeException("Oops, something went wrong!"); | |
} | |
controller.close(); | |
scaledWriter.close(); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
private <T> java.util.stream.Stream<T> iteratorToStream(Iterator<T> itor) { | |
return StreamSupport.stream(((Iterable<T>) () -> itor).spliterator(), false); | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// get segments and their events with batch client | |
BatchClientFactory batchClient = BatchClientFactory.withScope("tutorial", clientConfig); | |
StreamSegmentsIterator segments = batchClient | |
.getSegments(Stream.of("tutorial/scaled-stream"), null, null); | |
Set<SegmentIterator<Integer>> segmentIterators = new HashSet<>(); | |
iteratorToStream(segments.getIterator()) | |
.collect(Collectors.toSet()) | |
.parallelStream() | |
.flatMap(segmentRange -> { | |
System.out.println("Segment ID: " + segmentRange.getSegmentId()); | |
SegmentIterator<Integer> segmentIterator = batchClient | |
.readSegment(segmentRange, new JavaSerializer<Integer>()); | |
segmentIterators.add(segmentIterator); | |
return iteratorToStream(segmentIterator); | |
}).forEach(System.out::println); | |
segmentIterators.stream().forEach(SegmentIterator::close); | |
batchClient.close(); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
try (StreamManager streamManager = StreamManager.create(controllerURI)) { | |
streamManager.createScope("tutorial"); | |
streamManager.createStream("tutorial", "int-bytestream", streamConfig); | |
} | |
try (ByteStreamClientFactory byteFactory = ByteStreamClientFactory | |
.withScope("tutorial", clientConfig)) { | |
try (ByteStreamWriter byteWriter = byteFactory | |
.createByteStreamWriter("int-bytestream"); | |
DataOutputStream outStream = new DataOutputStream(byteWriter)) { | |
outStream.writeInt(1); | |
outStream.writeInt(2); | |
outStream.writeInt(3); | |
} | |
try (ByteStreamReader byteReader = byteFactory | |
.createByteStreamReader("int-bytestream"); | |
DataInputStream inStream = new DataInputStream(byteReader)) { | |
System.out.println(inStream.readInt()); | |
System.out.println(inStream.readInt()); | |
System.out.println(inStream.readInt()); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
factory.close(); | |
readerGroupManager.deleteReaderGroup("numReader"); | |
readerGroupManager.deleteReaderGroup("tailNumReader"); | |
readerGroupManager.deleteReaderGroup("paraNumReader"); | |
readerGroupManager.deleteReaderGroup("paraDecReader"); | |
readerGroupManager.close(); | |
try (StreamManager streamManager = StreamManager.create(controllerURI)) { | |
streamManager.sealStream("tutorial", "numbers"); | |
streamManager.deleteStream("tutorial", "numbers"); | |
streamManager.sealStream("tutorial", "parallel-numbers"); | |
streamManager.deleteStream("tutorial", "parallel-numbers"); | |
streamManager.sealStream("tutorial", "parallel-decades"); | |
streamManager.deleteStream("tutorial", "parallel-decades"); | |
streamManager.sealStream("tutorial", "scaled-stream"); | |
streamManager.deleteStream("tutorial", "scaled-stream"); | |
streamManager.sealStream("tutorial", "int-bytestream"); | |
streamManager.deleteStream("tutorial", "int-bytestream"); | |
streamManager.deleteScope("tutorial"); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment