Created
January 25, 2020 16:54
-
-
Save dubiousjim/f0d8c2299c5ea7f94e249553c6335323 to your computer and use it in GitHub Desktop.
reworked deno Process, readAll, Buffer.readFrom
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
let { EOF } = Deno; // may move to Deno.symbols | |
let { Buffer, ErrorKind, read, close, run: oldDenoRun, Process } = Deno; | |
const MIN_READ = 512; | |
Buffer.prototype.readFrom = async function readFrom(r) { | |
let n = 0; | |
while (true) { | |
try { | |
const i = this._grow(MIN_READ); | |
this._reslice(i); | |
const fub = new Uint8Array(this.buf.buffer, i); | |
const nread = await r.read(fub); | |
if (nread === EOF) | |
return n; | |
this._reslice(i + nread); | |
n += nread; | |
} catch (e) { | |
if (e.kind===ErrorKind.BadResource) | |
return -n-1; // FIXME | |
throw e; | |
} | |
} | |
} | |
Buffer.prototype.readFromN = async function readFromN(r, nbytes) { | |
if (nbytes < 0) | |
throw new Error("Buffer.readFromN: negative nbytes"); | |
let n = 0; | |
while (true) { | |
try { | |
const needed = (nbytes < MIN_READ) ? nbytes : MIN_READ; | |
if (needed <= 0) | |
return n; | |
const i = this._grow(needed); | |
this._reslice(i); | |
const fub = new Uint8Array(this.buf.buffer, i); | |
const nread = await r.read(fub); | |
if (nread === EOF) | |
return n; | |
this._reslice(i + nread); | |
n += nread; | |
console.assert(nread <= needed && needed <= nbytes); | |
nbytes -= nread; | |
} catch (e) { | |
if (e.kind===ErrorKind.BadResource) | |
return -n-1; // FIXME | |
throw e; | |
} | |
} | |
} | |
// TODO readAll(reader, opts?:{ maxBytes, rejector }) : { closed?:boolean, aborted?:any, contents:Uint8Array } | |
export function readAll(reader, rejector) { | |
/* | |
const buf = new Buffer(); | |
await buf.readFrom(reader); | |
return buf.bytes(); | |
*/ | |
const buf = new Buffer(); | |
const p = new Promise((res, rej) => { buf.readFrom(reader).then(res); if (rejector) rejector(rej); }); | |
return p.then(n => ({ closed: n < 0, contents: buf.bytes() }), | |
reason => ({ aborted: reason, contents: buf.bytes() })); | |
} | |
const readAllN_EOF_Buffer = new Uint8Array(1); | |
export function readAllN(reader, maxBytes, rejector) { | |
/* | |
const buf = new Buffer(); | |
const n = await buf.readFromN(reader, maxBytes); | |
if (n===maxBytes) { | |
// file may be longer than maxBytes | |
const m = await read(reader.rid, readAllN_EOF_Buffer); | |
// assert(m===EOF || m===1); | |
if (m!==EOF) | |
truncated = true; | |
} | |
return { contents: buf.bytes(), truncated }; | |
*/ | |
if (maxBytes < 0) | |
throw new Error("readAllN: negative maxBytes"); | |
const buf = new Buffer(); | |
const p = new Promise((res, rej) => { buf.readFromN(reader, maxBytes).then(res); if (rejector) rejector(rej); }); | |
async function finish(n) { | |
let truncated = false; | |
let was_negative = false; | |
if (n < 0) { was_negative = true; n = -n-1; } | |
if (n===maxBytes) { | |
// file may be longer than maxBytes | |
try { | |
const m = await read(reader.rid, readAllN_EOF_Buffer); | |
console.assert(m===EOF || m===1); | |
console.assert(!was_negative); | |
if (m!==EOF) | |
truncated = true; | |
} catch(e) { | |
if (e.kind!==ErrorKind.BadResource) | |
throw e; | |
truncated = null; // fd closed before we could check | |
} | |
} | |
if (truncated) console.assert(!was_negative); | |
const b = buf.bytes(); | |
return { truncated, closed: was_negative, contents: b }; | |
} | |
return p.then(finish, reason => ({ aborted: reason, contents: buf.bytes() })); | |
} | |
export function newDenoRun(opts) { | |
const timeout = (opts.timeout===null || opts.timeout===undefined || opts.timeout===false) ? 0 : opts.timeout; | |
const maxBuffer = (opts.maxBuffer===undefined) ? null : opts.maxBuffer; // Number.MAX_SAFE_INTEGER or 1024*1024? | |
const p = oldDenoRun(opts); | |
return new LimitedProcess(p, timeout, maxBuffer); | |
} | |
export class Exceeded { | |
constructor(nbytes, buf, caller) { | |
this.nbytes = nbytes; | |
this.buf = buf; | |
this.caller = caller; | |
} | |
toString() { | |
let s = `Exceeded ${this.nbytes} bytes` | |
if (this.caller) s += ` (${this.caller})`; | |
return s; | |
} | |
} | |
export class Timeout { | |
constructor(ms, caller) { | |
this.ms = ms; | |
this.caller = caller; | |
} | |
toString() { | |
let s = `Timeout after ${this.ms} ms` | |
if (this.caller) s += ` (${this.caller})`; | |
return s; | |
} | |
} | |
// TODO implement Timeout | |
export class LimitedProcess extends Process { | |
#maxBuffer; | |
#timeout; | |
#statusProm; | |
#exited = false; | |
#reject; | |
#outReject; | |
#errReject; | |
constructor(p, timeout, maxBuffer) { | |
super({ rid: p.rid, pid: p.pid }); | |
if (p.stdin) this.stdin = p.stdin; | |
if (p.stdout) this.stdout = p.stdout; | |
if (p.stderr) this.stderr = p.stderr; | |
this.#maxBuffer = maxBuffer; | |
this.#timeout = timeout; | |
this.#statusProm = new Promise((res, rej) => { | |
this.#reject = rej; | |
// TODO if this lands in main code, might delay generating #statusProm and #reject until requested | |
p.status().then( | |
// FIXME cleanup | |
data => { | |
if (this.#exited) { | |
if (this.#outReject) this.#outReject('alpha'); | |
if (this.#errReject) this.#errReject('alpha'); | |
} | |
this.#exited = true; | |
res(data); | |
}, | |
err => { | |
console.assert(this.#exited && err.kind===ErrorKind.BadResource); | |
if (1) { | |
if (this.#outReject) this.#outReject('beta'); | |
if (this.#errReject) this.#errReject('beta'); | |
} | |
// status already settled | |
} | |
); | |
}); | |
} | |
close() { | |
try { | |
close(this.rid); | |
} catch(e) { | |
if (e.kind!==ErrorKind.BadResource) | |
throw e; | |
} | |
if (!this.#exited) { | |
this.#exited = true; | |
// TODO assumes we've generated #statusProm and #reject | |
this.#reject('explicitly closed'); // FIXME cleanup | |
} // else status already settled | |
} | |
kill(signo) { | |
if (this.#exited) | |
throw new Error('Process.kill: already exited'); | |
return kill(this.pid, signo); | |
} | |
status() { return this.#statusProm; } | |
async output() { | |
if (!this.stdout) | |
throw new Error("Process.output: stdout is undefined"); | |
if (this.#outReject!==undefined) | |
throw new Error("Process.output: couldn't get lock"); | |
if (this.#exited) | |
console.info(`Process ${this.rid} already exited, but attempting output`); | |
let try_close = true, was_aborted = false; | |
try { | |
if (this.#maxBuffer===null) { | |
const r = await readAll(this.stdout, rej => this.#outReject = rej); | |
console.assert('aborted' in r || 'closed' in r); | |
if (r.aborted) { | |
was_aborted = true; | |
} else if (r.closed) { | |
try_close = false; | |
} | |
return r.contents; | |
} else { | |
const r = await readAllN(this.stdout, this.#maxBuffer, rej => this.#outReject = rej); | |
console.assert('aborted' in r || 'closed' in r); | |
if (r.aborted) { | |
was_aborted = true; | |
} else if (r.closed) { | |
try_close = false; | |
} | |
if (r.truncated) { | |
throw new Exceeded(this.#maxBuffer, r.contents, `Process ${this.rid}'s stdout`); // FIXME cleanup | |
} | |
return r.contents; | |
} | |
} finally { | |
this.#outReject = undefined; | |
// if readFrom/readFromN returned negative n, this would fail | |
// but it might also fail anyway | |
if (try_close) { | |
try { | |
this.stdout.close(); | |
} catch(e) { | |
if (e.kind!==ErrorKind.BadResource) | |
throw e; | |
} | |
} else { | |
console.assert(!was_aborted); | |
} | |
} | |
} | |
async stderrOutput() { | |
if (!this.stderr) | |
throw new Error("Process.stderrOutput: stderr is undefined"); | |
if (this.#errReject!==undefined) | |
throw new Error("Process.stderrOutput: couldn't get lock"); | |
if (this.#exited) | |
console.info(`Process ${this.rid} already exited, but attempting stderrOutput`); | |
let try_close = true, was_aborted = false; | |
try { | |
const r = await readAll(this.stderr, rej => this.#errReject = rej); | |
console.assert('aborted' in r || 'closed' in r); | |
if (r.aborted) { | |
was_aborted = true; | |
} else if (r.closed) { | |
try_close = false; | |
} | |
return r.contents; | |
} finally { | |
this.#errReject = undefined; | |
if (try_close) { | |
try { | |
this.stderr.close(); | |
} catch(e) { | |
if (e.kind!==ErrorKind.BadResource) | |
throw e; | |
} | |
} else { | |
console.assert(!was_aborted); | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment