Last active
December 28, 2024 04:48
-
-
Save betafcc/cd11dca95c1bccdfc5899f61cf7a0a8d to your computer and use it in GitHub Desktop.
Node and Deno child process helper
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* Creates a process. | |
* | |
* Used as a template string will spawn a shell | |
* @example ``` | |
* $`seq 1 100` | |
* ```` | |
* Used as a function will not | |
* @example ``` | |
* $('seq', 1, 100) | |
* ``` | |
* | |
* @example | |
* // can await the output as a string | |
* const textOuput = await $`seq 1 100` | |
* | |
* // can iterate over the output line by line | |
* for await (const line of $`seq 1 100`) | |
* console.log(line) | |
* | |
* // can pipe the output to another process | |
* for await (const line of $`seq 1 100`.pipe($`head -3`).pipe($`sort -rn`)) | |
* console.log(line) | |
* | |
* // can pipe an input to a process | |
* for await (const line of $`head -3`.input($`seq 1 100`).pipe($`sort -rn`)) | |
* console.log(line) | |
* | |
* // input can be an array of lines to be written to stdin | |
* for await (const line of $`head -3`.input([1, 2, 3, 4, 5]).pipe($`sort -rn`)) | |
* console.log(line) | |
*/ | |
export const $: { | |
(command: string, ...args: ToString[]): Process | |
(strings: TemplateStringsArray, ...values: ToString[]): Process | |
} = (strings, ...values) => { | |
if (typeof strings === 'string') | |
return new Process( | |
new Deno.Command(strings, { | |
args: values.map(e => e.toString()), | |
stdin: 'piped', | |
stdout: 'piped', | |
}).spawn(), | |
) | |
const shell = | |
Deno.build.os === 'windows' | |
? Deno.env.get('COMSPEC') ?? 'cmd.exe' | |
: Deno.env.get('SHELL') ?? 'sh' | |
return new Process( | |
new Deno.Command(shell, { | |
args: [ | |
'-c', | |
strings.map((str, i) => str + (values[i] ?? '').toString()).join(''), | |
], | |
stdin: 'piped', | |
stdout: 'piped', | |
}).spawn(), | |
) | |
} | |
export type ToString = { toString(): string } | |
export type Input = ToString | Iterable<ToString> | AsyncIterable<ToString> | |
export const serialize = async function* (input: Input): AsyncIterable<string> { | |
if (Symbol.asyncIterator in input) | |
for await (const el of input) yield el.toString() | |
else if (Symbol.iterator in input) for (const el of input) yield el.toString() | |
else yield input.toString() | |
} | |
export class Process { | |
private closed = false | |
constructor(public readonly child: Deno.ChildProcess) { | |
this.child.status.finally(() => (this.closed = true)) | |
} | |
[Symbol.asyncIterator]() { | |
return this.lines() | |
} | |
async then(onfulfilled: (output: string) => void) { | |
return onfulfilled((await Array.fromAsync(this)).join('\n')) | |
} | |
async *lines(): AsyncGenerator<string> { | |
const decoder = new TextDecoder() | |
let leftover = '' | |
for await (const chunk of this.chunks()) { | |
leftover += decoder.decode(chunk, { stream: true }) | |
const lines = leftover.split(/\r?\n|\n|\r/) | |
leftover = lines.pop() ?? '' | |
for (const line of lines) yield line | |
} | |
if (leftover) yield leftover | |
} | |
async *chunks(): AsyncGenerator<Uint8Array> { | |
const reader = this.child.stdout.getReader() | |
try { | |
while (true) { | |
const result = await reader.read() | |
if (result.done) break | |
yield result.value | |
} | |
} finally { | |
reader.releaseLock() | |
} | |
} | |
pipe(other: Process): Process { | |
this.child.stdout | |
.pipeTo(other.child.stdin, { preventClose: false }) | |
.catch(err => { | |
if (!(err.message.includes('closed') || err.message.includes('broken'))) | |
console.error('Pipe error:', err) | |
}) | |
return other | |
} | |
input(input: Input | Process): Process { | |
if (input instanceof Process) return input.pipe(this) | |
;(async () => { | |
const writer = this.child.stdin.getWriter() | |
const encoder = new TextEncoder() | |
try { | |
for await (const line of serialize(input)) | |
if (this.closed) break | |
else await writer.write(encoder.encode(line + '\n')) | |
} catch (err) { | |
console.error('Error writing to stdin:', err) | |
} finally { | |
try { | |
writer.close() | |
} catch (_) { | |
// Usually means the stream was already closed. | |
} | |
} | |
})() | |
return this | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import readline from 'node:readline' | |
import { once } from 'node:events' | |
import { spawn, type ChildProcessWithoutNullStreams } from 'node:child_process' | |
/** | |
* Creates a process. | |
* | |
* Used as a template string will spawn a shell | |
* @example ``` | |
* $`seq 1 100` | |
* ```` | |
* Used as a function will not | |
* @example ``` | |
* $('seq', 1, 100) | |
* ``` | |
* | |
* @example | |
* // can await the output as a string | |
* const textOuput = await $`seq 1 100` | |
* | |
* // can iterate over the output line by line | |
* for await (const line of $`seq 1 100`) | |
* console.log(line) | |
* | |
* // can pipe the output to another process | |
* for await (const line of $`seq 1 100`.pipe($`head -3`).pipe($`sort -rn`)) | |
* console.log(line) | |
* | |
* // can pipe an input to a process | |
* for await (const line of $`head -3`.input($`seq 1 100`).pipe($`sort -rn`)) | |
* console.log(line) | |
* | |
* // input can be an array of lines to be written to stdin | |
* for await (const line of $`head -3`.input([1, 2, 3, 4, 5]).pipe($`sort -rn`)) | |
* console.log(line) | |
*/ | |
export const $: { | |
(command: string, ...args: ToString[]): Process | |
(strings: TemplateStringsArray, ...values: ToString[]): Process | |
} = (strings, ...values) => { | |
if (typeof strings === 'string') | |
return new Process( | |
spawn( | |
strings, | |
values.map(e => e.toString()), | |
), | |
) | |
else | |
return new Process( | |
spawn( | |
strings.map((str, i) => str + (values[i] ?? '').toString()).join(''), | |
{ shell: true }, | |
), | |
) | |
} | |
export type ToString = { toString(): string } | |
export type Input = ToString | Iterable<ToString> | AsyncIterable<ToString> | |
export const serialize = async function* (input: Input): AsyncIterable<string> { | |
if (Symbol.asyncIterator in input) | |
for await (const el of input) yield el.toString() | |
else if (Symbol.iterator in input) for (const el of input) yield el.toString() | |
else yield input.toString() | |
} | |
export class Process { | |
constructor(readonly process: ChildProcessWithoutNullStreams) {} | |
[Symbol.asyncIterator]() { | |
return this.lines()[Symbol.asyncIterator]() | |
} | |
async then(onfulfilled: (output: string) => void) { | |
return onfulfilled((await Array.fromAsync(this)).join('\n')) | |
} | |
async *chunks() { | |
for await (const chunk of this.process.stdout) yield chunk | |
} | |
lines() { | |
return readline.createInterface({ | |
input: this.process.stdout, | |
crlfDelay: Infinity, | |
}) | |
} | |
pipe(other: Process) { | |
this.process.stdout.pipe(other.process.stdin) | |
return other | |
} | |
// UNSAFE: $`seq 1 100`.pipe($`head -3`).input($`sort -rn`) | |
// the returning process from the pipe will already have a piped stdin | |
input(input: Input | Process) { | |
if (input instanceof Process) return input.pipe(this) | |
;(async () => { | |
let closed = false | |
this.process.once('close', () => (closed = true)) | |
this.process.stdin.on( | |
'error', | |
err => (err as any).code === 'EPIPE' && (closed = true), | |
) | |
for await (const el of serialize(input)) | |
if (closed) break | |
else if (!this.process.stdin.write(el + '\n') && !closed) | |
try { | |
await once(this.process.stdin, 'drain') | |
} catch (err) { | |
console.error('Error waiting for drain:', err) | |
break | |
} | |
if (!closed) this.process.stdin.end() | |
})() | |
return this | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment