Created
April 26, 2023 10:00
-
-
Save schickling/82b0c6a923d74ad8f35a6056ecca970c to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// /// <reference no-default-lib="true" /> | |
/// <reference lib="esnext" /> | |
/// <reference lib="webworker" /> | |
import { casesHandled } from '@mytunes/utils' | |
import { Effect, Either, Layer, Otel, pipe, ServiceContext, Stream, WorkerLive } from '@mytunes/utils/effect' | |
import { OtelMeterLive, OtelTracerLive } from '@mytunes/utils-frontend/tracing' | |
import type { | |
EventId, | |
WorkerInitHandler, | |
WorkerInitPayload, | |
WorkerRequestHandler, | |
WorkerRequestPayload, | |
WorkerResponseData, | |
} from '../common.js' | |
import { isWorkerRequestDataEvent, makeParentContext } from '../common.js' | |
import type { AnyWorkerCodec } from '../WorkerCodec.js' | |
declare let self: DedicatedWorkerGlobalScope | |
type OtelEnv = Otel.Tracer | Otel.Meter | Otel.Span | |
// TODO shutdown logic | |
// TODO make `R` in handleRequest` is provided via `layers` | |
// TODO refactor to share logic with `makeSharedWorker` | |
// TODO make `TInitExtraPayload` type-safe across worker and worker-client code | |
export const makeWorkerUnsafe = <TCodec extends AnyWorkerCodec, Ctx>({ | |
handleInit, | |
// layers = Layer.Empty as Layer.Layer<Otel.Tracer | Otel.Meter | Otel.Span | DefaultEnv, never, R>, | |
handleRequest, | |
codec, | |
otelServiceName, | |
}: { | |
// TODO possibly refactor into a layer instead of an effect that returns a layer | |
handleInit: WorkerInitHandler<TCodec, Ctx> | |
handleRequest: WorkerRequestHandler<TCodec, Ctx> | |
codec: TCodec | |
otelServiceName: string | |
}) => { | |
let mainLayer: ServiceContext.MainLayer<OtelEnv | WorkerGlobalScope | Ctx> | |
const handleInitEncoded = ( | |
{ workerContext, otelContext, extraPayload }: WorkerInitPayload<TCodec>, | |
eventId: EventId, | |
): Effect.Effect<never, never, void> => { | |
const parentSpanCtx = makeParentContext(otelContext) | |
const initLayer = Layer.provideMerge( | |
Layer.mergeAll(OtelTracerLive(otelServiceName, workerContext), OtelMeterLive(otelServiceName, workerContext)), | |
Layer.mergeAll(Otel.spanLayerFromContext(parentSpanCtx), WorkerLive), | |
) | |
const initMainLayer = ServiceContext.unsafeMainLayer(initLayer) | |
return pipe( | |
handleInit(extraPayload), | |
Effect.tap((extraLayers) => | |
pipe( | |
Effect.sync(() => { | |
const mergedLayers = Layer.provideMerge(initLayer, extraLayers) | |
mainLayer = ServiceContext.unsafeMainLayer(mergedLayers) | |
}), | |
// NOTE This "warms up" the cache of the main layer | |
Effect.tap(() => Effect.provideLayer(Effect.unit(), mainLayer.layer)), | |
), | |
), | |
Effect.tap(() => sendToUiThread({ _tag: 'init', eventId })), | |
Otel.withSpan(`worker:init`, {}, makeParentContext(otelContext)), | |
Effect.provideSomeLayer(initMainLayer.layer), | |
) | |
} | |
const handleRequestEncoded = ({ | |
payload: { encodedRequest, otelContext }, | |
eventId, | |
}: { | |
payload: WorkerRequestPayload<TCodec> | |
eventId: EventId | |
}): Stream.Stream< | |
OtelEnv | WorkerGlobalScope | Ctx, | |
unknown, | |
[msg: TCodec['ResEncoded'], transfer: Transferable[]] | |
> => | |
pipe( | |
handleRequest(codec.decodeReq(encodedRequest)), | |
Stream.map((decodedOut) => codec.encodeRes(Either.right(decodedOut))), | |
Stream.catchAll((error) => Stream.succeed(codec.encodeRes(Either.left(error)))), | |
Stream.tap(([encodedResponse, transfer]) => | |
sendToUiThread({ _tag: 'response-value', eventId, encodedResponse }, transfer), | |
), | |
Otel.withSpanStream(`worker:handleRequest`, {}, makeParentContext(otelContext)), | |
Stream.tapCauseLogPretty, | |
) | |
const sendToUiThread = (encodedRequest: WorkerResponseData<TCodec>, transfer?: Transferable[]) => | |
pipe( | |
Effect.sync(() => self.postMessage(encodedRequest, { transfer })), | |
Effect.tap(() => Otel.addEvent('sendToUiThread')), | |
) | |
self.addEventListener('message', async (messageEvent) => { | |
if (isWorkerRequestDataEvent<TCodec>(messageEvent.data)) { | |
switch (messageEvent.data._tag) { | |
case 'init': { | |
// TODO rename to "envelop" vs "payload" | |
const { eventId, payload } = messageEvent.data | |
await pipe(handleInitEncoded(payload, eventId), Effect.tapCauseLogPretty, Effect.runPromise) | |
break | |
} | |
case 'request': { | |
const { eventId, payload } = messageEvent.data | |
await pipe( | |
handleRequestEncoded({ payload, eventId }), | |
Stream.runDrain, | |
Effect.tap(() => sendToUiThread({ _tag: 'response-end', eventId })), | |
Effect.provideLayer(mainLayer.layer), | |
Effect.tapCauseLogPretty, | |
Effect.runPromise, | |
) | |
break | |
} | |
case 'request-batch': { | |
const { batch } = messageEvent.data | |
await pipe( | |
Effect.forEachPar(batch, ({ eventId, payload }) => | |
pipe( | |
handleRequestEncoded({ payload, eventId }), | |
Stream.runDrain, | |
Effect.tap(() => sendToUiThread({ _tag: 'response-end', eventId })), | |
), | |
), | |
Effect.provideLayer(mainLayer.layer), | |
Effect.tapCauseLogPretty, | |
Effect.runPromise, | |
) | |
break | |
} | |
default: { | |
casesHandled(messageEvent.data[0]) | |
} | |
} | |
} else { | |
console.log('unhandled message', messageEvent.data) | |
} | |
}) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment