|
'use strict'; |
|
|
|
let EventSocket = {}; |
|
|
|
/** |
|
* @param {Object} eventTypes |
|
* @param {Array<String>} eventTypes.explicit - events listed here are excluded from '*' (catchall) |
|
*/ |
|
EventSocket.create = function (eventTypes) { |
|
let stream = {}; |
|
|
|
stream._explicitEvents = eventTypes.explicit; |
|
|
|
/** @type {Array<any>} */ |
|
stream._connections = []; |
|
|
|
/** |
|
* @param {Array<String>} events - ex: ['*', 'error'] for default events, or list by name |
|
*/ |
|
stream.listen = function (events = null) { |
|
let conn = EventSocket.createConnection(stream, events); |
|
return conn; |
|
}; |
|
|
|
stream.resolveAll = function (eventname, msg) { |
|
for (let p of stream._connections) { |
|
let isSubscribed = p._events.includes(eventname); |
|
if (isSubscribed) { |
|
p._resolve(msg); |
|
continue; |
|
} |
|
|
|
let isExplicit = stream._explicitEvents.includes(eventname); |
|
if (isExplicit) { |
|
continue; |
|
} |
|
|
|
let hasCatchall = p._events.includes('*'); |
|
if (hasCatchall) { |
|
p._resolve(msg); |
|
} |
|
} |
|
}; |
|
|
|
stream.rejectAll = function (err) { |
|
let handled = false; |
|
for (let p of stream._connections) { |
|
let handlesErrors = p._events.includes('error'); |
|
if (!handlesErrors) { |
|
continue; |
|
} |
|
|
|
handled = true; |
|
p._reject(err); |
|
} |
|
if (!handled) { |
|
for (let p of stream._connections) { |
|
p._reject(err); |
|
} |
|
} |
|
}; |
|
|
|
return stream; |
|
}; |
|
|
|
EventSocket.createConnection = function (stream, defaultEvents = null) { |
|
let p = {}; |
|
stream._connections.push(p); |
|
|
|
p._events = defaultEvents; |
|
|
|
p.closed = false; |
|
p._settled = false; |
|
p._resolve = function (msg) {}; |
|
p._reject = function (err) {}; |
|
p._promise = Promise.resolve(null); |
|
p._next = async function () { |
|
p._settled = false; |
|
p._promise = new Promise(function (_resolve, _reject) { |
|
p._resolve = function (msg) { |
|
p._close(true); |
|
_resolve(msg); |
|
}; |
|
p._reject = function (err) { |
|
p._close(true); |
|
_reject(err); |
|
}; |
|
}); |
|
|
|
return await p._promise; |
|
}; |
|
|
|
/** |
|
* Accepts the next message of the given event name, |
|
* or of any of the default event names. |
|
* @param {String} eventname - '*' for default events, 'error' for error, or others by name |
|
*/ |
|
p.accept = async function (eventname) { |
|
if (p.closed) { |
|
let err = new Error('cannot accept new events after close'); |
|
Object.assign(err, { code: 'E_ALREADY_CLOSED' }); |
|
throw err; |
|
} |
|
|
|
if (eventname) { |
|
p.events = [eventname]; |
|
} else if (defaultEvents?.length) { |
|
p.events = defaultEvents; |
|
} else { |
|
let err = new Error( |
|
`call stream.listen(['*']) or conn.accept('*') for default events`, |
|
); |
|
Object.assign(err, { code: 'E_NO_EVENTS' }); |
|
throw err; |
|
} |
|
|
|
return await p._next(); |
|
}; |
|
|
|
p._close = function (_settle) { |
|
if (p.closed) { |
|
return; |
|
} |
|
p.closed = true; |
|
|
|
let index = stream._connections.indexOf(p); |
|
if (index >= 0) { |
|
void stream._connections.splice(index, 1); |
|
} |
|
if (_settle) { |
|
p._settled = true; |
|
} |
|
if (p._settled) { |
|
return; |
|
} |
|
|
|
p._settled = true; |
|
let err = new Error('promise stream closed'); |
|
Object.assign(err, { code: 'E_CLOSE' }); |
|
p._reject(err); |
|
}; |
|
|
|
/** |
|
* Causes `let msg = conn.accept()` to fail with E_CLOSE or E_ALREADY_CLOSED |
|
*/ |
|
p.close = function () { |
|
p._close(false); |
|
}; |
|
|
|
return p; |
|
}; |
In the initial version, this may not work as expected:
If an event comes before the next await accept(), that event will not be available.
Missed events were queued in an earlier draft, but was removed intentionally.
There's a simple solution but, of course, it comes with a caveat - it will leak memory unless 'close()' is called.
A best-set-of-trade-offs approach would be to have a buffered mode, with a configurable max number of messages to buffer.
Buffered,
listen()
ison()
andclose()
isoff()
.Unbuffered,
accept()
isonce()
.