Skip to content

Instantly share code, notes, and snippets.

@wesleytodd
Last active January 11, 2020 00:20
Show Gist options
  • Save wesleytodd/ad293325a80de4be6cc4dd804441a99f to your computer and use it in GitHub Desktop.
Save wesleytodd/ad293325a80de4be6cc4dd804441a99f to your computer and use it in GitHub Desktop.
'use strict'
const SUBS = Symbol('subs')
const DONE = Symbol('done')
module.exports = class Channel {
constructor () {
this[SUBS] = []
}
subscribe (evt) {
const sub = {
finished: false,
queue: [],
emitResolve: null,
nextResolve: null,
evt: evt,
[Symbol.asyncIterator]: () => {
return {
next: async () => {
if (sub.nextResolve) {
sub.nextResolve()
sub.nextResolve = null
}
if (sub.finished && !sub.queue.length) {
return { done: true }
}
if (sub.queue.length) {
const [evt, resolve] = sub.queue.shift()
// End was called when we still had a queue
if (evt === DONE) {
resolve()
return { done: true }
}
sub.nextResolve = resolve
return {
done: false,
value: evt
}
}
return new Promise((resolve) => {
sub.emitResolve = resolve
})
},
return: () => {
this[SUBS] = this[SUBS].filter((s) => s !== sub)
// Resolve outstanding promises & Empty queue
if (sub.nextResolve) {
sub.nextResolve()
sub.nextResolve = null
}
let item = sub.queue.shift()
while (item) {
const [evt, resolve] = item
resolve()
item = sub.queue.shift()
}
return { done: true }
}
}
}
}
this[SUBS].push(sub)
return sub
}
emit (event, payload) {
const evt = [event, payload]
const pending = []
// Remove finished subs with empty queue
// if we called end when there was a queu
// there might be leftovers
this[SUBS] = this[SUBS].filter((s) => {
return !s.finished || s.queue.length !== 0
})
// Do we have waiting subscribers?
for (const sub of this[SUBS]) {
if (sub.evt !== event) continue
pending.push(new Promise((resolve) => {
if (sub.emitResolve) {
sub.emitResolve({
done: false,
value: evt
})
sub.emitResolve = null
sub.nextResolve = resolve
} else {
sub.queue.push([evt, resolve])
}
}))
}
if (pending.length) {
return Promise.all(pending)
}
return Promise.resolve()
}
end (event) {
const pending = []
for (const sub of this[SUBS]) {
if (event && sub.evt !== event) continue
sub.finished = true
if (sub.queue.length) {
pending.push(new Promise((resolve) => {
sub.queue.push([DONE, resolve])
}))
} else if (sub.emitResolve) {
sub.emitResolve({
done: true
})
sub.emitResolve = null
}
}
// Remove finished subs with empty queue
this[SUBS] = this[SUBS].filter((s) => {
return !s.finished || s.queue.length !== 0
})
if (pending.length) {
return Promise.all(pending)
}
return Promise.resolve()
}
}
'use strict'
const Channel = require('./channel')
const emitter = new Channel()
async function sleep () {
return new Promise((resolve) => {
setTimeout(resolve, 10000 * Math.random())
})
}
;(async () => {
for await (const [evt, payload] of emitter.subscribe('foo')) {
await sleep()
if (payload === 3) {
return console.log(`got the ${payload} I was loooking for!`)
}
console.log(`Sub1: ${payload}`)
}
})()
;(async () => {
for await (const [evt, payload] of emitter.subscribe('foo')) {
await sleep()
console.log(`Sub2: ${payload}`)
}
})()
;(async () => {
for await (const [evt, payload] of emitter.subscribe('bar')) {
await sleep()
console.log(`Sub3: ${payload}`)
}
})()
;(async () => {
await emitter.emit('bar', 'bar')
await emitter.emit('foo', 1)
await Promise.all([
emitter.emit('foo', 2),
emitter.emit('foo', 3),
emitter.emit('foo', 4),
emitter.end('foo').then(() => { console.log('foo end') })
])
await emitter.emit('foo', 5),
await emitter.emit('bar', 'bar')
await emitter.end()
await emitter.emit('bar', 'FAIL')
console.log('The End!!')
})()
Sub3: bar
Sub1: 1
Sub2: 1
Sub1: 2
Sub2: 2
got the 3 I was loooking for!
Sub2: 3
Sub2: 4
foo end
Sub3: bar
The End!!
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment