Created
October 12, 2017 14:30
-
-
Save kakai248/771d084b0eb1b914b6c86d7e09a8efad to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.util.concurrent.TimeUnit; | |
import io.reactivex.ObservableOperator; | |
import io.reactivex.Observer; | |
import io.reactivex.disposables.Disposable; | |
public class FilterWithTimeout<T> implements ObservableOperator<T, T> { | |
private final long intervalDuration; | |
private final TimeUnit unit; | |
private long lastTime = 0; | |
public FilterWithTimeout(long intervalDuration, TimeUnit unit) { | |
this.intervalDuration = intervalDuration; | |
this.unit = unit; | |
this.lastTime = 0; | |
} | |
@Override | |
public Observer<? super T> apply(Observer<? super T> observer) throws Exception { | |
return new Observer<T>() { | |
@Override | |
public void onSubscribe(Disposable d) { | |
observer.onSubscribe(d); | |
} | |
@Override | |
public void onNext(T t) { | |
// Only emit if we didn't emit anything in the last defined time. | |
long now = System.nanoTime(); | |
if (now - lastTime >= unit.toNanos(intervalDuration)) { | |
observer.onNext(t); | |
lastTime = now; | |
} | |
} | |
@Override | |
public void onError(Throwable e) { | |
observer.onError(e); | |
} | |
@Override | |
public void onComplete() { | |
observer.onComplete(); | |
} | |
}; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment