Skip to content

Instantly share code, notes, and snippets.

@schickling
Created August 16, 2022 08:13
Show Gist options
  • Save schickling/243c10e48b43b731943d8160141fed9e to your computer and use it in GitHub Desktop.
Save schickling/243c10e48b43b731943d8160141fed9e to your computer and use it in GitHub Desktop.
Effect Worker Pool
import { M, OT, pipe, T } from '@mytunes/utils/effect'
import { provideOtelTracer } from '../utils/tracer.js'
import { makeWorkerPool } from './worker-pool.js'
export const startWorkerPool = () =>
pipe(startWorkerPool_, OT.withSpan('worker-pool'), provideOtelTracer(), T.runPromise)
export const startWorkerPool_ = T.gen(function* ($) {
const workerUrl = new URL('./worker.ts', import.meta.url).href
yield* $(
M.use_(makeWorkerPool<number, string>(4, workerUrl), ({ doWork }) =>
T.gen(function* ($) {
yield* $(
T.forEachParWithIndex_(Array.from({ length: 10 }), (_, i) =>
pipe(doWork(i), T.tapError(T.log), T.tap(T.log)),
),
)
}),
),
)
})
import { F, M, OT, P, pipe, Q, Semaphore, T } from '@mytunes/utils/effect'
import * as Comlink from 'comlink'
import type { OtelCtx, WrappedWorker } from './worker.js'
export const makeWorkerPool = <I, O>(numWorkers: number, workerFilePath: string) =>
M.gen(function* ($) {
type QueueItem = readonly [
msg: I,
resultPromise: P.Promise<TODO, O>,
/** Needed to make Otel work */
env: {},
]
// type Msg = Parameters<WrappedWorker['handleMsg']>[0]
type ComlinkWorker = Comlink.Remote<WrappedWorker>
const queue = yield* $(Q.makeUnbounded<QueueItem>())
const availableWorkers: ComlinkWorker[] = []
for (let i = 0; i < numWorkers; i++) {
const worker = new Worker(workerFilePath, { type: 'module' })
const wrappedWorker = Comlink.wrap<WrappedWorker>(worker)
availableWorkers.push(wrappedWorker)
}
yield* $(
pipe(
T.forEachParWithIndex_(availableWorkers, (worker, index) =>
pipe(
callWorker((otelCtx) => worker.initialize({ workerId: index, otelCtx })),
OT.withSpan('worker-pool:initialize'),
),
),
OT.withSpan('worker-pool:initializeWorkers'),
),
)
const semaphore = yield* $(Semaphore.makeSemaphore(numWorkers))
yield* $(
pipe(
Q.take(queue),
T.chain(([item, resultPromise, env]) =>
pipe(
T.gen(function* ($) {
yield* $(OT.addEvent('new-item'))
const worker = availableWorkers.pop()!
const result = yield* $(
pipe(
callWorker((otelCtx) => worker.handleMsg({ data: item, otelCtx }) as Promise<O>),
OT.withSpan('worker-pool:handleMsg', { attributes: { item: JSON.stringify(item) } }),
),
)
availableWorkers.push(worker)
return result
}),
T.provide(env),
T.to(resultPromise),
Semaphore.withPermit(semaphore),
T.fork,
),
),
T.forever,
T.interruptible,
T.forkDaemon,
OT.withSpan('worker-pool:workerLoop'),
T.toManagedRelease(F.interrupt),
),
)
return {
doWork: (msg: I) =>
pipe(
T.do,
T.bind('resultPromise', () => P.make<TODO, O>()),
// NOTE needed for otel
T.bind('env', () => T.environment<{}>()),
T.tap(({ resultPromise, env }) => Q.offer_(queue, [msg, resultPromise, env])),
T.chain(({ resultPromise }) => P.await(resultPromise)),
),
}
})
const callWorker = <Res>(cb: (otelCtx: OtelCtx) => Promise<Res>) =>
T.accessServiceM(OT.Span)(({ span }) =>
T.tryCatchPromise(
() => {
const { traceId, spanId } = span.spanContext()
return cb({ traceId, spanId })
},
(err) => err as TODO,
),
)
import { Cause, OT, pipe, T } from '@mytunes/utils/effect'
import * as otel from '@opentelemetry/api'
import * as Comlink from 'comlink'
import { provideOtelTracer } from '../utils/tracer.js'
if (typeof window === 'undefined') {
globalThis.window = globalThis as TODO
}
const workerData = {
workerId: -1,
}
export type OtelCtx = { traceId: string; spanId: string }
const initialize = ({ workerId, otelCtx }: { workerId: number; otelCtx: OtelCtx }) =>
pipe(
T.succeedWith(() => {
workerData.workerId = workerId
return true
}),
// NOTE passing the `workerId` here explicitly since `workerData.workerId` is not yet set
runMain({ operationName: 'initialize', otelCtx, workerId }),
)
const handleMsg = async <I, O>({ data, otelCtx }: { data: I; otelCtx: OtelCtx }): Promise<O> =>
pipe(
T.tryPromise<O>(
() => new Promise((resolve) => setTimeout(() => resolve(`result for promise ${data}` as TODO), 2000)),
),
runMain({ operationName: 'handleMsg', otelCtx }),
)
const wrappedWorker = { initialize, handleMsg }
export type WrappedWorker = typeof wrappedWorker
Comlink.expose(wrappedWorker)
const runMain =
({ operationName, otelCtx, workerId }: { operationName: string; otelCtx: OtelCtx; workerId?: number }) =>
<E, A>(eff: T.Effect<OT.HasTracer, E, A>): Promise<A> =>
pipe(
eff,
OT.withSpan(`worker:${operationName}`, {}, makeParentContext(otelCtx)),
provideOtelTracer(`overtone-worker-${workerId ?? workerData.workerId}`),
T.tapCause((err) => T.logError(Cause.pretty(err))),
T.runPromise,
)
const makeParentContext = ({ spanId, traceId }: OtelCtx) =>
otel.trace.setSpanContext(otel.context.active(), {
traceId,
spanId,
traceFlags: otel.TraceFlags.SAMPLED,
isRemote: true,
})
@schickling
Copy link
Author

CleanShot 2022-08-16 at 10 13 44@2x

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment