Skip to content

Instantly share code, notes, and snippets.

@robwormald
Last active December 7, 2016 03:49
Show Gist options
  • Save robwormald/19dea0c70a6e01aadced6731aed4f9f7 to your computer and use it in GitHub Desktop.
Save robwormald/19dea0c70a6e01aadced6731aed4f9f7 to your computer and use it in GitHub Desktop.

RxJS Cache Operator

RxJS RC0 removed the cache operator - see ReactiveX/rxjs#2012 for more detail

This gist includes the removed operator in case you are using it in existing app. We recommend migrating off of this operator in favor of a supported one, but you may use this one temporarily.

If you're using Typescript, simply include the .ts file in the gist somewhere in your project, and import the file somewhere in your app.

If you're using an ES5 build of Angular, simply require or include the cache-operator.umd.js file included here.

//somewhere in app
import 'path/to/cache-operator.ts'
import { Observable } from 'rxjs/Observable';
import { Scheduler } from 'rxjs/Scheduler';
/**
* @param bufferSize
* @param windowTime
* @param scheduler
* @return {Observable<any>}
* @method cache
* @owner Observable
*/
export declare function cache<T>(bufferSize?: number, windowTime?: number, scheduler?: Scheduler): Observable<T>;
export interface CacheSignature<T> {
(bufferSize?: number, windowTime?: number, scheduler?: Scheduler): Observable<T>;
}
declare module 'rxjs/Observable' {
interface Observable<T> {
cache: CacheSignature<T>;
}
}
import { Observable } from 'rxjs/Observable';
import { ReplaySubject } from 'rxjs/ReplaySubject';
/**
* @param bufferSize
* @param windowTime
* @param scheduler
* @return {Observable<any>}
* @method cache
* @owner Observable
*/
export function cache(bufferSize, windowTime, scheduler) {
if (bufferSize === void 0) { bufferSize = Number.POSITIVE_INFINITY; }
if (windowTime === void 0) { windowTime = Number.POSITIVE_INFINITY; }
var subject;
var source = this;
var refs = 0;
var outerSub;
var getSubject = function () {
subject = new ReplaySubject(bufferSize, windowTime, scheduler);
return subject;
};
return new Observable(function (observer) {
if (!subject) {
subject = getSubject();
outerSub = source.subscribe(function (value) { return subject.next(value); }, function (err) {
var s = subject;
subject = null;
s.error(err);
}, function () { return subject.complete(); });
}
refs++;
if (!subject) {
subject = getSubject();
}
var innerSub = subject.subscribe(observer);
return function () {
refs--;
if (innerSub) {
innerSub.unsubscribe();
}
if (refs === 0) {
outerSub.unsubscribe();
}
};
});
}
Observable.prototype.cache = cache;
import { Observable } from 'rxjs/Observable';
import { Scheduler } from 'rxjs/Scheduler';
import { ReplaySubject } from 'rxjs/ReplaySubject';
import { Observer } from 'rxjs/Observer';
import { Subscription } from 'rxjs/Subscription';
/**
* @param bufferSize
* @param windowTime
* @param scheduler
* @return {Observable<any>}
* @method cache
* @owner Observable
*/
export function cache<T>(bufferSize: number = Number.POSITIVE_INFINITY,
windowTime: number = Number.POSITIVE_INFINITY,
scheduler?: Scheduler): Observable<T> {
let subject: ReplaySubject<T>;
let source = this;
let refs = 0;
let outerSub: Subscription;
const getSubject = () => {
subject = new ReplaySubject<T>(bufferSize, windowTime, scheduler);
return subject;
};
return new Observable<T>((observer: Observer<T>) => {
if (!subject) {
subject = getSubject();
outerSub = source.subscribe(
(value: T) => subject.next(value),
(err: any) => {
let s = subject;
subject = null;
s.error(err);
},
() => subject.complete()
);
}
refs++;
if (!subject) {
subject = getSubject();
}
let innerSub = subject.subscribe(observer);
return () => {
refs--;
if (innerSub) {
innerSub.unsubscribe();
}
if (refs === 0) {
outerSub.unsubscribe();
}
};
});
}
export interface CacheSignature<T> {
(bufferSize?: number, windowTime?: number, scheduler?: Scheduler): Observable<T>;
}
Observable.prototype.cache = cache;
declare module 'rxjs/Observable' {
interface Observable<T> {
cache: CacheSignature<T>;
}
}
(function (global, factory) {
typeof exports === 'object' && typeof module !== 'undefined' ? factory(exports, require('rxjs/Observable'), require('rxjs/ReplaySubject')) :
typeof define === 'function' && define.amd ? define(['exports', 'rxjs/Observable', 'rxjs/ReplaySubject'], factory) :
(factory((global.cacheOperator = global.cacheOperator || {}),global.Rx,global.Rx));
}(this, (function (exports,rxjs_Observable,rxjs_ReplaySubject) { 'use strict';
/**
* @param bufferSize
* @param windowTime
* @param scheduler
* @return {Observable<any>}
* @method cache
* @owner Observable
*/
function cache(bufferSize, windowTime, scheduler) {
if (bufferSize === void 0) { bufferSize = Number.POSITIVE_INFINITY; }
if (windowTime === void 0) { windowTime = Number.POSITIVE_INFINITY; }
var subject;
var source = this;
var refs = 0;
var outerSub;
var getSubject = function () {
subject = new rxjs_ReplaySubject.ReplaySubject(bufferSize, windowTime, scheduler);
return subject;
};
return new rxjs_Observable.Observable(function (observer) {
if (!subject) {
subject = getSubject();
outerSub = source.subscribe(function (value) { return subject.next(value); }, function (err) {
var s = subject;
subject = null;
s.error(err);
}, function () { return subject.complete(); });
}
refs++;
if (!subject) {
subject = getSubject();
}
var innerSub = subject.subscribe(observer);
return function () {
refs--;
if (innerSub) {
innerSub.unsubscribe();
}
if (refs === 0) {
outerSub.unsubscribe();
}
};
});
}
rxjs_Observable.Observable.prototype.cache = cache;
exports.cache = cache;
Object.defineProperty(exports, '__esModule', { value: true });
})));
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment