Skip to content

Instantly share code, notes, and snippets.

@betafcc
Last active December 28, 2024 04:48
Show Gist options
  • Save betafcc/cd11dca95c1bccdfc5899f61cf7a0a8d to your computer and use it in GitHub Desktop.
Save betafcc/cd11dca95c1bccdfc5899f61cf7a0a8d to your computer and use it in GitHub Desktop.
Node and Deno child process helper
/**
* Creates a process.
*
* Used as a template string will spawn a shell
* @example ```
* $`seq 1 100`
* ````
* Used as a function will not
* @example ```
* $('seq', 1, 100)
* ```
*
* @example
* // can await the output as a string
* const textOuput = await $`seq 1 100`
*
* // can iterate over the output line by line
* for await (const line of $`seq 1 100`)
* console.log(line)
*
* // can pipe the output to another process
* for await (const line of $`seq 1 100`.pipe($`head -3`).pipe($`sort -rn`))
* console.log(line)
*
* // can pipe an input to a process
* for await (const line of $`head -3`.input($`seq 1 100`).pipe($`sort -rn`))
* console.log(line)
*
* // input can be an array of lines to be written to stdin
* for await (const line of $`head -3`.input([1, 2, 3, 4, 5]).pipe($`sort -rn`))
* console.log(line)
*/
export const $: {
(command: string, ...args: ToString[]): Process
(strings: TemplateStringsArray, ...values: ToString[]): Process
} = (strings, ...values) => {
if (typeof strings === 'string')
return new Process(
new Deno.Command(strings, {
args: values.map(e => e.toString()),
stdin: 'piped',
stdout: 'piped',
}).spawn(),
)
const shell =
Deno.build.os === 'windows'
? Deno.env.get('COMSPEC') ?? 'cmd.exe'
: Deno.env.get('SHELL') ?? 'sh'
return new Process(
new Deno.Command(shell, {
args: [
'-c',
strings.map((str, i) => str + (values[i] ?? '').toString()).join(''),
],
stdin: 'piped',
stdout: 'piped',
}).spawn(),
)
}
export type ToString = { toString(): string }
export type Input = ToString | Iterable<ToString> | AsyncIterable<ToString>
export const serialize = async function* (input: Input): AsyncIterable<string> {
if (Symbol.asyncIterator in input)
for await (const el of input) yield el.toString()
else if (Symbol.iterator in input) for (const el of input) yield el.toString()
else yield input.toString()
}
export class Process {
private closed = false
constructor(public readonly child: Deno.ChildProcess) {
this.child.status.finally(() => (this.closed = true))
}
[Symbol.asyncIterator]() {
return this.lines()
}
async then(onfulfilled: (output: string) => void) {
return onfulfilled((await Array.fromAsync(this)).join('\n'))
}
async *lines(): AsyncGenerator<string> {
const decoder = new TextDecoder()
let leftover = ''
for await (const chunk of this.chunks()) {
leftover += decoder.decode(chunk, { stream: true })
const lines = leftover.split(/\r?\n|\n|\r/)
leftover = lines.pop() ?? ''
for (const line of lines) yield line
}
if (leftover) yield leftover
}
async *chunks(): AsyncGenerator<Uint8Array> {
const reader = this.child.stdout.getReader()
try {
while (true) {
const result = await reader.read()
if (result.done) break
yield result.value
}
} finally {
reader.releaseLock()
}
}
pipe(other: Process): Process {
this.child.stdout
.pipeTo(other.child.stdin, { preventClose: false })
.catch(err => {
if (!(err.message.includes('closed') || err.message.includes('broken')))
console.error('Pipe error:', err)
})
return other
}
input(input: Input | Process): Process {
if (input instanceof Process) return input.pipe(this)
;(async () => {
const writer = this.child.stdin.getWriter()
const encoder = new TextEncoder()
try {
for await (const line of serialize(input))
if (this.closed) break
else await writer.write(encoder.encode(line + '\n'))
} catch (err) {
console.error('Error writing to stdin:', err)
} finally {
try {
writer.close()
} catch (_) {
// Usually means the stream was already closed.
}
}
})()
return this
}
}
import readline from 'node:readline'
import { once } from 'node:events'
import { spawn, type ChildProcessWithoutNullStreams } from 'node:child_process'
/**
* Creates a process.
*
* Used as a template string will spawn a shell
* @example ```
* $`seq 1 100`
* ````
* Used as a function will not
* @example ```
* $('seq', 1, 100)
* ```
*
* @example
* // can await the output as a string
* const textOuput = await $`seq 1 100`
*
* // can iterate over the output line by line
* for await (const line of $`seq 1 100`)
* console.log(line)
*
* // can pipe the output to another process
* for await (const line of $`seq 1 100`.pipe($`head -3`).pipe($`sort -rn`))
* console.log(line)
*
* // can pipe an input to a process
* for await (const line of $`head -3`.input($`seq 1 100`).pipe($`sort -rn`))
* console.log(line)
*
* // input can be an array of lines to be written to stdin
* for await (const line of $`head -3`.input([1, 2, 3, 4, 5]).pipe($`sort -rn`))
* console.log(line)
*/
export const $: {
(command: string, ...args: ToString[]): Process
(strings: TemplateStringsArray, ...values: ToString[]): Process
} = (strings, ...values) => {
if (typeof strings === 'string')
return new Process(
spawn(
strings,
values.map(e => e.toString()),
),
)
else
return new Process(
spawn(
strings.map((str, i) => str + (values[i] ?? '').toString()).join(''),
{ shell: true },
),
)
}
export type ToString = { toString(): string }
export type Input = ToString | Iterable<ToString> | AsyncIterable<ToString>
export const serialize = async function* (input: Input): AsyncIterable<string> {
if (Symbol.asyncIterator in input)
for await (const el of input) yield el.toString()
else if (Symbol.iterator in input) for (const el of input) yield el.toString()
else yield input.toString()
}
export class Process {
constructor(readonly process: ChildProcessWithoutNullStreams) {}
[Symbol.asyncIterator]() {
return this.lines()[Symbol.asyncIterator]()
}
async then(onfulfilled: (output: string) => void) {
return onfulfilled((await Array.fromAsync(this)).join('\n'))
}
async *chunks() {
for await (const chunk of this.process.stdout) yield chunk
}
lines() {
return readline.createInterface({
input: this.process.stdout,
crlfDelay: Infinity,
})
}
pipe(other: Process) {
this.process.stdout.pipe(other.process.stdin)
return other
}
// UNSAFE: $`seq 1 100`.pipe($`head -3`).input($`sort -rn`)
// the returning process from the pipe will already have a piped stdin
input(input: Input | Process) {
if (input instanceof Process) return input.pipe(this)
;(async () => {
let closed = false
this.process.once('close', () => (closed = true))
this.process.stdin.on(
'error',
err => (err as any).code === 'EPIPE' && (closed = true),
)
for await (const el of serialize(input))
if (closed) break
else if (!this.process.stdin.write(el + '\n') && !closed)
try {
await once(this.process.stdin, 'drain')
} catch (err) {
console.error('Error waiting for drain:', err)
break
}
if (!closed) this.process.stdin.end()
})()
return this
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment