Last active
January 11, 2020 00:20
-
-
Save wesleytodd/ad293325a80de4be6cc4dd804441a99f to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
'use strict' | |
const SUBS = Symbol('subs') | |
const DONE = Symbol('done') | |
module.exports = class Channel { | |
constructor () { | |
this[SUBS] = [] | |
} | |
subscribe (evt) { | |
const sub = { | |
finished: false, | |
queue: [], | |
emitResolve: null, | |
nextResolve: null, | |
evt: evt, | |
[Symbol.asyncIterator]: () => { | |
return { | |
next: async () => { | |
if (sub.nextResolve) { | |
sub.nextResolve() | |
sub.nextResolve = null | |
} | |
if (sub.finished && !sub.queue.length) { | |
return { done: true } | |
} | |
if (sub.queue.length) { | |
const [evt, resolve] = sub.queue.shift() | |
// End was called when we still had a queue | |
if (evt === DONE) { | |
resolve() | |
return { done: true } | |
} | |
sub.nextResolve = resolve | |
return { | |
done: false, | |
value: evt | |
} | |
} | |
return new Promise((resolve) => { | |
sub.emitResolve = resolve | |
}) | |
}, | |
return: () => { | |
this[SUBS] = this[SUBS].filter((s) => s !== sub) | |
// Resolve outstanding promises & Empty queue | |
if (sub.nextResolve) { | |
sub.nextResolve() | |
sub.nextResolve = null | |
} | |
let item = sub.queue.shift() | |
while (item) { | |
const [evt, resolve] = item | |
resolve() | |
item = sub.queue.shift() | |
} | |
return { done: true } | |
} | |
} | |
} | |
} | |
this[SUBS].push(sub) | |
return sub | |
} | |
emit (event, payload) { | |
const evt = [event, payload] | |
const pending = [] | |
// Remove finished subs with empty queue | |
// if we called end when there was a queu | |
// there might be leftovers | |
this[SUBS] = this[SUBS].filter((s) => { | |
return !s.finished || s.queue.length !== 0 | |
}) | |
// Do we have waiting subscribers? | |
for (const sub of this[SUBS]) { | |
if (sub.evt !== event) continue | |
pending.push(new Promise((resolve) => { | |
if (sub.emitResolve) { | |
sub.emitResolve({ | |
done: false, | |
value: evt | |
}) | |
sub.emitResolve = null | |
sub.nextResolve = resolve | |
} else { | |
sub.queue.push([evt, resolve]) | |
} | |
})) | |
} | |
if (pending.length) { | |
return Promise.all(pending) | |
} | |
return Promise.resolve() | |
} | |
end (event) { | |
const pending = [] | |
for (const sub of this[SUBS]) { | |
if (event && sub.evt !== event) continue | |
sub.finished = true | |
if (sub.queue.length) { | |
pending.push(new Promise((resolve) => { | |
sub.queue.push([DONE, resolve]) | |
})) | |
} else if (sub.emitResolve) { | |
sub.emitResolve({ | |
done: true | |
}) | |
sub.emitResolve = null | |
} | |
} | |
// Remove finished subs with empty queue | |
this[SUBS] = this[SUBS].filter((s) => { | |
return !s.finished || s.queue.length !== 0 | |
}) | |
if (pending.length) { | |
return Promise.all(pending) | |
} | |
return Promise.resolve() | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
'use strict' | |
const Channel = require('./channel') | |
const emitter = new Channel() | |
async function sleep () { | |
return new Promise((resolve) => { | |
setTimeout(resolve, 10000 * Math.random()) | |
}) | |
} | |
;(async () => { | |
for await (const [evt, payload] of emitter.subscribe('foo')) { | |
await sleep() | |
if (payload === 3) { | |
return console.log(`got the ${payload} I was loooking for!`) | |
} | |
console.log(`Sub1: ${payload}`) | |
} | |
})() | |
;(async () => { | |
for await (const [evt, payload] of emitter.subscribe('foo')) { | |
await sleep() | |
console.log(`Sub2: ${payload}`) | |
} | |
})() | |
;(async () => { | |
for await (const [evt, payload] of emitter.subscribe('bar')) { | |
await sleep() | |
console.log(`Sub3: ${payload}`) | |
} | |
})() | |
;(async () => { | |
await emitter.emit('bar', 'bar') | |
await emitter.emit('foo', 1) | |
await Promise.all([ | |
emitter.emit('foo', 2), | |
emitter.emit('foo', 3), | |
emitter.emit('foo', 4), | |
emitter.end('foo').then(() => { console.log('foo end') }) | |
]) | |
await emitter.emit('foo', 5), | |
await emitter.emit('bar', 'bar') | |
await emitter.end() | |
await emitter.emit('bar', 'FAIL') | |
console.log('The End!!') | |
})() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sub3: bar | |
Sub1: 1 | |
Sub2: 1 | |
Sub1: 2 | |
Sub2: 2 | |
got the 3 I was loooking for! | |
Sub2: 3 | |
Sub2: 4 | |
foo end | |
Sub3: bar | |
The End!! |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment