Skip to content

Instantly share code, notes, and snippets.

View smaldini's full-sized avatar

Stephane Maldini smaldini

View GitHub Profile
/*
* Copyright (c) 2011-2017 Pivotal Software Inc, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
Flux<Integer> intFlux = Flux.range(1, 1000)
//2
//old: test=baseSubscriber
//next: test=baseSubscriber_range
.contextualize((old, next) -> next.put("test", old.get("test") + "_range"))
.flatMap(d -> Flux.just(d)
//4 (x1000)
//old: test=baseSubscriber_range
//next: test=baseSubscriber_range_innerFlatmap
.contextualize((old, next) -> next.put("test", old.get("test") + "_innerFlatmap")))
@smaldini
smaldini / Spring.java
Last active November 11, 2015 00:52
Spring 5 "Reactive Features" in action powered and demonstrated here with Reactor Streams
@Controller
public class TestController {
@Autowired
//Some reactive database repository
private final ProfileRepository profileRepository;
public TestController(ProfileRepository profileRepository){
this.profileRepository = profileRepository;
}
@smaldini
smaldini / ReactorNet.java
Last active November 11, 2015 00:27
TBC
//Create an HTTP Server on 8080
ReactiveNet.httpServer(8080)
//on HTTP Post matching /hello URI
.post("/hello", httpChannel ->
//echo input to output and close on flush
httpChannel.writeWith(httpChannel.input())
)
//Make sure server is ready before starting client
.startAndAwait();
@smaldini
smaldini / Multicast.java
Last active November 11, 2015 00:28
TBD
// AeronProcessor as a server
AeronProcessor server = AeronProcessor.builder()
.senderChannel("udp://localhost:12000")
.receiverChannel("udp://localhost:12001")
.senderOnly(true)
.create();
//
IO.readFile("~/test.txt")
.subscribe(server);
@smaldini
smaldini / Aeron.java
Last active November 11, 2015 00:01 — forked from kadyana/gist:d4c73101a54ffe5afb99
Reactive Client Server with Aeron transport
// AeronProcessor as a server
AeronProcessor server = AeronProcessor.builder()
.senderChannel("udp://localhost:12000")
.receiverChannel("udp://localhost:12001")
.senderOnly(true)
.create();
// Read file bytes on server command
IO.readFile("~/test.txt")
.subscribe(server);
@smaldini
smaldini / sample.java
Last active June 10, 2016 16:44 — forked from jbrisbin/BroadcastStream.java
Scatter gather Async Flux example using #ProjectReactor
// Create an async message-passing Processor exposing a Flux API
TopicProcessor<String> sink = TopicProcessor.create();
// Scatter Gather the input sequence
sink
.map(String::toUpperCase)
.flatMap(s ->
Mono.fromCallable(() -> someRepository.findOneByCategory(s))
.timeout(Duration.ofSeconds(3), someRepository::fallback)
.subscribeOn(Schedulers.parallel())
@smaldini
smaldini / gist:cd92a8d6c271a12d3ace
Last active August 29, 2015 14:23
Gitter to Slack relay
HttpClient<Buffer, Buffer> client = NetStreams.httpClient();
final String postUri = "https://hooks.slack.com/services/xxxxx";
final NioEventLoopGroup group = new NioEventLoopGroup(12,
new NamedDaemonThreadFactory("reactor-tcp-io"));
client.get("https://stream.gitter.im/v1/rooms/xxxxxx/chatMessages", ch -> {
ch
final Broadcaster<Integer> keyboardStream = Broadcaster.create();
Promise<List<Boolean>> konamis = keyboardStream
.skipWhile(key -> KeyEvent.VK_UP != key)
.buffer(10, 1)
.map(keys -> keys.size() == 10 &&
keys.get(0) == KeyEvent.VK_UP &&
keys.get(1) == KeyEvent.VK_UP &&
keys.get(2) == KeyEvent.VK_DOWN &&
keys.get(3) == KeyEvent.VK_DOWN &&
final Broadcaster<String> closeCircuit = Broadcaster.create();
final Stream<String> openCircuit = Streams.just("Alternative Message");
final Action<Publisher<? extends String>, String> circuitSwitcher = Streams.switchOnNext();
final AtomicInteger successes = new AtomicInteger();
final AtomicInteger failures = new AtomicInteger();
final int maxErrors = 3;