|
export class HttpReader extends yauzl.Reader { |
|
static #readahead = 1024 * 1024; |
|
static #maxRetryCount = 3; |
|
static #retryDelay = 250; |
|
static #retryBackoffFactor = 2; |
|
|
|
static async open(url: string): Promise<RemoteReader> { |
|
const response = await fetch(url, { method: "HEAD" }); |
|
if (!response.ok) throw new Error(`Failed to fetch ${url}: ${response.statusText}`); |
|
const contentLength = response.headers.get("Content-Length"); |
|
if (!contentLength) throw new Error("Content-Length header is missing"); |
|
return new RemoteReader(url, Number.parseInt(contentLength, 10)); |
|
} |
|
|
|
#url: string; |
|
#size: number; |
|
|
|
#reader: ReadableStreamDefaultReader<Uint8Array> | null = null; |
|
#retryCount = 0; |
|
#start = 0; |
|
#end = 0; |
|
|
|
#pendingChunk: Uint8Array<ArrayBufferLike> = new Uint8Array(); |
|
|
|
constructor(url: string, size: number) { |
|
super(); |
|
this.#url = url; |
|
this.#size = size; |
|
} |
|
|
|
get totalSize(): number { |
|
return this.#size; |
|
} |
|
|
|
override _createReadStream(start: number, length: number): Readable { |
|
return Readable.from(this.#getStream(start, length)); |
|
} |
|
|
|
async *#getStream(start: number, length: number): AsyncIterable<Buffer> { |
|
if (!this.#reader || start < this.#start || start + length > this.#end) { |
|
const end = Math.min(start + Math.max(length, RemoteReader.#readahead), this.#size); |
|
this.#reader = await this.#fetchRange(start, end); |
|
this.#retryCount = 0; |
|
this.#start = start; |
|
this.#end = end; |
|
this.#pendingChunk = new Uint8Array(); |
|
} |
|
|
|
while (start > this.#start) { |
|
const skip = start - this.#start; |
|
if (skip < this.#pendingChunk.length) { |
|
this.#pendingChunk = this.#pendingChunk.subarray(skip); |
|
this.#start += skip; |
|
break; |
|
} |
|
|
|
const result = await this.#reader.read(); |
|
if (result.done) throw new Error("Unexpected end of stream"); |
|
this.#pendingChunk = result.value; |
|
this.#start += Math.min(skip, this.#pendingChunk.length); |
|
} |
|
|
|
let remaining = length; |
|
while (remaining > 0) { |
|
if (this.#pendingChunk.length === 0) { |
|
const result = await this.#reader.read(); |
|
if (result.done) throw new Error("Unexpected end of stream"); |
|
this.#pendingChunk = result.value; |
|
} |
|
|
|
const bytesToYield = Math.min(remaining, this.#pendingChunk.length); |
|
yield Buffer.from(this.#pendingChunk.buffer, this.#pendingChunk.byteOffset, bytesToYield); |
|
this.#pendingChunk = this.#pendingChunk.subarray(bytesToYield); |
|
remaining -= bytesToYield; |
|
} |
|
|
|
this.#start += length; |
|
} |
|
|
|
async #fetchRange(start: number, end: number): Promise<ReadableStreamDefaultReader<Uint8Array>> { |
|
const response = await fetch(this.#url, { headers: { Range: `bytes=${start}-${end - 1}` } }); |
|
if (response.ok) return response.body!.getReader(); |
|
|
|
if (this.#retryCount >= RemoteReader.#maxRetryCount) |
|
throw new Error(`Failed to fetch range ${start}-${end - 1}: ${response.statusText}`); |
|
|
|
this.#retryCount++; |
|
console.warn( |
|
`Failed to fetch range ${start}-${end - 1}, retrying (${this.#retryCount}/${RemoteReader.#maxRetryCount})...`, |
|
); |
|
|
|
const delay = RemoteReader.#retryBackoffFactor ** this.#retryCount * RemoteReader.#retryDelay; |
|
await new Promise((resolve) => setTimeout(resolve, delay)); |
|
|
|
return this.#fetchRange(start, end); |
|
} |
|
} |