Skip to content

Instantly share code, notes, and snippets.

@pintowar
Created March 1, 2016 15:28
Show Gist options
  • Save pintowar/64e0fb0e45efe73900b7 to your computer and use it in GitHub Desktop.
Save pintowar/64e0fb0e45efe73900b7 to your computer and use it in GitHub Desktop.
Using RxJava to sequentially unsubscribe an Observable (counter) as a new instance is generated!
@Grab(group='io.reactivex', module='rxjava', version='1.1.1')
@Grab(group='com.hazelcast', module='hazelcast', version='3.6')
import rx.Observable
import rx.Observer
import rx.Subscription
import rx.schedulers.Schedulers
import rx.subscriptions.Subscriptions
import com.hazelcast.core.Hazelcast
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicBoolean
// Custom Observable to capture keyboard input (after press <Enter>)
Observable<Character> keyBoard(final Object main) {
Observable.create { final Observer<Character> observer ->
Scanner scan = new Scanner(System.in)
AtomicBoolean cond = new AtomicBoolean(true)
Thread.start {
while (cond.get()) {
String msg = scan.nextLine()
if (msg.trim() == 'quit') {
observer.onCompleted()
break
}
try {
observer.onNext(msg.charAt(0))
} catch (StringIndexOutOfBoundsException e) {
continue
}
}
scan.close()
synchronized (main) { main.notify() }
}
Subscriptions.create { cond.set(false) }
}
}
// Unsubscribe an old Observable as a new instance arrives
Observable<Integer> unsubStream(Observable<Character> instances, Observable<Integer> counter) {
Observable.create { Observer<Integer> observer ->
instances.map { Character action ->
counter.subscribe({ it -> observer.onNext(it) }, { err -> observer.onError(err) })
}.buffer(2, 1).subscribe { List<Subscription> subs ->
subs.first().unsubscribe()
}
Subscriptions.empty()
}
}
def hazelcast = Hazelcast.newHazelcastInstance()
def topic = hazelcast.getTopic('unsub-stream')
println 'Enter any character sequence (quit to exit)'
Observable<Character> instances = keyBoard(this)
Observable<Integer> counter = Observable.interval(1, TimeUnit.SECONDS, Schedulers.io())
unsubStream(instances, counter).subscribe { topic.publish(it) }
synchronized (this) { this.wait() }
@Grab(group='com.hazelcast', module='hazelcast', version='3.6')
import com.hazelcast.core.Hazelcast
import com.hazelcast.core.MessageListener
def hazelcast = Hazelcast.newHazelcastInstance()
def topic = hazelcast.getTopic('unsub-stream')
topic.addMessageListener({ println "output: ${it.messageObject}" } as MessageListener)
synchronized (this) { this.wait() }
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment