Skip to content

Instantly share code, notes, and snippets.

@schickling
Created December 18, 2024 14:45
Show Gist options
  • Save schickling/1d09d74fd84233185384c9a8fb69f53e to your computer and use it in GitHub Desktop.
Save schickling/1d09d74fd84233185384c9a8fb69f53e to your computer and use it in GitHub Desktop.
Effect Node Child Process Worker
/* eslint-disable prefer-arrow/prefer-arrow-functions */
import process from 'node:process'
import { WorkerError } from '@effect/platform/WorkerError'
import * as Runner from '@effect/platform/WorkerRunner'
import * as Context from 'effect/Context'
import * as Deferred from 'effect/Deferred'
import * as Effect from 'effect/Effect'
import * as Exit from 'effect/Exit'
import * as FiberId from 'effect/FiberId'
import * as FiberSet from 'effect/FiberSet'
import * as Layer from 'effect/Layer'
import * as Runtime from 'effect/Runtime'
import * as Scope from 'effect/Scope'
const platformRunnerImpl = Runner.PlatformRunner.of({
[Runner.PlatformRunnerTypeId]: Runner.PlatformRunnerTypeId,
start<I, O>() {
return Effect.gen(function* () {
if (!process.send) {
return yield* new WorkerError({ reason: 'spawn', cause: new Error('not in a child process') })
}
const port = {
postMessage: (message: any) => process.send!(message),
on: (event: string, handler: (message: any) => void) => process.on(event, handler),
close: () => {},
}
const send = (_portId: number, message: O, _transfers?: ReadonlyArray<unknown>) =>
Effect.sync(() => port.postMessage([1, message] /*, transfers as any*/))
const run = <A, E, R>(handler: (portId: number, message: I) => Effect.Effect<A, E, R>) =>
Effect.uninterruptibleMask((restore) =>
Effect.gen(function* () {
const runtime = (yield* Effect.runtime<R | Scope.Scope>()).pipe(
Runtime.updateContext(Context.omit(Scope.Scope)),
) as Runtime.Runtime<R>
const fiberSet = yield* FiberSet.make<any, WorkerError | E>()
const runFork = Runtime.runFork(runtime)
port.on('message', (message: Runner.BackingRunner.Message<I>) => {
if (message[0] === 0) {
FiberSet.unsafeAdd(fiberSet, runFork(restore(handler(0, message[1]))))
} else {
port.close()
Deferred.unsafeDone(fiberSet.deferred, Exit.interrupt(FiberId.none))
}
})
port.on('messageerror', (cause) => {
Deferred.unsafeDone(fiberSet.deferred, new WorkerError({ reason: 'decode', cause }))
})
port.on('error', (cause) => {
Deferred.unsafeDone(fiberSet.deferred, new WorkerError({ reason: 'unknown', cause }))
})
port.postMessage([0])
return (yield* restore(FiberSet.join(fiberSet))) as never
}).pipe(Effect.scoped),
)
return { run, send }
})
},
})
/** @internal */
export const layer = Layer.succeed(Runner.PlatformRunner, platformRunnerImpl)
/* eslint-disable prefer-arrow/prefer-arrow-functions */
import type * as ChildProcess from 'node:child_process'
import * as Worker from '@effect/platform/Worker'
import { WorkerError } from '@effect/platform/WorkerError'
import * as Deferred from 'effect/Deferred'
import * as Effect from 'effect/Effect'
import * as Exit from 'effect/Exit'
import * as Layer from 'effect/Layer'
import * as Scope from 'effect/Scope'
const platformWorkerImpl = Worker.makePlatform<ChildProcess.ChildProcess>()({
setup({ scope, worker: childProcess }) {
return Effect.flatMap(Deferred.make<void, WorkerError>(), (exitDeferred) => {
childProcess.on('exit', () => {
Deferred.unsafeDone(exitDeferred, Exit.void)
})
return Effect.as(
Scope.addFinalizer(
scope,
Effect.suspend(() => {
childProcess.send([1])
return Deferred.await(exitDeferred)
}).pipe(
Effect.interruptible,
Effect.timeout(5000),
Effect.catchAllCause(() => Effect.sync(() => childProcess.kill())),
),
),
{
postMessage: (message: any) => childProcess.send(message),
on: (event: string, handler: (message: any) => void) => childProcess.on(event, handler),
},
)
})
},
listen({ deferred, emit, port }) {
port.on('message', (message) => {
emit(message)
})
port.on('messageerror', (cause) => {
Deferred.unsafeDone(deferred, new WorkerError({ reason: 'decode', cause }))
})
port.on('error', (cause) => {
Deferred.unsafeDone(deferred, new WorkerError({ reason: 'unknown', cause }))
})
port.on('exit', (code) => {
Deferred.unsafeDone(
deferred,
new WorkerError({ reason: 'unknown', cause: new Error(`exited with code ${code}`) }),
)
})
return Effect.void
},
})
/** @internal */
export const layerWorker = Layer.succeed(Worker.PlatformWorker, platformWorkerImpl)
/** @internal */
export const layerManager = Layer.provide(Worker.layerManager, layerWorker)
/** @internal */
export const layer = (spawn: (id: number) => ChildProcess.ChildProcess) =>
Layer.merge(layerManager, Worker.layerSpawner(spawn))
export * as ChildProcessRunner from './ChildProcessRunner.js'
export * as ChildProcessWorker from './ChildProcessWorker.js'
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment