Skip to content

Instantly share code, notes, and snippets.

@muzzah
Created April 13, 2018 08:22
Show Gist options
  • Save muzzah/09e142e1bd2e36b640588efc78f8625d to your computer and use it in GitHub Desktop.
Save muzzah/09e142e1bd2e36b640588efc78f8625d to your computer and use it in GitHub Desktop.
Rx data loss example
import io.reactivex.Completable;
import io.reactivex.schedulers.Schedulers;
import io.reactivex.subjects.PublishSubject;
public class Example {
static class MessageLoop {
PublishSubject subject = PublishSubject.create();
void start() {
Completable.fromAction(() -> {
int count = 0;
while(true) {
System.out.println("Firing events");
subject.onNext(count);
count++;
subject.onNext(count);
count++;
try {
Thread.sleep(500);
} catch(Exception e ) {
}
}
}).subscribeOn(Schedulers.newThread())
.subscribe();
}
}
static class MessageLoopListener {
PublishSubject secondSubject = PublishSubject.create();
MessageLoop loop = new MessageLoop();
void listen() {
loop.subject
.observeOn(Schedulers.computation())
.subscribe(object -> {
System.out.println("MLL " + object);
secondSubject.onNext(object);
});
}
public void start() {
loop.start();
}
}
static class Server {
PublishSubject serversubject = PublishSubject.create();
MessageLoopListener loop = new MessageLoopListener();
void listen() {
loop.listen();
loop.secondSubject
.observeOn(Schedulers.computation())
.subscribe(object -> {
System.out.println("Server Loop " + object);
serversubject.onNext(object);
});
}
void start() {
loop.start();
}
}
static class FinalObserver {
Server server = new Server();
void observe() {
server.listen();
server.serversubject.doOnSubscribe(disposable -> {
//Do some setup here
System.out.println("doOnSubscribe");
}).filter(object -> {
//DO some filtering
System.out.println("filter " + object);
return true;
}).map(object -> {
//map object
System.out.println("map " + object);
return object;
}).observeOn(Schedulers.single())
.subscribe(object -> {
//update UI
System.out.println("Received object " + object);
});
}
}
public static void main(String[] args) {
FinalObserver finalObserver = new FinalObserver();
finalObserver.observe();
finalObserver.server.start();
try {
Thread.currentThread().join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment