I hereby claim:
- I am smaldini on github.
- I am smaldini (https://keybase.io/smaldini) on keybase.
- I have a public key whose fingerprint is 2696 137E 7ECA D45A CF63 4F82 D68B DFBA 314B 6F16
To claim this, I am signing this object:
| return PublisherFactory.create( | |
| (n, s) -> { | |
| long i = 0l; | |
| while (i < n && s.context().get() < elements) { | |
| if(s.isCancelled()) return; | |
| s.onNext(s.context().getAndIncrement()); | |
| i++; | |
| } | |
| if(s.context().get() == elements){ |
| HttpServer<Buffer, Buffer> server = NetStreams.httpServer("0.0.0.0", 80); | |
| server.get("/search/{search}", requestIn -> | |
| NetStreams.httpClient() | |
| .get("http://google.co.uk/?q=" + requestIn.param("search")) | |
| .flatMap(repliesOut -> requestIn.writeWith(repliesOut)) | |
| ); | |
| server.start().await(); |
| /* | |
| * Copyright (c) 2011-2015 Pivotal Software Inc, All Rights Reserved. | |
| * | |
| * Licensed under the Apache License, Version 2.0 (the "License"); | |
| * you may not use this file except in compliance with the License. | |
| * You may obtain a copy of the License at | |
| * | |
| * http://www.apache.org/licenses/LICENSE-2.0 | |
| * | |
| * Unless required by applicable law or agreed to in writing, software |
| @Test | |
| public void test5() throws Exception { | |
| //Hot stream of data, could be injected from anywhere | |
| Broadcaster<String> broadcaster = Broadcaster.<String>create(Environment.sharedDispatcher()); | |
| //Will go up to 16 parallel threads to proceed clients | |
| final int MAX_PARALLEL = 16; | |
| //Get a reference to the tail of the operation pipeline (microbatching + partitioning) |
I hereby claim:
To claim this, I am signing this object:
| def "http responds to requests from clients"() { | |
| given: "a simple HttpServer" | |
| //Listen on localhost using default impl (Netty) and assign a global codec to receive/reply String data | |
| def server = NetStreams.httpServer { | |
| it.codec(StandardCodecs.STRING_CODEC).listen(port) | |
| } | |
| //Prepare a client using default impl (Netty) to connect on http://localhost:port/ and assign global codec to send/receive String data | |
| def client = NetStreams.httpClient { |
| def "step-read and flush every 5 elems with manual decoding"() { | |
| given: "a TcpServer and a TcpClient" | |
| def latch = new CountDownLatch(10) | |
| def server = NetStreams.tcpServer(port) | |
| def client = NetStreams.tcpClient("localhost", port) | |
| def codec = new JsonCodec<Pojo, Pojo>(Pojo) | |
| when: "the client/server are prepared" | |
| server.pipeline { input -> |
| class ChronicleStreamSpec extends Specification { | |
| def "ChronicleStream persists objects and notifies subscribers"() { | |
| given: | |
| "2 slaves and 1 master" | |
| def putPromise = IOStreams.<Integer, String> persistentMapReader('journal') | |
| .onPut() | |
| .log('put') | |
| .next() |
| public class SensorProcessor implements ReactorProcessor<SensorData, SensorSummary> { | |
| @Override | |
| public Stream<SensorSummary> process(Stream<SensorData> inputStream) { | |
| return inputStream | |
| .buffer(5, 20, TimeUnit.SECONDS) | |
| //would be better to convert to stream of double 'values' and then have generic avg for type safety. |
| package reactor.rx; | |
| import org.junit.Test; | |
| import org.slf4j.Logger; | |
| import org.slf4j.LoggerFactory; | |
| import reactor.AbstractReactorTest; | |
| import reactor.fn.tuple.Tuple; | |
| import reactor.io.IOStreams; | |
| import reactor.rx.stream.MapStream; |