Last active
March 21, 2025 11:53
-
-
Save tim-smart/8d7153d30b90af2b63be99713d6c759a to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import type { Chunk, Scope } from "effect" | |
import { Channel, Data, Effect, Mailbox } from "effect" | |
import type { AsyncInputProducer } from "effect/SingleProducerAsyncInput" | |
class WebSocketError extends Data.TaggedError("WebSocketError")<{ | |
cause: unknown | |
}> {} | |
const makeChannelDuplex = <Out, In, Err, R = never>( | |
f: (mailbox: Mailbox.Mailbox<Out, Err>) => Effect.Effect<(item: In) => Effect.Effect<void>, Err, R | Scope.Scope> | |
) => | |
<IE>(): Channel.Channel< | |
Chunk.Chunk<Out>, | |
Chunk.Chunk<In>, | |
Err | IE, | |
IE, | |
void, | |
unknown, | |
Exclude<R, Scope.Scope> | |
> => | |
Effect.gen(function*() { | |
const mailbox = yield* Mailbox.make<Out, IE | Err>() | |
const emit = yield* f(mailbox as any) | |
const input: AsyncInputProducer< | |
IE, | |
Chunk.Chunk<In>, | |
unknown | |
> = { | |
awaitRead: () => Effect.void, | |
// send stuff to the client / resource | |
emit: Effect.fnUntraced(function*(chunk) { | |
for (const item of chunk) { | |
yield* emit(item) | |
} | |
}), | |
// passthrough errors downstream | |
error(error) { | |
return mailbox.failCause(error) | |
}, | |
done() { | |
return Effect.void | |
} | |
} | |
return Channel.embedInput(Mailbox.toChannel(mailbox), input) | |
}).pipe( | |
Channel.unwrapScoped | |
) | |
// IE captures the input errors | |
export const websocketChannel = makeChannelDuplex<string, string, WebSocketError>(Effect.fnUntraced(function*(mailbox) { | |
// Allocate the resource | |
const ws = yield* Effect.acquireRelease( | |
Effect.sync(() => new WebSocket("wss://echo.websocket.org")), | |
(ws) => Effect.sync(() => ws.close(1000)) | |
) | |
yield* Effect.async<void>((resume) => { | |
ws.onopen = () => resume(Effect.void) | |
}) | |
ws.addEventListener("message", (event) => { | |
mailbox.unsafeOffer(event.data) | |
}) | |
return (message) => Effect.sync(() => ws.send(message)) | |
})) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment