Last active
March 22, 2021 04:23
-
-
Save trajakovic/3b0239cae11e23c76b80 to your computer and use it in GitHub Desktop.
RxJs extension implementation for cache results with time expiration
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
var slowJob = Rx.Observable.defer(function () { | |
return Rx.Observable.return(Math.random() * 1000).delay(2000); | |
}); | |
var cached = slowJob.cacheWithExpiration(5000); | |
var last = Date.now(); | |
function repeat() { | |
last = Date.now(); | |
cached.subscribe(function (data) { | |
console.log("number:", data, 'took:', Date.now() - last, '[ms]'); | |
setTimeout(repeat, 1000); | |
}); | |
} | |
setTimeout(repeat, 1000); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Rx.Observable.prototype.cacheWithExpiration = function (expirationMs, scheduler) { | |
var source = this; | |
var cachedData = null; | |
// Use timeout scheduler if scheduler not supplied | |
scheduler = scheduler || Rx.Scheduler.timeout; | |
return Rx.Observable.createWithDisposable(function (observer) { | |
if (!cachedData) { | |
// The data is not cached. | |
// create a subject to hold the result | |
cachedData = new Rx.AsyncSubject(); | |
// subscribe to the query | |
source.subscribe(cachedData); | |
// when the query completes, start a timer which will expire the cache | |
cachedData.subscribe(function () { | |
scheduler.scheduleWithRelative(expirationMs, function () { | |
// clear the cache | |
cachedData = null; | |
}); | |
}); | |
} | |
// subscribe the observer to the cached data | |
return cachedData.subscribe(observer); | |
}); | |
}; |
and in case anyone needs it compile correctly in v6, then try this one:
import { asyncScheduler, AsyncSubject, Observable, SchedulerLike } from 'rxjs';
export const fromCacheWithExperation = <T>(expirationMs: number, scheduler: SchedulerLike = asyncScheduler) => (
source: Observable<T>,
) => {
let cached: AsyncSubject<T> | null;
return new Observable<T>((observer) => {
if (!cached) {
cached = new AsyncSubject();
source.subscribe(cached);
cached.subscribe(() => {
scheduler.schedule(() => {
cached = null;
}, expirationMs);
});
}
return cached.subscribe(observer);
});
};
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Here is a working typescript version, in case anyone needs it: