Created
August 16, 2022 08:13
-
-
Save schickling/243c10e48b43b731943d8160141fed9e to your computer and use it in GitHub Desktop.
Effect Worker Pool
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { M, OT, pipe, T } from '@mytunes/utils/effect' | |
import { provideOtelTracer } from '../utils/tracer.js' | |
import { makeWorkerPool } from './worker-pool.js' | |
export const startWorkerPool = () => | |
pipe(startWorkerPool_, OT.withSpan('worker-pool'), provideOtelTracer(), T.runPromise) | |
export const startWorkerPool_ = T.gen(function* ($) { | |
const workerUrl = new URL('./worker.ts', import.meta.url).href | |
yield* $( | |
M.use_(makeWorkerPool<number, string>(4, workerUrl), ({ doWork }) => | |
T.gen(function* ($) { | |
yield* $( | |
T.forEachParWithIndex_(Array.from({ length: 10 }), (_, i) => | |
pipe(doWork(i), T.tapError(T.log), T.tap(T.log)), | |
), | |
) | |
}), | |
), | |
) | |
}) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { F, M, OT, P, pipe, Q, Semaphore, T } from '@mytunes/utils/effect' | |
import * as Comlink from 'comlink' | |
import type { OtelCtx, WrappedWorker } from './worker.js' | |
export const makeWorkerPool = <I, O>(numWorkers: number, workerFilePath: string) => | |
M.gen(function* ($) { | |
type QueueItem = readonly [ | |
msg: I, | |
resultPromise: P.Promise<TODO, O>, | |
/** Needed to make Otel work */ | |
env: {}, | |
] | |
// type Msg = Parameters<WrappedWorker['handleMsg']>[0] | |
type ComlinkWorker = Comlink.Remote<WrappedWorker> | |
const queue = yield* $(Q.makeUnbounded<QueueItem>()) | |
const availableWorkers: ComlinkWorker[] = [] | |
for (let i = 0; i < numWorkers; i++) { | |
const worker = new Worker(workerFilePath, { type: 'module' }) | |
const wrappedWorker = Comlink.wrap<WrappedWorker>(worker) | |
availableWorkers.push(wrappedWorker) | |
} | |
yield* $( | |
pipe( | |
T.forEachParWithIndex_(availableWorkers, (worker, index) => | |
pipe( | |
callWorker((otelCtx) => worker.initialize({ workerId: index, otelCtx })), | |
OT.withSpan('worker-pool:initialize'), | |
), | |
), | |
OT.withSpan('worker-pool:initializeWorkers'), | |
), | |
) | |
const semaphore = yield* $(Semaphore.makeSemaphore(numWorkers)) | |
yield* $( | |
pipe( | |
Q.take(queue), | |
T.chain(([item, resultPromise, env]) => | |
pipe( | |
T.gen(function* ($) { | |
yield* $(OT.addEvent('new-item')) | |
const worker = availableWorkers.pop()! | |
const result = yield* $( | |
pipe( | |
callWorker((otelCtx) => worker.handleMsg({ data: item, otelCtx }) as Promise<O>), | |
OT.withSpan('worker-pool:handleMsg', { attributes: { item: JSON.stringify(item) } }), | |
), | |
) | |
availableWorkers.push(worker) | |
return result | |
}), | |
T.provide(env), | |
T.to(resultPromise), | |
Semaphore.withPermit(semaphore), | |
T.fork, | |
), | |
), | |
T.forever, | |
T.interruptible, | |
T.forkDaemon, | |
OT.withSpan('worker-pool:workerLoop'), | |
T.toManagedRelease(F.interrupt), | |
), | |
) | |
return { | |
doWork: (msg: I) => | |
pipe( | |
T.do, | |
T.bind('resultPromise', () => P.make<TODO, O>()), | |
// NOTE needed for otel | |
T.bind('env', () => T.environment<{}>()), | |
T.tap(({ resultPromise, env }) => Q.offer_(queue, [msg, resultPromise, env])), | |
T.chain(({ resultPromise }) => P.await(resultPromise)), | |
), | |
} | |
}) | |
const callWorker = <Res>(cb: (otelCtx: OtelCtx) => Promise<Res>) => | |
T.accessServiceM(OT.Span)(({ span }) => | |
T.tryCatchPromise( | |
() => { | |
const { traceId, spanId } = span.spanContext() | |
return cb({ traceId, spanId }) | |
}, | |
(err) => err as TODO, | |
), | |
) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { Cause, OT, pipe, T } from '@mytunes/utils/effect' | |
import * as otel from '@opentelemetry/api' | |
import * as Comlink from 'comlink' | |
import { provideOtelTracer } from '../utils/tracer.js' | |
if (typeof window === 'undefined') { | |
globalThis.window = globalThis as TODO | |
} | |
const workerData = { | |
workerId: -1, | |
} | |
export type OtelCtx = { traceId: string; spanId: string } | |
const initialize = ({ workerId, otelCtx }: { workerId: number; otelCtx: OtelCtx }) => | |
pipe( | |
T.succeedWith(() => { | |
workerData.workerId = workerId | |
return true | |
}), | |
// NOTE passing the `workerId` here explicitly since `workerData.workerId` is not yet set | |
runMain({ operationName: 'initialize', otelCtx, workerId }), | |
) | |
const handleMsg = async <I, O>({ data, otelCtx }: { data: I; otelCtx: OtelCtx }): Promise<O> => | |
pipe( | |
T.tryPromise<O>( | |
() => new Promise((resolve) => setTimeout(() => resolve(`result for promise ${data}` as TODO), 2000)), | |
), | |
runMain({ operationName: 'handleMsg', otelCtx }), | |
) | |
const wrappedWorker = { initialize, handleMsg } | |
export type WrappedWorker = typeof wrappedWorker | |
Comlink.expose(wrappedWorker) | |
const runMain = | |
({ operationName, otelCtx, workerId }: { operationName: string; otelCtx: OtelCtx; workerId?: number }) => | |
<E, A>(eff: T.Effect<OT.HasTracer, E, A>): Promise<A> => | |
pipe( | |
eff, | |
OT.withSpan(`worker:${operationName}`, {}, makeParentContext(otelCtx)), | |
provideOtelTracer(`overtone-worker-${workerId ?? workerData.workerId}`), | |
T.tapCause((err) => T.logError(Cause.pretty(err))), | |
T.runPromise, | |
) | |
const makeParentContext = ({ spanId, traceId }: OtelCtx) => | |
otel.trace.setSpanContext(otel.context.active(), { | |
traceId, | |
spanId, | |
traceFlags: otel.TraceFlags.SAMPLED, | |
isRemote: true, | |
}) |
Author
schickling
commented
Aug 16, 2022
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment