This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const syncScheduler: Scheduler = { | |
schedule(work) { | |
work(); | |
} | |
}; | |
const asyncScheduler: Scheduler = { | |
schedule(work) { | |
setTimeout(work, 0); | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
interface Scheduler { | |
schedule(work: Function): void; | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
outer | |
.pipe(concatMap(createInner)) | |
.subscribe(x => console.log("[concatMap] outer:", x), undefined, () => | |
console.log("[concatMap] done!") | |
); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
function concatMap<T, R>( | |
project: (value: T, index: number) => Observable<R> | |
): OperatorFunction<T, R> { | |
let currentIndex = 0; | |
const buffer: Array<Observable<R>> = []; | |
const subscribeTo = ( | |
projected: Observable<R>, | |
obs: Observer<R>, | |
subscriptions: Set<Subscription> |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
function sleep(ms: number) { | |
return new Promise(resolve => setTimeout(resolve, ms)); | |
} | |
function createInner(x: number) { | |
return new Observable(obs => { | |
(async () => { | |
for (let i = 0; i < 3; i++) { | |
if (i > 0) await sleep(500); | |
obs.next(10 * x); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
function mergeMap<T, R>( | |
project: (value: T, index: number) => Observable<R> | |
): OperatorFunction<T, R> { | |
let currentIndex = 0; | |
return source => | |
new Observable(obs => { | |
const subscriptions = new Set<Subscription>(); | |
const sub = source.subscribe( | |
x => { | |
const projected = project(x, currentIndex++); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
function computeSquaredSum(): OperatorFunction<number, number> { | |
return source => | |
source.pipe( | |
map(n => n * n), | |
reduce((s, n) => s + n, 0), | |
); | |
} | |
oneThroughTen | |
.pipe(computeSquaredSum()) | |
.subscribe(x => console.log("Squared sum w/ custom op =", x)); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const oneThroughTen = of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) | |
oneThroughTen | |
.pipe( | |
map(n => n * n), | |
reduce((s, n) => s + n, 0) | |
) | |
.subscribe(sqSum => console.log("Squared sum =", sqSum)); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
function fromEvent<T extends Event = Event>( | |
target: EventTarget, | |
eventName: string | |
): Observable<T> { | |
return new Observable(obs => { | |
const listener: EventListener = (evt: T) => { | |
obs.next(evt); | |
}; | |
target.addEventListener(eventName, listener); | |
return () => target.removeEventListener(eventName, listener); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
function from<T>(convertible: Promise<T> | ArrayLike<T>): Observable<T> { | |
if (typeof (convertible as any).length === "number") { | |
const arrayLike = convertible as ArrayLike<T>; | |
return new Observable(obs => { | |
for (let i = 0; i < arrayLike.length; i++) { | |
obs.next(arrayLike[i]); | |
} | |
obs.complete(); | |
}); | |
} |