Created
March 21, 2025 11:23
-
-
Save tim-smart/2d7416fddce06710ca918362782b7288 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { Equal, Hash } from "effect" | |
import * as Context from "effect/Context" | |
import * as Data from "effect/Data" | |
import * as Deferred from "effect/Deferred" | |
import * as Effect from "effect/Effect" | |
import * as Exit from "effect/Exit" | |
import * as MutableHashMap from "effect/MutableHashMap" | |
import * as MutableRef from "effect/MutableRef" | |
import * as Option from "effect/Option" | |
import * as Scope from "effect/Scope" | |
import * as Stream from "effect/Stream" | |
export type SpanDecision = Data.TaggedEnum<{ | |
Start: { id: string; name: string; attributes?: Record<string, unknown> } | |
End: { id: string } | |
}> | |
export const SpanDecision = Data.taggedEnum<SpanDecision>() | |
class SpanKey extends Data.Class<{ | |
id: string | |
name?: string | |
attributes?: Record<string, unknown> | undefined | |
}> { | |
[Equal.symbol](that: SpanKey): boolean { | |
return this.id === that.id | |
} | |
[Hash.symbol]() { | |
return Hash.string(this.id) | |
} | |
} | |
export const withSpans = | |
<A>(f: (element: A) => Iterable<SpanDecision>) => <E, R>(self: Stream.Stream<A, E, R>): Stream.Stream<A, E, R> => | |
Effect.gen(function*() { | |
const map = yield* ResourceMap.make((key: SpanKey) => | |
Effect.makeSpanScoped(key.name!, { attributes: key.attributes }) | |
) | |
return self.pipe( | |
Stream.tap(Effect.fnUntraced(function*(element) { | |
const decisions = f(element) | |
for (const decision of decisions) { | |
yield* SpanDecision.$match(decision, { | |
Start: ({ attributes, id, name }) => map.get(new SpanKey({ id, name, attributes })), | |
End: ({ id }) => map.remove(new SpanKey({ id })) | |
}) | |
} | |
})) | |
) | |
}).pipe(Stream.unwrapScoped) | |
// ResourceMap | |
export class ResourceMap<K, A, E> { | |
constructor( | |
readonly lookup: (key: K, scope: Scope.Scope) => Effect.Effect<A, E>, | |
readonly entries: MutableHashMap.MutableHashMap<K, { | |
readonly scope: Scope.CloseableScope | |
readonly deferred: Deferred.Deferred<A, E> | |
}>, | |
readonly isClosed: MutableRef.MutableRef<boolean> | |
) {} | |
static make = Effect.fnUntraced(function*<K, A, E, R>(lookup: (key: K) => Effect.Effect<A, E, R>) { | |
const scope = yield* Effect.scope | |
const context = yield* Effect.context<R>() | |
const isClosed = MutableRef.make(false) | |
const entries = MutableHashMap.empty<K, { | |
scope: Scope.CloseableScope | |
deferred: Deferred.Deferred<A, E> | |
}>() | |
yield* Scope.addFinalizerExit( | |
scope, | |
(exit) => { | |
MutableRef.set(isClosed, true) | |
return Effect.forEach(entries, ([key, { scope }]) => { | |
MutableHashMap.remove(entries, key) | |
return Effect.exit(Scope.close(scope, exit)) | |
}, { concurrency: "unbounded", discard: true }) | |
} | |
) | |
return new ResourceMap( | |
(key, scope) => Effect.provide(lookup(key), Context.add(context, Scope.Scope, scope)), | |
entries, | |
isClosed | |
) | |
}) | |
get(key: K): Effect.Effect<A, E> { | |
return Effect.withFiberRuntime((fiber) => { | |
if (MutableRef.get(this.isClosed)) { | |
return Effect.interrupt | |
} | |
const existing = MutableHashMap.get(this.entries, key) | |
if (Option.isSome(existing)) { | |
return Deferred.await(existing.value.deferred) | |
} | |
const scope = Effect.runSync(Scope.make()) | |
const deferred = Deferred.unsafeMake<A, E>(fiber.id()) | |
MutableHashMap.set(this.entries, key, { scope, deferred }) | |
return Effect.onExit(this.lookup(key, scope), (exit) => { | |
if (exit._tag === "Success") { | |
return Deferred.done(deferred, exit) | |
} | |
MutableHashMap.remove(this.entries, key) | |
return Deferred.done(deferred, exit) | |
}) | |
}) | |
} | |
remove(key: K): Effect.Effect<void> { | |
return Effect.suspend(() => { | |
const entry = MutableHashMap.get(this.entries, key) | |
if (Option.isNone(entry)) { | |
return Effect.void | |
} | |
MutableHashMap.remove(this.entries, key) | |
return Scope.close(entry.value.scope, Exit.void) | |
}) | |
} | |
removeIgnore(key: K): Effect.Effect<void> { | |
return Effect.catchAllCause(this.remove(key), (cause) => | |
Effect.annotateLogs(Effect.logDebug(cause), { | |
module: "ResourceMap", | |
method: "removeIgnore", | |
key | |
})) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment