Created
February 6, 2018 11:38
-
-
Save kakai248/592d8526b601e02286c531de772482c0 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package me.mesmo.app.utils.cache; | |
import com.jakewharton.rxrelay2.PublishRelay; | |
import com.jakewharton.rxrelay2.Relay; | |
import java.util.concurrent.TimeUnit; | |
import java.util.concurrent.atomic.AtomicReference; | |
import io.reactivex.Completable; | |
import io.reactivex.Observable; | |
import io.reactivex.Observer; | |
import io.reactivex.Single; | |
import io.reactivex.internal.functions.ObjectHelper; | |
/** | |
* Simulates a BehaviorRelay but with a timed value. This value expires after a certain time has | |
* passed and will no longer be emitted to the subscribers. | |
* Also supports manual invalidation. | |
* | |
* @param <T> The type to hold on this relay | |
*/ | |
public class TimedBehaviorRelay<T> extends Relay<T> { | |
private static final int DEFAULT_TIMEOUT_MS = 300000; // 5 minutes | |
private final long timeout; | |
private final TimeUnit unit; | |
private final AtomicReference<TimedValue<T>> valueRef = new AtomicReference<>(); | |
// We hold the value ourselves so we don't need an actual BehaviorRelay, just a Publish. | |
private final PublishRelay<T> relay = PublishRelay.create(); | |
public static <T> TimedBehaviorRelay<T> create() { | |
return new TimedBehaviorRelay<>(DEFAULT_TIMEOUT_MS, TimeUnit.MILLISECONDS); | |
} | |
public static <T> TimedBehaviorRelay<T> createWithTimeout(long timeout, TimeUnit unit) { | |
return new TimedBehaviorRelay<>(timeout, unit); | |
} | |
private TimedBehaviorRelay(long timeout, TimeUnit unit) { | |
this.timeout = timeout; | |
this.unit = ObjectHelper.requireNonNull(unit, "unit is null"); | |
} | |
public void invalidate() { | |
valueRef.set(null); | |
} | |
public boolean isValid() { | |
TimedValue<T> value = valueRef.get(); | |
return value != null && insideTimeWindow(value.time); | |
} | |
/** | |
* Observes this source but first runs the completable if the cache is invalid. | |
* | |
* @param completable The completable to run | |
* @return A subscription to this observable | |
*/ | |
public Observable<T> observeStartWith(Completable completable) { | |
if (!isValid()) { | |
return completable.andThen(this); | |
} | |
return this; | |
} | |
/** | |
* Observes this source but first runs the single and caches it's value if the cache is invalid. | |
* | |
* @param single The single to run and cache | |
* @return A subscription to this observable | |
*/ | |
public Observable<T> observeAndCache(Single<T> single) { | |
if (!isValid()) { | |
return single.doOnSuccess(this).toCompletable().andThen(this); | |
} | |
return this; | |
} | |
@Override | |
public void accept(T value) { | |
valueRef.set(new TimedValue<>(value, System.currentTimeMillis())); | |
relay.accept(value); | |
} | |
@Override | |
public boolean hasObservers() { | |
return relay.hasObservers(); | |
} | |
@Override | |
protected void subscribeActual(Observer<? super T> observer) { | |
relay.subscribeActual(observer); | |
// Only emit upon subscription if the value is still valid | |
if (isValid()) { | |
relay.accept(valueRef.get().value); | |
} | |
} | |
private boolean insideTimeWindow(long time) { | |
long now = System.currentTimeMillis(); | |
return now - time < unit.toMillis(timeout); | |
} | |
private static class TimedValue<T> { | |
private final T value; | |
private final long time; | |
TimedValue(T value, long time) { | |
this.value = value; | |
this.time = time; | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment