Skip to content

Instantly share code, notes, and snippets.

@szmarczak
Created April 22, 2020 19:30
Show Gist options
  • Save szmarczak/e7eb659bebb33bceb7577e90d7216aa3 to your computer and use it in GitHub Desktop.
Save szmarczak/e7eb659bebb33bceb7577e90d7216aa3 to your computer and use it in GitHub Desktop.
const assert = require('assert');
const {Duplex, PassThrough} = require('stream');
// PLEASE READ THIS BEFORE YOU SCROLL DOWN
//
// The goals for the GotStream are to:
// 1. If the response is not sucessful (eg. 404),
// the stream should NOT emit a single byte.
// 2. Call `beforeError` hooks.
// 3. Read the response in a _beforeError function,
// so when it throws it's user-friendly to read.
//
// The goals for the PromisableStream & asPromise are to:
// 1. Emit the progress callback nevertheless
// (doesn't matter whether the response is sucessful or not).
// 2. Call `beforeError` hooks.
// 3. The HTTPError should have `response.body`.
//
// There are two comments that describe the problem.
// The comments include these words: THE PROBLEM IS HERE / START
// Or just see these lines: 181 and 198
(async () => {
// UTILS
async function readStream(readable) {
const chunks = [];
for await (const chunk of readable) {
chunks.push(chunk);
}
return Buffer.concat(chunks).toString();
}
// HTTP REQUEST
class HTTPError extends Error {
constructor(response) {
super(`Status code: ${response.statusCode}`);
this.response = response;
}
}
class GotStream extends Duplex {
constructor() {
super();
}
// Writable side (aka ClientRequest)
_write(chunk, encoding, callback) {
callback();
}
_final(callback) {
callback();
// Simulate network processing
process.nextTick(() => {
const message = 'The requested page does not exist.';
const response = new PassThrough();
response.contentLength = message.length;
response.statusCode = 404;
response.write(message);
response.end();
this._onResponse(response);
});
}
// Readable side (aka IncomingMessage)
_read() {
// Readable only on success
if (!this.response) {
return;
}
let chunk;
let downloadedLength = 0;
console.log(`progress callback - 0%`);
while ((chunk = this.response.read()) !== null) {
downloadedLength += chunk.length;
console.log(`progress callback - ${Math.floor(downloadedLength / this.response.contentLength * 100)}%`);
this.push(chunk);
}
this.push(null);
}
// Got logic
async _onResponse(response) {
if (response.statusCode === 404) {
await this._beforeError(new HTTPError(response));
// It's destroyed for streams API
if (this.destroyed) {
return;
}
}
this.response = response;
this.emit('response', response);
}
async _beforeError(error) {
console.log('Calling user-defined async `beforeError` hooks which are useful when logging errors');
// HTTPError? We can read it.
if (error.response) {
error.response.body = await readStream(error.response);
}
this.destroy(error);
}
}
// Test stream
console.log('### BEGIN STREAM TEST ###');
const stream = new GotStream();
stream.end();
let responseOfStream;
stream.once('response', response => {
if (response.statusCode === 404) {
// Cannot read if the response is not successful
assert.equal(stream.read(), null);
} else {
assert.ok(stream.read());
}
responseOfStream = response;
});
try {
const data = await readStream(stream);
console.log(`statusCode: ${responseOfStream.statusCode}`);
console.log(`body: ${data}`);
} catch (error) {
console.log(`error: ${error.message}`);
console.log(`statusCode: ${error.response.statusCode}`);
console.log(`body: ${error.response.body}`);
}
console.log('### END STREAM TEST ###\n');
// PROMISE
class PromisableStream extends GotStream {
async _beforeError(error) {
console.log('Calling user-defined async `beforeError` hooks which are useful when logging errors');
// Let the promise be responsible for HTTPError,
// because the promise is responsible for reading.
this.emit('error', error);
}
}
const asPromise = async () => {
return new Promise((resolve, reject) => {
// Note: this is NOT ClientRequest
// This is a Duplex stream which combines
// ClientRequest and IncomingMessage together.
const request = new PromisableStream();
request.end();
request.once('response', async response => {
try {
// We need to read from request instead of response,
// so the progress events are emitted
// --------------- THE PROBLEM IS HERE / START --------------------
// This destroys the request
response.body = await readStream(request);
// --------------- THE PROBLEM IS HERE / END --------------------
} catch (error) {
reject(new Error(`Read error: ${error.message}`));
return;
}
if (response.statusCode === 404) {
request._beforeError(new HTTPError(response));
return;
}
resolve(response);
});
// --------------- THE PROBLEM IS HERE / START --------------------
// If put `once` here, you will get hanging promise,
// while it should throw an uncaught exception.
request.on('error', error => {
// --------------- THE PROBLEM IS HERE / END --------------------
// Migrating from the `response` event
// IOW: reached the retry count limit
if (error.response && error.response.body) {
reject(error);
}
// Retry logic here...
// Assume we retried and it's still failing
if (error instanceof HTTPError) {
// The `response` event is responsible for throwing HTTPError,
// because it's responsible for downloading the data.
return;
}
});
});
};
// Test promise API
console.log('### BEGIN PROMISE TEST ###');
try {
const response = await asPromise();
console.log(`statusCode: ${response.statusCode}`);
console.log(`body: ${response.body}`);
} catch (error) {
console.log(`error: ${error.message}`);
console.log(`statusCode: ${error.response.statusCode}`);
console.log(`body: ${error.response.body}`);
}
console.log('### END PROMISE TEST ###');
})();
@ronag
Copy link

ronag commented Apr 22, 2020

Sorry, this example is too complicated. I did try to wrap my head around it.

All I can say that doing emit('error', err) is a nono when it comes to streams. It can break in strange ways. You should use stream.destroy(err). Don't use async iterator to read the stream and then you can inject your own error through a manual stream.destroy(err) call.

const request = new Duplex({ autoDestroy: false })

...
try {
	response.body = ''
	await new Promise((resolve, reject) => {
	  request
	    .on('data', chunk => {
	      response.body += chunk;
	    })
	    .on('end', resolve)
	    .on('error', reject)
	})
	
	if (response.statusCode === 404) {
		request.destroy(new HTTPError(response));
		return;
	}
} finally {
  request.destroy();
}

I don't understand what this has to do with dangling error listeners though.

@ronag
Copy link

ronag commented Apr 22, 2020

btw, your _read code is missing this.response.on('readable', ...)

@szmarczak
Copy link
Author

Sorry, this example is too complicated. I did try to wrap my head around it.

Thanks, I really appreciate it. The Got project is quite big, I spent endless hours working on it :P I did even rewrite it from scratch to fix 20 bugs and to utilize latest Node.js v10 features.

Don't use async iterator to read the stream and then you can inject your own error through a manual stream.destroy(err) call.

That would work too. I fixed the problem by exporting a different function based on _beforeError and combined it with the original reject function: https://github.com/sindresorhus/got/blob/731e4fa235ae9b43e03e74fee34b8b32d8ebb705/source/as-promise/index.ts#L46-L58 so I no longer need to this.emit('error', ...)

I don't understand what this has to do with dangling error listeners though.

There was a regression in Got which led to hanging promises. If you replace line 201, especially on with once, you will see that indeed it just hangs (in this case the process just exists because there are no timers). With once it should throw an uncaught exception, so I thought it's a bug. I wasn't aware that it's done on purpose.

btw, your _read code is missing this.response.on('readable', ...)

ikr, it's just an example :P The real Got does have it: https://github.com/sindresorhus/got/blob/731e4fa235ae9b43e03e74fee34b8b32d8ebb705/source/core/index.ts#L1088-L1092

http-timer silently listens for the error event so that's why I wanted to emit error so badly. Due to this buggy-feature error.response.timings shows that the request was successful as the error timing is not present. But that's just a nitpick that I don't think it's worth fixing. I'll fix it maybe when users notice :P

Thank you very much for looking into this issue. I'm sorry if I pushed things. I was really convinced this was a bug. My bad I didn't read the Node.js docs closely. Having this said, let's close this topic. Have a good day / night!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment