Created
September 28, 2017 21:16
-
-
Save joaocsousa/4d6929f8f7af5a7af44cba8581675450 to your computer and use it in GitHub Desktop.
CacheObservable that allows you to cache an emission for a certain period of time
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.util.concurrent.TimeUnit; | |
import io.reactivex.Observable; | |
import io.reactivex.ObservableSource; | |
import io.reactivex.ObservableTransformer; | |
import io.reactivex.Observer; | |
import io.reactivex.functions.Consumer; | |
public static class CacheObservable<T> implements ObservableTransformer<T, T> { | |
private final long timeout; | |
private final TimeUnit unit; | |
private CacheObservable(long timeout, TimeUnit unit) { | |
this.timeout = timeout; | |
this.unit = unit; | |
} | |
public static <T> CacheObservable<T> cache(long timeout, TimeUnit unit) { | |
return new CacheObservable<T>(timeout, unit); | |
} | |
@Override | |
public ObservableSource<T> apply(Observable<T> upstream) { | |
LastSeen<T> lastSeen = new LastSeen<>(timeout, unit); | |
return new LastSeenObservable<>(upstream.doOnNext(lastSeen), lastSeen); | |
} | |
} | |
static final class LastSeen<T> implements Consumer<T> { | |
private final long timeout; | |
private final TimeUnit unit; | |
private long lastEmissionTimestamp; | |
private volatile T value; | |
LastSeen(long timeout, TimeUnit unit) { | |
this.timeout = timeout; | |
this.unit = unit; | |
} | |
@Override | |
public void accept(T latest) { | |
lastEmissionTimestamp = System.currentTimeMillis(); | |
value = latest; | |
} | |
boolean isValid() { | |
return value != null && System.currentTimeMillis() - lastEmissionTimestamp <= unit.toMillis(timeout); | |
} | |
} | |
static final class LastSeenObservable<T> extends Observable<T> { | |
private final Observable<T> upstream; | |
private final LastSeen<T> lastSeen; | |
LastSeenObservable(Observable<T> upstream, LastSeen<T> lastSeen) { | |
this.upstream = upstream; | |
this.lastSeen = lastSeen; | |
} | |
@Override | |
protected void subscribeActual(Observer<? super T> observer) { | |
if (lastSeen.isValid()) { | |
observer.onNext(lastSeen.value); | |
} else { | |
upstream.subscribe(observer); | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Observable<Integer> observable = Observable.fromCallable(new Callable<Integer>() { | |
@Override | |
public Integer call() throws Exception { | |
System.out.println("Creating observable"); | |
return 156; | |
} | |
}).compose(CacheObservable.<Integer>cache(5, TimeUnit.SECONDS)); | |
--------------subscribed multiple times-------------- | |
observable.subscribe(new Consumer<Integer>() { | |
@Override | |
public void accept(Integer integer) throws Exception { | |
System.out.println("New Value: " + integer); | |
} | |
}); | |
--------------output-------------- | |
// observable only created after timeout | |
00:00:28.015 Creating observable | |
00:00:28.015 New Value: 156 | |
00:00:30.655 New Value: 156 | |
00:00:32.335 New Value: 156 | |
00:00:34.455 Creating observable | |
00:00:34.455 New Value: 156 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment