Last active
February 8, 2026 20:12
-
-
Save ericjuta/abbbb74cbd71a22b01db1484f6482240 to your computer and use it in GitHub Desktop.
redis effect TS subscriptions.ts
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import { Context, Effect, Layer, Ref, Runtime, type Scope } from "effect"; | |
| import type { RedisMessageHandler } from "@/lib/clients/redis-types"; | |
| import { redisMetrics } from "@/lib/telemetry/metrics"; | |
| import { type RedisError, redisErrorFromUnknown } from "./errors"; | |
| import { RedisCircuitBreaker } from "./circuit-breaker"; | |
| import { RedisRetryPolicy } from "./retry-policy"; | |
| import { getSubscriberPool } from "./subscriber-pool-transport"; | |
| export interface ChannelLease { | |
| readonly channel: string; | |
| readonly leaseId: string; | |
| readonly release: Effect.Effect<void, never, never>; | |
| } | |
| export interface RedisChannelRegistryService { | |
| readonly acquireLease: ( | |
| channel: string, | |
| handler: (message: string) => Effect.Effect<void, never, never>, | |
| ) => Effect.Effect<ChannelLease, RedisError, Scope.Scope>; | |
| readonly listenerCount: (channel: string) => Effect.Effect<number, never, never>; | |
| } | |
| export class RedisChannelRegistry extends Context.Tag("RedisChannelRegistry")< | |
| RedisChannelRegistry, | |
| RedisChannelRegistryService | |
| >() {} | |
| export interface RedisChannelRegistrySubscriberPool { | |
| readonly subscribe: ( | |
| channel: string, | |
| handler: RedisMessageHandler, | |
| ) => Promise<() => Promise<void>>; | |
| readonly subscribeOnce?: ( | |
| channel: string, | |
| handler: RedisMessageHandler, | |
| ) => Promise<() => Promise<void>>; | |
| } | |
| export interface RedisChannelRegistryOptions { | |
| readonly pool?: RedisChannelRegistrySubscriberPool; | |
| readonly leaseIdPrefix?: string; | |
| } | |
| type ListenerCounts = ReadonlyMap<string, number>; | |
| const incrementListenerCount = ( | |
| listenerCountsRef: Ref.Ref<ListenerCounts>, | |
| channel: string, | |
| ): Effect.Effect<number, never, never> => | |
| Ref.modify(listenerCountsRef, (listenerCounts) => { | |
| const next = new Map(listenerCounts); | |
| const nextCount = (next.get(channel) ?? 0) + 1; | |
| next.set(channel, nextCount); | |
| return [nextCount, next] as const; | |
| }); | |
| const decrementListenerCount = ( | |
| listenerCountsRef: Ref.Ref<ListenerCounts>, | |
| channel: string, | |
| ): Effect.Effect<number, never, never> => | |
| Ref.modify(listenerCountsRef, (listenerCounts) => { | |
| const next = new Map(listenerCounts); | |
| const current = next.get(channel) ?? 0; | |
| if (current <= 1) { | |
| next.delete(channel); | |
| return [0, next] as const; | |
| } | |
| const nextCount = current - 1; | |
| next.set(channel, nextCount); | |
| return [nextCount, next] as const; | |
| }); | |
| const makeRedisMessageHandler = ( | |
| runFork: (effect: Effect.Effect<void, never, never>) => void, | |
| handler: (message: string) => Effect.Effect<void, never, never>, | |
| ): RedisMessageHandler => (message) => { | |
| runFork(handler(message)); | |
| }; | |
| const subscribeOnce = ( | |
| pool: RedisChannelRegistrySubscriberPool, | |
| channel: string, | |
| handler: RedisMessageHandler, | |
| ): Promise<() => Promise<void>> => { | |
| if (typeof pool.subscribeOnce === "function") { | |
| return pool.subscribeOnce(channel, handler); | |
| } | |
| return pool.subscribe(channel, handler); | |
| }; | |
| const makeIdempotentRelease = ({ | |
| channel, | |
| cleanup, | |
| listenerCountsRef, | |
| }: { | |
| channel: string; | |
| cleanup: () => Promise<void>; | |
| listenerCountsRef: Ref.Ref<ListenerCounts>; | |
| }): Effect.Effect<Effect.Effect<void, never, never>, never, never> => | |
| Effect.gen(function* () { | |
| const releasedRef = yield* Ref.make(false); | |
| return Ref.modify(releasedRef, (isReleased) => { | |
| if (isReleased) { | |
| return [Effect.void, true] as const; | |
| } | |
| const releaseEffect = Effect.tryPromise({ | |
| try: () => cleanup(), | |
| catch: () => undefined, | |
| }).pipe( | |
| Effect.catchAll(() => Effect.void), | |
| Effect.flatMap(() => decrementListenerCount(listenerCountsRef, channel)), | |
| Effect.tap((listenerCount) => | |
| Effect.sync(() => { | |
| redisMetrics.recordChannelLeaseReleased(channel, listenerCount); | |
| if (listenerCount === 0) { | |
| redisMetrics.recordChannelUnsubscribe(channel); | |
| } | |
| }), | |
| ), | |
| Effect.asVoid, | |
| ); | |
| return [releaseEffect, true] as const; | |
| }).pipe(Effect.flatten); | |
| }); | |
| export const makeRedisChannelRegistry = ( | |
| options: RedisChannelRegistryOptions = {}, | |
| ): Effect.Effect< | |
| RedisChannelRegistryService, | |
| never, | |
| RedisRetryPolicy | RedisCircuitBreaker | |
| > => | |
| Effect.gen(function* () { | |
| const retryPolicy = yield* RedisRetryPolicy; | |
| const breaker = yield* RedisCircuitBreaker; | |
| const runtime = yield* Effect.runtime<never>(); | |
| const listenerCountsRef = yield* Ref.make<ListenerCounts>(new Map()); | |
| const leaseIdCounterRef = yield* Ref.make(0); | |
| const leaseIdPrefix = options.leaseIdPrefix ?? "lease"; | |
| const pool = options.pool ?? getSubscriberPool(); | |
| const runHandler = (effect: Effect.Effect<void, never, never>): void => { | |
| Runtime.runFork(runtime, effect); | |
| }; | |
| const acquireLease = ( | |
| channel: string, | |
| handler: (message: string) => Effect.Effect<void, never, never>, | |
| ) => | |
| Effect.gen(function* () { | |
| const leaseId = yield* Ref.modify(leaseIdCounterRef, (current) => { | |
| const next = current + 1; | |
| return [`${leaseIdPrefix}-${next}`, next] as const; | |
| }); | |
| const release = yield* Effect.acquireRelease( | |
| Effect.gen(function* () { | |
| yield* breaker.allow; | |
| const cleanup = yield* retryPolicy.retrySubscribe( | |
| Effect.tryPromise({ | |
| try: () => | |
| subscribeOnce( | |
| pool, | |
| channel, | |
| makeRedisMessageHandler(runHandler, handler), | |
| ), | |
| catch: redisErrorFromUnknown, | |
| }), | |
| ).pipe( | |
| Effect.tap(() => breaker.recordSuccess), | |
| Effect.tapError((error) => breaker.recordTerminalFailure(error)), | |
| ); | |
| yield* Effect.sync(() => { | |
| redisMetrics.recordChannelSubscribe(channel); | |
| }); | |
| const listenerCount = yield* incrementListenerCount( | |
| listenerCountsRef, | |
| channel, | |
| ); | |
| yield* Effect.sync(() => { | |
| redisMetrics.recordChannelLeaseAcquired(channel, listenerCount); | |
| }); | |
| return yield* makeIdempotentRelease({ | |
| channel, | |
| cleanup, | |
| listenerCountsRef, | |
| }); | |
| }), | |
| (releaseEffect) => releaseEffect, | |
| ); | |
| return { | |
| channel, | |
| leaseId, | |
| release, | |
| } satisfies ChannelLease; | |
| }); | |
| const listenerCount = (channel: string) => | |
| Ref.get(listenerCountsRef).pipe( | |
| Effect.map((listenerCounts) => listenerCounts.get(channel) ?? 0), | |
| ); | |
| return { | |
| acquireLease, | |
| listenerCount, | |
| } satisfies RedisChannelRegistryService; | |
| }); | |
| export const RedisChannelRegistryLayer = ( | |
| options: RedisChannelRegistryOptions = {}, | |
| ): Layer.Layer< | |
| RedisChannelRegistry, | |
| never, | |
| RedisRetryPolicy | RedisCircuitBreaker | |
| > => Layer.effect(RedisChannelRegistry, makeRedisChannelRegistry(options)); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import { Effect, Exit, Ref, Scope } from "effect"; | |
| import { runPromiseWithNodeRuntime } from "@/lib/effect/runtime"; | |
| import { | |
| RedisChannelRegistry, | |
| makeRedisChannelRegistry, | |
| type RedisChannelRegistryService, | |
| type RedisChannelRegistrySubscriberPool, | |
| } from "./channel-registry"; | |
| import { | |
| RedisCircuitBreaker, | |
| makeRedisCircuitBreakerService, | |
| } from "./circuit-breaker"; | |
| import type { RedisError } from "./errors"; | |
| import { | |
| RedisRetryPolicy, | |
| makeRedisRetryPolicyService, | |
| } from "./retry-policy"; | |
| import { getSubscriberPool } from "./subscriber-pool-transport"; | |
| export interface RedisSubscriptionParams { | |
| readonly channel: string; | |
| readonly handler: (message: string) => Promise<void>; | |
| readonly pool?: RedisChannelRegistrySubscriberPool; | |
| } | |
| interface ManagedRedisChannelLease { | |
| readonly channel: string; | |
| readonly leaseId: string; | |
| readonly release: () => Promise<void>; | |
| } | |
| const sharedRegistryServiceByPool = new WeakMap< | |
| RedisChannelRegistrySubscriberPool, | |
| Promise<RedisChannelRegistryService> | |
| >(); | |
| const buildRegistryService = ( | |
| pool: RedisChannelRegistrySubscriberPool, | |
| ): Promise<RedisChannelRegistryService> => | |
| runPromiseWithNodeRuntime( | |
| Effect.gen(function* () { | |
| const retryPolicy = yield* makeRedisRetryPolicyService(); | |
| const breaker = yield* makeRedisCircuitBreakerService(); | |
| return yield* makeRedisChannelRegistry({ pool }).pipe( | |
| Effect.provideService(RedisRetryPolicy, retryPolicy), | |
| Effect.provideService(RedisCircuitBreaker, breaker), | |
| ); | |
| }), | |
| ); | |
| const getSharedRegistryService = ( | |
| pool: RedisChannelRegistrySubscriberPool, | |
| ): Promise<RedisChannelRegistryService> => { | |
| const cached = sharedRegistryServiceByPool.get(pool); | |
| if (cached) { | |
| return cached; | |
| } | |
| const registryServicePromise = buildRegistryService(pool).catch((error) => { | |
| if (sharedRegistryServiceByPool.get(pool) === registryServicePromise) { | |
| sharedRegistryServiceByPool.delete(pool); | |
| } | |
| throw error; | |
| }); | |
| sharedRegistryServiceByPool.set(pool, registryServicePromise); | |
| return registryServicePromise; | |
| }; | |
| const makeHandlerEffect = ( | |
| handler: (message: string) => Promise<void>, | |
| message: string, | |
| ) => | |
| Effect.tryPromise({ | |
| try: () => handler(message), | |
| catch: () => undefined, | |
| }).pipe(Effect.catchAll(() => Effect.void)); | |
| const acquireManagedLease = ( | |
| params: RedisSubscriptionParams, | |
| ): Effect.Effect<ManagedRedisChannelLease, RedisError, RedisChannelRegistry> => | |
| Effect.gen(function* () { | |
| const scope = yield* Scope.make(); | |
| const registry = yield* RedisChannelRegistry; | |
| const leaseExit = yield* Effect.exit( | |
| Scope.extend( | |
| registry.acquireLease(params.channel, (message) => | |
| makeHandlerEffect(params.handler, message), | |
| ), | |
| scope, | |
| ), | |
| ); | |
| if (leaseExit._tag === "Failure") { | |
| yield* Scope.close(scope, leaseExit); | |
| return yield* Effect.failCause(leaseExit.cause); | |
| } | |
| const lease = leaseExit.value; | |
| const releasedRef = yield* Ref.make(false); | |
| const releaseEffect = Ref.modify(releasedRef, (released) => { | |
| if (released) { | |
| return [Effect.void, true] as const; | |
| } | |
| const cleanupEffect = Effect.zipRight( | |
| lease.release, | |
| Scope.close(scope, Exit.void), | |
| ).pipe(Effect.catchAll(() => Effect.void)); | |
| return [cleanupEffect, true] as const; | |
| }).pipe(Effect.flatten); | |
| return { | |
| channel: lease.channel, | |
| leaseId: lease.leaseId, | |
| release: () => runPromiseWithNodeRuntime(releaseEffect), | |
| }; | |
| }); | |
| export const subscribeWithRedisEffectRuntime = async ( | |
| params: RedisSubscriptionParams, | |
| ): Promise<() => Promise<void>> => { | |
| const pool = params.pool ?? getSubscriberPool(); | |
| const registryService = await getSharedRegistryService(pool); | |
| const managedLease = await runPromiseWithNodeRuntime( | |
| acquireManagedLease(params).pipe( | |
| Effect.provideService(RedisChannelRegistry, registryService), | |
| ), | |
| ); | |
| return managedLease.release; | |
| }; | |
| export const releaseRedisChannel = async ({ | |
| channel, | |
| pool, | |
| }: { | |
| readonly channel: string; | |
| readonly pool?: { releaseChannel: (channel: string) => Promise<void> }; | |
| }): Promise<void> => { | |
| const subscriberPool = pool ?? getSubscriberPool(); | |
| await subscriberPool.releaseChannel(channel); | |
| }; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment