Skip to content

Instantly share code, notes, and snippets.

@dubiousjim
Created January 25, 2020 16:54
Show Gist options
  • Save dubiousjim/f0d8c2299c5ea7f94e249553c6335323 to your computer and use it in GitHub Desktop.
Save dubiousjim/f0d8c2299c5ea7f94e249553c6335323 to your computer and use it in GitHub Desktop.
reworked deno Process, readAll, Buffer.readFrom
let { EOF } = Deno; // may move to Deno.symbols
let { Buffer, ErrorKind, read, close, run: oldDenoRun, Process } = Deno;
const MIN_READ = 512;
Buffer.prototype.readFrom = async function readFrom(r) {
let n = 0;
while (true) {
try {
const i = this._grow(MIN_READ);
this._reslice(i);
const fub = new Uint8Array(this.buf.buffer, i);
const nread = await r.read(fub);
if (nread === EOF)
return n;
this._reslice(i + nread);
n += nread;
} catch (e) {
if (e.kind===ErrorKind.BadResource)
return -n-1; // FIXME
throw e;
}
}
}
Buffer.prototype.readFromN = async function readFromN(r, nbytes) {
if (nbytes < 0)
throw new Error("Buffer.readFromN: negative nbytes");
let n = 0;
while (true) {
try {
const needed = (nbytes < MIN_READ) ? nbytes : MIN_READ;
if (needed <= 0)
return n;
const i = this._grow(needed);
this._reslice(i);
const fub = new Uint8Array(this.buf.buffer, i);
const nread = await r.read(fub);
if (nread === EOF)
return n;
this._reslice(i + nread);
n += nread;
console.assert(nread <= needed && needed <= nbytes);
nbytes -= nread;
} catch (e) {
if (e.kind===ErrorKind.BadResource)
return -n-1; // FIXME
throw e;
}
}
}
// TODO readAll(reader, opts?:{ maxBytes, rejector }) : { closed?:boolean, aborted?:any, contents:Uint8Array }
export function readAll(reader, rejector) {
/*
const buf = new Buffer();
await buf.readFrom(reader);
return buf.bytes();
*/
const buf = new Buffer();
const p = new Promise((res, rej) => { buf.readFrom(reader).then(res); if (rejector) rejector(rej); });
return p.then(n => ({ closed: n < 0, contents: buf.bytes() }),
reason => ({ aborted: reason, contents: buf.bytes() }));
}
const readAllN_EOF_Buffer = new Uint8Array(1);
export function readAllN(reader, maxBytes, rejector) {
/*
const buf = new Buffer();
const n = await buf.readFromN(reader, maxBytes);
if (n===maxBytes) {
// file may be longer than maxBytes
const m = await read(reader.rid, readAllN_EOF_Buffer);
// assert(m===EOF || m===1);
if (m!==EOF)
truncated = true;
}
return { contents: buf.bytes(), truncated };
*/
if (maxBytes < 0)
throw new Error("readAllN: negative maxBytes");
const buf = new Buffer();
const p = new Promise((res, rej) => { buf.readFromN(reader, maxBytes).then(res); if (rejector) rejector(rej); });
async function finish(n) {
let truncated = false;
let was_negative = false;
if (n < 0) { was_negative = true; n = -n-1; }
if (n===maxBytes) {
// file may be longer than maxBytes
try {
const m = await read(reader.rid, readAllN_EOF_Buffer);
console.assert(m===EOF || m===1);
console.assert(!was_negative);
if (m!==EOF)
truncated = true;
} catch(e) {
if (e.kind!==ErrorKind.BadResource)
throw e;
truncated = null; // fd closed before we could check
}
}
if (truncated) console.assert(!was_negative);
const b = buf.bytes();
return { truncated, closed: was_negative, contents: b };
}
return p.then(finish, reason => ({ aborted: reason, contents: buf.bytes() }));
}
export function newDenoRun(opts) {
const timeout = (opts.timeout===null || opts.timeout===undefined || opts.timeout===false) ? 0 : opts.timeout;
const maxBuffer = (opts.maxBuffer===undefined) ? null : opts.maxBuffer; // Number.MAX_SAFE_INTEGER or 1024*1024?
const p = oldDenoRun(opts);
return new LimitedProcess(p, timeout, maxBuffer);
}
export class Exceeded {
constructor(nbytes, buf, caller) {
this.nbytes = nbytes;
this.buf = buf;
this.caller = caller;
}
toString() {
let s = `Exceeded ${this.nbytes} bytes`
if (this.caller) s += ` (${this.caller})`;
return s;
}
}
export class Timeout {
constructor(ms, caller) {
this.ms = ms;
this.caller = caller;
}
toString() {
let s = `Timeout after ${this.ms} ms`
if (this.caller) s += ` (${this.caller})`;
return s;
}
}
// TODO implement Timeout
export class LimitedProcess extends Process {
#maxBuffer;
#timeout;
#statusProm;
#exited = false;
#reject;
#outReject;
#errReject;
constructor(p, timeout, maxBuffer) {
super({ rid: p.rid, pid: p.pid });
if (p.stdin) this.stdin = p.stdin;
if (p.stdout) this.stdout = p.stdout;
if (p.stderr) this.stderr = p.stderr;
this.#maxBuffer = maxBuffer;
this.#timeout = timeout;
this.#statusProm = new Promise((res, rej) => {
this.#reject = rej;
// TODO if this lands in main code, might delay generating #statusProm and #reject until requested
p.status().then(
// FIXME cleanup
data => {
if (this.#exited) {
if (this.#outReject) this.#outReject('alpha');
if (this.#errReject) this.#errReject('alpha');
}
this.#exited = true;
res(data);
},
err => {
console.assert(this.#exited && err.kind===ErrorKind.BadResource);
if (1) {
if (this.#outReject) this.#outReject('beta');
if (this.#errReject) this.#errReject('beta');
}
// status already settled
}
);
});
}
close() {
try {
close(this.rid);
} catch(e) {
if (e.kind!==ErrorKind.BadResource)
throw e;
}
if (!this.#exited) {
this.#exited = true;
// TODO assumes we've generated #statusProm and #reject
this.#reject('explicitly closed'); // FIXME cleanup
} // else status already settled
}
kill(signo) {
if (this.#exited)
throw new Error('Process.kill: already exited');
return kill(this.pid, signo);
}
status() { return this.#statusProm; }
async output() {
if (!this.stdout)
throw new Error("Process.output: stdout is undefined");
if (this.#outReject!==undefined)
throw new Error("Process.output: couldn't get lock");
if (this.#exited)
console.info(`Process ${this.rid} already exited, but attempting output`);
let try_close = true, was_aborted = false;
try {
if (this.#maxBuffer===null) {
const r = await readAll(this.stdout, rej => this.#outReject = rej);
console.assert('aborted' in r || 'closed' in r);
if (r.aborted) {
was_aborted = true;
} else if (r.closed) {
try_close = false;
}
return r.contents;
} else {
const r = await readAllN(this.stdout, this.#maxBuffer, rej => this.#outReject = rej);
console.assert('aborted' in r || 'closed' in r);
if (r.aborted) {
was_aborted = true;
} else if (r.closed) {
try_close = false;
}
if (r.truncated) {
throw new Exceeded(this.#maxBuffer, r.contents, `Process ${this.rid}'s stdout`); // FIXME cleanup
}
return r.contents;
}
} finally {
this.#outReject = undefined;
// if readFrom/readFromN returned negative n, this would fail
// but it might also fail anyway
if (try_close) {
try {
this.stdout.close();
} catch(e) {
if (e.kind!==ErrorKind.BadResource)
throw e;
}
} else {
console.assert(!was_aborted);
}
}
}
async stderrOutput() {
if (!this.stderr)
throw new Error("Process.stderrOutput: stderr is undefined");
if (this.#errReject!==undefined)
throw new Error("Process.stderrOutput: couldn't get lock");
if (this.#exited)
console.info(`Process ${this.rid} already exited, but attempting stderrOutput`);
let try_close = true, was_aborted = false;
try {
const r = await readAll(this.stderr, rej => this.#errReject = rej);
console.assert('aborted' in r || 'closed' in r);
if (r.aborted) {
was_aborted = true;
} else if (r.closed) {
try_close = false;
}
return r.contents;
} finally {
this.#errReject = undefined;
if (try_close) {
try {
this.stderr.close();
} catch(e) {
if (e.kind!==ErrorKind.BadResource)
throw e;
}
} else {
console.assert(!was_aborted);
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment