Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save xgrommx/a1ba5fb2475a2b3f901592964afe2408 to your computer and use it in GitHub Desktop.
Save xgrommx/a1ba5fb2475a2b3f901592964afe2408 to your computer and use it in GitHub Desktop.
RxJS 5 Operators By Example

#RxJS 5 Operators By Example A (soon to be) complete list of RxJS 5 operators with easy to understand explanations and runnable examples.

(I will be adding one operator per day until all operators are included.)

Table of Contents

buffer

#####signature: buffer<T>(closingNotifier: Observable<any>): Observable<T[]> The gist: Collect output values until something happens then hand them over. Repeat...

(demo | official docs)

//Create an observable that emits a value every second
const myInterval = Rx.Observable.interval(1000);
//Create an observable that emits every time document is clicked
const bufferBy = Rx.Observable.fromEvent(document, 'click');
/*
Collect all values emitted by our interval observable until we click document. This will cause the bufferBy Observable to emit a value, satisfying the buffer. Pass us all collected values since last buffer as an array.
*/
const myBufferedInterval = myInterval.buffer(bufferBy);
//Print values to console
//ex. output: [1,2,3] ... [4,5,6,7,8]
const subscribe = myBufferedInterval.subscribe(val => console.log(' Buffered Values:', val));

bufferCount

#####signature: bufferCount<T>(bufferSize: number, startBufferEvery: number = null): Observable<T[]> The gist: Collect output values until specified number is fulfilled then hand them over. Repeat...

(demo | official docs)

//Create an observable that emits a value every second
const myInterval = Rx.Observable.interval(1000);
//After three values are emitted, pass on as an array of buffered values
const bufferThree = myInterval.bufferCount(3);
//Print values to console
//ex. output [0,1,2]...[3,4,5]
const subscribe = bufferThree.subscribe(val => console.log('Buffered Values:', val));

/*
bufferCount also takes second argument, when to start the next buffer
for instance, if we have a bufferCount of 3 but second argument (startBufferEvery) of 1:
1st interval value:
buffer 1: [0]
2nd interval value:
buffer 1: [0,1]
buffer 2: [1]
3rd interval value:
buffer 1: [0,1,2] Buffer of 3, emit buffer
buffer 2: [1,2]
buffer 3: [2]
4th interval value:
buffer 2: [1,2,3] Buffer of 3, emit buffer
buffer 3: [2, 3]
buffer 4: [3]
*/
const bufferEveryOne = myInterval.bufferCount(3,1);
//Print values to console
const secondSubscribe = bufferEveryOne.subscribe(val => console.log('Start Buffer Every 1:', val))

bufferTime

#####signature: bufferTime(bufferTimeSpan: number, bufferCreationInterval: number, scheduler: Scheduler): Observable<T[]> The gist: Collect output values until specified time has passed then hand them over. Repeat...

(demo | official docs)

//Create an observable that emits a value every 500ms
const myInterval = Rx.Observable.interval(500);
//After 2 seconds have passed, emit buffered values as an array
const bufferTime = myInterval.bufferTime(2000);
//Print values to console
//ex. output [0,1,2]...[3,4,5,6]
const subscribe = bufferTime.subscribe(val => console.log('Buffered with Time:', val));

/*
bufferTime also takes second argument, when to start the next buffer (time in ms)
for instance, if we have a bufferTime of 2 seconds but second argument (bufferCreationInterval) of 1 second:
ex. output: [0,1,2]...[1,2,3,4,5]...[3,4,5,6,7]
*/
const bufferTimeTwo = myInterval.bufferTime(2000,1000);
//Print values to console
const secondSubscribe = bufferTimeTwo.subscribe(val => console.log('Start Buffer Every 1s:', val));

bufferToggle

#####signature: bufferToggle(openings: Observable<O>, closingSelector: Function): Observable<T[]> The gist: Toggle buffer on to catch emitted values from source, toggle buffer off to emit buffered values

(demo | official docs)

//emit value every second
const sourceInterval = Rx.Observable.interval(1000);
//start first buffer after 5s, and every 5s after
const startInterval = Rx.Observable.interval(5000);
//emit value after 3s, closing corresponding buffer
const closingInterval = val => {
	console.log(`Value ${val} emitted, starting buffer! Closing in 3s!`)
	return Rx.Observable.interval(3000);
}
//every 5s a new buffer will start, collecting emitted values for 3s then emitting buffered values
const bufferToggleInterval = sourceInterval.bufferToggle(startInterval, closingInterval);
//log to console
//ex. emitted buffers [4,5,6]...[9,10,11]
const subscribe = bufferToggleInterval.subscribe(val => console.log('Emitted Buffer:', val));

bufferWhen

#####signature: bufferWhen(closingSelector: function): Observable<T[]> The gist: Buffer all values until closing selector emits, emit buffered values, repeat...

(demo | official docs)

//emit value every 1 second
const oneSecondInterval = Rx.Observable.interval(1000);
//return an observable that emits value every 5 seconds
const fiveSecondInterval = () => Rx.Observable.interval(5000);
//every five seconds, emit buffered values
const bufferWhenExample = oneSecondInterval.bufferWhen(fiveSecondInterval);
//log values
//ex. output: [0,1,2,3]...[4,5,6,7,8]
const subscribe = bufferWhenExample.subscribe(val => console.log('Emitted Buffer: ', val));
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment