Skip to content

Instantly share code, notes, and snippets.

@QuadFlask
Last active November 14, 2018 00:40
Show Gist options
  • Select an option

  • Save QuadFlask/145e80b4ac54d1541e2d38d9ce762a57 to your computer and use it in GitHub Desktop.

Select an option

Save QuadFlask/145e80b4ac54d1541e2d38d9ce762a57 to your computer and use it in GitHub Desktop.
RxJava Study

RxJava Study github repo

1์ผ์ฐจ

cold / hot observable

~ cold: ๋ฐ์ดํ„ฐ ๋“ค์–ด์˜ค๋ฉด ๋ฐ”๋กœ ์ฒ˜๋ฆฌ ~ ~ hot: ๋ฐ์ดํ„ฐ๊ฐ€ ๋“ค์–ด์˜ค๋ฉด ๋ฐ์ดํ„ฐ๋ฅผ ํ™€๋“œํ•œ ์ƒํƒœ๋กœ ๋ญ”๊ฐ€ ๋‹ค๋ฅธ ์ฒ˜๋ฆฌ๋ฅผ ํ•  ์ˆ˜ ์žˆ๋„๋ก ํ•ด์คŒ ~

cold: ์ผ๋ฐ˜์ ์ธ observable. ๊ตฌ๋…์„ ํ• ๋•Œ๋งˆ๋‹ค ์ƒˆ๋กœ์šด ์ŠคํŠธ๋ฆผ ์ƒ์„ฑ. ๊ทธ๋ž˜์„œ ๋ถ™์–ด์žˆ๋Š” ๋ชจ๋“  ์˜คํผ๋ ˆ์ด์…˜์ด ๊ฐ ์ŠคํŠธ๋ฆผ๋งˆ๋‹ค ์‹คํ–‰์ด ๋จ-> ํผํฌ๋จผ์Šค ์ €ํ•˜

hot: ConnectableObservable. publish() ๋กœ ๋งŒ๋“ค ์ˆ˜ ์žˆ์Œ. ํ•˜๋‚˜์˜ ์ŠคํŠธ๋ฆผ. subscriber๊ฐ€ ์žˆ๋“  ์—†๋“  ์•„์ดํ…œ emit. ๋‹จ connect()๊ฐ€ ํ˜ธ์ถœ๋˜์–ด์•ผ ํ•จ. ๋งค๋ฒˆ ์ปค๋„ฅํŠธ๊ฐ€ ๊ท€์ฐฎ์„๋• refCount()-> subscriber ๋ ˆํผ๋Ÿฐ์Šค ๊ฐฏ์ˆ˜๋ฅผ ๋ด์„œ ์ž๋™์œผ๋กœ ํ•ด์คŒ -> ํ•˜์ง€๋งŒ ์ด๋•Œ๋Š” Observable์ด ์•„๋‹Œ PublishSuject ๋กœ ํ•ด์•ผ ์›ํ•˜๋Š” ๊ฒฐ๊ณผ(multicast)๋ฅผ ์–ป์„ ์ˆ˜ ์žˆ์Œ

์ฐธ๊ณ : http://moka-a.github.io/android/rxAndroid_study/

subject ์—์„œ onError ๋Š” onComplete ๋ž‘ ๊ฐ™์•„์„œ item์ด emit ๋˜์ง€ ์•Š์Œ -> ์ด๋• RxRelay์˜ PublishRelay? ๋ฅผ ์“ฐ๋ฉด ๋œ๋‹คํ•จ http://futurice.com/blog/top-7-tips-for-rxjava-on-android

unsubscribe onComplete unsubscribe๋Š” ์„œ๋ธŒ์ ํŠธ๊ฐ€ ๋”์ด์ƒ ์•„์ดํ…œ์„ emit ํ•˜์ง€ ์•Š์Œ....????

CompositeSubscription ์„œ๋ธŒ์Šคํฌ๋ฆฝ์…˜ ๊ทธ๋ฃน ๊ด€๋ฆฌ -> ํ•œ๋ฒˆ์— unsubscribe ํ•  ์ˆ˜ ์žˆ์Œ!! ์•ˆ๋“œ์—์„œ ๋ผ์ดํ”„์‚ฌ์ดํด์— ๊ด€๋ จํ•ด์„œ ๋ฉ”๋ชจ๋ฆฌ๋ฆญ๋“ฑ ๋ฌธ์ œ๋ฅผ ํ”ผํ•˜๊ธฐ ์œ„ํ•ด์„œ ๋งŽ์ด ์‚ฌ์šฉ๋˜๋Š”๋“ฏ!

realignist๋‹˜ ์ •๋ฆฌ ์ž๋ฃŒ

d2 ์ž๋ฃŒ


2์ผ์ฐจ

compose : ๋ญ”๊ฐ€ ๊ณตํ†ต์ ์ธ ์˜คํผ๋ ˆ์ด์…˜์„ ๋ฌถ์–ด์ค„ ์ˆ˜ ์žˆ์Œ

observable
				.compose(getTransformer())
				.subscribe(str -> {
					adapter2.addItem(str);
				});
	}

	@NonNull
	private <T> Observable.Transformer<T, T> getTransformer() {
		return observable -> observable
				.subscribeOn(Schedulers.io())
				.observeOn(AndroidSchedulers.mainThread());
	}

hot observable : ํ•˜๋‚˜์˜ ์ŠคํŠธ๋ฆผ์„ ๊ณต์œ 

cold : ์„œ๋ธŒ์Šคํฌ๋ฆฝ์…˜ํ• ๋•Œ๋งˆ๋‹ค ์ƒˆ ์ŠคํŠธ๋ฆผ ์ƒ์„ฑ, ๊ทธ์ „์— emit๋œ ์•„์ดํ…œ๋“ค ๋‹ค์‹œ ๋ฐ›์Œ -> ๋ชจ๋“  ์˜คํผ๋ ˆ์ด์…˜์ด ๋‹ค์‹œ ์‹คํ–‰๋จ

  • publish : Hot observable ์ƒ์„ฑ, connectํ•ด์•ผ ํ•จ
  • refCount : subscribe๋ฅผ ํ•˜๋ฉด ์ž๋™์œผ๋กœ connectํ•ด์คŒ
  • replay : emit๋œ ์•„์ดํ…œ ๋‹ค์‹œ emit(์ด๋• ์ด๋ฏธ ์˜คํผ๋ ˆ์ด์…˜์„ ํ†ต๊ณผํ•œ ๋…€์„๋“ค์„ ๋ฐ›์Œ)
  • rxReplayshare : connect๋˜๊ธฐ ์ „์— emit๋œ ์•„์ดํ…œ emitํ•ด์คŒ
final Count count = new Count();

		ConnectableObservable<String> observable = Observable
			.range(0, 10)
			.timestamp()
			.map(timestamped -> String.format("[%d] %d", timestamped.getValue(), timestamped.getTimestampMillis()))
			.doOnNext(value -> count.increase())
			.publish();

		observable.subscribe(value -> {
			System.out.println("subscriber1 : " + value);
		});

		observable.subscribe(value -> {
			System.out.println("subscriber2 : " + value);
		});

		observable.connect();
		System.out.println("์—ฐ์‚ฐํšŸ์ˆ˜ : " + count.count());

hot ์€ ์ปค๋„ฅํ„ฐ๋ธ” ์˜ต์ €๋ฒ„๋ธ”์„ ์‚ฌ์šฉํ•จ, ์ด๋•Œ ์ปค๋„ฅํŠธ๋ฅผ ํ•˜์ง€ ์•Š์œผ๋ฉด ์•„์ดํ…œ์ด emit ๋˜์ง€ ์•Š์Œ

์˜ต์ €๋ฒ„๋ธ” ์„ ํƒํ•˜๊ธฐ ํ•˜๋‚˜์˜ ์ŠคํŠธ๋ฆผ์— ํ•˜๋‚˜์˜ ์„œ๋ธŒ์Šคํฌ๋ผ์ด๋ฒ„ -> ๊ทธ๋ƒฅ Observable | ์ด์ „์— emit๋œ ์•„์ดํ…œ์ด ํ•„์š” ์—†์„ ๊ฒฝ์šฐ -> Observable.publish().refCount() | ๋ชจ๋“  emit๋œ ์•„์ดํ…œ์ด ํ•„์š”ํ•  ๊ฒฝ์šฐ -> Observable.replay().refCount() | ๊ฐ€์žฅ ์ตœ๊ทผ์— emit๋œ ์•„์ดํ…œ์ด ํ•„์š”ํ•  ๊ฒฝ์šฐ -> Observable.compose(ReplayingShare.instance())

RxJava Test

ํ…Œ์ŠคํŠธ์šฉ ์„œ๋ธŒ์Šคํฌ๋ผ์ด๋ฒ„๋ฅผ ์‚ฌ์šฉํ•˜๋ฉด ๋จ TestSubscriber.create(); ๋กœ ์˜ต์ €๋ฒ„๋ธ”์„ ์„œ๋ธŒ์Šคํฌ๋ผ์ด๋ธŒ ํ•˜๊ณ  getOnNextEvents()๋ฉ”์†Œ๋“œ๋กœ ์•„์ดํ…œ์„ ๊ฐ€์ ธ๋‹ค๊ฐ€ assert ํ•˜๋ฉด ๋˜๋Š”๋“ฏ.

๋ธ”๋กœํ‚น์œผ๋กœ ๊ฐ€์ ธ์™€์„œ ํ• ์ˆ˜๋„ ์žˆ์ง€๋งŒ ๋ณ„๋กœ ์ข‹์ง„ ์•Š์Œ? -> ์“ฐ๋ ˆ๋“œ ํ›„์ปค? ํ”„๋ก์‹œ๋ฅผ ์จ์„œ ํ•˜๋ฉด ํ•  ์ˆ˜ ์žˆ์Œ

public class TestSchedulerProxy {

    private static final TestScheduler SCHEDULER = new TestScheduler();
    private static final TestSchedulerProxy INSTANCE = new TestSchedulerProxy();

    static {
        try {
            RxJavaPlugins.getInstance().registerSchedulersHook(new RxJavaSchedulersHook() {
                @Override
                public Scheduler getIOScheduler() {
                    return SCHEDULER;
                }

                @Override
                public Scheduler getComputationScheduler() {
                    return SCHEDULER;
                }

                @Override
                public Scheduler getNewThreadScheduler() {
                    return SCHEDULER;
                }
            });

            RxAndroidPlugins.getInstance().registerSchedulersHook(new RxAndroidSchedulersHook() {
                @Override
                public Scheduler getMainThreadScheduler() {
                    return SCHEDULER;
                }
            });
        } catch (IllegalStateException e) {
            throw new IllegalStateException("Schedulers class already initialized. " +
                    "Ensure you always use the TestSchedulerProxy in unit tests.");
        }
    }

    public static TestSchedulerProxy get() {
        return INSTANCE;
    }

    public void advanceBy(long delayTime, TimeUnit unit) {
        SCHEDULER.advanceTimeBy(delayTime, unit);
    }
}

์ด๋Ÿฐ์‹์œผ๋กœ ํ›„์ปค๋ฅผ ๋งŒ๋“ค๊ณ , ํ…Œ์ŠคํŠธ ์ผ€์ด์Šค์•ˆ์—์„œ ์Šค์ผ€์ค„๋Ÿฌ์— ๋Œ€ํ•ด ์•„๋ž˜ ๋ฉ”์†Œ๋“œ๋ฅผ ํ˜ธ์ถœํ•˜๋ฉด ์‹œ๊ฐ„์„ ์•ž๋‹น์ธ๋‹ค????๊ณ  ํ•ด์•ผํ•˜๋‚˜...?????

proxy.advanceBy(1, TimeUnit.SECONDS);

3์ผ์ฐจ

creating observable

๋˜๋„๋ก Observable.create ๋Š” ์“ฐ์ง€ ๋ง๋„๋ก-> OnSubscribe imple ์•ˆ์—์„œ onNext, onError, onComplete ๋ฅผ ๋งค๋‰ด์–ผํ•˜๊ฒŒ ํ˜ธ์ถœํ•ด์ค˜์•ผ ํ•˜๋Š”๋ฐ ์‹ค์ˆ˜ํ•˜๋ฉด ๋ฉ”๋ชจ๋ฆฌ๋ฆญ ๊ฐ€๋Šฅ์„ฑ

๊ทธ๋ž˜์„œ from ์„ ์“ฐ๊ฑฐ๋‚˜ defer ๊ฐ™์€๊ฑธ ์“ฐ๋Š”๊ฒŒ ์ข‹์„๋“ฏ?

Observable.empty() ์ž์ฃผ ์‚ฌ์šฉํ•˜์‹œ๋Š”๋“ฏ Observable.error(new NullPointerException("error")); ๋„ ์ž์ฃผ ์‚ฌ์šฉ -> validation ์ฒดํฌํ•  ๋•Œ Observable.timer(100, TimeUnit.MILLISECONDS, testScheduler); ๋„ ์ž์ฃผ ์‚ฌ์šฉ -> unsubscribe ์ž˜ ํ•ด์ค˜์•ผํ•จ

ํ…Œ์ŠคํŠธํ•  ๋–„ subscriber = new TestSubscriber(); ๋ฅผ ์‚ฌ์šฉํ•˜๋ฉด ์ข‹์Œ subscriber.assertValue("name"); ์ด๋ ‡๊ฒŒ assert ํ•  ์ˆ˜ ๋„ ์žˆ์Œ


subscribeOn ํ•˜๋‚˜์˜ ์ŠคํŠธ๋ฆผ์— ๋Œ€ํ•ด์„œ ํ•œ๋ฒˆ๋งŒ ํ˜ธ์ถœ ๊ฐ€๋Šฅ(์—ฌ๋Ÿฌ๋ฒˆ ํ˜ธ์ถœํ•ด๋„ ์ตœ์ดˆ์— ํ˜ธ์ถœํ•œ ์Šค์ผ€์ฅด๋Ÿฌ๋งŒ ์ ์šฉ๋จ). ์•„์ดํ…œ์ด emit๋˜๋Š” ์“ฐ๋ ˆ๋“œ๋ฅผ ์„ค์ •.

observeOn ํ•˜๋‚˜์˜ ์ŠคํŠธ๋ฆผ์— ๋Œ€ํ•ด ์—ฌ๋Ÿฌ๋ฒˆ ํ˜ธ์ถœ ๊ฐ€๋Šฅ. ํ˜ธ์ถœํ•œ ๋’ค๋กœ ์ฒด์ด๋‹๋œ ์˜คํผ๋ ˆ์ด์…˜์˜ ์“ฐ๋ ˆ๋“œ๋ฅผ ์„ค์ •(์ฐธ๊ณ ๋กœ ์˜คํผ๋ ˆ์ด์…˜์ด ์Šค์ผ€์ฅด๋Ÿฌ๋ฅผ ๋ฐ”๊พธ๊ธฐ๋„ํ•จ).

์™œ subscribeOn์€ subscribeํ• ๋•Œ ํ˜ธ์ถœ ํ•˜๋Š”๊ฐ€? -> subscribeOn์€ ์•„์ดํ…œ์˜ emit๊ณผ ๊ด€๋ จ๋œ ์Šค์ผ€์ค„๋Ÿฌ ์„ค์ •์ธ๋ฐ ๊ทธ๋ ‡๋‹ค๋ฉด ์ŠคํŠธ๋ฆผ ์ƒ์„ฑ ํ›„ ๋ฐ”๋กœ ํ˜ธ์ถœํ•ด์ค˜์•ผ ๋งž๋Š”๊ฑฐ ๊ฐ™์Œ -> ํ•˜์ง€๋งŒ ์ด๋ ‡๊ฒŒ ๋˜๋ฉด subscriber๊ฐ€ ์ด๋ฏธ ์„ค์ •๋œ ์Šค์ผ€์ค„๋Ÿฌ๋ฅผ ๋ฐ”๊ฟ€ ์ˆ˜ ์—†์Œ. ๊ทธ๋ž˜์„œ ์ตœ๋Œ€ํ•œ ๋’ค๋กœ ๋ฏธ๋ค„์„œ ํ•˜๋Š”๋“ฏ???

์ผ๋ฐ˜์ ์ธ observable(cold) ์ผ ๊ฒฝ์šฐ subscribeOn, subscribe ํ• ๋•Œ๋งˆ๋‹ค ์ƒˆ๋กœ์šด ์ŠคํŠธ๋ฆผ์ด ์ƒ์„ฑ๋˜๊ธฐ ๋•Œ๋ฌธ์— ๊ฐ๊ธฐ ๋‹ค๋ฅธ ์Šค์ผ€์ฅด๋Ÿฌ๋ฅผ ์ค„ ์ˆ˜ ์žˆ์Œ


flatMap ํ•˜๋‚˜์˜ ์•„์ดํ…œ์„ ๋ฐ›์•„์„œ ์—ฌ๋Ÿฌ๊ฐœ์˜ ์•„์ดํ…œ์œผ๋กœ ์ฃผ๊ณ  ์‹ถ์„ ๋•Œ. ํ•˜์ง€๋งŒ ์ˆœ์„œ๊ฐ€ ๋ณด์žฅ๋˜์ง€ ์•Š์Œ.

concatMap flatMap์ด๋ž‘ ๊ฐ™์€๋ฐ ์ˆœ์„œ๊ฐ€ ๋ณด์žฅ๋จ.

4์ผ์ฐจ

Subject in RxJava

4๊ฐ€์ง€ ์„œ๋ธŒ์ ํŠธ ํƒ€์ž…

  • AsyncSubject
  • BehaviorSubject
  • PublishSubject
  • ReplaySubject

A Subject is a sort of bridge or proxy that acts both as an Subscriber and as an Observable. Because it is a Subscriber, it can subscribe to one or more Observables, and because it is an Observable, it can pass through the items it observes by reemitting them, and it can also emit new items.

http://reactivex.io/documentation/subject.html Subject = Observable + Subscriber Observable, Subscriber ์—ฐ๊ฒฐํ•˜๋Š” ๋…€์„??

  1. AsyncSubject

onComplete ๋ฐ”๋กœ ์ด์ „์˜(๋งˆ์ง€๋ง‰) item emit

  1. BehaviorSubject

it begins by emitting the item most recently emitted by the source Observable (or a seed/default value if none has yet been emitted) and then continues to emit any other items emitted later by the source Observable(s).

subscribe ์‹œ์  ์ดํ›„ ๊ฐ€์žฅ ์ตœ๊ทผ์˜ item + ๋‚˜๋จธ์ง€ item ๊ธฐ๋ณธ๊ฐ’์„ ์ „๋‹ฌํ•ด์ฃผ๋Š” ์šฉ๋„๋„ ๊ฐ€๋Šฅ

  1. PublishSubject

๊ธฐ๋ณธ์ ์ธ subject subscribe ์ดํ›„์— emit ๋˜๋Š” ๋ชจ๋“  item ๋ฐ›์Œ

  1. ReplaySubject

emit ๋œ ๋ชจ๋“  item ์„ ๋ฒ„ํผ์— ์ €์žฅํ•˜๊ณ  subscribe ํ•˜๋Š” ๋ชจ๋“  Observer ์—๊ฒŒ ์ „๋‹ฌ (์ˆœ์„œ ๋ณด์žฅ)

ReplaySubject.create(int capacity) : unbound. ๋ฏธ๋ฆฌ ๋ฒ„ํผ(๋ฐฐ์—ด) ํ• ๋‹น ํ•ด์ฃผ๋Š” ์šฉ(๋ฐฐ์—ด ์นดํ”ผ ์˜ค๋ฒ„ํ—ค๋“œ ๋ฐฉ์ง€) ๊ธฐ๋ณธ๊ฐ’ 32k

ReplaySubject.createWithSize(int size) : bound๋จ. ์ตœ๋Œ€ ๋ฒ„ํผ ํฌ๊ธฐ๋ฅผ ์ง€์ •

Why subject?

Observable ์€ ์ƒ์„ฑ ์‹œ์ ์— ๋ฐ์ดํ„ฐ๋ฅผ ๋„ฃ์–ด์ค˜์•ผ ํ•จ. ๊ทผ๋ฐ Subject๋Š” ์ƒ์„ฑ ํ›„์— ๋„ฃ์–ด์คŒ

Serializing

When you use a Subject as a Subscriber, take care not to call its onNext(โ€ฏ) method (or its other on methods) from multiple threads, as this could lead to non-serialized calls, which violates the Observable contract and creates an ambiguity in the resulting Subject.

To protect a Subject from this danger, you can convert it into a SerializedSubject with code like the following:

mySafeSubject = new SerializedSubject( myUnsafeSubject );

5์ผ์ฐจ

CompositeSubscription

์„œ๋ธŒ์Šคํฌ๋ฆฝ์…˜์„ ํ•œ๊ณณ์— ๋ชจ์•„์„œ ๊ด€๋ฆฌํ• ๋•Œ ์‚ฌ์šฉ -> ๋ณดํ†ต ์•ˆ๋“œ ๋ผ์ดํ”„์‹ธ์ดํด์ด๋ž‘ ์—ฎ์–ด์„œ ์”€ -> RxLifeCycle์„ ์ด์šฉํ•ด์„œ compose๋ฅผ ํ•˜๋ฉด๋จ

RxUserBus.sub().compose(bindToLifecycle()).subscribe((String s) -> {
    Toast.makeText(this, s, Toast.LENGTH_SHORT).show();
});

์ž๋™์œผ๋กœ onPause, onResume ๋ ๋•Œ ์ฒ˜๋ฆฌํ•ด์คŒ

WeakReference๋ž‘ ์—ฎ์–ด์„œ ์‚ฌ์šฉํ•ด๋ณด๋Š”๊ฑด ๋ณ„๋กœ -> null๋กœ ๋‚˜์™€์„œ??

๋””๋ฒ„๊น…

1. rxjava-debug

static {
	String TAG = "Rx-Debug Example";
	RxJavaPlugins.getInstance().registerObservableExecutionHook(new DebugHook(new DebugNotificationListener() {
	    public Object onNext(DebugNotification n) {
	        Log.v(TAG, "onNext on " + n);
	        return super.onNext(n);
	    }
	
	
	    public Object start(DebugNotification n) {
	        Log.v(TAG, "start on " + n);
	        return super.start(n);
	    }
	
	
	    public void complete(Object context) {
	        Log.v(TAG, "complete on " + context);
	    }
	
	    public void error(Object context, Throwable e) {
	        Log.e(TAG, "error on " + context);
	    }
	}));
}

registerObservableExecutionHook ๋กœ ํ›…์„ ๊ฑธ์–ด์ค„ ์ˆ˜ ์žˆ์Œ.

์ด๊ฒƒ๋ณด๋‹ค frodo๋ฅผ ์‚ฌ์šฉํ•˜๋ฉด ์ข€ ๋” ์‰ฝ๊ฒŒ annotation processor ๋ฅผ ์ด์šฉํ•ด์„œ ์‰ฝ๊ฒŒ ์‚ฌ์šฉ ๊ฐ€๋Šฅํ•จ

public interface GitHubService {
    @RxLogObservable(RxLogObservable.Scope.EVERYTHING)
    @GET("/search/users?")
    Observable<SearchResult> searchUsers(@Query("q") String query);
}

์ €๋ ‡๊ฒŒ ์–ด๋…ธํ…Œ์ด์…˜ ๋ถ™์ด๋ฉด ๋กœ๊น…์„ ํ•  ์ˆ˜ ์žˆ์Œ....

3. IntelliJ ๊ธฐ๋ณธ ๊ธฐ๋Šฅ ํ™œ์šฉ; Break point๋ฅผ ์‚ฌ์šฉํ•ด์„œ ๋กœ๊ทธ ์ฐ๊ธฐ

attach debugger to android process

Break point๋ฅผ ์‚ฌ์šฉํ•ด์„œ ๋กœ๊ทธ ์ฐ๊ธฐ

cmd + F8: break point ์ฐ๊ธฐ

cmd + shift + F8: break point ์˜ต์…˜ ๋‹ค์ด์–ผ๋กœ๊ทธ ๋„์šฐ๊ธฐ

suspend uncheck ํ•˜๊ณ 

log evaluated expression ์— ์ฐ๊ณ ์ž ํ•˜๋Š” ํ‘œํ˜„์‹ ์ž‘์„ฑ


MVCC: multiversion concurrency control

Realm ์—์„œ๋Š” ๋ชจ๋“  ์ธ์Šคํ„ด์Šค๊ฐ€ ThreadLocal

copyFromRealm: ๋ ˜์˜ managed ๊ฐ์ฒด๋ฅผ stand alone?(unmanaged) ๊ฐ์ฒด๋กœ ๋งŒ๋“ค์–ด ์คŒ

retrofit์˜ RxJavaCallAdapterFactory

realm์˜ RealmObservableFactory

Observable.create() ํ†ตํ•ด์„œ ์ƒ์„ฑํ• ๋• ๋ฐ˜๋“œ์‹œ isUnsubscribed() ์ฒดํฌ๋ฅผ ํ•ด์•ผํ•˜๊ณ , onNext, onError, onComplete๋ฅผ ์ž˜ ์จ์•ผํ•จ

ex) ์ด๋ฏธ unsubscribe ๋ฌ๋Š”๋ฐ onNext๋กœ ์•„์ดํ…œ์„ emit ํ•˜๋Š” ๊ฒฝ์šฐ ์ฒดํฌํ•ด์•ผํ•˜๊ณ , onErrorํ˜ธ์ถœ์‹œ onComplete๋Š” ํ˜ธ์ถœ๋˜์ง€ ์•Š์•„์•ผ ํ•จ, ๋“ฑ๋“ฑ....

@Override
public Observable<DynamicRealm> from(DynamicRealm realm) {
	final RealmConfiguration realmConfig = realm.getConfiguration();
	return Observable.create(new Observable.OnSubscribe<DynamicRealm>() {
	    @Override
	    public void call(final Subscriber<? super DynamicRealm> subscriber) {
	        // Get instance to make sure that the Realm is open for as long as the
	        // Observable is subscribed to it.
	        final DynamicRealm observableRealm = DynamicRealm.getInstance(realmConfig);
	        final RealmChangeListener<DynamicRealm> listener = new RealmChangeListener<DynamicRealm>() {
	            @Override
	            public void onChange(DynamicRealm realm) {
	                if (!subscriber.isUnsubscribed()) {
	                    subscriber.onNext(observableRealm);
	                }
	            }
	        };
	        observableRealm.addChangeListener(listener);
	        subscriber.add(Subscriptions.create(new Action0() {
	            @Override
	            public void call() {
	                observableRealm.removeChangeListener(listener);
	                observableRealm.close();
	            }
	        }));
	
	            // Immediately call onNext with the current value, as due to Realm's auto-update, it will be the latest
	        // value.
	        subscriber.onNext(observableRealm);
	    }
	});
}

subscriber.add(Subscriptions.create(new Action0() ์ด ๋ถ€๋ถ„์ด ๊ผญ ํ•„์š”ํ•จ??? -> ??

6์ผ์ฐจ

https://youtu.be/hHnTIMjd1Y8?t=15m14s

RxJava in Action

RxAndroid, RxBinding, ReRelay

RxBinding: ui ์œ„์ ฏ์˜ ๋ฒ„ํŠผ ํด๋ฆญ, ์Šคํฌ๋กค ์ด๋ฒคํŠธ๋ฅผ ๋ชจ๋‘ Observable๋กœ ์ œ๊ณต

RxView.Click(view)

๊ฑฐ์˜ ๋ชจ๋“ ๊ณณ์—์„œ ์‚ฌ์šฉ ใ„ทใ„ท

Operators

interval()

withLatestFrom(observable): ์ด๋ฒคํŠธ ๋ฐœ์ƒ์‹œ ๋‹ค๋ฅธ Observable์˜ ๋งˆ์ง€๋ง‰ ์•„์ดํ…œ๊ณผ ํ•ฉ์ณ์„œ ์ฒ˜๋ฆฌ?

distinctUntilChanged(): ์ด๋ฒคํŠธ๊ฐ€ ๋‹ค๋ฅผ ๊ฒฝ์šฐ์—๋งŒ ์ฒ˜๋ฆฌ

startWith(sth): ๊ตฌ๋…์‹œ ์ฒซ ์•„์ดํ…œ์œผ๋กœ ์‹œ์ž‘ ํ•  ์ˆ˜ ์žˆ๋„๋ก ํ•ด์คŒ

onBackpressureDrop(): ๋ณดํ†ต์€ UI ์“ฐ๋ ˆ๋“œ์—์„œ ๋ฒ„ํผ๋ฅผ ์“ฐ๋Š”๋ฐ, ๋“œ๋ž์€ ์ด๋ฒคํŠธ ์ฒ˜๋ฆฌ๊ฐ€ ์ง€์—ฐ๋ ๋•Œ ์œ ์šฉ, timer๋กœ ๋ญ”๊ฐ€ ์ฒ˜๋ฆฌํ• ๋•Œ ์ฒ˜๋ฆฌ๊ฐ€ ์ง€์—ฐ๋˜๋ฉด ์•ˆ๋  ๋•Œ, ๊ตณ์ด ๋ชจ๋“  ์•„์ดํ…œ์„ ๋ฐ›์„ ํ•„์š”๊ฐ€ ์—†์„ ๋•Œ

๋กค๋ง ๋ฐฐ๋„ˆ๋ฅผ ๋งŒ๋“ ๋‹ค. 1์ดˆ๋งˆ๋‹ค ํŽ˜์ด์ง• ๊ทธ๋Ÿฐ๋ฐ ์†์œผ๋กœ ๋“œ๋ž˜๊ทธํ• ๋• ๋ฌด์‹œํ•ด์•ผํ•จ.

val timer = Observable.interval(1000, TimeUnit.MILLISECONDS)

val dragging = RxViewPager.pageScrollStateChanges(vp_activity_view_pager)
        .map { ViewPager.SCROLL_STATE_DRAGGING == it }
        .distinctUntilChanged()
        .startWith(false)
        
subscription.add(timer.withLatestFrom(dragging, { timer, dragging -> dragging })
        .filter { !it } // ๋“œ๋ž˜๊ทธ ์ค‘์ด ์•„๋‹๋•Œ๋งŒ 
        .retry()
        .onBackpressureDrop()
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe {
            with(vp_activity_view_pager) {
                val currentIdx = currentItem
                currentItem = if (currentIdx == adapter.count - 1) 0 else currentItem + 1
            }
        })

RxRelay - ์•ˆ์ „ํ•œ ์ด๋ฒคํŠธ ์ŠคํŠธ๋ฆผ?

onError, onComplete ๋ฐœ์ƒ์‹œ์—๋„ ์ŠคํŠธ๋ฆผ์ด ์ฃฝ์ง€ ์•Š์Œ..

PublisRelay.create() ๋กœ ์ƒ์„ฑ

์–ด๋–ค ์กฐ๊ฑด์ด ๋˜์—ˆ์„ ๋•Œ ๋ทฐ๋ฅผ ์ž ๊น ๋ณด์—ฌ์ฃผ๋Š” ์šฉ๋„์—์„œ...์ด๋–„ ๋ฐ˜๋“œ์‹œ backpressure์จ์ค˜์•ผ

private val visibilityPublishRelay by lazy { PublishRelay.create<Int>() }

with(subscription) {
            add(visibilityPublishRelay
                    .retry()
                    .distinctUntilChanged()
                    .onBackpressureBuffer()
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribe {
                        changeVisibility(it)
                    })

            add(RxView.clicks(btn_activity_visibility_event)
                    .concatMap {
                        Observable.concat(
                                Observable.just(View.VISIBLE),
                                Observable.just(View.GONE).delay(3, TimeUnit.SECONDS))
                    }
                    .subscribe(visibilityPublishRelay))
        }

private fun changeVisibility(toVisibility: Int) {
        with(tv_activity_visibility_event) {
            if (toVisibility == visibility) {
                return
            }
            visibility = toVisibility
        }
    }

Kotlin

private val btnViewPager by lazy {
    findViewById(R.id.btn_activity_main_view_pager) as Button
}

private val btnViewPager by lazy {
    (findViewById(R.id.btn_activity_main_view_pager) as Button).apply {
    	textSize = 15f
    }
}

android extentions ์„ ์“ฐ๋ฉด,

private val btnViewPager by lazy {
    btn_activity_main_view_pager // ๋ฐ”๋กœ xml ์ด๋ฆ„์„
}

btnActivityMainViewPager.text = โ€œโ€
btnActivityMainViewPager.setOnClockListener {
	~~~
}
class OperatorMapSubscriptionItemListing : Observable.Operator<Listing, SubscriptionItem> {

    override fun call(o: Subscriber<in Listing>): Subscriber<in SubscriptionItem> {
        return object: Subscriber<SubscriptionItem>() {

            override fun onError(e: Throwable?) {
                o.onError(e)
            }

            override fun onCompleted() {
                o.onCompleted()
            }

            override fun onNext(t: SubscriptionItem?) {
                t?.let {
                    o.onNext(Listing(it.title, it.channelName,
                            "gdg://korea.android/listing/${it.itemId}?channelName=${it.channelName}"))
                }
            }
        }
    }
}

observable.lift(OperatorMapSubscriptionItemListing)

Sequence Operators

The following example shows how you can use the lift(โ€ฏ) operator to chain your custom operator (in this example: myOperator) alongside standard RxJava operators like ofType and map:

fooObservable = barObservable.ofType(Integer).map({it*2}).lift(new 
myOperator<T>()).map({"transformed by myOperator: " + it});

The following section shows how you form the scaffolding of your operator so that it will work correctly with lift(โ€ฏ).

Subject<T, R>

Observable์ด์ž Observer

toSerialized๋กœ ์—ฌ๋Ÿฌ ์“ฐ๋ ˆ๋“œ์„œ ์ ‘๊ทผํ•ด๋„ ๊ดœ์ฐฎ

RxJavaHooks.onObservableStart(this, onSubscribe).call(subscriber);: onObservableStart๋ฅผ ํ˜ธ์ถœํ•˜๊ณ  OnSubscribe๊ฐ์ฒด ๋ฆฌํ„ด

return RxJavahooks.onObservableReturn(subscriber): Subscription ๊ฐ์ฒด ๋ฆฌํ„ด

If using Observable.just(), consider Completable.create() or Completable.defer()

java

Completable.create(new Completable.OnSubscribe() {
	@Override
		public void call(CompletableSubscriber completableSubscriber) {

	}
}).observeOn(AndroidSchedulers.mainThread()).subscribe();

GDG Korea Slack

half-but [5:16 PM] ์•ˆ๋…•ํ•˜์„ธ์š” ํ˜น์‹œ observable์˜ defer์—ฐ์‚ฐ์ž์— ๋Œ€ํ•œ ์„ค๋ช…ํ•ด์ฃผ์‹ค์ˆ˜ ์žˆ์œผ์‹ ๊ฐ€์š”? ์ž˜ ์ดํ•ด๊ฐ€ ๊ฐ€์ง€ ์•Š์•„์„œ์š”โ€ฆ ๋„์›€์ฃผ์„ธ์š”

seongug.jung (์ •์Šน์šฑ) [6:51 PM] subscribe ๋˜๊ธฐ์ „๊นŒ์ง€ ๋Œ€๊ธฐํ•˜๋Š” ํ•จ์ˆ˜์—์š” ์ด๋ฅผํ…Œ๋ฉด

api.getData1()
  .concatWith(api.getdata2())
  ...

์œ„์ฒ˜๋Ÿผ ๊ตฌ์„ฑ์„ ํ•˜๋ฉด getData1 ๊ณผ getData2 ๊ฐ€ ๊ฑฐ์˜ ๋™์‹œ์— ํ˜ธ์ถœ์ด ๋˜์š” ๊ทธ ๊ณผ์ •์—์„œ getData1 ์—์„œ ์—๋Ÿฌ๊ฐ€ ๋‚˜๋ฉด getData2 ๋Š” undelivered ์—๋Ÿฌ๊ฐ€ ๋ฐœ์ƒํ•˜๊ฒŒ ๋˜์š” ์„ค๋ž‘ํƒ•์„ ์‚ฌ์™”๋Š”๋ฐ ์™œ ๋จน์ง€๋ฅผ ๋ชปํ•˜๋‹ˆ (edited) ์ด๋Ÿฐ ๊ฒฝ์šฐ๋ฅผ ๋ฐฉ์ง€ํ•  ์ˆ˜ ์žˆ๋Š” ๋ฐฉ๋ฒ•์œผ๋กœ defer ์ธ๋ฐ concatWith ๋‚˜ mergeWith ๊ฐ™์€ ๊ฒฝ์šฐ์— ์“ฐ๋ฉด

api.getData1()
  .concatWith(Single.defer { api.getData2() })
  ...

์ด์ฒ˜๋Ÿผ ์“ฐ๋ฉด getData1 ์ด ์ฒ˜๋ฆฌ๊ฐ€ ๋๋‚˜๊ณ  ์‹ค์ œ concatWith ๊ฐ€ subscribe ๊ฐ€ ๋˜๋Š” ์‹œ์ ์— ๋™์ž‘์„ ์‹œ์ž‘ํ•ด์š” ์ด๋Ÿฐ ๊ฒฝ์šฐ์—๋„ ์“ธ ์ˆ˜ ์žˆ์–ด์š”.

getLocal().concatWith(getApi())

์ด๋ ‡๊ฒŒ ์“ฐ๋ฉด ๋กœ์ปฌ ๋ณด๋‚ด๊ณ  ์ตœ์‹  ์ •๋ณด๋ฉด api ๋ฅผ ์•ˆ ์ด๋„ ๋˜๋Š” ์ƒํ™ฉ์—์„œ ๋ฌธ์ œ๊ฐ€ ๋  ์ˆ˜ ์žˆ์ฃ . ๊ทธ๋Ÿด๋•Œ defer ๋ฅผ ์“ฐ๋ฉด ๋กœ์ปฌ ๋ณด๋‚ด๊ณ  ๊ฐฑ์‹  ์•ˆ๋˜๋„ ๋จ์œผ๋กœ ์ฒ˜๋ฆฌํ•˜๋ฉด api ๋ฅผ ์•ˆ ์˜๋Š” ์ฒ˜๋ฆฌ๋ฅผ ํ•  ์ˆ˜ ์žˆ์–ด์š” ๋Š” ๋„ˆ๋ฌด ๋Šฆ๊ฒŒ ๋Œ€๋‹ตํ•œ ๋А๋‚Œ์ด๋„ค์š”

yjh5424 [7:53 PM] joined #reactivex along with geuntaek.

gaemi [11:19 AM] ์ตœ๊ทผ์— .defer() ๋ฅผ Hystrix ์ ์šฉํ•˜๋ฉด์„œ ์œ ์šฉํ•˜๊ฒŒ ์‚ฌ์šฉํ–ˆ์–ด์š”. ใ…Žใ…Ž ์ œ ์ƒ๊ฐ์—” .compose() ์™€ ์กฐํ•ฉํ•  ์ˆ˜ ์žˆ๋Š” ์ตœ๊ณ ์˜ ํ•จ์ˆ˜๊ฐ€ ์•„๋‹๊นŒ ์ƒ๊ฐ๋˜๋„ค์š”.

gaemi [11:23 AM] ์˜ˆ๋ฅผ๋“ ๋‹ค๋ฉด ์บ์‹œ๋ฅผ ์‚ฌ์šฉํ•˜๊ธฐ ์œ„ํ•˜์—ฌ Single.Transformer ๋ฅผ ์ด๋Ÿฐํ˜•ํƒœ๋กœ ๊ตฌ์„ฑํ–ˆ์–ด์š”.

   public Single<T> call(Single<T> source) {
       return Single.defer(() -> {
           T cachedObj = findCache();
           if (cachedObj != null) {
               return Single.just(cachedObj);
           }

           return source.doOnSuccess(v -> {
               Completable.fromAction(() -> putCache(v))
                       .subscribeOn(cacheScheduler)
                       .subscribe(Actions.empty(), Actions.empty());
           });
       });
   }
   
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment