Last active
November 20, 2018 16:07
-
-
Save mirland/aac2c1318a1a1facd811 to your computer and use it in GitHub Desktop.
Regular Interval Delay, custom rx operator
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.util.Date; | |
import java.util.concurrent.TimeUnit; | |
import rx.Observable; | |
import rx.Subscriber; | |
import timber.log.Timber; | |
/** | |
* Created by mirland on 26/02/16. | |
*/ | |
@SuppressWarnings("unused") | |
public class MinIntervalDelay<T> implements Observable.Operator<T, T> { | |
private final static GeneralObserver<Object> observer = new GeneralObserver<>(); | |
private long lastCall; | |
private long intervalTimeMilliseconds; | |
public MinIntervalDelay(long intervalTime, TimeUnit intervalUnit) { | |
this(TimeUnit.MILLISECONDS.convert(intervalTime, intervalUnit)); | |
} | |
public MinIntervalDelay(long intervalTimeMilliseconds) { | |
this.intervalTimeMilliseconds = intervalTimeMilliseconds; | |
lastCall = 0L; | |
} | |
long calculateNextCall() { | |
long now = new Date().getTime(); | |
if (lastCall == 0 || now > lastCall) { | |
if (now - lastCall >= intervalTimeMilliseconds) { | |
lastCall = now; | |
return 0; | |
} else { | |
long difference = intervalTimeMilliseconds - (now - lastCall); | |
lastCall = now + difference; | |
return difference; | |
} | |
} else { | |
lastCall += intervalTimeMilliseconds; | |
return lastCall - now; | |
} | |
} | |
@Override | |
public Subscriber<? super T> call(final Subscriber<? super T> s) { | |
return new Subscriber<T>(s) { | |
@Override | |
public void onCompleted() { | |
Observable.just(null) | |
.delay(calculateNextCall(), TimeUnit.MILLISECONDS) | |
.doOnNext(t -> Timber.d("Call onCompleted: %d", new Date().getTime())) | |
.doOnNext(t -> { | |
if (!s.isUnsubscribed()) { | |
s.onCompleted(); | |
} | |
}) | |
.subscribe(observer); | |
} | |
@Override | |
public void onError(Throwable error) { | |
Observable.just(error) | |
.delay(calculateNextCall(), TimeUnit.MILLISECONDS) | |
.doOnNext(t1 -> Timber.d("Call onError: %d", new Date().getTime())) | |
.doOnNext(t -> { | |
if (!s.isUnsubscribed()) { | |
s.onError(t); | |
} | |
}) | |
.subscribe(observer); | |
} | |
@Override | |
public void onNext(T item) { | |
Observable.just(item) | |
.delay(calculateNextCall(), TimeUnit.MILLISECONDS) | |
.doOnNext(t -> Timber.d("Call onNext: %d", new Date().getTime())) | |
.doOnNext(t -> { | |
if (!s.isUnsubscribed()) { | |
s.onNext(item); | |
} | |
}) | |
.subscribe(observer); | |
} | |
}; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment