I hereby claim:
- I am smaldini on github.
- I am smaldini (https://keybase.io/smaldini) on keybase.
- I have a public key whose fingerprint is 2696 137E 7ECA D45A CF63 4F82 D68B DFBA 314B 6F16
To claim this, I am signing this object:
class ChronicleStreamSpec extends Specification { | |
def "ChronicleStream persists objects and notifies subscribers"() { | |
given: | |
"2 slaves and 1 master" | |
def putPromise = IOStreams.<Integer, String> persistentMapReader('journal') | |
.onPut() | |
.log('put') | |
.next() |
def "step-read and flush every 5 elems with manual decoding"() { | |
given: "a TcpServer and a TcpClient" | |
def latch = new CountDownLatch(10) | |
def server = NetStreams.tcpServer(port) | |
def client = NetStreams.tcpClient("localhost", port) | |
def codec = new JsonCodec<Pojo, Pojo>(Pojo) | |
when: "the client/server are prepared" | |
server.pipeline { input -> |
def "http responds to requests from clients"() { | |
given: "a simple HttpServer" | |
//Listen on localhost using default impl (Netty) and assign a global codec to receive/reply String data | |
def server = NetStreams.httpServer { | |
it.codec(StandardCodecs.STRING_CODEC).listen(port) | |
} | |
//Prepare a client using default impl (Netty) to connect on http://localhost:port/ and assign global codec to send/receive String data | |
def client = NetStreams.httpClient { |
I hereby claim:
To claim this, I am signing this object:
@Test | |
public void test5() throws Exception { | |
//Hot stream of data, could be injected from anywhere | |
Broadcaster<String> broadcaster = Broadcaster.<String>create(Environment.sharedDispatcher()); | |
//Will go up to 16 parallel threads to proceed clients | |
final int MAX_PARALLEL = 16; | |
//Get a reference to the tail of the operation pipeline (microbatching + partitioning) |
/* | |
* Copyright (c) 2011-2015 Pivotal Software Inc, All Rights Reserved. | |
* | |
* Licensed under the Apache License, Version 2.0 (the "License"); | |
* you may not use this file except in compliance with the License. | |
* You may obtain a copy of the License at | |
* | |
* http://www.apache.org/licenses/LICENSE-2.0 | |
* | |
* Unless required by applicable law or agreed to in writing, software |
HttpServer<Buffer, Buffer> server = NetStreams.httpServer("0.0.0.0", 80); | |
server.get("/search/{search}", requestIn -> | |
NetStreams.httpClient() | |
.get("http://google.co.uk/?q=" + requestIn.param("search")) | |
.flatMap(repliesOut -> requestIn.writeWith(repliesOut)) | |
); | |
server.start().await(); |
return PublisherFactory.create( | |
(n, s) -> { | |
long i = 0l; | |
while (i < n && s.context().get() < elements) { | |
if(s.isCancelled()) return; | |
s.onNext(s.context().getAndIncrement()); | |
i++; | |
} | |
if(s.context().get() == elements){ |
final Broadcaster<String> closeCircuit = Broadcaster.create(); | |
final Stream<String> openCircuit = Streams.just("Alternative Message"); | |
final Action<Publisher<? extends String>, String> circuitSwitcher = Streams.switchOnNext(); | |
final AtomicInteger successes = new AtomicInteger(); | |
final AtomicInteger failures = new AtomicInteger(); | |
final int maxErrors = 3; |
final Broadcaster<Integer> keyboardStream = Broadcaster.create(); | |
Promise<List<Boolean>> konamis = keyboardStream | |
.skipWhile(key -> KeyEvent.VK_UP != key) | |
.buffer(10, 1) | |
.map(keys -> keys.size() == 10 && | |
keys.get(0) == KeyEvent.VK_UP && | |
keys.get(1) == KeyEvent.VK_UP && | |
keys.get(2) == KeyEvent.VK_DOWN && | |
keys.get(3) == KeyEvent.VK_DOWN && |