Skip to content

Instantly share code, notes, and snippets.

@schickling
Created July 9, 2023 15:06
Show Gist options
  • Save schickling/2cf370eae19ffd700d5121a8dc86bc18 to your computer and use it in GitHub Desktop.
Save schickling/2cf370eae19ffd700d5121a8dc86bc18 to your computer and use it in GitHub Desktop.
import { pipe } from '@effect/data/Function'
import * as Option from '@effect/data/Option'
import * as Cause from '@effect/io/Cause'
import * as Effect from '@effect/io/Effect'
import * as Exit from '@effect/io/Exit'
import * as Fiber from '@effect/io/Fiber'
import * as Queue from '@effect/io/Queue'
import * as Stream from '@effect/stream/Stream'
export const chainSwitch =
<R1, E1, A, B>(f: (a: A) => Stream.Stream<R1, E1, B>) =>
<R, E>(self: Stream.Stream<R, E, A>): Stream.Stream<R | R1, E1 | E, B> =>
Stream.unwrapScoped(
Effect.gen(function* ($) {
const outputQueue = yield* $(Queue.bounded<Exit.Exit<Option.Option<E | E1>, B>>(1))
const inputQueue = yield* $(Queue.bounded<Exit.Exit<Option.Option<E>, A>>(1))
yield* $(Effect.forkScoped(drainInto(self, inputQueue, { skip: false, propagate: true })))
let currentFiber: undefined | Fiber.RuntimeFiber<any, any>
let currentRef = { skip: false, propagate: false }
yield* $(
pipe(
Effect.gen(function* ($) {
const input = yield* $(pipe(Queue.take(inputQueue), Effect.flatMap(Effect.done)))
const latest = f(input)
if (currentFiber && currentRef) {
currentRef.skip = true
yield* $(Fiber.interrupt(currentFiber))
}
currentRef = { skip: false, propagate: false }
currentFiber = (yield* $(
Effect.forkDaemon(drainInto(latest, outputQueue, currentRef)),
)) as unknown as Fiber.RuntimeFiber<any, any>
}),
Effect.matchCauseEffect({
onFailure: (cause) => {
const maybeFailure = Cause.failureOrCause(cause)
if (maybeFailure._tag === 'Left' && currentFiber && maybeFailure.left._tag === 'None') {
currentRef.propagate = true
const exit = currentFiber.unsafePoll()
if (exit) {
return Queue.offer(outputQueue, Exit.fail(Option.none()))
}
}
return Queue.offer(outputQueue, Exit.failCause(cause))
},
onSuccess: () => Effect.unit,
}),
Effect.forever,
Effect.forkScoped,
),
)
const pull = pipe(Queue.take(outputQueue), Effect.flatten)
const stream = Stream.repeatEffectOption(pull)
return stream
}),
)
const drainInto = <R, E, A>(
self: Stream.Stream<R, E, A>,
queue: Queue.Queue<Exit.Exit<Option.Option<E>, A>>,
ref: { skip: boolean; propagate: boolean },
) =>
pipe(
self,
Stream.runForEach((a) => (ref.skip ? Effect.unit : Queue.offer(queue, Exit.succeed(a)))),
Effect.mapError(Option.some),
Effect.matchCauseEffect(
(cause) => (ref.skip ? Effect.unit : Queue.offer(queue, Exit.failCause(cause))),
() => (ref.propagate ? Queue.offer(queue, Exit.fail(Option.none())) : Effect.unit),
),
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment