Skip to content

Instantly share code, notes, and snippets.

@tim-smart
Created March 21, 2025 11:23
Show Gist options
  • Save tim-smart/2d7416fddce06710ca918362782b7288 to your computer and use it in GitHub Desktop.
Save tim-smart/2d7416fddce06710ca918362782b7288 to your computer and use it in GitHub Desktop.
import { Equal, Hash } from "effect"
import * as Context from "effect/Context"
import * as Data from "effect/Data"
import * as Deferred from "effect/Deferred"
import * as Effect from "effect/Effect"
import * as Exit from "effect/Exit"
import * as MutableHashMap from "effect/MutableHashMap"
import * as MutableRef from "effect/MutableRef"
import * as Option from "effect/Option"
import * as Scope from "effect/Scope"
import * as Stream from "effect/Stream"
export type SpanDecision = Data.TaggedEnum<{
Start: { id: string; name: string; attributes?: Record<string, unknown> }
End: { id: string }
}>
export const SpanDecision = Data.taggedEnum<SpanDecision>()
class SpanKey extends Data.Class<{
id: string
name?: string
attributes?: Record<string, unknown> | undefined
}> {
[Equal.symbol](that: SpanKey): boolean {
return this.id === that.id
}
[Hash.symbol]() {
return Hash.string(this.id)
}
}
export const withSpans =
<A>(f: (element: A) => Iterable<SpanDecision>) => <E, R>(self: Stream.Stream<A, E, R>): Stream.Stream<A, E, R> =>
Effect.gen(function*() {
const map = yield* ResourceMap.make((key: SpanKey) =>
Effect.makeSpanScoped(key.name!, { attributes: key.attributes })
)
return self.pipe(
Stream.tap(Effect.fnUntraced(function*(element) {
const decisions = f(element)
for (const decision of decisions) {
yield* SpanDecision.$match(decision, {
Start: ({ attributes, id, name }) => map.get(new SpanKey({ id, name, attributes })),
End: ({ id }) => map.remove(new SpanKey({ id }))
})
}
}))
)
}).pipe(Stream.unwrapScoped)
// ResourceMap
export class ResourceMap<K, A, E> {
constructor(
readonly lookup: (key: K, scope: Scope.Scope) => Effect.Effect<A, E>,
readonly entries: MutableHashMap.MutableHashMap<K, {
readonly scope: Scope.CloseableScope
readonly deferred: Deferred.Deferred<A, E>
}>,
readonly isClosed: MutableRef.MutableRef<boolean>
) {}
static make = Effect.fnUntraced(function*<K, A, E, R>(lookup: (key: K) => Effect.Effect<A, E, R>) {
const scope = yield* Effect.scope
const context = yield* Effect.context<R>()
const isClosed = MutableRef.make(false)
const entries = MutableHashMap.empty<K, {
scope: Scope.CloseableScope
deferred: Deferred.Deferred<A, E>
}>()
yield* Scope.addFinalizerExit(
scope,
(exit) => {
MutableRef.set(isClosed, true)
return Effect.forEach(entries, ([key, { scope }]) => {
MutableHashMap.remove(entries, key)
return Effect.exit(Scope.close(scope, exit))
}, { concurrency: "unbounded", discard: true })
}
)
return new ResourceMap(
(key, scope) => Effect.provide(lookup(key), Context.add(context, Scope.Scope, scope)),
entries,
isClosed
)
})
get(key: K): Effect.Effect<A, E> {
return Effect.withFiberRuntime((fiber) => {
if (MutableRef.get(this.isClosed)) {
return Effect.interrupt
}
const existing = MutableHashMap.get(this.entries, key)
if (Option.isSome(existing)) {
return Deferred.await(existing.value.deferred)
}
const scope = Effect.runSync(Scope.make())
const deferred = Deferred.unsafeMake<A, E>(fiber.id())
MutableHashMap.set(this.entries, key, { scope, deferred })
return Effect.onExit(this.lookup(key, scope), (exit) => {
if (exit._tag === "Success") {
return Deferred.done(deferred, exit)
}
MutableHashMap.remove(this.entries, key)
return Deferred.done(deferred, exit)
})
})
}
remove(key: K): Effect.Effect<void> {
return Effect.suspend(() => {
const entry = MutableHashMap.get(this.entries, key)
if (Option.isNone(entry)) {
return Effect.void
}
MutableHashMap.remove(this.entries, key)
return Scope.close(entry.value.scope, Exit.void)
})
}
removeIgnore(key: K): Effect.Effect<void> {
return Effect.catchAllCause(this.remove(key), (cause) =>
Effect.annotateLogs(Effect.logDebug(cause), {
module: "ResourceMap",
method: "removeIgnore",
key
}))
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment