Skip to content

Instantly share code, notes, and snippets.

@bangarharshit
Created October 28, 2017 03:03
Show Gist options
  • Save bangarharshit/b3b3b55e339d6e6380def63d955289cf to your computer and use it in GitHub Desktop.
Save bangarharshit/b3b3b55e339d6e6380def63d955289cf to your computer and use it in GitHub Desktop.
public static void main(String[] args) throws InterruptedException {
Observable<String> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override public void subscribe(ObservableEmitter<Integer> e) throws Exception {
System.out.println("emit" + Thread.currentThread().getName());
e.onNext(12);
e.onNext(3232);
}
})
.compose(integerStringObservableTransformer)
.compose(ioComputationTransformer());
observable.subscribe(new Observer<String>() {
@Override public void onSubscribe(Disposable d) {
}
@Override public void onNext(String s) {
System.out.println("next" + Thread.currentThread().getName());
System.out.println("s = [" + s + "]");
}
@Override public void onError(Throwable e) {
System.out.println("e = [" + e + "]");
}
@Override public void onComplete() {
System.out.println("RxJavaTest.onComplete");
}
});
Single.fromCallable(new Callable<String>() {
@Override public String call() throws Exception {
Thread.sleep(10000);
return "sddsd";
}
})
.takeUntil(new Completable() {
@Override protected void subscribeActual(CompletableObserver s) {
s.onComplete();
}
})
.subscribe(new Consumer<String>() {
@Override public void accept(String s) throws Exception {
System.out.println(s);
}
});
while (true) {
Thread.sleep(500);
}
}
private static final ObservableTransformer<Integer, String> integerStringObservableTransformer = new ObservableTransformer<Integer, String>() {
@Override public ObservableSource<String> apply(Observable<Integer> upstream) {
return upstream.map(new Function<Integer, String>() {
@Override public String apply(Integer integer) throws Exception {
return String.valueOf(integer);
}
});
}
};
private static final <T> ObservableTransformer<T, T> ioComputationTransformer() {
return new ObservableTransformer<T, T>() {
@Override public ObservableSource<T> apply(Observable<T> upstream) {
return upstream.subscribeOn(Schedulers.computation()).observeOn(Schedulers.io()).replay(1).refCount();
}
};
}
private static final <T> ObservableTransformer<T, T> disposableTranformer(Observable<Event> eventObservable, Event outerEvent) {
return new ObservableTransformer<T, T>() {
@Override public ObservableSource<T> apply(Observable<T> upstream) {
return upstream.takeUntil(new ObservableSource<Object>() {
@Override public void subscribe(Observer<? super Object> observer) {
eventObservable.filter(new Predicate<Event>() {
@Override public boolean test(Event event) throws Exception {
return (event == outerEvent);
}
}).subscribe(new Consumer<Event>() {
@Override public void accept(Event event) throws Exception {
observer.onNext("sdsd");
}
});
}
});
}
};
}
private static BehaviorSubject<Event> observable = BehaviorSubject.create();
public void onCreate() {
observable.onNext(Event.CREATE);
}
public void ondestroy() {
observable.onNext(Event.DESTROY);
}
public enum Event {
CREATE,
DESTROY
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment