Last active
September 5, 2016 14:51
-
-
Save cedricvidal/410b2c0131bae742959f to your computer and use it in GitHub Desktop.
RxJava Observables Cartesian Product
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import rx.Observable; | |
import rx.functions.Func1; | |
import rx.functions.Func2; | |
import static java.util.Arrays.asList; | |
import static java.util.Collections.singleton; | |
/** | |
* Computes the cartesian product of Observables. | |
* | |
* @author <a href="mailto:[email protected]">Cedric Vidal, Quicksign</a> | |
*/ | |
public class ObservablesCartesianProduct { | |
/** | |
* Computes the cartesian product of a variable number of observables | |
* | |
* @param observables | |
* @return | |
*/ | |
public static <T> Observable<Observable<T>> cartesianProduct(Observable<Observable<T>> observables) { | |
Observable<Observable<T>> head = observables | |
.take(1) // take first stream of T, this is a Observable<Observable<T>> | |
.flatMap(new Func1<Observable<T>, Observable<Observable<T>>>() { // wrap each T in a singleton stream | |
@Override | |
public Observable<Observable<T>> call(Observable<T> o) { | |
return o.map(QsObservables.<T>singletonF()); | |
} | |
}); | |
Observable<Observable<T>> tail = observables.skip(1); // take tail | |
return Observable.merge(tail.reduce(head, new Func2<Observable<Observable<T>>, Observable<T>, Observable<Observable<T>>>() { | |
@Override | |
public Observable<Observable<T>> call(Observable<Observable<T>> i1, Observable<T> i2) { | |
return doCartesianProduct(i1, i2); | |
} | |
})); | |
} | |
private static <T> Observable<Observable<T>> doCartesianProduct(Observable<Observable<T>> i1, final Observable<T> i2) { | |
return i1.flatMap(new Func1<Observable<T>, Observable<Observable<T>>>() { | |
@Override | |
public Observable<Observable<T>> call(final Observable<T> s1) { | |
return i2.map(new Func1<T, Observable<T>>() { | |
@Override | |
public Observable<T> call(T s2) { | |
return Observable.merge(s1, Observable.from(singleton(s2))); | |
} | |
}); | |
} | |
}); | |
} | |
public static <T> Observable<Observable<T>> cartesianProduct(Observable<T> i1, final Observable<T> i2) { | |
return i1.flatMap(new Func1<T, Observable<Observable<T>>>() { | |
@Override | |
public Observable<Observable<T>> call(final T s1) { | |
return i2.map(new Func1<T, Observable<T>>() { | |
@Override | |
public Observable<T> call(T s2) { | |
return Observable.from(asList(s1, s2)); | |
} | |
}); | |
} | |
}); | |
} | |
private static <T> Func1<T, Observable<T>> singletonF() { | |
return new Func1<T, Observable<T>>() { | |
@Override | |
public Observable<T> call(T s) { | |
return Observable.from(singleton(s)); | |
} | |
}; | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.quicksign.pdfmetareader.drools.functions.util; | |
import com.google.common.collect.Iterables; | |
import org.junit.Test; | |
import rx.Observable; | |
import rx.functions.Func1; | |
import static java.util.Arrays.asList; | |
import static org.junit.Assert.assertEquals; | |
public class ObservablesCartesianProductTest { | |
@Test | |
public void testCartesianProductRxJava() { | |
Observable<String> i1 = Observable.from(asList("a1", "b1")); | |
final Observable<String> i2 = Observable.from(asList("a2", "b2")); | |
Observable<Observable<String>> cartesian = ObservablesCartesianProduct.cartesianProduct(i1, i2); | |
Iterable<Iterable<String>> cartesianIt = toIterable(cartesian); | |
assertEquals(4, Iterables.size(cartesianIt)); | |
} | |
@Test | |
public void testCartesianProductRxJavaN3() { | |
Observable<String> i1 = Observable.from(asList("a1", "b1")); | |
Observable<String> i2 = Observable.from(asList("a2", "b2")); | |
Observable<String> i3 = Observable.from(asList("a3", "b3")); | |
Observable<Observable<String>> cartesian = ObservablesCartesianProduct.cartesianProduct(Observable.from(asList(i1, i2, i3))); | |
Iterable<Iterable<String>> cartesianIt = toIterable(cartesian); | |
assertEquals(8, Iterables.size(cartesianIt)); | |
} | |
public static Iterable<Iterable<String>> toIterable(Observable<Observable<String>> cartesian) { | |
return cartesian.map(new Func1<Observable<String>, Iterable<String>>() { | |
@Override | |
public Iterable<String> call(Observable<String> o) { | |
return o.toBlocking().toIterable(); | |
} | |
}).toBlocking().toIterable(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
I was playing with this and translated it to kotlin: