Skip to content

Instantly share code, notes, and snippets.

@patrickhulce
Created July 12, 2023 02:25
Show Gist options
  • Save patrickhulce/0b87fcb0866f34f061585eb0b190e5cb to your computer and use it in GitHub Desktop.
Save patrickhulce/0b87fcb0866f34f061585eb0b190e5cb to your computer and use it in GitHub Desktop.
Cheatsheet for web events.
import {ReadableStream} from 'node:stream/web';
// Custom logger that accumulates log statements
const logger = {
logs: [],
log(...messages) {
this.logs.push(messages.join(' '));
},
error(...messages) {
this.logs.push(`ERROR: ${messages.join(' ')}`);
},
reset() {
this.logs = [];
},
};
function tick() {
return new Promise((r) => setTimeout(r, 0));
}
function createReadableStream(controllerFn) {
return new ReadableStream({
start: (controller) =>
controllerFn(controller).catch((error) => {
logger.error('controllerFn threw', error.message);
}),
});
}
async function printStream(stream) {
const reader = stream.getReader();
let cancellationTimeout;
let hasError = false;
// eslint-disable-next-line no-constant-condition
while (true) {
try {
cancellationTimeout = setTimeout(() => {
logger.log('[reader] stream read timed out');
reader.cancel();
logger.log('[reader] stream cancelled');
}, 1_000);
const {done, value} = await reader.read();
if (done) {
logger.log('[reader] stream done');
break;
}
logger.log('[reader] stream chunk:', value);
} catch (err) {
if (hasError) {
logger.log('[reader] 2nd error, breaking...');
break;
}
hasError = true;
logger.error('[reader] stream error:', err);
} finally {
clearTimeout(cancellationTimeout);
}
}
logger.log('[reader] stream closed\n\n');
return logger.logs.join('\n').trim();
}
// Jest tests
afterEach(() => {
logger.reset();
});
test('forgetting to close results in read() never finishing, cancel results in read() -> done', async () => {
const stream = createReadableStream(async (controller) => {
logger.log('emit 1');
controller.enqueue('hello');
await tick();
logger.log('emit 2');
controller.enqueue('world');
});
const result = await printStream(stream);
expect(result).toMatchInlineSnapshot(`
"emit 1
[reader] stream chunk: hello
emit 2
[reader] stream chunk: world
[reader] stream read timed out
[reader] stream cancelled
[reader] stream done
[reader] stream closed"
`);
});
test('errors close the stream', async () => {
const stream = createReadableStream(async (controller) => {
logger.log('emit 1');
controller.enqueue('hello');
await tick();
logger.log('emit error');
controller.error(new Error('Failed'));
try {
controller.close();
} catch (err) {
logger.error('close threw', err.message);
}
});
const result = await printStream(stream);
expect(result).toMatchInlineSnapshot(`
"emit 1
[reader] stream chunk: hello
emit error
ERROR: close threw Invalid state: Controller is already closed
ERROR: [reader] stream error: Error: Failed
[reader] 2nd error, breaking...
[reader] stream closed"
`);
});
test('enqueues after an error throw', async () => {
const stream = createReadableStream(async (controller) => {
logger.log('emit 1');
controller.enqueue('hello');
await tick();
logger.log('emit error');
controller.error(new Error('Failed'));
try {
controller.enqueue('world'); // this will throw
} catch (err) {
logger.error('enqueue failed', err.message);
throw err;
}
await tick();
logger.log('emit 2');
controller.enqueue('ignored');
});
const result = await printStream(stream);
expect(result).toMatchInlineSnapshot(`
"emit 1
[reader] stream chunk: hello
emit error
ERROR: enqueue failed Invalid state: Controller is already closed
ERROR: [reader] stream error: Error: Failed
[reader] 2nd error, breaking...
[reader] stream closed"
`);
});
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment