Created
October 28, 2017 03:03
-
-
Save bangarharshit/b3b3b55e339d6e6380def63d955289cf to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public static void main(String[] args) throws InterruptedException { | |
Observable<String> observable = Observable.create(new ObservableOnSubscribe<Integer>() { | |
@Override public void subscribe(ObservableEmitter<Integer> e) throws Exception { | |
System.out.println("emit" + Thread.currentThread().getName()); | |
e.onNext(12); | |
e.onNext(3232); | |
} | |
}) | |
.compose(integerStringObservableTransformer) | |
.compose(ioComputationTransformer()); | |
observable.subscribe(new Observer<String>() { | |
@Override public void onSubscribe(Disposable d) { | |
} | |
@Override public void onNext(String s) { | |
System.out.println("next" + Thread.currentThread().getName()); | |
System.out.println("s = [" + s + "]"); | |
} | |
@Override public void onError(Throwable e) { | |
System.out.println("e = [" + e + "]"); | |
} | |
@Override public void onComplete() { | |
System.out.println("RxJavaTest.onComplete"); | |
} | |
}); | |
Single.fromCallable(new Callable<String>() { | |
@Override public String call() throws Exception { | |
Thread.sleep(10000); | |
return "sddsd"; | |
} | |
}) | |
.takeUntil(new Completable() { | |
@Override protected void subscribeActual(CompletableObserver s) { | |
s.onComplete(); | |
} | |
}) | |
.subscribe(new Consumer<String>() { | |
@Override public void accept(String s) throws Exception { | |
System.out.println(s); | |
} | |
}); | |
while (true) { | |
Thread.sleep(500); | |
} | |
} | |
private static final ObservableTransformer<Integer, String> integerStringObservableTransformer = new ObservableTransformer<Integer, String>() { | |
@Override public ObservableSource<String> apply(Observable<Integer> upstream) { | |
return upstream.map(new Function<Integer, String>() { | |
@Override public String apply(Integer integer) throws Exception { | |
return String.valueOf(integer); | |
} | |
}); | |
} | |
}; | |
private static final <T> ObservableTransformer<T, T> ioComputationTransformer() { | |
return new ObservableTransformer<T, T>() { | |
@Override public ObservableSource<T> apply(Observable<T> upstream) { | |
return upstream.subscribeOn(Schedulers.computation()).observeOn(Schedulers.io()).replay(1).refCount(); | |
} | |
}; | |
} | |
private static final <T> ObservableTransformer<T, T> disposableTranformer(Observable<Event> eventObservable, Event outerEvent) { | |
return new ObservableTransformer<T, T>() { | |
@Override public ObservableSource<T> apply(Observable<T> upstream) { | |
return upstream.takeUntil(new ObservableSource<Object>() { | |
@Override public void subscribe(Observer<? super Object> observer) { | |
eventObservable.filter(new Predicate<Event>() { | |
@Override public boolean test(Event event) throws Exception { | |
return (event == outerEvent); | |
} | |
}).subscribe(new Consumer<Event>() { | |
@Override public void accept(Event event) throws Exception { | |
observer.onNext("sdsd"); | |
} | |
}); | |
} | |
}); | |
} | |
}; | |
} | |
private static BehaviorSubject<Event> observable = BehaviorSubject.create(); | |
public void onCreate() { | |
observable.onNext(Event.CREATE); | |
} | |
public void ondestroy() { | |
observable.onNext(Event.DESTROY); | |
} | |
public enum Event { | |
CREATE, | |
DESTROY | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment