Last active
September 18, 2020 19:53
-
-
Save derekm/62c256b55833b297fc656fdc96e643c2 to your computer and use it in GitHub Desktop.
Pravega Client API 101 code snippets
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| StreamConfiguration streamConfig = StreamConfiguration.builder() | |
| .scalingPolicy(ScalingPolicy.fixed(1)) | |
| .build(); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| URI controllerURI = URI.create("tcp://localhost:9090"); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| try (StreamManager streamManager = StreamManager.create(controllerURI)) { | |
| streamManager.createScope("tutorial"); | |
| streamManager.createStream("tutorial", "numbers", streamConfig); | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| ClientConfig clientConfig = ClientConfig.builder() | |
| .controllerURI(controllerURI).build(); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| EventWriterConfig writerConfig = EventWriterConfig.builder().build(); | |
| EventStreamClientFactory factory = EventStreamClientFactory | |
| .withScope("tutorial", clientConfig); | |
| EventStreamWriter<Integer> writer = factory | |
| .createEventWriter("numbers", new JavaSerializer<Integer>(), writerConfig); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| writer.writeEvent(1); | |
| writer.writeEvent(2); | |
| writer.writeEvent(3); | |
| writer.flush(); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| ReaderGroupManager readerGroupManager = ReaderGroupManager | |
| .withScope("tutorial", clientConfig); | |
| ReaderGroupConfig readerGroupConfig = ReaderGroupConfig.builder() | |
| .stream("tutorial/numbers").build(); | |
| readerGroupManager.createReaderGroup("numReader", readerGroupConfig); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| EventStreamReader<Integer> reader = factory | |
| .createReader("myId", "numReader", | |
| new JavaSerializer<Integer>(), ReaderConfig.builder().build()); | |
| Integer intEvent; | |
| while ((intEvent = reader.readNextEvent(1000).getEvent()) != null) { | |
| System.out.println(intEvent); | |
| } | |
| reader.close(); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| try (ReaderGroup readerGroup = readerGroupManager.getReaderGroup("numReader")) { | |
| readerGroup.readerOffline("myId", null); | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| readerGroupManager.deleteReaderGroup("numReader"); | |
| // or | |
| try (ReaderGroup readerGroup = readerGroupManager.getReaderGroup("numReader")) { | |
| readerGroup.resetReaderGroup(readerGroupConfig); | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| StreamInfo streamInfo; | |
| StreamCut tail; // current end of stream | |
| try (StreamManager streamManager = StreamManager.create(controllerURI)) { | |
| streamInfo = streamManager.getStreamInfo("tutorial", "numbers"); | |
| tail = streamInfo.getTailStreamCut(); | |
| } | |
| ReaderGroupConfig readerGroupConfig = ReaderGroupConfig.builder() | |
| .stream("tutorial/numbers", tail).build(); | |
| readerGroupManager.createReaderGroup("tailNumReader", readerGroupConfig); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| writer.writeEvent(4); | |
| writer.writeEvent(5); | |
| writer.writeEvent(6); | |
| writer.flush(); | |
| try (ReaderGroup readerGroup = readerGroupManager.getReaderGroup("tailNumReader"); | |
| EventStreamReader<Integer> tailReader = factory | |
| .createReader("tailId", "tailNumReader", | |
| new JavaSerializer<Integer>(), ReaderConfig.builder().build())) { | |
| Integer intEvent; | |
| while ((intEvent = tailReader.readNextEvent(2000).getEvent()) != null | |
| || readerGroup.getMetrics().unreadBytes() != 0) { | |
| System.out.println(intEvent); | |
| } | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| StreamConfiguration parallelConfig = StreamConfiguration.builder() | |
| .scalingPolicy(ScalingPolicy.fixed(5)) | |
| .build(); | |
| try (StreamManager streamManager = StreamManager.create(controllerURI)) { | |
| streamManager.createStream("tutorial", "parallel-numbers", parallelConfig); | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| EventStreamWriter<Integer> paraWriter = factory | |
| .createEventWriter("parallel-numbers", | |
| new JavaSerializer<Integer>(), writerConfig); | |
| paraWriter.writeEvent(1); | |
| paraWriter.writeEvent(2); | |
| paraWriter.writeEvent(3); | |
| paraWriter.writeEvent(4); | |
| paraWriter.writeEvent(5); | |
| paraWriter.writeEvent(6); | |
| paraWriter.close(); | |
| ReaderGroupConfig readerGroupConfig = ReaderGroupConfig.builder() | |
| .stream("tutorial/parallel-numbers").build(); | |
| readerGroupManager.createReaderGroup("paraNumReader", readerGroupConfig); | |
| try (EventStreamReader<Integer> reader = factory | |
| .createReader("paraId", "paraNumReader", | |
| new JavaSerializer<Integer>(), ReaderConfig.builder().build())) { | |
| Integer intEvent; | |
| while ((intEvent = reader.readNextEvent(1000).getEvent()) != null) { | |
| System.out.println(intEvent); | |
| } | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| try (StreamManager streamManager = StreamManager.create(controllerURI)) { | |
| streamManager.createStream("tutorial", "parallel-decades", parallelConfig); | |
| } | |
| EventStreamWriter<Integer> decWriter = factory | |
| .createEventWriter("parallel-decades", | |
| new JavaSerializer<Integer>(), writerConfig); | |
| Map<String, Integer> decades = new LinkedHashMap<>(); | |
| decades.put("ones", 0); | |
| decades.put("tens", 10); | |
| decades.put("twenties", 20); | |
| decades.entrySet().stream().forEach(decade -> { | |
| IntStream.range(decade.getValue(), decade.getValue() + 10) | |
| .forEachOrdered(n -> { | |
| try { | |
| decWriter.writeEvent(decade.getKey(), n); | |
| } catch (InterruptedException | ExecutionException e) { | |
| throw new RuntimeException(e); | |
| } | |
| }); | |
| }); | |
| decWriter.close(); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| ReaderGroupConfig readerGroupConfig = ReaderGroupConfig.builder() | |
| .stream("tutorial/parallel-decades").build(); | |
| readerGroupManager.createReaderGroup("paraDecReader", readerGroupConfig); | |
| EventStreamReader<Integer> reader = factory | |
| .createReader("decId", "paraDecReader", | |
| new JavaSerializer<Integer>(), ReaderConfig.builder().build()); | |
| Integer intEvent; | |
| while ((intEvent = reader.readNextEvent(1000).getEvent()) != null) { | |
| System.out.println(intEvent); | |
| } | |
| reader.close(); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| try (StreamManager streamManager = StreamManager.create(controllerURI)) { | |
| streamManager.createScope("tutorial"); | |
| streamManager.createStream("tutorial", "scaled-stream", streamConfig); | |
| } | |
| EventStreamWriter<Integer> scaledWriter = factory | |
| .createEventWriter("scaled-stream", new JavaSerializer<Integer>(), writerConfig); | |
| scaledWriter.writeEvent(1); | |
| scaledWriter.writeEvent(2); | |
| scaledWriter.writeEvent(3); | |
| scaledWriter.flush(); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| // get segment ID and scale stream | |
| ScheduledExecutorService executor = ExecutorServiceHelpers | |
| .newScheduledThreadPool(4, "executor"); | |
| Controller controller = new ControllerImpl(ControllerImplConfig.builder() | |
| .clientConfig(clientConfig).build(), executor); | |
| StreamSegments streamSegments = controller.getCurrentSegments("tutorial", "scaled-stream") | |
| .get(); | |
| System.out.println("Number of segments: " + streamSegments.getNumberOfSegments()); | |
| long segmentId = streamSegments.getSegments().iterator().next().getSegmentId(); | |
| System.out.println("Segment ID to scale: " + segmentId); | |
| Map<Double, Double> newKeyRanges = new HashMap<>(); | |
| newKeyRanges.put(0.0, 0.5); | |
| newKeyRanges.put(0.5, 1.0); | |
| CompletableFuture<Boolean> scaleStream = controller | |
| .scaleStream(Stream.of("tutorial/scaled-stream"), | |
| Collections.singletonList(segmentId), | |
| newKeyRanges, executor).getFuture(); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| // write 4 thru 9 to scaled stream | |
| if (scaleStream.join()) { | |
| List<CompletableFuture<Void>> writes = IntStream.range(4, 10).parallel() | |
| .mapToObj(scaledWriter::writeEvent).collect(Collectors.toList()); | |
| Futures.allOf(writes).join(); | |
| } else { | |
| throw new RuntimeException("Oops, something went wrong!"); | |
| } | |
| controller.close(); | |
| scaledWriter.close(); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| private <T> java.util.stream.Stream<T> iteratorToStream(Iterator<T> itor) { | |
| return StreamSupport.stream(((Iterable<T>) () -> itor).spliterator(), false); | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| // get segments and their events with batch client | |
| BatchClientFactory batchClient = BatchClientFactory.withScope("tutorial", clientConfig); | |
| StreamSegmentsIterator segments = batchClient | |
| .getSegments(Stream.of("tutorial/scaled-stream"), null, null); | |
| Set<SegmentIterator<Integer>> segmentIterators = new HashSet<>(); | |
| iteratorToStream(segments.getIterator()) | |
| .collect(Collectors.toSet()) | |
| .parallelStream() | |
| .flatMap(segmentRange -> { | |
| System.out.println("Segment ID: " + segmentRange.getSegmentId()); | |
| SegmentIterator<Integer> segmentIterator = batchClient | |
| .readSegment(segmentRange, new JavaSerializer<Integer>()); | |
| segmentIterators.add(segmentIterator); | |
| return iteratorToStream(segmentIterator); | |
| }).forEach(System.out::println); | |
| segmentIterators.stream().forEach(SegmentIterator::close); | |
| batchClient.close(); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| try (StreamManager streamManager = StreamManager.create(controllerURI)) { | |
| streamManager.createScope("tutorial"); | |
| streamManager.createStream("tutorial", "int-bytestream", streamConfig); | |
| } | |
| try (ByteStreamClientFactory byteFactory = ByteStreamClientFactory | |
| .withScope("tutorial", clientConfig)) { | |
| try (ByteStreamWriter byteWriter = byteFactory | |
| .createByteStreamWriter("int-bytestream"); | |
| DataOutputStream outStream = new DataOutputStream(byteWriter)) { | |
| outStream.writeInt(1); | |
| outStream.writeInt(2); | |
| outStream.writeInt(3); | |
| } | |
| try (ByteStreamReader byteReader = byteFactory | |
| .createByteStreamReader("int-bytestream"); | |
| DataInputStream inStream = new DataInputStream(byteReader)) { | |
| System.out.println(inStream.readInt()); | |
| System.out.println(inStream.readInt()); | |
| System.out.println(inStream.readInt()); | |
| } | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| factory.close(); | |
| readerGroupManager.deleteReaderGroup("numReader"); | |
| readerGroupManager.deleteReaderGroup("tailNumReader"); | |
| readerGroupManager.deleteReaderGroup("paraNumReader"); | |
| readerGroupManager.deleteReaderGroup("paraDecReader"); | |
| readerGroupManager.close(); | |
| try (StreamManager streamManager = StreamManager.create(controllerURI)) { | |
| streamManager.sealStream("tutorial", "numbers"); | |
| streamManager.deleteStream("tutorial", "numbers"); | |
| streamManager.sealStream("tutorial", "parallel-numbers"); | |
| streamManager.deleteStream("tutorial", "parallel-numbers"); | |
| streamManager.sealStream("tutorial", "parallel-decades"); | |
| streamManager.deleteStream("tutorial", "parallel-decades"); | |
| streamManager.sealStream("tutorial", "scaled-stream"); | |
| streamManager.deleteStream("tutorial", "scaled-stream"); | |
| streamManager.sealStream("tutorial", "int-bytestream"); | |
| streamManager.deleteStream("tutorial", "int-bytestream"); | |
| streamManager.deleteScope("tutorial"); | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment