Last active
March 26, 2018 07:52
-
-
Save evxn/6b9e292a7755219ff4d10b0eb43f6aeb to your computer and use it in GitHub Desktop.
Add Observable-like entities to a queue (Promises, Observables, Subjects, Arrays, generators, Iterables). After each stream is completed (i.e. Promise resolved) it's elements are emitted to the result stream in the order of addition (first in first out). Each subscriber receives result only for items added to queue after the subscription.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import {Subject} from 'rxjs/Subject'; | |
import {BehaviorSubject} from 'rxjs/BehaviorSubject'; | |
import {Observable, ObservableInput} from 'rxjs/Observable'; | |
import {concatMap, switchMap} from 'rxjs/operators'; | |
enum QueueActions { | |
RESET = 'RESET', | |
ADD = 'ADD', | |
} | |
interface QueueCommand { | |
action: QueueActions; | |
payload?: any; | |
} | |
export class ObservableQueue<T> { | |
private _commands: Subject<QueueCommand> = new Subject<QueueCommand>(); | |
private _queues: BehaviorSubject<Subject<ObservableInput<T>>> = new BehaviorSubject<Subject<ObservableInput<T>>>(new Subject<ObservableInput<T>>()); | |
constructor() { | |
this._commands.asObservable() | |
.subscribe((command) => { | |
switch (command && command.action) { | |
case QueueActions.RESET: | |
this._queues.next(new Subject<ObservableInput<T>>()); | |
break; | |
case QueueActions.ADD: | |
this._queues.value.next(command.payload); | |
break; | |
default: | |
this._queues.value.error(`[ObservableQueue] unknown command: ${JSON.stringify(command)}`); | |
} | |
}); | |
} | |
add(item: ObservableInput<T>): this { | |
this._commands.next({action: QueueActions.ADD, payload: item}); | |
return this; | |
} | |
reset(): this { | |
this._commands.next({action: QueueActions.RESET}); | |
return this; | |
} | |
asObservable(): Observable<T> { | |
return this._queues.asObservable().pipe( | |
switchMap(queue => | |
queue.pipe( | |
concatMap(item => item) | |
) | |
) | |
); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Example 1: Basic usage | |
import {ObservableQueue} from './observable-queue' | |
// function producing promises | |
function delay(ms) { | |
return new Promise((resolve, reject) => { | |
console.log('sending ' + ms); | |
setTimeout(() => { | |
console.log('resolving ' + ms); | |
resolve(ms); | |
}, ms); | |
}); | |
} | |
const queue = new ObservableQueue(); | |
queue.asObservable().subscribe(res => console.log('result: ' + res)); | |
queue.add(delay(100)) // supports fluent style calls | |
.add(delay(3000)) | |
.add(delay(1000)) | |
.add([42,43]); | |
setTimeout(()=>queue.add(delay(111)),5000) | |
// Output: | |
// "sending 100" | |
// "sending 3000" | |
// "sending 1000" | |
// "resolving 100" | |
// "result: 100" | |
// "resolving 1000" | |
// "resolving 3000" | |
// "result: 3000" | |
// "result: 1000" | |
// "result: 42" | |
// "result: 43" | |
// "sending 111" | |
// "resolving 111" | |
// "result: 111" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Example 2: Multiple subscriptions | |
import {ObservableQueue} from './observable-queue' | |
// function producing promises | |
function delay(ms) { | |
return new Promise((resolve, reject) => { | |
console.log('sending ' + ms); | |
setTimeout(() => { | |
console.log('resolving ' + ms); | |
resolve(ms); | |
}, ms); | |
}); | |
} | |
const queue = new ObservableQueue(); | |
queue.asObservable().subscribe(res => console.log('result: ' + res)); | |
queue | |
.add(delay(100)) // supports fluent style calls | |
.reset() | |
.add(delay(3000)); | |
queue.asObservable().subscribe(res => console.log('result2: ' + res)); | |
queue | |
.add(delay(1000)) | |
.add([42,43]); | |
setTimeout(()=>queue.add(delay(333)),5000); | |
setTimeout(()=>queue.add(delay(222)),5000); | |
setTimeout(()=>queue.asObservable().subscribe(res => console.log('result3: ' + res)),3000) | |
// Output | |
// sending 100 | |
// sending 3000 | |
// sending 1000 | |
// resolving 100 | |
// resolving 1000 | |
// result2: 1000 | |
// result2: 42 | |
// result2: 43 | |
// resolving 3000 | |
// result: 3000 | |
// result: 1000 | |
// result: 42 | |
// result: 43 | |
// sending 333 | |
// sending 222 | |
// resolving 222 | |
// resolving 333 | |
// result: 333 | |
// result2: 333 | |
// result3: 333 | |
// result: 222 | |
// result2: 222 | |
// result3: 222 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment