cold / hot observable
~ cold: ๋ฐ์ดํฐ ๋ค์ด์ค๋ฉด ๋ฐ๋ก ์ฒ๋ฆฌ ~ ~ hot: ๋ฐ์ดํฐ๊ฐ ๋ค์ด์ค๋ฉด ๋ฐ์ดํฐ๋ฅผ ํ๋ํ ์ํ๋ก ๋ญ๊ฐ ๋ค๋ฅธ ์ฒ๋ฆฌ๋ฅผ ํ ์ ์๋๋ก ํด์ค ~
cold: ์ผ๋ฐ์ ์ธ observable. ๊ตฌ๋ ์ ํ ๋๋ง๋ค ์๋ก์ด ์คํธ๋ฆผ ์์ฑ. ๊ทธ๋์ ๋ถ์ด์๋ ๋ชจ๋ ์คํผ๋ ์ด์ ์ด ๊ฐ ์คํธ๋ฆผ๋ง๋ค ์คํ์ด ๋จ-> ํผํฌ๋จผ์ค ์ ํ
hot: ConnectableObservable. publish() ๋ก ๋ง๋ค ์ ์์. ํ๋์ ์คํธ๋ฆผ. subscriber๊ฐ ์๋ ์๋ ์์ดํ
emit. ๋จ connect()๊ฐ ํธ์ถ๋์ด์ผ ํจ. ๋งค๋ฒ ์ปค๋ฅํธ๊ฐ ๊ท์ฐฎ์๋ refCount()-> subscriber ๋ ํผ๋ฐ์ค ๊ฐฏ์๋ฅผ ๋ด์ ์๋์ผ๋ก ํด์ค -> ํ์ง๋ง ์ด๋๋ Observable์ด ์๋ PublishSuject ๋ก ํด์ผ ์ํ๋ ๊ฒฐ๊ณผ(multicast)๋ฅผ ์ป์ ์ ์์
subject ์์ onError ๋ onComplete ๋ ๊ฐ์์ item์ด emit ๋์ง ์์ -> ์ด๋ RxRelay์ PublishRelay? ๋ฅผ ์ฐ๋ฉด ๋๋คํจ http://futurice.com/blog/top-7-tips-for-rxjava-on-android
unsubscribe onComplete unsubscribe๋ ์๋ธ์ ํธ๊ฐ ๋์ด์ ์์ดํ ์ emit ํ์ง ์์....????
CompositeSubscription ์๋ธ์คํฌ๋ฆฝ์ ๊ทธ๋ฃน ๊ด๋ฆฌ -> ํ๋ฒ์ unsubscribe ํ ์ ์์!! ์๋์์ ๋ผ์ดํ์ฌ์ดํด์ ๊ด๋ จํด์ ๋ฉ๋ชจ๋ฆฌ๋ฆญ๋ฑ ๋ฌธ์ ๋ฅผ ํผํ๊ธฐ ์ํด์ ๋ง์ด ์ฌ์ฉ๋๋๋ฏ!
compose : ๋ญ๊ฐ ๊ณตํต์ ์ธ ์คํผ๋ ์ด์
์ ๋ฌถ์ด์ค ์ ์์
observable
.compose(getTransformer())
.subscribe(str -> {
adapter2.addItem(str);
});
}
@NonNull
private <T> Observable.Transformer<T, T> getTransformer() {
return observable -> observable
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
}hot observable : ํ๋์ ์คํธ๋ฆผ์ ๊ณต์
cold : ์๋ธ์คํฌ๋ฆฝ์ ํ ๋๋ง๋ค ์ ์คํธ๋ฆผ ์์ฑ, ๊ทธ์ ์ emit๋ ์์ดํ ๋ค ๋ค์ ๋ฐ์ -> ๋ชจ๋ ์คํผ๋ ์ด์ ์ด ๋ค์ ์คํ๋จ
publish: Hot observable ์์ฑ, connectํด์ผ ํจrefCount: subscribe๋ฅผ ํ๋ฉด ์๋์ผ๋ก connectํด์คreplay: emit๋ ์์ดํ ๋ค์ emit(์ด๋ ์ด๋ฏธ ์คํผ๋ ์ด์ ์ ํต๊ณผํ ๋ ์๋ค์ ๋ฐ์)rxReplayshare:connect๋๊ธฐ ์ ์ emit๋ ์์ดํ emitํด์ค
final Count count = new Count();
ConnectableObservable<String> observable = Observable
.range(0, 10)
.timestamp()
.map(timestamped -> String.format("[%d] %d", timestamped.getValue(), timestamped.getTimestampMillis()))
.doOnNext(value -> count.increase())
.publish();
observable.subscribe(value -> {
System.out.println("subscriber1 : " + value);
});
observable.subscribe(value -> {
System.out.println("subscriber2 : " + value);
});
observable.connect();
System.out.println("์ฐ์ฐํ์ : " + count.count());hot ์ ์ปค๋ฅํฐ๋ธ ์ต์ ๋ฒ๋ธ์ ์ฌ์ฉํจ, ์ด๋ ์ปค๋ฅํธ๋ฅผ ํ์ง ์์ผ๋ฉด ์์ดํ ์ด emit ๋์ง ์์
์ต์ ๋ฒ๋ธ ์ ํํ๊ธฐ ํ๋์ ์คํธ๋ฆผ์ ํ๋์ ์๋ธ์คํฌ๋ผ์ด๋ฒ -> ๊ทธ๋ฅ Observable | ์ด์ ์ emit๋ ์์ดํ ์ด ํ์ ์์ ๊ฒฝ์ฐ -> Observable.publish().refCount() | ๋ชจ๋ emit๋ ์์ดํ ์ด ํ์ํ ๊ฒฝ์ฐ -> Observable.replay().refCount() | ๊ฐ์ฅ ์ต๊ทผ์ emit๋ ์์ดํ ์ด ํ์ํ ๊ฒฝ์ฐ -> Observable.compose(ReplayingShare.instance())
RxJava Test
ํ
์คํธ์ฉ ์๋ธ์คํฌ๋ผ์ด๋ฒ๋ฅผ ์ฌ์ฉํ๋ฉด ๋จ
TestSubscriber.create();
๋ก ์ต์ ๋ฒ๋ธ์ ์๋ธ์คํฌ๋ผ์ด๋ธ ํ๊ณ getOnNextEvents()๋ฉ์๋๋ก ์์ดํ
์ ๊ฐ์ ธ๋ค๊ฐ assert ํ๋ฉด ๋๋๋ฏ.
๋ธ๋กํน์ผ๋ก ๊ฐ์ ธ์์ ํ ์๋ ์์ง๋ง ๋ณ๋ก ์ข์ง ์์? -> ์ฐ๋ ๋ ํ์ปค? ํ๋ก์๋ฅผ ์จ์ ํ๋ฉด ํ ์ ์์
public class TestSchedulerProxy {
private static final TestScheduler SCHEDULER = new TestScheduler();
private static final TestSchedulerProxy INSTANCE = new TestSchedulerProxy();
static {
try {
RxJavaPlugins.getInstance().registerSchedulersHook(new RxJavaSchedulersHook() {
@Override
public Scheduler getIOScheduler() {
return SCHEDULER;
}
@Override
public Scheduler getComputationScheduler() {
return SCHEDULER;
}
@Override
public Scheduler getNewThreadScheduler() {
return SCHEDULER;
}
});
RxAndroidPlugins.getInstance().registerSchedulersHook(new RxAndroidSchedulersHook() {
@Override
public Scheduler getMainThreadScheduler() {
return SCHEDULER;
}
});
} catch (IllegalStateException e) {
throw new IllegalStateException("Schedulers class already initialized. " +
"Ensure you always use the TestSchedulerProxy in unit tests.");
}
}
public static TestSchedulerProxy get() {
return INSTANCE;
}
public void advanceBy(long delayTime, TimeUnit unit) {
SCHEDULER.advanceTimeBy(delayTime, unit);
}
}์ด๋ฐ์์ผ๋ก ํ์ปค๋ฅผ ๋ง๋ค๊ณ , ํ ์คํธ ์ผ์ด์ค์์์ ์ค์ผ์ค๋ฌ์ ๋ํด ์๋ ๋ฉ์๋๋ฅผ ํธ์ถํ๋ฉด ์๊ฐ์ ์๋น์ธ๋ค????๊ณ ํด์ผํ๋...?????
proxy.advanceBy(1, TimeUnit.SECONDS);
creating observable
๋๋๋ก Observable.create ๋ ์ฐ์ง ๋ง๋๋ก-> OnSubscribe imple ์์์ onNext, onError, onComplete ๋ฅผ ๋งค๋ด์ผํ๊ฒ ํธ์ถํด์ค์ผ ํ๋๋ฐ ์ค์ํ๋ฉด ๋ฉ๋ชจ๋ฆฌ๋ฆญ ๊ฐ๋ฅ์ฑ
๊ทธ๋์ from ์ ์ฐ๊ฑฐ๋ defer ๊ฐ์๊ฑธ ์ฐ๋๊ฒ ์ข์๋ฏ?
Observable.empty() ์์ฃผ ์ฌ์ฉํ์๋๋ฏ
Observable.error(new NullPointerException("error")); ๋ ์์ฃผ ์ฌ์ฉ -> validation ์ฒดํฌํ ๋
Observable.timer(100, TimeUnit.MILLISECONDS, testScheduler); ๋ ์์ฃผ ์ฌ์ฉ -> unsubscribe ์ ํด์ค์ผํจ
ํ
์คํธํ ๋ subscriber = new TestSubscriber(); ๋ฅผ ์ฌ์ฉํ๋ฉด ์ข์
subscriber.assertValue("name"); ์ด๋ ๊ฒ assert ํ ์ ๋ ์์
subscribeOn
ํ๋์ ์คํธ๋ฆผ์ ๋ํด์ ํ๋ฒ๋ง ํธ์ถ ๊ฐ๋ฅ(์ฌ๋ฌ๋ฒ ํธ์ถํด๋ ์ต์ด์ ํธ์ถํ ์ค์ผ์ฅด๋ฌ๋ง ์ ์ฉ๋จ).
์์ดํ
์ด emit๋๋ ์ฐ๋ ๋๋ฅผ ์ค์ .
observeOn
ํ๋์ ์คํธ๋ฆผ์ ๋ํด ์ฌ๋ฌ๋ฒ ํธ์ถ ๊ฐ๋ฅ. ํธ์ถํ ๋ค๋ก ์ฒด์ด๋๋ ์คํผ๋ ์ด์
์ ์ฐ๋ ๋๋ฅผ ์ค์ (์ฐธ๊ณ ๋ก ์คํผ๋ ์ด์
์ด ์ค์ผ์ฅด๋ฌ๋ฅผ ๋ฐ๊พธ๊ธฐ๋ํจ).
์ subscribeOn์ subscribeํ ๋ ํธ์ถ ํ๋๊ฐ? -> subscribeOn์ ์์ดํ
์ emit๊ณผ ๊ด๋ จ๋ ์ค์ผ์ค๋ฌ ์ค์ ์ธ๋ฐ ๊ทธ๋ ๋ค๋ฉด ์คํธ๋ฆผ ์์ฑ ํ ๋ฐ๋ก ํธ์ถํด์ค์ผ ๋ง๋๊ฑฐ ๊ฐ์ -> ํ์ง๋ง ์ด๋ ๊ฒ ๋๋ฉด subscriber๊ฐ ์ด๋ฏธ ์ค์ ๋ ์ค์ผ์ค๋ฌ๋ฅผ ๋ฐ๊ฟ ์ ์์. ๊ทธ๋์ ์ต๋ํ ๋ค๋ก ๋ฏธ๋ค์ ํ๋๋ฏ???
์ผ๋ฐ์ ์ธ observable(cold) ์ผ ๊ฒฝ์ฐ subscribeOn, subscribe ํ ๋๋ง๋ค ์๋ก์ด ์คํธ๋ฆผ์ด ์์ฑ๋๊ธฐ ๋๋ฌธ์ ๊ฐ๊ธฐ ๋ค๋ฅธ ์ค์ผ์ฅด๋ฌ๋ฅผ ์ค ์ ์์
flatMap
ํ๋์ ์์ดํ
์ ๋ฐ์์ ์ฌ๋ฌ๊ฐ์ ์์ดํ
์ผ๋ก ์ฃผ๊ณ ์ถ์ ๋. ํ์ง๋ง ์์๊ฐ ๋ณด์ฅ๋์ง ์์.
concatMap
flatMap์ด๋ ๊ฐ์๋ฐ ์์๊ฐ ๋ณด์ฅ๋จ.
operators : http://rxmarbles.com/
4๊ฐ์ง ์๋ธ์ ํธ ํ์
- AsyncSubject
- BehaviorSubject
- PublishSubject
- ReplaySubject
A Subject is a sort of bridge or proxy that acts both as an Subscriber and as an Observable. Because it is a Subscriber, it can subscribe to one or more Observables, and because it is an Observable, it can pass through the items it observes by reemitting them, and it can also emit new items.
http://reactivex.io/documentation/subject.html Subject = Observable + Subscriber Observable, Subscriber ์ฐ๊ฒฐํ๋ ๋ ์??
- AsyncSubject
onComplete ๋ฐ๋ก ์ด์ ์(๋ง์ง๋ง) item emit
- BehaviorSubject
it begins by emitting the item most recently emitted by the source Observable (or a seed/default value if none has yet been emitted) and then continues to emit any other items emitted later by the source Observable(s).
subscribe ์์ ์ดํ ๊ฐ์ฅ ์ต๊ทผ์ item + ๋๋จธ์ง item
๊ธฐ๋ณธ๊ฐ์ ์ ๋ฌํด์ฃผ๋ ์ฉ๋๋ ๊ฐ๋ฅ
- PublishSubject
๊ธฐ๋ณธ์ ์ธ subject
subscribe ์ดํ์ emit ๋๋ ๋ชจ๋ item ๋ฐ์
- ReplaySubject
emit ๋ ๋ชจ๋ item ์ ๋ฒํผ์ ์ ์ฅํ๊ณ subscribe ํ๋ ๋ชจ๋ Observer ์๊ฒ ์ ๋ฌ (์์ ๋ณด์ฅ)
ReplaySubject.create(int capacity) : unbound. ๋ฏธ๋ฆฌ ๋ฒํผ(๋ฐฐ์ด) ํ ๋น ํด์ฃผ๋ ์ฉ(๋ฐฐ์ด ์นดํผ ์ค๋ฒํค๋ ๋ฐฉ์ง) ๊ธฐ๋ณธ๊ฐ 32k
ReplaySubject.createWithSize(int size) : bound๋จ. ์ต๋ ๋ฒํผ ํฌ๊ธฐ๋ฅผ ์ง์
Observable ์ ์์ฑ ์์ ์ ๋ฐ์ดํฐ๋ฅผ ๋ฃ์ด์ค์ผ ํจ. ๊ทผ๋ฐ Subject๋ ์์ฑ ํ์ ๋ฃ์ด์ค
When you use a Subject as a Subscriber, take care not to call its onNext(โฏ) method (or its other on methods) from multiple threads, as this could lead to non-serialized calls, which violates the Observable contract and creates an ambiguity in the resulting Subject.
To protect a Subject from this danger, you can convert it into a SerializedSubject with code like the following:
mySafeSubject = new SerializedSubject( myUnsafeSubject );CompositeSubscription
์๋ธ์คํฌ๋ฆฝ์
์ ํ๊ณณ์ ๋ชจ์์ ๊ด๋ฆฌํ ๋ ์ฌ์ฉ -> ๋ณดํต ์๋ ๋ผ์ดํ์ธ์ดํด์ด๋ ์ฎ์ด์ ์
-> RxLifeCycle์ ์ด์ฉํด์ compose๋ฅผ ํ๋ฉด๋จ
RxUserBus.sub().compose(bindToLifecycle()).subscribe((String s) -> {
Toast.makeText(this, s, Toast.LENGTH_SHORT).show();
});์๋์ผ๋ก onPause, onResume ๋ ๋ ์ฒ๋ฆฌํด์ค
WeakReference๋ ์ฎ์ด์ ์ฌ์ฉํด๋ณด๋๊ฑด ๋ณ๋ก -> null๋ก ๋์์??
static {
String TAG = "Rx-Debug Example";
RxJavaPlugins.getInstance().registerObservableExecutionHook(new DebugHook(new DebugNotificationListener() {
public Object onNext(DebugNotification n) {
Log.v(TAG, "onNext on " + n);
return super.onNext(n);
}
public Object start(DebugNotification n) {
Log.v(TAG, "start on " + n);
return super.start(n);
}
public void complete(Object context) {
Log.v(TAG, "complete on " + context);
}
public void error(Object context, Throwable e) {
Log.e(TAG, "error on " + context);
}
}));
}registerObservableExecutionHook ๋ก ํ
์ ๊ฑธ์ด์ค ์ ์์.
์ด๊ฒ๋ณด๋ค frodo๋ฅผ ์ฌ์ฉํ๋ฉด ์ข ๋ ์ฝ๊ฒ annotation processor ๋ฅผ ์ด์ฉํด์ ์ฝ๊ฒ ์ฌ์ฉ ๊ฐ๋ฅํจ
public interface GitHubService {
@RxLogObservable(RxLogObservable.Scope.EVERYTHING)
@GET("/search/users?")
Observable<SearchResult> searchUsers(@Query("q") String query);
}์ ๋ ๊ฒ ์ด๋ ธํ ์ด์ ๋ถ์ด๋ฉด ๋ก๊น ์ ํ ์ ์์....
attach debugger to android process
Break point๋ฅผ ์ฌ์ฉํด์ ๋ก๊ทธ ์ฐ๊ธฐ
cmd + F8: break point ์ฐ๊ธฐ
cmd + shift + F8: break point ์ต์
๋ค์ด์ผ๋ก๊ทธ ๋์ฐ๊ธฐ
suspend uncheck ํ๊ณ
log evaluated expression ์ ์ฐ๊ณ ์ ํ๋ ํํ์ ์์ฑ
MVCC: multiversion concurrency control
Realm ์์๋ ๋ชจ๋ ์ธ์คํด์ค๊ฐ ThreadLocal
copyFromRealm: ๋ ์ managed ๊ฐ์ฒด๋ฅผ stand alone?(unmanaged) ๊ฐ์ฒด๋ก ๋ง๋ค์ด ์ค
retrofit์ RxJavaCallAdapterFactory
realm์ RealmObservableFactory
Observable.create() ํตํด์ ์์ฑํ ๋ ๋ฐ๋์ isUnsubscribed() ์ฒดํฌ๋ฅผ ํด์ผํ๊ณ , onNext, onError, onComplete๋ฅผ ์ ์จ์ผํจ
ex) ์ด๋ฏธ
unsubscribe๋ฌ๋๋ฐonNext๋ก ์์ดํ ์ emit ํ๋ ๊ฒฝ์ฐ ์ฒดํฌํด์ผํ๊ณ ,onErrorํธ์ถ์onComplete๋ ํธ์ถ๋์ง ์์์ผ ํจ, ๋ฑ๋ฑ....
@Override
public Observable<DynamicRealm> from(DynamicRealm realm) {
final RealmConfiguration realmConfig = realm.getConfiguration();
return Observable.create(new Observable.OnSubscribe<DynamicRealm>() {
@Override
public void call(final Subscriber<? super DynamicRealm> subscriber) {
// Get instance to make sure that the Realm is open for as long as the
// Observable is subscribed to it.
final DynamicRealm observableRealm = DynamicRealm.getInstance(realmConfig);
final RealmChangeListener<DynamicRealm> listener = new RealmChangeListener<DynamicRealm>() {
@Override
public void onChange(DynamicRealm realm) {
if (!subscriber.isUnsubscribed()) {
subscriber.onNext(observableRealm);
}
}
};
observableRealm.addChangeListener(listener);
subscriber.add(Subscriptions.create(new Action0() {
@Override
public void call() {
observableRealm.removeChangeListener(listener);
observableRealm.close();
}
}));
// Immediately call onNext with the current value, as due to Realm's auto-update, it will be the latest
// value.
subscriber.onNext(observableRealm);
}
});
}subscriber.add(Subscriptions.create(new Action0() ์ด ๋ถ๋ถ์ด ๊ผญ ํ์ํจ??? -> ??
https://youtu.be/hHnTIMjd1Y8?t=15m14s
RxJava in Action
RxAndroid, RxBinding, ReRelay
RxBinding: ui ์์ ฏ์ ๋ฒํผ ํด๋ฆญ, ์คํฌ๋กค ์ด๋ฒคํธ๋ฅผ ๋ชจ๋ Observable๋ก ์ ๊ณต
RxView.Click(view)
๊ฑฐ์ ๋ชจ๋ ๊ณณ์์ ์ฌ์ฉ ใทใท
interval()
withLatestFrom(observable): ์ด๋ฒคํธ ๋ฐ์์ ๋ค๋ฅธ Observable์ ๋ง์ง๋ง ์์ดํ
๊ณผ ํฉ์ณ์ ์ฒ๋ฆฌ?
distinctUntilChanged(): ์ด๋ฒคํธ๊ฐ ๋ค๋ฅผ ๊ฒฝ์ฐ์๋ง ์ฒ๋ฆฌ
startWith(sth): ๊ตฌ๋
์ ์ฒซ ์์ดํ
์ผ๋ก ์์ ํ ์ ์๋๋ก ํด์ค
onBackpressureDrop(): ๋ณดํต์ UI ์ฐ๋ ๋์์ ๋ฒํผ๋ฅผ ์ฐ๋๋ฐ, ๋๋์ ์ด๋ฒคํธ ์ฒ๋ฆฌ๊ฐ ์ง์ฐ๋ ๋ ์ ์ฉ, timer๋ก ๋ญ๊ฐ ์ฒ๋ฆฌํ ๋ ์ฒ๋ฆฌ๊ฐ ์ง์ฐ๋๋ฉด ์๋ ๋, ๊ตณ์ด ๋ชจ๋ ์์ดํ
์ ๋ฐ์ ํ์๊ฐ ์์ ๋
๋กค๋ง ๋ฐฐ๋๋ฅผ ๋ง๋ ๋ค. 1์ด๋ง๋ค ํ์ด์ง ๊ทธ๋ฐ๋ฐ ์์ผ๋ก ๋๋๊ทธํ ๋ ๋ฌด์ํด์ผํจ.
val timer = Observable.interval(1000, TimeUnit.MILLISECONDS)
val dragging = RxViewPager.pageScrollStateChanges(vp_activity_view_pager)
.map { ViewPager.SCROLL_STATE_DRAGGING == it }
.distinctUntilChanged()
.startWith(false)
subscription.add(timer.withLatestFrom(dragging, { timer, dragging -> dragging })
.filter { !it } // ๋๋๊ทธ ์ค์ด ์๋๋๋ง
.retry()
.onBackpressureDrop()
.observeOn(AndroidSchedulers.mainThread())
.subscribe {
with(vp_activity_view_pager) {
val currentIdx = currentItem
currentItem = if (currentIdx == adapter.count - 1) 0 else currentItem + 1
}
})onError, onComplete ๋ฐ์์์๋ ์คํธ๋ฆผ์ด ์ฃฝ์ง ์์..
PublisRelay.create() ๋ก ์์ฑ
์ด๋ค ์กฐ๊ฑด์ด ๋์์ ๋ ๋ทฐ๋ฅผ ์ ๊น ๋ณด์ฌ์ฃผ๋ ์ฉ๋์์...์ด๋ ๋ฐ๋์ backpressure์จ์ค์ผ
private val visibilityPublishRelay by lazy { PublishRelay.create<Int>() }
with(subscription) {
add(visibilityPublishRelay
.retry()
.distinctUntilChanged()
.onBackpressureBuffer()
.observeOn(AndroidSchedulers.mainThread())
.subscribe {
changeVisibility(it)
})
add(RxView.clicks(btn_activity_visibility_event)
.concatMap {
Observable.concat(
Observable.just(View.VISIBLE),
Observable.just(View.GONE).delay(3, TimeUnit.SECONDS))
}
.subscribe(visibilityPublishRelay))
}
private fun changeVisibility(toVisibility: Int) {
with(tv_activity_visibility_event) {
if (toVisibility == visibility) {
return
}
visibility = toVisibility
}
}private val btnViewPager by lazy {
findViewById(R.id.btn_activity_main_view_pager) as Button
}
private val btnViewPager by lazy {
(findViewById(R.id.btn_activity_main_view_pager) as Button).apply {
textSize = 15f
}
}android extentions ์ ์ฐ๋ฉด,
private val btnViewPager by lazy {
btn_activity_main_view_pager // ๋ฐ๋ก xml ์ด๋ฆ์
}
btnActivityMainViewPager.text = โโ
btnActivityMainViewPager.setOnClockListener {
~~~
}class OperatorMapSubscriptionItemListing : Observable.Operator<Listing, SubscriptionItem> {
override fun call(o: Subscriber<in Listing>): Subscriber<in SubscriptionItem> {
return object: Subscriber<SubscriptionItem>() {
override fun onError(e: Throwable?) {
o.onError(e)
}
override fun onCompleted() {
o.onCompleted()
}
override fun onNext(t: SubscriptionItem?) {
t?.let {
o.onNext(Listing(it.title, it.channelName,
"gdg://korea.android/listing/${it.itemId}?channelName=${it.channelName}"))
}
}
}
}
}
observable.lift(OperatorMapSubscriptionItemListing)The following example shows how you can use the lift(โฏ) operator to chain your custom operator (in this example: myOperator) alongside standard RxJava operators like ofType and map:
fooObservable = barObservable.ofType(Integer).map({it*2}).lift(new
myOperator<T>()).map({"transformed by myOperator: " + it});The following section shows how you form the scaffolding of your operator so that it will work correctly with lift(โฏ).
Subject<T, R>
Observable์ด์ Observer
toSerialized๋ก ์ฌ๋ฌ ์ฐ๋ ๋์ ์ ๊ทผํด๋ ๊ด์ฐฎ
RxJavaHooks.onObservableStart(this, onSubscribe).call(subscriber);: onObservableStart๋ฅผ ํธ์ถํ๊ณ OnSubscribe๊ฐ์ฒด ๋ฆฌํด
return RxJavahooks.onObservableReturn(subscriber): Subscription ๊ฐ์ฒด ๋ฆฌํด
java
Completable.create(new Completable.OnSubscribe() {
@Override
public void call(CompletableSubscriber completableSubscriber) {
}
}).observeOn(AndroidSchedulers.mainThread()).subscribe();
GDG Korea Slack
half-but [5:16 PM] ์๋ ํ์ธ์ ํน์ observable์ defer์ฐ์ฐ์์ ๋ํ ์ค๋ช ํด์ฃผ์ค์ ์์ผ์ ๊ฐ์? ์ ์ดํด๊ฐ ๊ฐ์ง ์์์์โฆ ๋์์ฃผ์ธ์
seongug.jung (์ ์น์ฑ) [6:51 PM] subscribe ๋๊ธฐ์ ๊น์ง ๋๊ธฐํ๋ ํจ์์์ ์ด๋ฅผํ ๋ฉด
api.getData1()
.concatWith(api.getdata2())
...
์์ฒ๋ผ ๊ตฌ์ฑ์ ํ๋ฉด
getData1 ๊ณผ getData2 ๊ฐ ๊ฑฐ์ ๋์์ ํธ์ถ์ด ๋์
๊ทธ ๊ณผ์ ์์ getData1 ์์ ์๋ฌ๊ฐ ๋๋ฉด
getData2 ๋ undelivered ์๋ฌ๊ฐ ๋ฐ์ํ๊ฒ ๋์
(edited)
์ด๋ฐ ๊ฒฝ์ฐ๋ฅผ ๋ฐฉ์งํ ์ ์๋ ๋ฐฉ๋ฒ์ผ๋ก defer ์ธ๋ฐ
concatWith ๋ mergeWith ๊ฐ์ ๊ฒฝ์ฐ์ ์ฐ๋ฉด์ค๋ํ์ ์ฌ์๋๋ฐ ์ ๋จน์ง๋ฅผ ๋ชปํ๋
api.getData1()
.concatWith(Single.defer { api.getData2() })
...
์ด์ฒ๋ผ ์ฐ๋ฉด getData1 ์ด ์ฒ๋ฆฌ๊ฐ ๋๋๊ณ ์ค์ concatWith ๊ฐ subscribe ๊ฐ ๋๋ ์์ ์ ๋์์ ์์ํด์ ์ด๋ฐ ๊ฒฝ์ฐ์๋ ์ธ ์ ์์ด์.
getLocal().concatWith(getApi())
์ด๋ ๊ฒ ์ฐ๋ฉด ๋ก์ปฌ ๋ณด๋ด๊ณ ์ต์ ์ ๋ณด๋ฉด api ๋ฅผ ์ ์ด๋ ๋๋ ์ํฉ์์ ๋ฌธ์ ๊ฐ ๋ ์ ์์ฃ . ๊ทธ๋ด๋ defer ๋ฅผ ์ฐ๋ฉด ๋ก์ปฌ ๋ณด๋ด๊ณ ๊ฐฑ์ ์๋๋ ๋จ์ผ๋ก ์ฒ๋ฆฌํ๋ฉด api ๋ฅผ ์ ์๋ ์ฒ๋ฆฌ๋ฅผ ํ ์ ์์ด์ ๋ ๋๋ฌด ๋ฆ๊ฒ ๋๋ตํ ๋๋์ด๋ค์
yjh5424 [7:53 PM] joined #reactivex along with geuntaek.
gaemi [11:19 AM]
์ต๊ทผ์ .defer() ๋ฅผ Hystrix ์ ์ฉํ๋ฉด์ ์ ์ฉํ๊ฒ ์ฌ์ฉํ์ด์. ใ
ใ
์ ์๊ฐ์ .compose() ์ ์กฐํฉํ ์ ์๋ ์ต๊ณ ์ ํจ์๊ฐ ์๋๊น ์๊ฐ๋๋ค์.
gaemi [11:23 AM] ์๋ฅผ๋ ๋ค๋ฉด ์บ์๋ฅผ ์ฌ์ฉํ๊ธฐ ์ํ์ฌ Single.Transformer ๋ฅผ ์ด๋ฐํํ๋ก ๊ตฌ์ฑํ์ด์.
public Single<T> call(Single<T> source) {
return Single.defer(() -> {
T cachedObj = findCache();
if (cachedObj != null) {
return Single.just(cachedObj);
}
return source.doOnSuccess(v -> {
Completable.fromAction(() -> putCache(v))
.subscribeOn(cacheScheduler)
.subscribe(Actions.empty(), Actions.empty());
});
});
}