Skip to content

Instantly share code, notes, and snippets.

View smaldini's full-sized avatar

Stephane Maldini smaldini

View GitHub Profile
package reactor.tcp.netty;
import org.junit.Test;
import reactor.core.Environment;
import reactor.tcp.TcpClient;
import reactor.tcp.TcpServer;
import java.util.concurrent.TimeUnit;
import static junit.framework.Assert.assertEquals;
//A perfectly valid Reactive Streams Processor with Reactor (to attach to a Stream or connect with Actions/Promises) :
//This Processor is an Identity Producer/Subscriber - every request is propagated upside and every value propagated downside
// identityProcessor will not affect its input value and will eventually broadcast it to its subscribers.
deferred = Streams.defer(env);
deferred
.parallel(env.getDispatcher(dispatcher))
.map(stream -> stream
.map(i -> i)
.reduce((Tuple2<Integer, Integer> tup) -> {
def 'GroupBy will re-route N elements to a nested stream based on the mapped key'() {
given:
'a source and a collected window stream'
def source = Streams.<SimplePojo> defer()
def result = [:]
source.groupBy { pojo ->
pojo.id
}.consume { stream ->
stream.consume { pojo ->
{
"id": "Stream",
"to": [
{
"bound-to": [
{
"id": "1",
"to": [
{
"id": "Callback",
/*
* Copyright (c) 2011-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
2014-10-01 17:53:24,370 [48270792] WARN - nal.AbstractExternalSystemTask - Cause: ratpack/gradle/RatpackGroovyPlugin : Unsupported major.minor version 52.0
com.intellij.openapi.externalSystem.model.LocationAwareExternalSystemException: Cause: ratpack/gradle/RatpackGroovyPlugin : Unsupported major.minor version 52.0
at org.jetbrains.plugins.gradle.service.project.AbstractProjectImportErrorHandler.createUserFriendlyError(AbstractProjectImportErrorHandler.java:103)
at org.jetbrains.plugins.gradle.service.project.BaseProjectImportErrorHandler.getUserFriendlyError(BaseProjectImportErrorHandler.java:158)
at org.jetbrains.plugins.gradle.service.project.BaseGradleProjectResolverExtension.getUserFriendlyError(BaseGradleProjectResolverExtension.java:428)
at org.jetbrains.plugins.gradle.service.project.AbstractProjectResolverExtension.getUserFriendlyError(AbstractProjectResolverExtension.java:164)
at com.android.tools.idea.gradle.project.AndroidGradleProjectResolver.getUserFriendlyError(AndroidGradleProjectReso
import reactor.core.Environment
import reactor.rx.Streams
import reactor.rx.amqp.LapinStreams
import reactor.rx.amqp.signal.ExchangeSignal
import reactor.rx.amqp.spec.Queue
import spock.lang.Shared
import spock.lang.Specification
import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit
@smaldini
smaldini / gist:d6858b08773e028f5383
Last active August 29, 2015 14:08
A sendAndReceive RabbitMQ flow with Reactor AMQP support
try {
def lapin = Lapin.from(someConnectionFactory)
//Unique channel/publisher, auto close on Subscription#cancel().
def publisher = LapinStreams.toLapinAndExchange(lapin, someExchangeName)
publisher.dispatchOn(env)
publisher.start()
def replyToStream = publisher
.replyTo(props.replyTo)
package reactor.rx;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.AbstractReactorTest;
import reactor.fn.tuple.Tuple;
import reactor.io.IOStreams;
import reactor.rx.stream.MapStream;
public class SensorProcessor implements ReactorProcessor<SensorData, SensorSummary> {
@Override
public Stream<SensorSummary> process(Stream<SensorData> inputStream) {
return inputStream
.buffer(5, 20, TimeUnit.SECONDS)
//would be better to convert to stream of double 'values' and then have generic avg for type safety.