Created
December 28, 2014 22:36
-
-
Save timyates/c4224fb7753b460bd1fb to your computer and use it in GitHub Desktop.
RXJava Split based on Type
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* Not sure if this is the best way to do this... Comments more than welcome | |
* | |
* Given a stream of Events, how best to split them to be processed separately | |
*/ | |
import rx.Observable; | |
import rx.subjects.BehaviorSubject; | |
import java.util.HashMap; | |
import java.util.Map; | |
import java.util.Random; | |
import java.util.function.Function; | |
public class RxSplitByType { | |
Random rnd = new Random(); | |
Function<Integer,Observable.OnSubscribe<Event>> subscribeFunction = n -> subscriber -> { | |
try { | |
for (int i = 0; i < n; i++) { | |
if (!subscriber.isUnsubscribed()) { | |
Event e = rnd.nextInt(100) < 50 ? new EventTypeA("Tim") : new EventTypeB("Yates"); | |
subscriber.onNext(e); | |
} | |
} | |
if (!subscriber.isUnsubscribed()) { | |
subscriber.onCompleted(); | |
} | |
} catch(Throwable t) { | |
if (!subscriber.isUnsubscribed()) { | |
subscriber.onError(t); | |
} | |
} | |
}; | |
Observable<Event> events() { | |
// Create an Observable of 20 random events | |
return Observable.create(subscribeFunction.apply(20)); | |
} | |
private void run() { | |
Map<Class,BehaviorSubject<? super Event>> subjects = new HashMap<>(); | |
// This will manipulate events of type A | |
BehaviorSubject<? super Event> typea = BehaviorSubject.create(); | |
typea.subscribe(e -> System.out.println("A SUBJECT " + e)); | |
subjects.put(EventTypeA.class, typea); | |
// This will manipulate events of type B | |
BehaviorSubject<? super Event> typeb = BehaviorSubject.create(); | |
typeb.subscribe(e -> System.out.println("B SUBJECT " + e)); | |
subjects.put(EventTypeB.class, typeb); | |
// Group by the class of the event | |
events().groupBy(Object::getClass) | |
.subscribe(group -> { | |
// Then, based on the key of the group, send to the relevant subject | |
group.subscribe((Event evt) -> | |
subjects.computeIfAbsent(group.getKey(), c -> {throw new RuntimeException("Unknown Event " + c);}) | |
.onNext(evt)); | |
}); | |
} | |
public static void main(String[] args) { | |
new RxSplitByType().run(); | |
} | |
} | |
abstract class Event { | |
private static long ID = 1; | |
private long id; | |
private String name; | |
public Event(String name) { | |
this.name = name; | |
this.id = ID++; | |
} | |
@Override | |
public String toString() { | |
return this.getClass().getSimpleName() + " " + id + " " + name; | |
} | |
} | |
class EventTypeA extends Event { | |
public EventTypeA(String name) { | |
super(name); | |
} | |
} | |
class EventTypeB extends Event { | |
public EventTypeB(String name) { | |
super(name); | |
} | |
} |
Something like this:
// Get our events
ConnectableObservable<Event> events = events().publish();
// Connect our EventTypeA handler
Observable<Event> a = events.filter(e -> e instanceof EventTypeA);
a.subscribe(e -> System.out.println("A SUBJECT " + e));
// Connect our EventTypeB handler
Observable<Event> b = events.filter(e -> e instanceof EventTypeB);
b.subscribe(e -> System.out.println("B SUBJECT " + e));
// Go!
events.connect();
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
As suggested by Russell Hart, it's probably better to have multiple subscribers to the
events()
observable, and have a filter on each of them...