Skip to content

Instantly share code, notes, and snippets.

@CMCDragonkai
Created January 16, 2022 08:47
Show Gist options
  • Save CMCDragonkai/983bce4816977607e822f63fc015108f to your computer and use it in GitHub Desktop.
Save CMCDragonkai/983bce4816977607e822f63fc015108f to your computer and use it in GitHub Desktop.
EventBus with `emitAsync` #javascript #eventemitter
import { EventEmitter } from 'events';
class EventBus extends EventEmitter {
protected kCapture: symbol;
constructor (...args: ConstructorParameters<typeof EventEmitter>) {
super(...args);
// EventEmitter's captureRejections option is only accessible through a private symbol
// Here we augment the construction and save it as a property
const symbols = Object.getOwnPropertySymbols(this);
this.kCapture = symbols[0];
}
public async emitAsync(event: string | symbol, ...args: Array<any>): Promise<boolean> {
const listeners = this.rawListeners(event);
if (listeners.length < 1) {
return false;
}
let result;
try {
for (const listener of listeners) {
result = listener.apply(this, args);
await result;
}
} catch (e) {
if (!this[this.kCapture]) {
throw e;
}
// The capture rejections mechanism only applies to promises
if (!(result instanceof Promise)) {
throw e;
}
// Queues error handling asynchronously to avoid bubbling up rejections
// This matches the behaviour of EventEmitter which uses `process.nextTick`
queueMicrotask(() => {
if (typeof this[EventEmitter.captureRejectionSymbol] === 'function') {
this[EventEmitter.captureRejectionSymbol](e, event, ...args);
} else {
// Disable the capture rejections mechanism to avoid infinite loop
const prev = this[this.kCapture];
// If the error handler throws, it results in `uncaughtException`
try {
this[this.kCapture] = false;
this.emit('error', e);
} finally {
this[this.kCapture] = prev;
}
}
});
}
return true;
}
}
export default EventBus;
@CMCDragonkai
Copy link
Author

It turns out that the behaviour of error handling is quite complex. Some notes:

  1. Node's event emitter would be described as "synchronously synchronous". This is because emit synchronously iterates over all the handlers and executes them. This means any synchronous exceptions will interrupt the loop and emit will bubble up a synchronous exception. Furthermore it also runs in sequence of the registration. Which means it's not really parallel.
  2. Nodejs documents that to make it asynchronous, one can add a setImmediate wrapper around the handlers, and this would make emit a sort of "synchronously asynchronous". This would make event emitter much closer to pub-sub, where errors don't interrupt the loop of running each handler.
  3. The captureRejections mechanism ONLY applies to promises. Which means if errors are emitted synchronously, they are always just going to bubble up the emit. It only works if the handlers are returning promises. And if it is true, then promise rejections will instead be handled by the error handler.
  4. In creating emitAsync, I decided to create a sort of "asynchronously synchronous" system. It follows the emit logic, in that it loops over the handlers, and executes them one at a time. It also awaits for each handler, which means it does not execute all handlers concurrently. Just like emit, any synchronous exception will be thrown and bubble up, however asynchronous exceptions will now be handled by the captureRejections mechanism.
  5. In order to implement captureRejections mechanism, I had to look over the node source code and observe how they are handling errors, but I selected to use tools that were more portable like queueMicrotask over process.nextTick.
  6. One major difference between emit and emitAsync is that when asynchronous errors occur in emit, they are only handled after all handlers have already executed. This is because emit is not promise-aware. But emitAsync is, so both synchronous and asynchronous exceptions will interrupt the handler loop.
  7. An "asynchronously asynchronous" system would be interesting, and there are many more variants of emitAsync that could be done, one which does proper broadcasting using Promise.allSettled or even cascading. But this goes into more complicating messaging system architecture, which we can extend later in the future.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment