Created
January 16, 2022 08:47
-
-
Save CMCDragonkai/983bce4816977607e822f63fc015108f to your computer and use it in GitHub Desktop.
EventBus with `emitAsync` #javascript #eventemitter
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import { EventEmitter } from 'events'; | |
| class EventBus extends EventEmitter { | |
| protected kCapture: symbol; | |
| constructor (...args: ConstructorParameters<typeof EventEmitter>) { | |
| super(...args); | |
| // EventEmitter's captureRejections option is only accessible through a private symbol | |
| // Here we augment the construction and save it as a property | |
| const symbols = Object.getOwnPropertySymbols(this); | |
| this.kCapture = symbols[0]; | |
| } | |
| public async emitAsync(event: string | symbol, ...args: Array<any>): Promise<boolean> { | |
| const listeners = this.rawListeners(event); | |
| if (listeners.length < 1) { | |
| return false; | |
| } | |
| let result; | |
| try { | |
| for (const listener of listeners) { | |
| result = listener.apply(this, args); | |
| await result; | |
| } | |
| } catch (e) { | |
| if (!this[this.kCapture]) { | |
| throw e; | |
| } | |
| // The capture rejections mechanism only applies to promises | |
| if (!(result instanceof Promise)) { | |
| throw e; | |
| } | |
| // Queues error handling asynchronously to avoid bubbling up rejections | |
| // This matches the behaviour of EventEmitter which uses `process.nextTick` | |
| queueMicrotask(() => { | |
| if (typeof this[EventEmitter.captureRejectionSymbol] === 'function') { | |
| this[EventEmitter.captureRejectionSymbol](e, event, ...args); | |
| } else { | |
| // Disable the capture rejections mechanism to avoid infinite loop | |
| const prev = this[this.kCapture]; | |
| // If the error handler throws, it results in `uncaughtException` | |
| try { | |
| this[this.kCapture] = false; | |
| this.emit('error', e); | |
| } finally { | |
| this[this.kCapture] = prev; | |
| } | |
| } | |
| }); | |
| } | |
| return true; | |
| } | |
| } | |
| export default EventBus; |
Author
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
It turns out that the behaviour of error handling is quite complex. Some notes:
emitsynchronously iterates over all the handlers and executes them. This means any synchronous exceptions will interrupt the loop andemitwill bubble up a synchronous exception. Furthermore it also runs in sequence of the registration. Which means it's not really parallel.setImmediatewrapper around the handlers, and this would makeemita sort of "synchronously asynchronous". This would make event emitter much closer to pub-sub, where errors don't interrupt the loop of running each handler.captureRejectionsmechanism ONLY applies to promises. Which means if errors are emitted synchronously, they are always just going to bubble up theemit. It only works if the handlers are returning promises. And if it is true, then promise rejections will instead be handled by the error handler.emitAsync, I decided to create a sort of "asynchronously synchronous" system. It follows theemitlogic, in that it loops over the handlers, and executes them one at a time. It also awaits for each handler, which means it does not execute all handlers concurrently. Just likeemit, any synchronous exception will be thrown and bubble up, however asynchronous exceptions will now be handled by thecaptureRejectionsmechanism.captureRejectionsmechanism, I had to look over the node source code and observe how they are handling errors, but I selected to use tools that were more portable likequeueMicrotaskoverprocess.nextTick.emitandemitAsyncis that when asynchronous errors occur inemit, they are only handled after all handlers have already executed. This is becauseemitis not promise-aware. ButemitAsyncis, so both synchronous and asynchronous exceptions will interrupt the handler loop.emitAsyncthat could be done, one which does proper broadcasting usingPromise.allSettledor even cascading. But this goes into more complicating messaging system architecture, which we can extend later in the future.