Created
December 18, 2024 14:45
-
-
Save schickling/1d09d74fd84233185384c9a8fb69f53e to your computer and use it in GitHub Desktop.
Effect Node Child Process Worker
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* eslint-disable prefer-arrow/prefer-arrow-functions */ | |
import process from 'node:process' | |
import { WorkerError } from '@effect/platform/WorkerError' | |
import * as Runner from '@effect/platform/WorkerRunner' | |
import * as Context from 'effect/Context' | |
import * as Deferred from 'effect/Deferred' | |
import * as Effect from 'effect/Effect' | |
import * as Exit from 'effect/Exit' | |
import * as FiberId from 'effect/FiberId' | |
import * as FiberSet from 'effect/FiberSet' | |
import * as Layer from 'effect/Layer' | |
import * as Runtime from 'effect/Runtime' | |
import * as Scope from 'effect/Scope' | |
const platformRunnerImpl = Runner.PlatformRunner.of({ | |
[Runner.PlatformRunnerTypeId]: Runner.PlatformRunnerTypeId, | |
start<I, O>() { | |
return Effect.gen(function* () { | |
if (!process.send) { | |
return yield* new WorkerError({ reason: 'spawn', cause: new Error('not in a child process') }) | |
} | |
const port = { | |
postMessage: (message: any) => process.send!(message), | |
on: (event: string, handler: (message: any) => void) => process.on(event, handler), | |
close: () => {}, | |
} | |
const send = (_portId: number, message: O, _transfers?: ReadonlyArray<unknown>) => | |
Effect.sync(() => port.postMessage([1, message] /*, transfers as any*/)) | |
const run = <A, E, R>(handler: (portId: number, message: I) => Effect.Effect<A, E, R>) => | |
Effect.uninterruptibleMask((restore) => | |
Effect.gen(function* () { | |
const runtime = (yield* Effect.runtime<R | Scope.Scope>()).pipe( | |
Runtime.updateContext(Context.omit(Scope.Scope)), | |
) as Runtime.Runtime<R> | |
const fiberSet = yield* FiberSet.make<any, WorkerError | E>() | |
const runFork = Runtime.runFork(runtime) | |
port.on('message', (message: Runner.BackingRunner.Message<I>) => { | |
if (message[0] === 0) { | |
FiberSet.unsafeAdd(fiberSet, runFork(restore(handler(0, message[1])))) | |
} else { | |
port.close() | |
Deferred.unsafeDone(fiberSet.deferred, Exit.interrupt(FiberId.none)) | |
} | |
}) | |
port.on('messageerror', (cause) => { | |
Deferred.unsafeDone(fiberSet.deferred, new WorkerError({ reason: 'decode', cause })) | |
}) | |
port.on('error', (cause) => { | |
Deferred.unsafeDone(fiberSet.deferred, new WorkerError({ reason: 'unknown', cause })) | |
}) | |
port.postMessage([0]) | |
return (yield* restore(FiberSet.join(fiberSet))) as never | |
}).pipe(Effect.scoped), | |
) | |
return { run, send } | |
}) | |
}, | |
}) | |
/** @internal */ | |
export const layer = Layer.succeed(Runner.PlatformRunner, platformRunnerImpl) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* eslint-disable prefer-arrow/prefer-arrow-functions */ | |
import type * as ChildProcess from 'node:child_process' | |
import * as Worker from '@effect/platform/Worker' | |
import { WorkerError } from '@effect/platform/WorkerError' | |
import * as Deferred from 'effect/Deferred' | |
import * as Effect from 'effect/Effect' | |
import * as Exit from 'effect/Exit' | |
import * as Layer from 'effect/Layer' | |
import * as Scope from 'effect/Scope' | |
const platformWorkerImpl = Worker.makePlatform<ChildProcess.ChildProcess>()({ | |
setup({ scope, worker: childProcess }) { | |
return Effect.flatMap(Deferred.make<void, WorkerError>(), (exitDeferred) => { | |
childProcess.on('exit', () => { | |
Deferred.unsafeDone(exitDeferred, Exit.void) | |
}) | |
return Effect.as( | |
Scope.addFinalizer( | |
scope, | |
Effect.suspend(() => { | |
childProcess.send([1]) | |
return Deferred.await(exitDeferred) | |
}).pipe( | |
Effect.interruptible, | |
Effect.timeout(5000), | |
Effect.catchAllCause(() => Effect.sync(() => childProcess.kill())), | |
), | |
), | |
{ | |
postMessage: (message: any) => childProcess.send(message), | |
on: (event: string, handler: (message: any) => void) => childProcess.on(event, handler), | |
}, | |
) | |
}) | |
}, | |
listen({ deferred, emit, port }) { | |
port.on('message', (message) => { | |
emit(message) | |
}) | |
port.on('messageerror', (cause) => { | |
Deferred.unsafeDone(deferred, new WorkerError({ reason: 'decode', cause })) | |
}) | |
port.on('error', (cause) => { | |
Deferred.unsafeDone(deferred, new WorkerError({ reason: 'unknown', cause })) | |
}) | |
port.on('exit', (code) => { | |
Deferred.unsafeDone( | |
deferred, | |
new WorkerError({ reason: 'unknown', cause: new Error(`exited with code ${code}`) }), | |
) | |
}) | |
return Effect.void | |
}, | |
}) | |
/** @internal */ | |
export const layerWorker = Layer.succeed(Worker.PlatformWorker, platformWorkerImpl) | |
/** @internal */ | |
export const layerManager = Layer.provide(Worker.layerManager, layerWorker) | |
/** @internal */ | |
export const layer = (spawn: (id: number) => ChildProcess.ChildProcess) => | |
Layer.merge(layerManager, Worker.layerSpawner(spawn)) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
export * as ChildProcessRunner from './ChildProcessRunner.js' | |
export * as ChildProcessWorker from './ChildProcessWorker.js' |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment