Last active
January 5, 2016 20:11
-
-
Save gjesse/72a478e5b8248861dd2c to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.boundary.sargon.rx.operators; | |
import rx.Observable; | |
import rx.Subscriber; | |
import rx.functions.Func1; | |
import rx.internal.operators.BufferUntilSubscriber; | |
import rx.subscriptions.Subscriptions; | |
import java.util.Objects; | |
/** | |
* Windows an observable, with boundaries determined when the | |
* result of the keySelector changes. | |
* | |
* windows are *eagerly* dispatched, that is | |
* the result of the keySelector changes, the current item is included in the current | |
* window, which is then immediately closed! | |
* | |
* A new window is then created when the next item is received | |
* | |
* heavily borrowed from rx.internal.operators.OperatorWindowWithSize | |
* | |
* @param <T> | |
* @param <U> | |
*/ | |
public class OperatorWindowUntilChanged<T, U> implements Observable.Operator<Observable<T>, T> { | |
private final Func1<T, U> keySelector; | |
public OperatorWindowUntilChanged(Func1<T, U> keySelector) { | |
this.keySelector = keySelector; | |
} | |
@Override | |
public Subscriber<? super T> call(Subscriber<? super Observable<T>> child) { | |
EagerSubscriber e = new EagerSubscriber(child); | |
e.init(); | |
return e; | |
} | |
/** | |
* creates exact/non overlapping window bounds | |
* that are eagerly dispatched | |
* */ | |
final class EagerSubscriber extends Subscriber<T> { | |
private final Subscriber<? super Observable<T>> child; | |
private BufferUntilSubscriber<T> window; | |
private boolean noWindow = true; | |
private U previousKey = null; | |
private boolean hasPrevious; | |
public EagerSubscriber(Subscriber<? super Observable<T>> child) { | |
/** | |
* See https://github.com/ReactiveX/RxJava/issues/1546 | |
* We cannot compose through a Subscription because unsubscribing | |
* applies to the outer, not the inner. | |
*/ | |
this.child = child; | |
/* | |
* Add unsubscribe hook to child to get unsubscribe on outer (unsubscribing on next window, not on the inner window itself) | |
*/ | |
} | |
void init() { | |
child.add(Subscriptions.create(() -> { | |
// if no window we unsubscribe up otherwise wait until window ends | |
if (noWindow) { | |
unsubscribe(); | |
} | |
})); | |
} | |
@Override | |
public void onNext(T t) { | |
final U currentKey = previousKey; | |
final U key = keySelector.call(t); | |
previousKey = key; | |
if (window == null) { | |
createWindow(); | |
} | |
window.onNext(t); | |
if (hasPrevious && !Objects.equals(key, currentKey)) { | |
window.onCompleted(); | |
window = null; | |
} | |
hasPrevious = true; | |
} | |
private void createWindow() { | |
noWindow = false; | |
window = BufferUntilSubscriber.create(); | |
child.onNext(window); | |
} | |
@Override | |
public void onError(Throwable e) { | |
if (window != null) { | |
window.onError(e); | |
} | |
child.onError(e); | |
} | |
@Override | |
public void onCompleted() { | |
if (window != null) { | |
window.onCompleted(); | |
} | |
child.onCompleted(); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment