Skip to content

Instantly share code, notes, and snippets.

@thcolin
Last active April 19, 2017 09:22
Show Gist options
  • Save thcolin/11e6077d308001d87310843abc7c83b1 to your computer and use it in GitHub Desktop.
Save thcolin/11e6077d308001d87310843abc7c83b1 to your computer and use it in GitHub Desktop.
RxJS 5 pausableBuffered
// Inspired from : [RxJS 5 pausableBuffered operation declaration](http://codepen.io/elhigu/pen/jqZmpV)
// Simplified (but loosing in readability) and improved by sending final buffer on source$.complete with flusher$.next(1)
import Rx from 'rxjs/Rx'
function pausableBuffered(pauser$) {
return Rx.Observable.create(subscriber$ => {
var source$ = this
var buffer$ = new Rx.Subject()
var flusher$ = new Rx.Subject()
var paused = false
// every flusher$ ping, send bufferized data to subscriber$
buffer$
.buffer(flusher$)
.concatAll()
.subscribe(v => subscriber$.next(v))
pauser$.subscribe(to => {
paused = to
if(!paused){
// flush buffer$ when RESUME
flusher$.next(1)
}
})
source$.subscribe(
// if paused, send data to buffer$ waiting to be flushed, else send data directly to subscriber$
v => paused ? buffer$.next(v) : subscriber$.next(v),
e => subscriber$.error(e),
() => {
// flush buffer$ when source$ is completed
flusher$.next(1)
subscriber$.complete()
}
)
})
}
Rx.Observable.prototype.pausableBuffered = pausableBuffered
var pauser = new Rx.Subject()
Rx.Observable
.interval(200)
.take(20) // 10 : end before RESUME - 20 : continue after RESUME
.pausableBuffered(pauser)
.subscribe(n => console.log('next', n), e => console.log('error', e), () => console.log('complete'))
setTimeout(() => {
console.log('PAUSE')
pauser.next(true)
}, 1000)
setTimeout(() => {
console.log('RESUME')
pauser.next(false)
}, 3000)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment