Last active
September 2, 2019 20:33
-
-
Save michaelsbradleyjr/5abf097e9c2b99d101003d6c2cafb1ed to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* global require setImmediate setTimeout */ | |
const {EventEmitter} = require('events'); | |
const {inspect} = require('util'); | |
const {fromEvent: rxFromEvent, interval: rxInterval} = require('rxjs'); | |
const {debounce: rxDebounce, filter: rxFilter} = require('rxjs/operators'); | |
const { | |
AsyncIterableX: {from: ixFrom}, | |
batch: ixBatch | |
} = require('ix/asynciterable'); | |
const { | |
debounce: ixDebounce, | |
map: ixMap, | |
tap: ixTap | |
} = require('ix/asynciterable/pipe/'); | |
// ----------------------------------------------------------------------------- | |
const emitter = new EventEmitter(); | |
/* derive Rx observable from emitter */ | |
const foo$ = rxFromEvent(emitter, 'foo'); | |
const timer = ms => new Promise(resolve => setTimeout(resolve, ms)); | |
ixBatch( | |
ixFrom(foo$.pipe( | |
// rxFilter(v => v % 2), | |
rxDebounce(() => rxInterval(50)) | |
)).pipe( | |
// ixDebounce(50), | |
ixTap(v => { | |
console.log('from:', inspect(v)); | |
}) | |
) | |
).forEach(async v => { | |
console.log('batched:', inspect(v)); | |
// the timer here represents something async like compiling | |
await timer(5000); | |
}); | |
(async () => { | |
let x = 0; | |
setImmediate(() => emitter.emit('foo', ++x)); | |
setImmediate(() => emitter.emit('foo', ++x)); | |
setImmediate(() => emitter.emit('foo', ++x)); | |
await timer(1000); | |
setImmediate(() => emitter.emit('foo', ++x)); | |
setImmediate(() => emitter.emit('foo', ++x)); | |
await timer(2000); | |
setImmediate(() => emitter.emit('foo', ++x)); | |
setImmediate(() => emitter.emit('foo', ++x)); | |
setImmediate(() => emitter.emit('foo', ++x)); | |
await timer(1000); | |
setImmediate(() => emitter.emit('foo', ++x)); | |
await timer(2000); | |
setImmediate(() => emitter.emit('foo', ++x)); | |
setImmediate(() => emitter.emit('foo', ++x)); | |
setImmediate(() => emitter.emit('foo', ++x)); | |
})(); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment