This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package reactor.tcp.netty; | |
import org.junit.Test; | |
import reactor.core.Environment; | |
import reactor.tcp.TcpClient; | |
import reactor.tcp.TcpServer; | |
import java.util.concurrent.TimeUnit; | |
import static junit.framework.Assert.assertEquals; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
//A perfectly valid Reactive Streams Processor with Reactor (to attach to a Stream or connect with Actions/Promises) : | |
//This Processor is an Identity Producer/Subscriber - every request is propagated upside and every value propagated downside | |
// identityProcessor will not affect its input value and will eventually broadcast it to its subscribers. | |
deferred = Streams.defer(env); | |
deferred | |
.parallel(env.getDispatcher(dispatcher)) | |
.map(stream -> stream | |
.map(i -> i) | |
.reduce((Tuple2<Integer, Integer> tup) -> { |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def 'GroupBy will re-route N elements to a nested stream based on the mapped key'() { | |
given: | |
'a source and a collected window stream' | |
def source = Streams.<SimplePojo> defer() | |
def result = [:] | |
source.groupBy { pojo -> | |
pojo.id | |
}.consume { stream -> | |
stream.consume { pojo -> |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"id": "Stream", | |
"to": [ | |
{ | |
"bound-to": [ | |
{ | |
"id": "1", | |
"to": [ | |
{ | |
"id": "Callback", |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
* Copyright (c) 2011-2013 the original author or authors. | |
* | |
* Licensed under the Apache License, Version 2.0 (the "License"); | |
* you may not use this file except in compliance with the License. | |
* You may obtain a copy of the License at | |
* | |
* http://www.apache.org/licenses/LICENSE-2.0 | |
* | |
* Unless required by applicable law or agreed to in writing, software |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
2014-10-01 17:53:24,370 [48270792] WARN - nal.AbstractExternalSystemTask - Cause: ratpack/gradle/RatpackGroovyPlugin : Unsupported major.minor version 52.0 | |
com.intellij.openapi.externalSystem.model.LocationAwareExternalSystemException: Cause: ratpack/gradle/RatpackGroovyPlugin : Unsupported major.minor version 52.0 | |
at org.jetbrains.plugins.gradle.service.project.AbstractProjectImportErrorHandler.createUserFriendlyError(AbstractProjectImportErrorHandler.java:103) | |
at org.jetbrains.plugins.gradle.service.project.BaseProjectImportErrorHandler.getUserFriendlyError(BaseProjectImportErrorHandler.java:158) | |
at org.jetbrains.plugins.gradle.service.project.BaseGradleProjectResolverExtension.getUserFriendlyError(BaseGradleProjectResolverExtension.java:428) | |
at org.jetbrains.plugins.gradle.service.project.AbstractProjectResolverExtension.getUserFriendlyError(AbstractProjectResolverExtension.java:164) | |
at com.android.tools.idea.gradle.project.AndroidGradleProjectResolver.getUserFriendlyError(AndroidGradleProjectReso |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import reactor.core.Environment | |
import reactor.rx.Streams | |
import reactor.rx.amqp.LapinStreams | |
import reactor.rx.amqp.signal.ExchangeSignal | |
import reactor.rx.amqp.spec.Queue | |
import spock.lang.Shared | |
import spock.lang.Specification | |
import java.util.concurrent.CountDownLatch | |
import java.util.concurrent.TimeUnit |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
try { | |
def lapin = Lapin.from(someConnectionFactory) | |
//Unique channel/publisher, auto close on Subscription#cancel(). | |
def publisher = LapinStreams.toLapinAndExchange(lapin, someExchangeName) | |
publisher.dispatchOn(env) | |
publisher.start() | |
def replyToStream = publisher | |
.replyTo(props.replyTo) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package reactor.rx; | |
import org.junit.Test; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
import reactor.AbstractReactorTest; | |
import reactor.fn.tuple.Tuple; | |
import reactor.io.IOStreams; | |
import reactor.rx.stream.MapStream; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class SensorProcessor implements ReactorProcessor<SensorData, SensorSummary> { | |
@Override | |
public Stream<SensorSummary> process(Stream<SensorData> inputStream) { | |
return inputStream | |
.buffer(5, 20, TimeUnit.SECONDS) | |
//would be better to convert to stream of double 'values' and then have generic avg for type safety. |