-
-
Save szmarczak/e7eb659bebb33bceb7577e90d7216aa3 to your computer and use it in GitHub Desktop.
const assert = require('assert'); | |
const {Duplex, PassThrough} = require('stream'); | |
// PLEASE READ THIS BEFORE YOU SCROLL DOWN | |
// | |
// The goals for the GotStream are to: | |
// 1. If the response is not sucessful (eg. 404), | |
// the stream should NOT emit a single byte. | |
// 2. Call `beforeError` hooks. | |
// 3. Read the response in a _beforeError function, | |
// so when it throws it's user-friendly to read. | |
// | |
// The goals for the PromisableStream & asPromise are to: | |
// 1. Emit the progress callback nevertheless | |
// (doesn't matter whether the response is sucessful or not). | |
// 2. Call `beforeError` hooks. | |
// 3. The HTTPError should have `response.body`. | |
// | |
// There are two comments that describe the problem. | |
// The comments include these words: THE PROBLEM IS HERE / START | |
// Or just see these lines: 181 and 198 | |
(async () => { | |
// UTILS | |
async function readStream(readable) { | |
const chunks = []; | |
for await (const chunk of readable) { | |
chunks.push(chunk); | |
} | |
return Buffer.concat(chunks).toString(); | |
} | |
// HTTP REQUEST | |
class HTTPError extends Error { | |
constructor(response) { | |
super(`Status code: ${response.statusCode}`); | |
this.response = response; | |
} | |
} | |
class GotStream extends Duplex { | |
constructor() { | |
super(); | |
} | |
// Writable side (aka ClientRequest) | |
_write(chunk, encoding, callback) { | |
callback(); | |
} | |
_final(callback) { | |
callback(); | |
// Simulate network processing | |
process.nextTick(() => { | |
const message = 'The requested page does not exist.'; | |
const response = new PassThrough(); | |
response.contentLength = message.length; | |
response.statusCode = 404; | |
response.write(message); | |
response.end(); | |
this._onResponse(response); | |
}); | |
} | |
// Readable side (aka IncomingMessage) | |
_read() { | |
// Readable only on success | |
if (!this.response) { | |
return; | |
} | |
let chunk; | |
let downloadedLength = 0; | |
console.log(`progress callback - 0%`); | |
while ((chunk = this.response.read()) !== null) { | |
downloadedLength += chunk.length; | |
console.log(`progress callback - ${Math.floor(downloadedLength / this.response.contentLength * 100)}%`); | |
this.push(chunk); | |
} | |
this.push(null); | |
} | |
// Got logic | |
async _onResponse(response) { | |
if (response.statusCode === 404) { | |
await this._beforeError(new HTTPError(response)); | |
// It's destroyed for streams API | |
if (this.destroyed) { | |
return; | |
} | |
} | |
this.response = response; | |
this.emit('response', response); | |
} | |
async _beforeError(error) { | |
console.log('Calling user-defined async `beforeError` hooks which are useful when logging errors'); | |
// HTTPError? We can read it. | |
if (error.response) { | |
error.response.body = await readStream(error.response); | |
} | |
this.destroy(error); | |
} | |
} | |
// Test stream | |
console.log('### BEGIN STREAM TEST ###'); | |
const stream = new GotStream(); | |
stream.end(); | |
let responseOfStream; | |
stream.once('response', response => { | |
if (response.statusCode === 404) { | |
// Cannot read if the response is not successful | |
assert.equal(stream.read(), null); | |
} else { | |
assert.ok(stream.read()); | |
} | |
responseOfStream = response; | |
}); | |
try { | |
const data = await readStream(stream); | |
console.log(`statusCode: ${responseOfStream.statusCode}`); | |
console.log(`body: ${data}`); | |
} catch (error) { | |
console.log(`error: ${error.message}`); | |
console.log(`statusCode: ${error.response.statusCode}`); | |
console.log(`body: ${error.response.body}`); | |
} | |
console.log('### END STREAM TEST ###\n'); | |
// PROMISE | |
class PromisableStream extends GotStream { | |
async _beforeError(error) { | |
console.log('Calling user-defined async `beforeError` hooks which are useful when logging errors'); | |
// Let the promise be responsible for HTTPError, | |
// because the promise is responsible for reading. | |
this.emit('error', error); | |
} | |
} | |
const asPromise = async () => { | |
return new Promise((resolve, reject) => { | |
// Note: this is NOT ClientRequest | |
// This is a Duplex stream which combines | |
// ClientRequest and IncomingMessage together. | |
const request = new PromisableStream(); | |
request.end(); | |
request.once('response', async response => { | |
try { | |
// We need to read from request instead of response, | |
// so the progress events are emitted | |
// --------------- THE PROBLEM IS HERE / START -------------------- | |
// This destroys the request | |
response.body = await readStream(request); | |
// --------------- THE PROBLEM IS HERE / END -------------------- | |
} catch (error) { | |
reject(new Error(`Read error: ${error.message}`)); | |
return; | |
} | |
if (response.statusCode === 404) { | |
request._beforeError(new HTTPError(response)); | |
return; | |
} | |
resolve(response); | |
}); | |
// --------------- THE PROBLEM IS HERE / START -------------------- | |
// If put `once` here, you will get hanging promise, | |
// while it should throw an uncaught exception. | |
request.on('error', error => { | |
// --------------- THE PROBLEM IS HERE / END -------------------- | |
// Migrating from the `response` event | |
// IOW: reached the retry count limit | |
if (error.response && error.response.body) { | |
reject(error); | |
} | |
// Retry logic here... | |
// Assume we retried and it's still failing | |
if (error instanceof HTTPError) { | |
// The `response` event is responsible for throwing HTTPError, | |
// because it's responsible for downloading the data. | |
return; | |
} | |
}); | |
}); | |
}; | |
// Test promise API | |
console.log('### BEGIN PROMISE TEST ###'); | |
try { | |
const response = await asPromise(); | |
console.log(`statusCode: ${response.statusCode}`); | |
console.log(`body: ${response.body}`); | |
} catch (error) { | |
console.log(`error: ${error.message}`); | |
console.log(`statusCode: ${error.response.statusCode}`); | |
console.log(`body: ${error.response.body}`); | |
} | |
console.log('### END PROMISE TEST ###'); | |
})(); |
btw, your _read
code is missing this.response.on('readable', ...)
Sorry, this example is too complicated. I did try to wrap my head around it.
Thanks, I really appreciate it. The Got project is quite big, I spent endless hours working on it :P I did even rewrite it from scratch to fix 20 bugs and to utilize latest Node.js v10 features.
Don't use async iterator to read the stream and then you can inject your own error through a manual stream.destroy(err) call.
That would work too. I fixed the problem by exporting a different function based on _beforeError
and combined it with the original reject function: https://github.com/sindresorhus/got/blob/731e4fa235ae9b43e03e74fee34b8b32d8ebb705/source/as-promise/index.ts#L46-L58 so I no longer need to this.emit('error', ...)
I don't understand what this has to do with dangling error listeners though.
There was a regression in Got which led to hanging promises. If you replace line 201, especially on
with once
, you will see that indeed it just hangs (in this case the process just exists because there are no timers). With once
it should throw an uncaught exception, so I thought it's a bug. I wasn't aware that it's done on purpose.
btw, your _read code is missing this.response.on('readable', ...)
ikr, it's just an example :P The real Got does have it: https://github.com/sindresorhus/got/blob/731e4fa235ae9b43e03e74fee34b8b32d8ebb705/source/core/index.ts#L1088-L1092
http-timer
silently listens for the error
event so that's why I wanted to emit error
so badly. Due to this buggy-feature error.response.timings
shows that the request was successful as the error
timing is not present. But that's just a nitpick that I don't think it's worth fixing. I'll fix it maybe when users notice :P
Thank you very much for looking into this issue. I'm sorry if I pushed things. I was really convinced this was a bug. My bad I didn't read the Node.js docs closely. Having this said, let's close this topic. Have a good day / night!
Sorry, this example is too complicated. I did try to wrap my head around it.
All I can say that doing
emit('error', err)
is a nono when it comes to streams. It can break in strange ways. You should usestream.destroy(err)
. Don't use async iterator to read the stream and then you can inject your own error through a manualstream.destroy(err)
call.I don't understand what this has to do with dangling error listeners though.