Skip to content

Instantly share code, notes, and snippets.

@ericjuta
Last active February 8, 2026 20:12
Show Gist options
  • Select an option

  • Save ericjuta/abbbb74cbd71a22b01db1484f6482240 to your computer and use it in GitHub Desktop.

Select an option

Save ericjuta/abbbb74cbd71a22b01db1484f6482240 to your computer and use it in GitHub Desktop.
redis effect TS subscriptions.ts
import { Context, Effect, Layer, Ref, Runtime, type Scope } from "effect";
import type { RedisMessageHandler } from "@/lib/clients/redis-types";
import { redisMetrics } from "@/lib/telemetry/metrics";
import { type RedisError, redisErrorFromUnknown } from "./errors";
import { RedisCircuitBreaker } from "./circuit-breaker";
import { RedisRetryPolicy } from "./retry-policy";
import { getSubscriberPool } from "./subscriber-pool-transport";
export interface ChannelLease {
readonly channel: string;
readonly leaseId: string;
readonly release: Effect.Effect<void, never, never>;
}
export interface RedisChannelRegistryService {
readonly acquireLease: (
channel: string,
handler: (message: string) => Effect.Effect<void, never, never>,
) => Effect.Effect<ChannelLease, RedisError, Scope.Scope>;
readonly listenerCount: (channel: string) => Effect.Effect<number, never, never>;
}
export class RedisChannelRegistry extends Context.Tag("RedisChannelRegistry")<
RedisChannelRegistry,
RedisChannelRegistryService
>() {}
export interface RedisChannelRegistrySubscriberPool {
readonly subscribe: (
channel: string,
handler: RedisMessageHandler,
) => Promise<() => Promise<void>>;
readonly subscribeOnce?: (
channel: string,
handler: RedisMessageHandler,
) => Promise<() => Promise<void>>;
}
export interface RedisChannelRegistryOptions {
readonly pool?: RedisChannelRegistrySubscriberPool;
readonly leaseIdPrefix?: string;
}
type ListenerCounts = ReadonlyMap<string, number>;
const incrementListenerCount = (
listenerCountsRef: Ref.Ref<ListenerCounts>,
channel: string,
): Effect.Effect<number, never, never> =>
Ref.modify(listenerCountsRef, (listenerCounts) => {
const next = new Map(listenerCounts);
const nextCount = (next.get(channel) ?? 0) + 1;
next.set(channel, nextCount);
return [nextCount, next] as const;
});
const decrementListenerCount = (
listenerCountsRef: Ref.Ref<ListenerCounts>,
channel: string,
): Effect.Effect<number, never, never> =>
Ref.modify(listenerCountsRef, (listenerCounts) => {
const next = new Map(listenerCounts);
const current = next.get(channel) ?? 0;
if (current <= 1) {
next.delete(channel);
return [0, next] as const;
}
const nextCount = current - 1;
next.set(channel, nextCount);
return [nextCount, next] as const;
});
const makeRedisMessageHandler = (
runFork: (effect: Effect.Effect<void, never, never>) => void,
handler: (message: string) => Effect.Effect<void, never, never>,
): RedisMessageHandler => (message) => {
runFork(handler(message));
};
const subscribeOnce = (
pool: RedisChannelRegistrySubscriberPool,
channel: string,
handler: RedisMessageHandler,
): Promise<() => Promise<void>> => {
if (typeof pool.subscribeOnce === "function") {
return pool.subscribeOnce(channel, handler);
}
return pool.subscribe(channel, handler);
};
const makeIdempotentRelease = ({
channel,
cleanup,
listenerCountsRef,
}: {
channel: string;
cleanup: () => Promise<void>;
listenerCountsRef: Ref.Ref<ListenerCounts>;
}): Effect.Effect<Effect.Effect<void, never, never>, never, never> =>
Effect.gen(function* () {
const releasedRef = yield* Ref.make(false);
return Ref.modify(releasedRef, (isReleased) => {
if (isReleased) {
return [Effect.void, true] as const;
}
const releaseEffect = Effect.tryPromise({
try: () => cleanup(),
catch: () => undefined,
}).pipe(
Effect.catchAll(() => Effect.void),
Effect.flatMap(() => decrementListenerCount(listenerCountsRef, channel)),
Effect.tap((listenerCount) =>
Effect.sync(() => {
redisMetrics.recordChannelLeaseReleased(channel, listenerCount);
if (listenerCount === 0) {
redisMetrics.recordChannelUnsubscribe(channel);
}
}),
),
Effect.asVoid,
);
return [releaseEffect, true] as const;
}).pipe(Effect.flatten);
});
export const makeRedisChannelRegistry = (
options: RedisChannelRegistryOptions = {},
): Effect.Effect<
RedisChannelRegistryService,
never,
RedisRetryPolicy | RedisCircuitBreaker
> =>
Effect.gen(function* () {
const retryPolicy = yield* RedisRetryPolicy;
const breaker = yield* RedisCircuitBreaker;
const runtime = yield* Effect.runtime<never>();
const listenerCountsRef = yield* Ref.make<ListenerCounts>(new Map());
const leaseIdCounterRef = yield* Ref.make(0);
const leaseIdPrefix = options.leaseIdPrefix ?? "lease";
const pool = options.pool ?? getSubscriberPool();
const runHandler = (effect: Effect.Effect<void, never, never>): void => {
Runtime.runFork(runtime, effect);
};
const acquireLease = (
channel: string,
handler: (message: string) => Effect.Effect<void, never, never>,
) =>
Effect.gen(function* () {
const leaseId = yield* Ref.modify(leaseIdCounterRef, (current) => {
const next = current + 1;
return [`${leaseIdPrefix}-${next}`, next] as const;
});
const release = yield* Effect.acquireRelease(
Effect.gen(function* () {
yield* breaker.allow;
const cleanup = yield* retryPolicy.retrySubscribe(
Effect.tryPromise({
try: () =>
subscribeOnce(
pool,
channel,
makeRedisMessageHandler(runHandler, handler),
),
catch: redisErrorFromUnknown,
}),
).pipe(
Effect.tap(() => breaker.recordSuccess),
Effect.tapError((error) => breaker.recordTerminalFailure(error)),
);
yield* Effect.sync(() => {
redisMetrics.recordChannelSubscribe(channel);
});
const listenerCount = yield* incrementListenerCount(
listenerCountsRef,
channel,
);
yield* Effect.sync(() => {
redisMetrics.recordChannelLeaseAcquired(channel, listenerCount);
});
return yield* makeIdempotentRelease({
channel,
cleanup,
listenerCountsRef,
});
}),
(releaseEffect) => releaseEffect,
);
return {
channel,
leaseId,
release,
} satisfies ChannelLease;
});
const listenerCount = (channel: string) =>
Ref.get(listenerCountsRef).pipe(
Effect.map((listenerCounts) => listenerCounts.get(channel) ?? 0),
);
return {
acquireLease,
listenerCount,
} satisfies RedisChannelRegistryService;
});
export const RedisChannelRegistryLayer = (
options: RedisChannelRegistryOptions = {},
): Layer.Layer<
RedisChannelRegistry,
never,
RedisRetryPolicy | RedisCircuitBreaker
> => Layer.effect(RedisChannelRegistry, makeRedisChannelRegistry(options));
import { Effect, Exit, Ref, Scope } from "effect";
import { runPromiseWithNodeRuntime } from "@/lib/effect/runtime";
import {
RedisChannelRegistry,
makeRedisChannelRegistry,
type RedisChannelRegistryService,
type RedisChannelRegistrySubscriberPool,
} from "./channel-registry";
import {
RedisCircuitBreaker,
makeRedisCircuitBreakerService,
} from "./circuit-breaker";
import type { RedisError } from "./errors";
import {
RedisRetryPolicy,
makeRedisRetryPolicyService,
} from "./retry-policy";
import { getSubscriberPool } from "./subscriber-pool-transport";
export interface RedisSubscriptionParams {
readonly channel: string;
readonly handler: (message: string) => Promise<void>;
readonly pool?: RedisChannelRegistrySubscriberPool;
}
interface ManagedRedisChannelLease {
readonly channel: string;
readonly leaseId: string;
readonly release: () => Promise<void>;
}
const sharedRegistryServiceByPool = new WeakMap<
RedisChannelRegistrySubscriberPool,
Promise<RedisChannelRegistryService>
>();
const buildRegistryService = (
pool: RedisChannelRegistrySubscriberPool,
): Promise<RedisChannelRegistryService> =>
runPromiseWithNodeRuntime(
Effect.gen(function* () {
const retryPolicy = yield* makeRedisRetryPolicyService();
const breaker = yield* makeRedisCircuitBreakerService();
return yield* makeRedisChannelRegistry({ pool }).pipe(
Effect.provideService(RedisRetryPolicy, retryPolicy),
Effect.provideService(RedisCircuitBreaker, breaker),
);
}),
);
const getSharedRegistryService = (
pool: RedisChannelRegistrySubscriberPool,
): Promise<RedisChannelRegistryService> => {
const cached = sharedRegistryServiceByPool.get(pool);
if (cached) {
return cached;
}
const registryServicePromise = buildRegistryService(pool).catch((error) => {
if (sharedRegistryServiceByPool.get(pool) === registryServicePromise) {
sharedRegistryServiceByPool.delete(pool);
}
throw error;
});
sharedRegistryServiceByPool.set(pool, registryServicePromise);
return registryServicePromise;
};
const makeHandlerEffect = (
handler: (message: string) => Promise<void>,
message: string,
) =>
Effect.tryPromise({
try: () => handler(message),
catch: () => undefined,
}).pipe(Effect.catchAll(() => Effect.void));
const acquireManagedLease = (
params: RedisSubscriptionParams,
): Effect.Effect<ManagedRedisChannelLease, RedisError, RedisChannelRegistry> =>
Effect.gen(function* () {
const scope = yield* Scope.make();
const registry = yield* RedisChannelRegistry;
const leaseExit = yield* Effect.exit(
Scope.extend(
registry.acquireLease(params.channel, (message) =>
makeHandlerEffect(params.handler, message),
),
scope,
),
);
if (leaseExit._tag === "Failure") {
yield* Scope.close(scope, leaseExit);
return yield* Effect.failCause(leaseExit.cause);
}
const lease = leaseExit.value;
const releasedRef = yield* Ref.make(false);
const releaseEffect = Ref.modify(releasedRef, (released) => {
if (released) {
return [Effect.void, true] as const;
}
const cleanupEffect = Effect.zipRight(
lease.release,
Scope.close(scope, Exit.void),
).pipe(Effect.catchAll(() => Effect.void));
return [cleanupEffect, true] as const;
}).pipe(Effect.flatten);
return {
channel: lease.channel,
leaseId: lease.leaseId,
release: () => runPromiseWithNodeRuntime(releaseEffect),
};
});
export const subscribeWithRedisEffectRuntime = async (
params: RedisSubscriptionParams,
): Promise<() => Promise<void>> => {
const pool = params.pool ?? getSubscriberPool();
const registryService = await getSharedRegistryService(pool);
const managedLease = await runPromiseWithNodeRuntime(
acquireManagedLease(params).pipe(
Effect.provideService(RedisChannelRegistry, registryService),
),
);
return managedLease.release;
};
export const releaseRedisChannel = async ({
channel,
pool,
}: {
readonly channel: string;
readonly pool?: { releaseChannel: (channel: string) => Promise<void> };
}): Promise<void> => {
const subscriberPool = pool ?? getSubscriberPool();
await subscriberPool.releaseChannel(channel);
};
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment