Operator Wiki : Link
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper)
1:1 형 변환을 담당하며, Observable를 Observable 형태로 반환
Operator Wiki : Link
- Observe의 중첩된 Observe의 결과 취득을 위해서는 중첩된 Observe 구독처리가 필요
- 단일 스트림으로 평탄화(flattening) 처리로 해결
Stream으로 오는 데이터를 단순 Iterable 형태를 띄는 경우, 간단한 평탄화 처리로
flatMapIterable
함수를 사용 가능
- 내부적으로 merge 사용
// ObservableFlatMap.java
public void subscribeActual(Observer<? super U> t) {
if (ObservableScalarXMap.tryScalarXMapSubscribe(source, t, mapper)) {
return;
}
source.subscribe(new MergeObserver<T, U>(t, mapper, delayErrors, maxConcurrency, bufferSize));
}
- flatMap(Func1) 으로 동시성 사용시 문제되는 이유
public final <R> Observable<R> flatMap(Func1<? super T,? extends Observable<? extends R>> func)
↓
public final <R> Observable<R> flatMap(Func1<? super T,? extends Observable<? extends R>> func, boolean delayErrors)
↓
public final <R> Observable<R> flatMap(Func1<? super T,? extends Observable<? extends R>> func, boolean delayErrors, int maxConcurrent)
동시에 구독할 수 있는 ObservableSource 의 최대수가 Integer.MAX_VALUE 값이 할당됨 (parameter : maxConcurrent)
단순 해결책은 적절한 구독수를 지정해서 사용
Operator Wiki : Link
Operator Wiki : Link
- 각 Observable 의 순서 보장
- public Function
// ObservableConcatMap.java
Operator Wiki : Link
- concat(Iterable source)
- concat -> fromIterable -> concatMapDelayError -> ObservableConcatMap
- concat(ObservableSource source1, ObservableSource source2, ObservableSource source3)
- concat -> concatArray
- concatArray(ObservableSource... sources)
- concatArray -> fromArray -> ObservableConcatMap
Operator Wiki : Link
- 각 Stream에서 방출되는 데이터가 방출 순서대로 흘러감
Operator Wiki : Link
- N 개의 Observable을 1개의 Observable로 변환
- zip하는 Observable이 모두 종료시까지 대기함, 모두 종료 후 Zip Function이 호출 됨
Operator Wiki : Link
- 각 스트림에서 방출되는 데이터의 마지막 값을 조합
Operator Wiki : Link
- 스트림 주체가 변경시에만 값을 처리할 경우에 사용
- 주체가 아닌 다른 Observable의 데이터가 유실 가능성이 있음
Operator Wiki : Link
- Observable 들 중 하나가 첫 이벤트를 방출하면 나머지 Observable는 버리고, 첫 이벤트를 방출한 Observable 만 전달된다
Operator Wiki : Link
- 계산값이 다음 onNext에 전달된다
RxJava 1
RxJava 2
Operator Wiki : Link
- RxJava 1에서는 Observable#scan 의 마지막 값을 취득하는 형태
- RxJava 2에서는 별도 관리
- scan : ObservableScan
- reduce : ObservableReduceSeedSingle / ObservableReduceMaybe / ObservableReduceWithSingle
- onComplete까지 값을 기다린다
RxJava 1
RxJava 2
Operator Wiki : Link
- Stream 에서 흘러오는 데이터가 동일한 값 제외처리
- Stream 시작부터 유일한 값 처리 용도로 적합
Operator Wiki : Link
Observable.fromArray(1, 1, 2, 2, 3, 3, 4, 4, 5, 5, 4, 3, 2, 1)
.distinct()
.forEach { print("$it ") }
// 1 2 3 4 5
- distinct() 사용시 사용한 타입과 동일한 Observable 이 반환
Observable<Status> empty = Observable.empty();
empty.distinct(new Function<Status, String>() {
@Override
public String apply(Status status) throws Exception {
return status.getUser().getId();
}
}).map(new Function<Status, Object>() {
@Override
public Object apply(Status status) throws Exception {
return null;
}
});
public final Observable<T> distinct() {
return distinct(Functions.identity(), Functions.createHashSet());
}
↓
public final <K> Observable<T> distinct(Function<? super T, K> keySelector) {
return distinct(keySelector, Functions.createHashSet());
}
Operator Wiki : Link
- 데이터가 실제로 변화가 있을때에만 이벤트 받는 용도로 사용 (equals 사용)
- distinct() 와는 다르게 중복이 발생한다.
Observable.fromArray(1, 1, 2, 2, 3, 3, 4, 4, 5, 5, 4, 3, 2, 1)
.distinctUntilChanged()
.forEach { print("$it ") }
// 1 2 3 4 5 4 3 2 1
메모리 사용량 : distinctUntilChanged < distinct
- take(n) : 최초 n 개의 값만 방출
- skip(n) : 최초 n 개의 값을 버림, n+1 부터 방출
- takeLast(n) : 완료 이전의 n 개의 값만 방출
- skipLast(n) : 완료 이전의 n 개의 값을 버림
takeLast / skipLast는 delay onComplete 를 판단시 이벤트가 흘러감
Observable.fromArray(1, 2, 3, 4, 5)
.doOnNext { log("onNext = $it") }
.delay(1.toLong(), TimeUnit.SECONDS)
.skipLast(3)
.doOnNext { log("onNext1 = $it") }
.delay(1.toLong(), TimeUnit.SECONDS)
.doOnNext { log("onNext2 = $it") }
.subscribe(
{ v -> log(v.toString()) },
{ t -> log(t.localizedMessage) },
{ log("Complete") }
)
// 결과
// 2017-11-26 19:55:49.380 => onNext = 1
// 2017-11-26 19:55:49.402 => onNext = 2
// 2017-11-26 19:55:49.403 => onNext = 3
// 2017-11-26 19:55:49.403 => onNext = 4
// 2017-11-26 19:55:49.403 => onNext = 5
// 2017-11-26 19:55:50.410 => onNext1 = 1
// 2017-11-26 19:55:50.410 => onNext1 = 2
// 2017-11-26 19:55:51.417 => onNext2 = 1
// 2017-11-26 19:55:51.417 => 1
// 2017-11-26 19:55:51.417 => onNext2 = 2
// 2017-11-26 19:55:51.417 => 2
// 2017-11-26 19:55:51.417 => Complete
- 하나의 값이나 예외(NoSuchElementException)를 방출
- first() == take(1).single()
- last() == takeLast(1).single()
- 하나의 값을 방출
- 원하는 값이 없는 경우 예외를 발생하지 않음
- RxJava 2 에서 사라짐
filter(predicate).take(1) = takeFirst(predicate)
- predicate와 일치하는 첫번째 항목을 발출 후 완료
Observable.range(1, 5).takeUntil { it > 3 } // [1, 2, 3, 4]
- predicate를 만족하는 한 방출
Observable.range(1, 5).takeWhile { it < 4 } // [1, 2, 3]
- 특정 위치의 항목을 추출
- n < 0 : IndexOutOfBoundsException
- 기본값 처리
- RxJava 1 : ...OrDefault()
- RxJava 2 : 기본 함수에 DefaultValue 처리
- Match 여부의 평가 연산자
Operator Wiki : Link
switchOnNext는 ObservableSources를 내보내는 ObservableSource에 가입합니다. 이러한 방출 된 ObservableSources 중 하나를 관찰 할 때마다 switchOnNext에 의해 반환 된 ObservableSource가 ObservableSource가 방출하는 항목을 방출하기 시작합니다. 새로운 ObservableSource가 생성되면 switchOnNext는 이전에 방출된 ObservableSource에서 항목을 내보내는 것을 중지하고 새 항목에서 항목을 방출하기 시작합니다.
Operator Wiki : Link
Observable.range(1, 20)
.groupBy(i -> i % 3)
.subscribe(grouped -> grouped.toList().subscribe(list ->
System.out.println(String.format("key=%d,value=%s", grouped.getKey(), list)))
);
// 결과
// key=0,value=[3, 6, 9, 12, 15, 18]
// key=1,value=[1, 4, 7, 10, 13, 16, 19]
// key=2,value=[2, 5, 8, 11, 14, 17, 20]
Operator Wiki : Link
Observable.range(1, 10)
.window(3)
.subscribe(window -> window.toList().subscribe(list ->
System.out.println(String.format("value=%s", list))
));
// window = Observable<Integer>
// 결과
// value=[1, 2, 3]
// value=[4, 5, 6]
// value=[7, 8, 9]
// value=[10]
Operator Wiki : Link
Observable.range(1, 10)
.buffer(3)
.subscribe(list ->
System.out.println(String.format("value=%s", list))
);
// list = List<Integer>
// 결과
// value=[1, 2, 3]
// value=[4, 5, 6]
// value=[7, 8, 9]
// value=[10]
https://github.com/ReactiveX/RxJava/wiki/Writing-operators-for-2.0
RxJava 1.x 대비 10배정도 어려워짐
ObservableTransformer
을 사용하여 ObservableSource를 변환
public final <R> Observable<R> compose(ObservableTransformer<? super T, ? extends R> composer) {
return wrap(((ObservableTransformer<T, R>) ObjectHelper.requireNonNull(composer, "composer is null")).apply(this));
}
// Int to String
class MyCompose : ObservableTransformer<Int, String> {
override fun apply(upstream: Observable<Int>): ObservableSource<String> =
upstream.map { "Result : $it" }
}
Observable.range(1, 3)
.compose(MyCompose())
.forEach { println(it) }
Observable.range(1, 3)
.compose { upstream -> upstream.map { "Result : $it" } }
.forEach { println(it) }
// 결과
// Result : 1
// Result : 2
// Result : 3
현재 ObservableSource로 옮기고 구독할 때 Operator 함수를 통해 현재 ObservableSource의 값을 전달하는 새 ObservableSource를 반환
ObservableSource.map(...).filter(...).take(5).lift(new OperatorA()).lift(new OperatorB(...)).subscribe()
class MyLift : ObservableOperator<String, Int> {
override fun apply(observer: Observer<in String>): Observer<in Int> {
return object : Observer<Int> {
override fun onSubscribe(d: Disposable) {
observer.onSubscribe(d)
}
override fun onNext(t: Int) {
observer.onNext("Result : $t")
}
override fun onError(e: Throwable) {
observer.onError(e)
}
override fun onComplete() {
observer.onComplete()
}
}
}
}
Observable.range(1, 3)
.lift(MyLift())
.forEach { println(it) }
Observable.range(1, 3)
.lift<String> { observer: Observer<in String> ->
object : Observer<Int> {
override fun onSubscribe(d: Disposable) {
observer.onSubscribe(d)
}
override fun onError(e: Throwable) {
observer.onError(e)
}
override fun onComplete() {
observer.onComplete()
}
override fun onNext(t: Int) {
observer.onNext("Result : $t")
}
}
}
.forEach { println(it) }
// 결과
// Result : 1
// Result : 2
// Result : 3
- Operator가 Observable로부터 방출된 각 요소에 작용하려면
lift
를 사용 - Operator가 Observable 자체에 작용하는 경우에는
compose
를 사용
Operator | 직접 Observable 처리 | Observable 의 값에 접근 가능 |
---|---|---|
compose | 가능 | 불가능 |
lift | 불가능 | 가능 |
Operator | 호출 시점 | 반환값 | 직접 Observable 처리 | Observable 의 값에 접근 가능 |
---|---|---|---|---|
compose | subscribe 된 타이밍 (한번) | Observable | 가능 | 불가능 |
flatmap | onNext 호출시 (복수) | Observable | 불가능 | 가능 |
// Custom Method
fun <T> Observable<T>.filterNotEmpty(): Observable<T> = filter {
when (it) {
is String -> it.isNotEmpty()
is Int -> it > 0
is List<*> -> it.count() > 0
else -> it != null
}
}
Observable.just("A", "", 2, -1)
.filterNotEmpty()
.forEach { println(it) }
// 결과
// A
// 2