Skip to content

Instantly share code, notes, and snippets.

@schickling
Created April 26, 2023 10:00
Show Gist options
  • Save schickling/82b0c6a923d74ad8f35a6056ecca970c to your computer and use it in GitHub Desktop.
Save schickling/82b0c6a923d74ad8f35a6056ecca970c to your computer and use it in GitHub Desktop.
// /// <reference no-default-lib="true" />
/// <reference lib="esnext" />
/// <reference lib="webworker" />
import { casesHandled } from '@mytunes/utils'
import { Effect, Either, Layer, Otel, pipe, ServiceContext, Stream, WorkerLive } from '@mytunes/utils/effect'
import { OtelMeterLive, OtelTracerLive } from '@mytunes/utils-frontend/tracing'
import type {
EventId,
WorkerInitHandler,
WorkerInitPayload,
WorkerRequestHandler,
WorkerRequestPayload,
WorkerResponseData,
} from '../common.js'
import { isWorkerRequestDataEvent, makeParentContext } from '../common.js'
import type { AnyWorkerCodec } from '../WorkerCodec.js'
declare let self: DedicatedWorkerGlobalScope
type OtelEnv = Otel.Tracer | Otel.Meter | Otel.Span
// TODO shutdown logic
// TODO make `R` in handleRequest` is provided via `layers`
// TODO refactor to share logic with `makeSharedWorker`
// TODO make `TInitExtraPayload` type-safe across worker and worker-client code
export const makeWorkerUnsafe = <TCodec extends AnyWorkerCodec, Ctx>({
handleInit,
// layers = Layer.Empty as Layer.Layer<Otel.Tracer | Otel.Meter | Otel.Span | DefaultEnv, never, R>,
handleRequest,
codec,
otelServiceName,
}: {
// TODO possibly refactor into a layer instead of an effect that returns a layer
handleInit: WorkerInitHandler<TCodec, Ctx>
handleRequest: WorkerRequestHandler<TCodec, Ctx>
codec: TCodec
otelServiceName: string
}) => {
let mainLayer: ServiceContext.MainLayer<OtelEnv | WorkerGlobalScope | Ctx>
const handleInitEncoded = (
{ workerContext, otelContext, extraPayload }: WorkerInitPayload<TCodec>,
eventId: EventId,
): Effect.Effect<never, never, void> => {
const parentSpanCtx = makeParentContext(otelContext)
const initLayer = Layer.provideMerge(
Layer.mergeAll(OtelTracerLive(otelServiceName, workerContext), OtelMeterLive(otelServiceName, workerContext)),
Layer.mergeAll(Otel.spanLayerFromContext(parentSpanCtx), WorkerLive),
)
const initMainLayer = ServiceContext.unsafeMainLayer(initLayer)
return pipe(
handleInit(extraPayload),
Effect.tap((extraLayers) =>
pipe(
Effect.sync(() => {
const mergedLayers = Layer.provideMerge(initLayer, extraLayers)
mainLayer = ServiceContext.unsafeMainLayer(mergedLayers)
}),
// NOTE This "warms up" the cache of the main layer
Effect.tap(() => Effect.provideLayer(Effect.unit(), mainLayer.layer)),
),
),
Effect.tap(() => sendToUiThread({ _tag: 'init', eventId })),
Otel.withSpan(`worker:init`, {}, makeParentContext(otelContext)),
Effect.provideSomeLayer(initMainLayer.layer),
)
}
const handleRequestEncoded = ({
payload: { encodedRequest, otelContext },
eventId,
}: {
payload: WorkerRequestPayload<TCodec>
eventId: EventId
}): Stream.Stream<
OtelEnv | WorkerGlobalScope | Ctx,
unknown,
[msg: TCodec['ResEncoded'], transfer: Transferable[]]
> =>
pipe(
handleRequest(codec.decodeReq(encodedRequest)),
Stream.map((decodedOut) => codec.encodeRes(Either.right(decodedOut))),
Stream.catchAll((error) => Stream.succeed(codec.encodeRes(Either.left(error)))),
Stream.tap(([encodedResponse, transfer]) =>
sendToUiThread({ _tag: 'response-value', eventId, encodedResponse }, transfer),
),
Otel.withSpanStream(`worker:handleRequest`, {}, makeParentContext(otelContext)),
Stream.tapCauseLogPretty,
)
const sendToUiThread = (encodedRequest: WorkerResponseData<TCodec>, transfer?: Transferable[]) =>
pipe(
Effect.sync(() => self.postMessage(encodedRequest, { transfer })),
Effect.tap(() => Otel.addEvent('sendToUiThread')),
)
self.addEventListener('message', async (messageEvent) => {
if (isWorkerRequestDataEvent<TCodec>(messageEvent.data)) {
switch (messageEvent.data._tag) {
case 'init': {
// TODO rename to "envelop" vs "payload"
const { eventId, payload } = messageEvent.data
await pipe(handleInitEncoded(payload, eventId), Effect.tapCauseLogPretty, Effect.runPromise)
break
}
case 'request': {
const { eventId, payload } = messageEvent.data
await pipe(
handleRequestEncoded({ payload, eventId }),
Stream.runDrain,
Effect.tap(() => sendToUiThread({ _tag: 'response-end', eventId })),
Effect.provideLayer(mainLayer.layer),
Effect.tapCauseLogPretty,
Effect.runPromise,
)
break
}
case 'request-batch': {
const { batch } = messageEvent.data
await pipe(
Effect.forEachPar(batch, ({ eventId, payload }) =>
pipe(
handleRequestEncoded({ payload, eventId }),
Stream.runDrain,
Effect.tap(() => sendToUiThread({ _tag: 'response-end', eventId })),
),
),
Effect.provideLayer(mainLayer.layer),
Effect.tapCauseLogPretty,
Effect.runPromise,
)
break
}
default: {
casesHandled(messageEvent.data[0])
}
}
} else {
console.log('unhandled message', messageEvent.data)
}
})
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment