Last active
August 29, 2015 14:20
-
-
Save ulyssesdotcodes/14f913a0144814f5d42b to your computer and use it in GitHub Desktop.
BufferUntil
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public void testBuffer() { | |
PublishSubject<String> stream = PublishSubject.create(); | |
PublishSubject<Boolean> gate = PublishSubject.create(); | |
ReplaySubject<String> test = ReplaySubject.create(); | |
stream | |
.lift(TRx.bufferUntil(gate)) | |
.subscribe(test); | |
List<String> results = new ArrayList<>(); | |
test.subscribe(results::add); | |
assertEquals(0, results.size()); | |
stream.onNext("one"); | |
assertEquals(0, results.size()); | |
gate.onNext(true); | |
assertEquals(1, results.size()); | |
stream.onNext("two"); | |
assertEquals(2, results.size()); | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public <T, U> OperatorBufferUntil extends Observable.Operator<T, T>() { | |
private final Observable<? extends U> other; | |
public OperatorBufferUntil(final Observable<? extends U> other) { | |
this.other = other; | |
} | |
ReplaySubject<T> mReplaySubject = ReplaySubject.create(); | |
@Override | |
public Subscriber<? super T> call(Subscriber<? super T> child) { | |
final Subscriber<T> serial = new SerializedSubscriber<T>(child, false); | |
final Subscriber<T> buffer = new Subscriber<T>(child, false) { | |
@Override public void onCompleted() { | |
try { | |
serial.onCompleted(); | |
} | |
finally { | |
serial.unsubscribe(); | |
} | |
} | |
@Override public void onError(Throwable e) { | |
try { | |
serial.onError(e); | |
} | |
finally { | |
serial.unsubscribe(); | |
} | |
} | |
@Override public void onNext(T t) { | |
mReplaySubject.onNext(t); | |
} | |
}; | |
Subscriber<U> u = new Subscriber<U>() { | |
@Override public void onCompleted() { | |
unsubscribe(); | |
} | |
@Override public void onError(Throwable e) { | |
serial.onError(e); | |
unsubscribe(); | |
} | |
@Override public void onNext(U u) { | |
mReplaySubject.subscribe(serial); | |
unsubscribe(); | |
} | |
}; | |
serial.add(u); | |
serial.add(buffer); | |
child.add(serial); | |
other.unsafeSubscribe(u); | |
return buffer; | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public <T, U> TransformBufferUntil extends Observable.Operator<T, T>() { | |
private final Observable<? extends U> other; | |
ReplaySubject<T> mReplaySubject = ReplaySubject.create(); | |
public TransformBufferUntil(Observable<? extends U> other) { | |
this.other = other; | |
} | |
@Override public Observable<T> call(Observable<T> tObservable) { | |
tObservable.subscribe(mReplaySubject); | |
return other.take(1).flatMap(__ -> mReplaySubject); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment