Skip to content

Instantly share code, notes, and snippets.

@Pluu
Last active November 24, 2022 04:12
Show Gist options
  • Save Pluu/0e32cff72eed282dc170c4e13887c8b1 to your computer and use it in GitHub Desktop.
Save Pluu/0e32cff72eed282dc170c4e13887c8b1 to your computer and use it in GitHub Desktop.
RxJava 3장 ~ 연산자와 변환

RxJava 3장 ~ 연산자와 변환

핵심 연산자: 매핑과 필터링

map

Operator Wiki : Link

public final <R> Observable<R> map(Function<? super T, ? extends R> mapper)

1:1 형 변환을 담당하며, Observable를 Observable 형태로 반환

flatMap

Operator Wiki : Link

  • Observe의 중첩된 Observe의 결과 취득을 위해서는 중첩된 Observe 구독처리가 필요
  • 단일 스트림으로 평탄화(flattening) 처리로 해결

Stream으로 오는 데이터를 단순 Iterable 형태를 띄는 경우, 간단한 평탄화 처리로 flatMapIterable 함수를 사용 가능

  • 내부적으로 merge 사용
// ObservableFlatMap.java
public void subscribeActual(Observer<? super U> t) {
   if (ObservableScalarXMap.tryScalarXMapSubscribe(source, t, mapper)) {
      return;
   }

   source.subscribe(new MergeObserver<T, U>(t, mapper, delayErrors, maxConcurrency, bufferSize));
}
  • flatMap(Func1) 으로 동시성 사용시 문제되는 이유
public final <R> Observable<R> flatMap(Func1<? super T,? extends Observable<? extends R>> func)
↓
public final <R> Observable<R> flatMap(Func1<? super T,? extends Observable<? extends R>> func, boolean delayErrors)
↓
public final <R> Observable<R> flatMap(Func1<? super T,? extends Observable<? extends R>> func, boolean delayErrors, int maxConcurrent)

동시에 구독할 수 있는 ObservableSource 의 최대수가 Integer.MAX_VALUE 값이 할당됨 (parameter : maxConcurrent)

단순 해결책은 적절한 구독수를 지정해서 사용

delay

Operator Wiki : Link

concatMap

Operator Wiki : Link

  • 각 Observable 의 순서 보장
  • public Function
// ObservableConcatMap.java

concat

Operator Wiki : Link

  • concat(Iterable source)
    • concat -> fromIterable -> concatMapDelayError -> ObservableConcatMap
  • concat(ObservableSource source1, ObservableSource source2, ObservableSource source3)
    • concat -> concatArray
  • concatArray(ObservableSource... sources)
    • concatArray -> fromArray -> ObservableConcatMap

여러개의 Observable

merge/mergeWith()

Operator Wiki : Link

  • 각 Stream에서 방출되는 데이터가 방출 순서대로 흘러감

zip/zipWith()

Operator Wiki : Link

  • N 개의 Observable을 1개의 Observable로 변환
  • zip하는 Observable이 모두 종료시까지 대기함, 모두 종료 후 Zip Function이 호출 됨

combineLatest()

Operator Wiki : Link

  • 각 스트림에서 방출되는 데이터의 마지막 값을 조합

withLatestFrom

Operator Wiki : Link

  • 스트림 주체가 변경시에만 값을 처리할 경우에 사용
  • 주체가 아닌 다른 Observable의 데이터가 유실 가능성이 있음

amb()

Operator Wiki : Link

  • Observable 들 중 하나가 첫 이벤트를 방출하면 나머지 Observable는 버리고, 첫 이벤트를 방출한 Observable 만 전달된다

고수준 연산자 : collect(), reduce(), scan(), distinct(), groupBy()

scan()

Operator Wiki : Link

  • 계산값이 다음 onNext에 전달된다

reduce()

RxJava 1

RxJava 2

Operator Wiki : Link

  • RxJava 1에서는 Observable#scan 의 마지막 값을 취득하는 형태
  • RxJava 2에서는 별도 관리
    • scan : ObservableScan
    • reduce : ObservableReduceSeedSingle / ObservableReduceMaybe / ObservableReduceWithSingle
  • onComplete까지 값을 기다린다

collect()

single()

RxJava 1

RxJava 2

Operator Wiki : Link

distinct()

  • Stream 에서 흘러오는 데이터가 동일한 값 제외처리
  • Stream 시작부터 유일한 값 처리 용도로 적합

Operator Wiki : Link

Observable.fromArray(1, 1, 2, 2, 3, 3, 4, 4, 5, 5, 4, 3, 2, 1)
   .distinct()
   .forEach { print("$it ") }
// 1 2 3 4 5 
  • distinct() 사용시 사용한 타입과 동일한 Observable 이 반환
Observable<Status> empty = Observable.empty();
empty.distinct(new Function<Status, String>() {
   @Override
   public String apply(Status status) throws Exception {
      return status.getUser().getId();
   }
}).map(new Function<Status, Object>() {
   @Override
   public Object apply(Status status) throws Exception {
      return null;
   }
});
public final Observable<T> distinct() {
   return distinct(Functions.identity(), Functions.createHashSet());
}
↓
public final <K> Observable<T> distinct(Function<? super T, K> keySelector) {
   return distinct(keySelector, Functions.createHashSet());
}

distinctUntilChanged()

Operator Wiki : Link

  • 데이터가 실제로 변화가 있을때에만 이벤트 받는 용도로 사용 (equals 사용)
  • distinct() 와는 다르게 중복이 발생한다.
Observable.fromArray(1, 1, 2, 2, 3, 3, 4, 4, 5, 5, 4, 3, 2, 1)
   .distinctUntilChanged()
   .forEach { print("$it ") }
// 1 2 3 4 5 4 3 2 1 

메모리 사용량 : distinctUntilChanged < distinct

skip(), takeWhile() 등을 사용하여 잘개 쪼개거나 잘라내기

  • take(n) : 최초 n 개의 값만 방출

  • skip(n) : 최초 n 개의 값을 버림, n+1 부터 방출

  • takeLast(n) : 완료 이전의 n 개의 값만 방출

  • skipLast(n) : 완료 이전의 n 개의 값을 버림

takeLast / skipLast는 delay onComplete 를 판단시 이벤트가 흘러감

Observable.fromArray(1, 2, 3, 4, 5)
            .doOnNext { log("onNext = $it") }
            .delay(1.toLong(), TimeUnit.SECONDS)
            .skipLast(3)
            .doOnNext { log("onNext1 = $it") }
            .delay(1.toLong(), TimeUnit.SECONDS)
            .doOnNext { log("onNext2 = $it") }
            .subscribe(
                  { v -> log(v.toString()) },
                  { t -> log(t.localizedMessage) },
                  { log("Complete") }
            )

// 결과           
// 2017-11-26 19:55:49.380 => onNext = 1
// 2017-11-26 19:55:49.402 => onNext = 2
// 2017-11-26 19:55:49.403 => onNext = 3
// 2017-11-26 19:55:49.403 => onNext = 4
// 2017-11-26 19:55:49.403 => onNext = 5
// 2017-11-26 19:55:50.410 => onNext1 = 1
// 2017-11-26 19:55:50.410 => onNext1 = 2
// 2017-11-26 19:55:51.417 => onNext2 = 1
// 2017-11-26 19:55:51.417 => 1
// 2017-11-26 19:55:51.417 => onNext2 = 2
// 2017-11-26 19:55:51.417 => 2
// 2017-11-26 19:55:51.417 => Complete

first() / last()

  • 하나의 값이나 예외(NoSuchElementException)를 방출
  • first() == take(1).single()
  • last() == takeLast(1).single()

takeFirst(predicate)

  • 하나의 값을 방출
  • 원하는 값이 없는 경우 예외를 발생하지 않음
  • RxJava 2 에서 사라짐
filter(predicate).take(1) = takeFirst(predicate)

takeUntil(predicate)

  • predicate와 일치하는 첫번째 항목을 발출 후 완료
Observable.range(1, 5).takeUntil { it > 3 }   // [1, 2, 3, 4]

takeWhile(predicate)

  • predicate를 만족하는 한 방출
Observable.range(1, 5).takeWhile { it < 4 }   // [1, 2, 3]

elementAt(n)

  • 특정 위치의 항목을 추출
  • n < 0 : IndexOutOfBoundsException

...OrDefault()

  • 기본값 처리
  • RxJava 1 : ...OrDefault()
  • RxJava 2 : 기본 함수에 DefaultValue 처리

count()

all(predicate) / exists(predicate) / contains(value)

  • Match 여부의 평가 연산자

switchOnNext()

Operator Wiki : Link

switchOnNext는 ObservableSources를 내보내는 ObservableSource에 가입합니다. 이러한 방출 된 ObservableSources 중 하나를 관찰 할 때마다 switchOnNext에 의해 반환 된 ObservableSource가 ObservableSource가 방출하는 항목을 방출하기 시작합니다. 새로운 ObservableSource가 생성되면 switchOnNext는 이전에 방출된 ObservableSource에서 항목을 내보내는 것을 중지하고 새 항목에서 항목을 방출하기 시작합니다.

groupBy

Operator Wiki : Link

Observable.range(1, 20)
        .groupBy(i -> i % 3)
        .subscribe(grouped -> grouped.toList().subscribe(list ->
                System.out.println(String.format("key=%d,value=%s", grouped.getKey(), list)))
        );

// 결과
// key=0,value=[3, 6, 9, 12, 15, 18]
// key=1,value=[1, 4, 7, 10, 13, 16, 19]
// key=2,value=[2, 5, 8, 11, 14, 17, 20]

window

Operator Wiki : Link

 Observable.range(1, 10)
        .window(3)
        .subscribe(window ->  window.toList().subscribe(list ->
                System.out.println(String.format("value=%s", list))
        ));
        
// window = Observable<Integer>

// 결과
// value=[1, 2, 3]
// value=[4, 5, 6]
// value=[7, 8, 9]
// value=[10]

buffer

Operator Wiki : Link

Observable.range(1, 10)
        .buffer(3)
        .subscribe(list -> 
                System.out.println(String.format("value=%s", list))
        );
                
// list = List<Integer>

// 결과
// value=[1, 2, 3]
// value=[4, 5, 6]
// value=[7, 8, 9]
// value=[10]

사용자 정의 연산자 만들기

Writing operators for 2.0

https://github.com/ReactiveX/RxJava/wiki/Writing-operators-for-2.0

RxJava 1.x 대비 10배정도 어려워짐

compose

ObservableTransformer 을 사용하여 ObservableSource를 변환

public final <R> Observable<R> compose(ObservableTransformer<? super T, ? extends R> composer) {
     return wrap(((ObservableTransformer<T, R>) ObjectHelper.requireNonNull(composer, "composer is null")).apply(this));
}
// Int to String
class MyCompose : ObservableTransformer<Int, String> {
    override fun apply(upstream: Observable<Int>): ObservableSource<String> =
            upstream.map { "Result : $it" }
}
Observable.range(1, 3)
        .compose(MyCompose())
        .forEach { println(it) }

Observable.range(1, 3)
        .compose { upstream -> upstream.map { "Result : $it" } }
        .forEach { println(it) }
        
// 결과
// Result : 1
// Result : 2
// Result : 3

lift

현재 ObservableSource로 옮기고 구독할 때 Operator 함수를 통해 현재 ObservableSource의 값을 전달하는 새 ObservableSource를 반환

ObservableSource.map(...).filter(...).take(5).lift(new OperatorA()).lift(new OperatorB(...)).subscribe()

class MyLift : ObservableOperator<String, Int> {
    override fun apply(observer: Observer<in String>): Observer<in Int> {
        return object : Observer<Int> {
            override fun onSubscribe(d: Disposable) {
                observer.onSubscribe(d)
            }
            override fun onNext(t: Int) {
                observer.onNext("Result : $t")
            }
            override fun onError(e: Throwable) {
                observer.onError(e)
            }
            override fun onComplete() {
                observer.onComplete()
            }
        }
    }
}
Observable.range(1, 3)
        .lift(MyLift())
        .forEach { println(it) }

Observable.range(1, 3)
        .lift<String> { observer: Observer<in String> ->
            object : Observer<Int> {
                override fun onSubscribe(d: Disposable) {
                    observer.onSubscribe(d)
                }

                override fun onError(e: Throwable) {
                    observer.onError(e)
                }

                override fun onComplete() {
                    observer.onComplete()
                }

                override fun onNext(t: Int) {
                    observer.onNext("Result : $t")
                }
            }
        }
        .forEach { println(it) }

// 결과
// Result : 1
// Result : 2
// Result : 3

compose vs lift

  • Operator가 Observable로부터 방출된 각 요소에 작용하려면 lift를 사용
  • Operator가 Observable 자체에 작용하는 경우에는 compose를 사용
Operator 직접 Observable 처리 Observable 의 값에 접근 가능
compose 가능 불가능
lift 불가능 가능

compose vs flatmap

Operator 호출 시점 반환값 직접 Observable 처리 Observable 의 값에 접근 가능
compose subscribe 된 타이밍 (한번) Observable 가능 불가능
flatmap onNext 호출시 (복수) Observable 불가능 가능

Rxjavaのcompose、あるいはRxLifeCycleについて

Kotlin Custom Method

// Custom Method
fun <T> Observable<T>.filterNotEmpty(): Observable<T> = filter {
    when (it) {
        is String -> it.isNotEmpty()
        is Int -> it > 0
        is List<*> -> it.count() > 0
        else -> it != null
    }
}
Observable.just("A", "", 2, -1)
    .filterNotEmpty()
    .forEach { println(it) }

// 결과
// A
// 2

요약

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment