Skip to content

Instantly share code, notes, and snippets.

@defHLT
Created March 23, 2016 15:15
Show Gist options
  • Save defHLT/942fb6108b65fe0b3a88 to your computer and use it in GitHub Desktop.
Save defHLT/942fb6108b65fe0b3a88 to your computer and use it in GitHub Desktop.
import rx.Observable.Operator;
import rx.Subscriber;
import rx.exceptions.Exceptions;
import rx.functions.Func2;
/**
* Created by Artem Kholodnyi on 3/23/16.
*/
/**
* Returns an Observable that emits all sequentially distinct items emitted by the source.
* @param <T> the value type
*/
public final class OperatorDistinctUntilNotEqual<T> implements Operator<T, T> {
private Func2<? super T, ? super T, Boolean> comparator = new Func2<T, T, Boolean>() {
@Override
public Boolean call(T t, T t2) {
return (t == null) ? (t2 == null) : t.equals(t2);
}
};
public OperatorDistinctUntilNotEqual() {
this.comparator = new Func2<T, T, Boolean>() {
@Override
public Boolean call(T t, T t2) {
return (t == null) ? (t2 == null) : t.equals(t2);
}
};
}
public OperatorDistinctUntilNotEqual(Func2<? super T, ? super T, Boolean> customComparator) {
this.comparator = customComparator;
}
@Override
public Subscriber<? super T> call(final Subscriber<? super T> child) {
return new Subscriber<T>(child) {
T previousValue;
boolean hasPrevious;
@Override
public void onNext(T t) {
final boolean equal;
if (hasPrevious) {
try {
equal = comparator.call(previousValue, t);
} catch (Throwable e) {
Exceptions.throwOrReport(e, child, t);
return;
}
if (equal) {
request(1);
} else {
previousValue = t;
child.onNext(t);
}
} else {
previousValue = t;
child.onNext(t);
hasPrevious = true;
}
}
@Override
public void onError(Throwable e) {
child.onError(e);
}
@Override
public void onCompleted() {
child.onCompleted();
}
};
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment