Skip to content

Instantly share code, notes, and snippets.

@smaldini
Last active August 29, 2015 14:07
Show Gist options
  • Save smaldini/d60ea05a85f1513e30c9 to your computer and use it in GitHub Desktop.
Save smaldini/d60ea05a85f1513e30c9 to your computer and use it in GitHub Desktop.
import reactor.core.Environment
import reactor.rx.Streams
import reactor.rx.amqp.LapinStreams
import reactor.rx.amqp.signal.ExchangeSignal
import reactor.rx.amqp.spec.Queue
import spock.lang.Shared
import spock.lang.Specification
import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit
/**
* @author Stephane Maldini
*/
class LapinSpec extends Specification {
@Shared
Environment environment
void setup() {
environment = new Environment()
}
def cleanup() {
environment.shutdown()
}
def 'A simple stream from rabbit queue'() {
when:
'we declare and listen to localhost queue'
def latch = new CountDownLatch(1)
def value = ''
def value2 = ''
LapinStreams.fromQueue(
Queue.create('test').durable(true).bind('test')
)
.qosTolerance(15.0f) //will adapt BasicQos everytime we go beyond 15% requested volume difference with the last request
.bindAckToRequest(true) //will batch BasicACK each time a subscriber is demanding, considering previous requested volume ACK
.dispatchOn(environment)
.consume {
value = it.toString()
Streams.just(it.replyTo('hey Bernard'))
.connect(LapinStreams.toDefaultExchange())
.drain()
}.finallyDo {
println 'complete'
value = 'test'
}
and:
'a message is published'
Streams.just('hello Bob')
.map { ExchangeSignal.from(it) }
.connect(LapinStreams.toExchange('test'))
.replyTo()
.consume {
value2 = it.toString()
latch.countDown()
}
then:
'received and reply values are available'
latch.await(45, TimeUnit.SECONDS)
value == 'hello Bob'
value2 == 'hey Bernard'
}
}
@jgriff
Copy link

jgriff commented Oct 9, 2014

@smaldini, @jbrisbin this is some exciting stuff. Where are you working it? Is there a reactor-amqp repo I'm just not seeing?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment