Created
April 13, 2018 08:22
-
-
Save muzzah/09e142e1bd2e36b640588efc78f8625d to your computer and use it in GitHub Desktop.
Rx data loss example
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import io.reactivex.Completable; | |
import io.reactivex.schedulers.Schedulers; | |
import io.reactivex.subjects.PublishSubject; | |
public class Example { | |
static class MessageLoop { | |
PublishSubject subject = PublishSubject.create(); | |
void start() { | |
Completable.fromAction(() -> { | |
int count = 0; | |
while(true) { | |
System.out.println("Firing events"); | |
subject.onNext(count); | |
count++; | |
subject.onNext(count); | |
count++; | |
try { | |
Thread.sleep(500); | |
} catch(Exception e ) { | |
} | |
} | |
}).subscribeOn(Schedulers.newThread()) | |
.subscribe(); | |
} | |
} | |
static class MessageLoopListener { | |
PublishSubject secondSubject = PublishSubject.create(); | |
MessageLoop loop = new MessageLoop(); | |
void listen() { | |
loop.subject | |
.observeOn(Schedulers.computation()) | |
.subscribe(object -> { | |
System.out.println("MLL " + object); | |
secondSubject.onNext(object); | |
}); | |
} | |
public void start() { | |
loop.start(); | |
} | |
} | |
static class Server { | |
PublishSubject serversubject = PublishSubject.create(); | |
MessageLoopListener loop = new MessageLoopListener(); | |
void listen() { | |
loop.listen(); | |
loop.secondSubject | |
.observeOn(Schedulers.computation()) | |
.subscribe(object -> { | |
System.out.println("Server Loop " + object); | |
serversubject.onNext(object); | |
}); | |
} | |
void start() { | |
loop.start(); | |
} | |
} | |
static class FinalObserver { | |
Server server = new Server(); | |
void observe() { | |
server.listen(); | |
server.serversubject.doOnSubscribe(disposable -> { | |
//Do some setup here | |
System.out.println("doOnSubscribe"); | |
}).filter(object -> { | |
//DO some filtering | |
System.out.println("filter " + object); | |
return true; | |
}).map(object -> { | |
//map object | |
System.out.println("map " + object); | |
return object; | |
}).observeOn(Schedulers.single()) | |
.subscribe(object -> { | |
//update UI | |
System.out.println("Received object " + object); | |
}); | |
} | |
} | |
public static void main(String[] args) { | |
FinalObserver finalObserver = new FinalObserver(); | |
finalObserver.observe(); | |
finalObserver.server.start(); | |
try { | |
Thread.currentThread().join(); | |
} catch (InterruptedException e) { | |
e.printStackTrace(); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment