This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
* Copyright (c) 2011-2017 Pivotal Software Inc, All Rights Reserved. | |
* | |
* Licensed under the Apache License, Version 2.0 (the "License"); | |
* you may not use this file except in compliance with the License. | |
* You may obtain a copy of the License at | |
* | |
* http://www.apache.org/licenses/LICENSE-2.0 | |
* | |
* Unless required by applicable law or agreed to in writing, software |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Flux<Integer> intFlux = Flux.range(1, 1000) | |
//2 | |
//old: test=baseSubscriber | |
//next: test=baseSubscriber_range | |
.contextualize((old, next) -> next.put("test", old.get("test") + "_range")) | |
.flatMap(d -> Flux.just(d) | |
//4 (x1000) | |
//old: test=baseSubscriber_range | |
//next: test=baseSubscriber_range_innerFlatmap | |
.contextualize((old, next) -> next.put("test", old.get("test") + "_innerFlatmap"))) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@Controller | |
public class TestController { | |
@Autowired | |
//Some reactive database repository | |
private final ProfileRepository profileRepository; | |
public TestController(ProfileRepository profileRepository){ | |
this.profileRepository = profileRepository; | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
//Create an HTTP Server on 8080 | |
ReactiveNet.httpServer(8080) | |
//on HTTP Post matching /hello URI | |
.post("/hello", httpChannel -> | |
//echo input to output and close on flush | |
httpChannel.writeWith(httpChannel.input()) | |
) | |
//Make sure server is ready before starting client | |
.startAndAwait(); | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// AeronProcessor as a server | |
AeronProcessor server = AeronProcessor.builder() | |
.senderChannel("udp://localhost:12000") | |
.receiverChannel("udp://localhost:12001") | |
.senderOnly(true) | |
.create(); | |
// | |
IO.readFile("~/test.txt") | |
.subscribe(server); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// AeronProcessor as a server | |
AeronProcessor server = AeronProcessor.builder() | |
.senderChannel("udp://localhost:12000") | |
.receiverChannel("udp://localhost:12001") | |
.senderOnly(true) | |
.create(); | |
// Read file bytes on server command | |
IO.readFile("~/test.txt") | |
.subscribe(server); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Create an async message-passing Processor exposing a Flux API | |
TopicProcessor<String> sink = TopicProcessor.create(); | |
// Scatter Gather the input sequence | |
sink | |
.map(String::toUpperCase) | |
.flatMap(s -> | |
Mono.fromCallable(() -> someRepository.findOneByCategory(s)) | |
.timeout(Duration.ofSeconds(3), someRepository::fallback) | |
.subscribeOn(Schedulers.parallel()) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
HttpClient<Buffer, Buffer> client = NetStreams.httpClient(); | |
final String postUri = "https://hooks.slack.com/services/xxxxx"; | |
final NioEventLoopGroup group = new NioEventLoopGroup(12, | |
new NamedDaemonThreadFactory("reactor-tcp-io")); | |
client.get("https://stream.gitter.im/v1/rooms/xxxxxx/chatMessages", ch -> { | |
ch |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
final Broadcaster<Integer> keyboardStream = Broadcaster.create(); | |
Promise<List<Boolean>> konamis = keyboardStream | |
.skipWhile(key -> KeyEvent.VK_UP != key) | |
.buffer(10, 1) | |
.map(keys -> keys.size() == 10 && | |
keys.get(0) == KeyEvent.VK_UP && | |
keys.get(1) == KeyEvent.VK_UP && | |
keys.get(2) == KeyEvent.VK_DOWN && | |
keys.get(3) == KeyEvent.VK_DOWN && |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
final Broadcaster<String> closeCircuit = Broadcaster.create(); | |
final Stream<String> openCircuit = Streams.just("Alternative Message"); | |
final Action<Publisher<? extends String>, String> circuitSwitcher = Streams.switchOnNext(); | |
final AtomicInteger successes = new AtomicInteger(); | |
final AtomicInteger failures = new AtomicInteger(); | |
final int maxErrors = 3; |
NewerOlder