Created
January 16, 2022 08:47
-
-
Save CMCDragonkai/983bce4816977607e822f63fc015108f to your computer and use it in GitHub Desktop.
EventBus with `emitAsync` #javascript #eventemitter
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { EventEmitter } from 'events'; | |
class EventBus extends EventEmitter { | |
protected kCapture: symbol; | |
constructor (...args: ConstructorParameters<typeof EventEmitter>) { | |
super(...args); | |
// EventEmitter's captureRejections option is only accessible through a private symbol | |
// Here we augment the construction and save it as a property | |
const symbols = Object.getOwnPropertySymbols(this); | |
this.kCapture = symbols[0]; | |
} | |
public async emitAsync(event: string | symbol, ...args: Array<any>): Promise<boolean> { | |
const listeners = this.rawListeners(event); | |
if (listeners.length < 1) { | |
return false; | |
} | |
let result; | |
try { | |
for (const listener of listeners) { | |
result = listener.apply(this, args); | |
await result; | |
} | |
} catch (e) { | |
if (!this[this.kCapture]) { | |
throw e; | |
} | |
// The capture rejections mechanism only applies to promises | |
if (!(result instanceof Promise)) { | |
throw e; | |
} | |
// Queues error handling asynchronously to avoid bubbling up rejections | |
// This matches the behaviour of EventEmitter which uses `process.nextTick` | |
queueMicrotask(() => { | |
if (typeof this[EventEmitter.captureRejectionSymbol] === 'function') { | |
this[EventEmitter.captureRejectionSymbol](e, event, ...args); | |
} else { | |
// Disable the capture rejections mechanism to avoid infinite loop | |
const prev = this[this.kCapture]; | |
// If the error handler throws, it results in `uncaughtException` | |
try { | |
this[this.kCapture] = false; | |
this.emit('error', e); | |
} finally { | |
this[this.kCapture] = prev; | |
} | |
} | |
}); | |
} | |
return true; | |
} | |
} | |
export default EventBus; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
It turns out that the behaviour of error handling is quite complex. Some notes:
emit
synchronously iterates over all the handlers and executes them. This means any synchronous exceptions will interrupt the loop andemit
will bubble up a synchronous exception. Furthermore it also runs in sequence of the registration. Which means it's not really parallel.setImmediate
wrapper around the handlers, and this would makeemit
a sort of "synchronously asynchronous". This would make event emitter much closer to pub-sub, where errors don't interrupt the loop of running each handler.captureRejections
mechanism ONLY applies to promises. Which means if errors are emitted synchronously, they are always just going to bubble up theemit
. It only works if the handlers are returning promises. And if it is true, then promise rejections will instead be handled by the error handler.emitAsync
, I decided to create a sort of "asynchronously synchronous" system. It follows theemit
logic, in that it loops over the handlers, and executes them one at a time. It also awaits for each handler, which means it does not execute all handlers concurrently. Just likeemit
, any synchronous exception will be thrown and bubble up, however asynchronous exceptions will now be handled by thecaptureRejections
mechanism.captureRejections
mechanism, I had to look over the node source code and observe how they are handling errors, but I selected to use tools that were more portable likequeueMicrotask
overprocess.nextTick
.emit
andemitAsync
is that when asynchronous errors occur inemit
, they are only handled after all handlers have already executed. This is becauseemit
is not promise-aware. ButemitAsync
is, so both synchronous and asynchronous exceptions will interrupt the handler loop.emitAsync
that could be done, one which does proper broadcasting usingPromise.allSettled
or even cascading. But this goes into more complicating messaging system architecture, which we can extend later in the future.