Skip to content

Instantly share code, notes, and snippets.

@timyates
Created December 28, 2014 22:36
Show Gist options
  • Save timyates/c4224fb7753b460bd1fb to your computer and use it in GitHub Desktop.
Save timyates/c4224fb7753b460bd1fb to your computer and use it in GitHub Desktop.
RXJava Split based on Type
/**
* Not sure if this is the best way to do this... Comments more than welcome
*
* Given a stream of Events, how best to split them to be processed separately
*/
import rx.Observable;
import rx.subjects.BehaviorSubject;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import java.util.function.Function;
public class RxSplitByType {
Random rnd = new Random();
Function<Integer,Observable.OnSubscribe<Event>> subscribeFunction = n -> subscriber -> {
try {
for (int i = 0; i < n; i++) {
if (!subscriber.isUnsubscribed()) {
Event e = rnd.nextInt(100) < 50 ? new EventTypeA("Tim") : new EventTypeB("Yates");
subscriber.onNext(e);
}
}
if (!subscriber.isUnsubscribed()) {
subscriber.onCompleted();
}
} catch(Throwable t) {
if (!subscriber.isUnsubscribed()) {
subscriber.onError(t);
}
}
};
Observable<Event> events() {
// Create an Observable of 20 random events
return Observable.create(subscribeFunction.apply(20));
}
private void run() {
Map<Class,BehaviorSubject<? super Event>> subjects = new HashMap<>();
// This will manipulate events of type A
BehaviorSubject<? super Event> typea = BehaviorSubject.create();
typea.subscribe(e -> System.out.println("A SUBJECT " + e));
subjects.put(EventTypeA.class, typea);
// This will manipulate events of type B
BehaviorSubject<? super Event> typeb = BehaviorSubject.create();
typeb.subscribe(e -> System.out.println("B SUBJECT " + e));
subjects.put(EventTypeB.class, typeb);
// Group by the class of the event
events().groupBy(Object::getClass)
.subscribe(group -> {
// Then, based on the key of the group, send to the relevant subject
group.subscribe((Event evt) ->
subjects.computeIfAbsent(group.getKey(), c -> {throw new RuntimeException("Unknown Event " + c);})
.onNext(evt));
});
}
public static void main(String[] args) {
new RxSplitByType().run();
}
}
abstract class Event {
private static long ID = 1;
private long id;
private String name;
public Event(String name) {
this.name = name;
this.id = ID++;
}
@Override
public String toString() {
return this.getClass().getSimpleName() + " " + id + " " + name;
}
}
class EventTypeA extends Event {
public EventTypeA(String name) {
super(name);
}
}
class EventTypeB extends Event {
public EventTypeB(String name) {
super(name);
}
}
@timyates
Copy link
Author

As suggested by Russell Hart, it's probably better to have multiple subscribers to the events() observable, and have a filter on each of them...

@timyates
Copy link
Author

Something like this:

        // Get our events
        ConnectableObservable<Event> events = events().publish();

        // Connect our EventTypeA handler
        Observable<Event> a = events.filter(e -> e instanceof EventTypeA);
        a.subscribe(e -> System.out.println("A SUBJECT " + e));

        // Connect our EventTypeB handler
        Observable<Event> b = events.filter(e -> e instanceof EventTypeB);
        b.subscribe(e -> System.out.println("B SUBJECT " + e));

        // Go!
        events.connect();

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment