Skip to content

Instantly share code, notes, and snippets.

View cartant's full-sized avatar

Nicholas Jamieson cartant

View GitHub Profile
function takeWhileInclusive<T>(
predicate: (value: T) => boolean
): (source: Observable<T>) => Observable<T> {
return source => source.multicast(
() => new ReplaySubject<T>(1),
shared => shared
.takeWhile(predicate)
.concat(shared.take(1).filter(t => !predicate(t)))
);
import { Observable } from "rxjs/Observable";
import { Subject } from "rxjs/Subject";
import "rxjs/add/observable/defer";
import "rxjs/add/observable/of";
const source = Observable.defer(() => Observable.of(
Math.floor(Math.random() * 100)
));
function observer(name: string) {
function multicast<T>(source: Observable<T>) {
const subject = new Subject<T>();
source.subscribe(subject);
return subject;
}
const m = multicast(source);
m.subscribe(observer("a"));
m.subscribe(observer("b"));
import { Observable } from "rxjs/Observable";
import { Subject } from "rxjs/Subject";
import "rxjs/add/observable/defer";
import "rxjs/add/observable/of";
import "rxjs/add/operator/multicast";
const source = Observable.defer(() => Observable.of(
Math.floor(Math.random() * 100)
));
const m = source.multicast(new Subject<number>()).refCount();
m.subscribe(observer("a"));
m.subscribe(observer("b"));
const m = source.multicast(() => new Subject<number>()).refCount();
m.subscribe(observer("a"));
m.subscribe(observer("b"));
import { Observable } from "rxjs/Observable";
import { Subject } from "rxjs/Subject";
import "rxjs/add/observable/defer";
import "rxjs/add/observable/of";
import "rxjs/add/operator/delay";
import "rxjs/add/operator/multicast";
const source = Observable.defer(() => Observable.of(
Math.floor(Math.random() * 100)
)).delay(0);
import { Observable } from "rxjs/Observable";
import "rxjs/add/observable/concat";
import "rxjs/add/observable/defer";
import "rxjs/add/observable/of";
import "rxjs/add/operator/delay";
import "rxjs/add/operator/publish";
function random() {
return Math.floor(Math.random() * 100);
}
const p = source.publishLast();
p.subscribe(observer("a"));
p.connect();
p.subscribe(observer("b"));
setTimeout(() => p.subscribe(observer("c")), 10);
const p = source.publishReplay(1);
p.subscribe(observer("a"));
p.connect();
p.subscribe(observer("b"));
setTimeout(() => p.subscribe(observer("c")), 10);