Skip to content

Instantly share code, notes, and snippets.

@kevroletin
Last active August 29, 2015 14:10
Show Gist options
  • Save kevroletin/d000229feb312134576a to your computer and use it in GitHub Desktop.
Save kevroletin/d000229feb312134576a to your computer and use it in GitHub Desktop.
package rxtest;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import rx.Observable;
import rx.subjects.PublishSubject;
public class RxTest {
/**
* MapOnce - same as build-in Observable's map method but computes only
* once for each subscribed chain of Observables.
*/
private static class MapOnce<T, R> {
private final PublishSubject<R> subject = PublishSubject.create();
private MapOnce(Observable<T> source, Function<T, R> func) {
source.subscribe((T data) -> {
R result = func.apply(data);
subject.onNext(result);
});
}
static public <T,R> Observable<R> apply(Observable<T> source, Function<T, R> func) {
return (new MapOnce(source, func)).subject.asObservable();
}
}
static Long counter = 0L;
public static void mapTest() throws InterruptedException {
Observable<Long> source = Observable.interval(500, TimeUnit.MILLISECONDS).delay(1, TimeUnit.SECONDS).take(10);
Observable<Long> withSideEffect = source.map(i -> {
System.out.println("Side effect - " + counter.toString());
counter = counter + 1;
return counter;
});
Observable<Long> a = withSideEffect.map(i -> i);
Observable<Long> b = withSideEffect.map(i -> i);
a.subscribe((i) -> System.out.println("a - " + i.toString()));
b.subscribe((i) -> System.out.println("b - " + i.toString()));
source.toBlocking().last();
Thread.sleep(1000);
}
static Long counter2 = 0L;
public static void mapOnceTest() throws InterruptedException {
Observable<Long> source = Observable.interval(500, TimeUnit.MILLISECONDS).delay(1, TimeUnit.SECONDS).take(10);
Observable<Long> withSideEffect = MapOnce.apply(source, i -> {
System.out.println("Side effect - " + counter2.toString());
counter2 = counter2 + 1;
return counter2;
});
Observable<Long> a = withSideEffect.map(i -> i);
Observable<Long> b = withSideEffect.map(i -> i);
a.subscribe((i) -> System.out.println("a - " + i.toString()));
b.subscribe((i) -> System.out.println("b - " + i.toString()));
source.toBlocking().last();
Thread.sleep(1000);
}
public static void main(String[] args) throws InterruptedException {
mapTest();
System.out.println("\n===");
mapOnceTest();
}
}
@kevroletin
Copy link
Author

Console output is

Side effect - 0
a - 1
Side effect - 1
b - 2
Side effect - 2
a - 3
Side effect - 3
b - 4
Side effect - 4
a - 5
Side effect - 5
b - 6
Side effect - 6
a - 7
Side effect - 7
b - 8
Side effect - 8
a - 9
Side effect - 9
b - 10
Side effect - 10
a - 11
Side effect - 11
b - 12
Side effect - 12
a - 13
Side effect - 13
b - 14
Side effect - 14
a - 15
Side effect - 15
b - 16
Side effect - 16
a - 17
Side effect - 17
b - 18
Side effect - 18
a - 19
Side effect - 19
b - 20

Side effect - 0
a - 1
b - 1
Side effect - 1
a - 2
b - 2
Side effect - 2
a - 3
b - 3
Side effect - 3
a - 4
b - 4
Side effect - 4
a - 5
b - 5
Side effect - 5
a - 6
b - 6
Side effect - 6
a - 7
b - 7
Side effect - 7
a - 8
b - 8
Side effect - 8
a - 9
b - 9
Side effect - 9
a - 10
b - 10

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment