Skip to content

Instantly share code, notes, and snippets.

@schickling
Created April 10, 2023 20:37
Show Gist options
  • Save schickling/0be90afbf0ba1ea916eaa5d977225c9f to your computer and use it in GitHub Desktop.
Save schickling/0be90afbf0ba1ea916eaa5d977225c9f to your computer and use it in GitHub Desktop.
Effect Dataloader
import * as Chunk from '@effect/data/Chunk'
import * as Context from '@effect/data/Context'
import * as Duration from '@effect/data/Duration'
import type * as Either from '@effect/data/Either'
import { pipe } from '@effect/data/Function'
import * as Deferred from '@effect/io/Deferred'
import * as FiberRefs from '@effect/io/FiberRefs'
import * as Queue from '@effect/io/Queue'
import * as Effect from './Effect.js'
import * as OT from './Tracing/index.js'
export type DataLoaderOptions = {
maxBatchSize?: number
/**
* Time in milliseconds how long the dataloader should wait for new keys before calling `batchCallback`
*
* @default 10
*
* TODO replace with a more sophisticated scheduler (e.g. https://github.com/graphql/dataloader#batch-scheduling)
*/
batchTimeoutMs?: number
otelBatchKey: string
}
export const makeDataLoader = <R, Err, TKey, TOut>(
batchCallback: (keys: readonly TKey[]) => Effect.Effect<R, never, readonly Either.Either<Err, TOut>[]>,
{ batchTimeoutMs = 10, maxBatchSize = Number.POSITIVE_INFINITY, otelBatchKey }: DataLoaderOptions,
) =>
Effect.gen(function* ($) {
type QueueItem = [
key: TKey,
promise: Deferred.Deferred<Err, TOut>,
env: Context.Context<R>,
refs: FiberRefs.FiberRefs,
]
const queue = yield* $(Queue.unbounded<QueueItem>())
const loadSingle = (key: TKey): Effect.Effect<R, Err, TOut> =>
pipe(
Effect.Do(),
Effect.bind('promise', () => Deferred.make<Err, TOut>()),
Effect.bind('env', () => Effect.context<R>()),
Effect.bind('refs', () => Effect.getFiberRefs()),
Effect.tap(({ promise, env, refs }) => Queue.offer(queue, [key, promise, env, refs])),
Effect.flatMap(({ promise }) => Deferred.await(promise)),
)
function load(key: TKey): Effect.Effect<R, Err, TOut>
function load(key: readonly TKey[]): Effect.Effect<R, Err, readonly TOut[]>
// eslint-disable-next-line prefer-arrow/prefer-arrow-functions
function load(key: TKey | readonly TKey[]): Effect.Effect<R, Err, TOut | readonly TOut[]> {
return Array.isArray(key)
? pipe(Effect.map(Effect.forEachPar(key, loadSingle), Chunk.toReadonlyArray))
: loadSingle(key as TKey)
}
let batchRunNumber = 0
yield* $(
pipe(
Effect.gen(function* ($) {
const batch: QueueItem[] = []
let timeOfFirstItemMs: number | undefined
while (true) {
if (batch.length === 0) {
const itemChunk = yield* $(Queue.takeBetween(queue, 1, maxBatchSize - batch.length))
batch.push(...itemChunk)
timeOfFirstItemMs = Date.now()
} else {
const remainingTime = batchTimeoutMs - (Date.now() - timeOfFirstItemMs!)
const itemChunkOption = yield* $(
pipe(
Queue.takeBetween(queue, 1, maxBatchSize - batch.length),
Effect.timeout(Duration.millis(remainingTime)),
),
)
const newRemainingTime = batchTimeoutMs - (Date.now() - timeOfFirstItemMs!)
if (itemChunkOption._tag === 'None' || newRemainingTime <= 0) {
break
}
batch.push(...itemChunkOption.value)
}
if (batch.length === maxBatchSize) {
break
}
}
return batch
}),
Effect.flatMap((queueItemsBatch) => {
const keys = queueItemsBatch.map(([key]) => key)
const env = queueItemsBatch[0]![2]
const refs = queueItemsBatch[0]![3]
return Effect.acquireUseRelease(
Effect.getFiberRefs(),
() =>
Effect.contextWithEffect((parentContext: Context.Context<never>) =>
Effect.flatMap(FiberRefs.setAll(refs), () =>
pipe(
batchCallback([...keys]),
Effect.tap((results) =>
Effect.forEachWithIndex(results, (result, index) => {
const promise = queueItemsBatch[index]![1]
return result._tag === 'Left'
? Deferred.fail(promise, result.left)
: Deferred.succeed(promise, result.right)
}),
),
OT.withSpan('DataLoader:batchedLoad', {
attributes: {
batchKey: otelBatchKey,
maxBatchSize,
batchTimeoutMs,
batchSize: queueItemsBatch.length,
batchRunNumber,
},
}),
OT.histogram(`dataloader_batch_size`, queueItemsBatch.length, { batchKey: otelBatchKey }),
Effect.provideSomeContext(Context.merge(parentContext, env)),
),
),
),
(oldRefs) => Effect.setFiberRefs(oldRefs),
)
}),
Effect.tap(() => Effect.sync(() => batchRunNumber++)),
Effect.forever,
Effect.tapCauseLogPretty,
Effect.interruptible,
Effect.forkScoped,
),
)
// const sink = collectUpTo<never, QueueItem>(maxBatchSize)
// const schedule = Schedule.spaced(batchTimeoutMs)
// yield* $(
// pipe(
// S.fromQueue_(queue),
// // Defers the stream until either `batchTimeoutMs` has passed or `maxBatchSize` has been reached
// S.aggregateAsyncWithin(sink, schedule),
// S.mapEffect((queueItems) => {
// const keys = Chunk.map_(queueItems, ([key]) => key)
// const env = Chunk.unsafeHead(queueItems)![2]
// return pipe(
// batchCallback([...keys]),
// T.provide(env),
// T.tap((results) =>
// T.forEachWithIndex_(results, (result, index) => {
// const promise = Chunk.unsafeGet_(queueItems, index)[1]
// return result._tag === 'Left' ? P.fail_(promise, result.left) : P.succeed_(promise, result.right)
// }),
// ),
// )
// }),
// S.runDrain,
// tapCauseLogPretty,
// T.interruptible,
// T.forkDaemon,
// T.toManagedRelease(F.interrupt),
// ),
// )
return { load }
})
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment