Created
May 26, 2024 13:45
-
-
Save yus-ham/36f75733f26f832efe071664cf638054 to your computer and use it in GitHub Desktop.
Bun.serve() workaround for emulated CPU
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os from 'os'; | |
import http from 'http'; | |
import { HTTPParser } from 'http-parser-js'; | |
import { styleText } from 'util'; | |
import type { ServeOptions } from 'bun'; | |
const runningOnEmulatedCPUEnv = os.cpus().find(cpu => cpu.model.includes('KVM')) | |
if (runningOnEmulatedCPUEnv) { | |
try { | |
Bun.serve({ fetch() {} }).stop(true) | |
} catch (err) { | |
const req_buffer = [] | |
/** | |
* | |
* @param {*} input | |
* @param {import("bun").TCPSocket} socket | |
* @param {*} onRequest | |
*/ | |
async function parseRequest(input, socket, onRequest) { | |
const log = (...args) => `${Bun.env.APP_ENV}`.startsWith('dev') && console.info(styleText(`blue`, `[req handler]`), ...args) | |
const parser = new HTTPParser(HTTPParser.REQUEST); | |
let complete = false; | |
let shouldKeepAlive; | |
let upgrade; | |
let method; | |
let url; | |
let versionMajor; | |
let versionMinor; | |
let headers_ = []; | |
let trailers = []; | |
let bodyChunks = []; | |
parser[HTTPParser.kOnHeadersComplete] = function (req) { | |
shouldKeepAlive = req.shouldKeepAlive; | |
upgrade = req.upgrade; | |
method = HTTPParser.methods[req.method]; | |
url = req.url; | |
versionMajor = req.versionMajor; | |
versionMinor = req.versionMinor; | |
headers_ = req.headers; | |
}; | |
parser[HTTPParser.kOnBody] = function (chunk, offset, length) { | |
bodyChunks.push(chunk.slice(offset, offset + length)); | |
}; | |
// This is actually the event for trailers, go figure. | |
parser[HTTPParser.kOnHeaders] = function (t) { | |
trailers = t; | |
}; | |
parser[HTTPParser.kOnMessageComplete] = function () { | |
complete = true; | |
}; | |
// Since we are sending the entire Buffer at once here all callbacks above happen synchronously. | |
// The parser does not do _anything_ asynchronous. | |
// However, you can of course call execute() multiple times with multiple chunks, e.g. from a stream. | |
// But then you have to refactor the entire logic to be async (e.g. resolve a Promise in kOnMessageComplete and add timeout logic). | |
parser.execute(input); | |
parser.finish(); | |
if (!complete) { | |
throw new Error('Could not parse request'); | |
} | |
const body = Buffer.concat(bodyChunks); | |
console.info({ | |
shouldKeepAlive, | |
upgrade, | |
method, | |
url, | |
versionMajor, | |
versionMinor, | |
headers_, | |
body, | |
trailers, | |
// socket | |
}); | |
const headers = new Headers() | |
for (let i = 0; i < headers_.length; i += 2) { | |
headers.set(headers_[i].toLowerCase(), headers_[i + 1]) | |
} | |
/** @type {Response} */ | |
const req = new Request(`http://${headers.get('host')}`, { headers, signal: socket.ctrl.signal }) | |
/** @type {Response} */ | |
const res = await onRequest(req) | |
const statusText = res.statusText || http.STATUS_CODES[res.status] | |
socket.write(`HTTP/${versionMajor}.${versionMinor} ${res.status} ${statusText}\r\n`) | |
const headers2 = await extendIterator(res.headers.entries()) | |
for await (const [key, value] of headers2) { | |
log(`writeHeader:`, key, value) | |
socket.write(`${key}: ${value}\r\n` + (headers2.hasNext() ? `` : `\r\n`)) | |
} | |
if (method === 'OPTIONS') { | |
log('stop on OPTIONS req') | |
return | |
} | |
log('res_body_reader ready') | |
let chunk, res_body_reader = res.body.getReader() | |
while ((chunk = await res_body_reader.read()).value) { | |
log('respon from user req handler:', styleText(`red`, `|`) + Buffer.from(chunk.value.subarray(0, 50)).toString() + styleText(`red`, `|`)) | |
socket.write(chunk.value) | |
} | |
} | |
Bun.serve = function (opts: ServeOptions) { | |
if (typeof opts !== 'object') { | |
const err = new TypeError('Bun.serve expects an object') | |
err.code = 'ERR_INVALID_ARG_TYPE' | |
throw err | |
} | |
if (typeof opts.fetch !== 'function') { | |
const err = new TypeError('Expected fetch() to be a function') | |
err.code = 'ERR_INVALID_ARG_TYPE' | |
throw err | |
} | |
const log = (...args) => console.info(styleText(`cyan`, `[Bun.serve]`), ...args) | |
const port = opts.port || Bun.env.PORT || 3000 | |
try { | |
const tcp = Bun.listen({ | |
port, | |
hostname: opts.hostname || '0.0.0.0', | |
socket: { | |
open(socket) { | |
log(`socket client open ...`) | |
socket.ctrl = new AbortController() | |
// socket.startTime = performance.now() | |
// incrementalWrite(socket) | |
}, | |
drain() { | |
log(`drain`) | |
}, | |
data(socket, data) { | |
log(`received:`, styleText(`red`, `|`) + data.toString().slice(0, 50) + styleText(`red`, `|`)) | |
req_buffer.push(data) | |
if (data.subarray(-4).equals(Buffer.from('\r\n\r\n'))) { | |
parseRequest(Buffer.concat(req_buffer), socket, opts.fetch) | |
} | |
}, | |
end(socket) { | |
log(`end`) | |
socket.end() | |
}, | |
close(socket) { | |
log(`close`) | |
log(`socket.ctrl.abort()`) | |
socket.ctrl.abort() | |
}, | |
error(socket, error) { | |
log(`error: ` + error) | |
}, | |
connectError(socket, error) { | |
log(`connectError: ` + error) | |
}, | |
} | |
}) | |
return { | |
address: { | |
address: tcp.hostname, | |
family: tcp.hostname.includes(':') ? 'IPv6' : 'IPv4', | |
port: tcp.port, | |
}, | |
url: new URL(`http://${tcp.hostname}:${tcp.port}/`), | |
} | |
} catch (err) { | |
if (err.message.includes('Failed to listen')) { | |
err.message = `Failed to start server. Is port ${port} in use?` | |
err.name = 'EADDRINUSE' | |
} | |
throw err | |
} | |
} | |
} | |
} | |
async function extendIterator<T> (iterator: AsyncIterableIterator<T>): Promise<ExtendedAsyncIterableIterator<T>> { | |
let curr = await iterator.next() | |
let next = await iterator.next() | |
let hasNext = !next.done | |
async function * innerIterator(): AsyncIterableIterator<T> { | |
while (!curr.done) { | |
yield curr.value | |
curr = next | |
next = await iterator.next() | |
hasNext = !next.done | |
} | |
} | |
const wrapper = innerIterator() | |
Object.defineProperty(wrapper, 'hasNext', { writable: false, value: () => hasNext }) | |
return wrapper as ExtendedAsyncIterableIterator<T> | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment