Created
April 10, 2023 20:37
-
-
Save schickling/0be90afbf0ba1ea916eaa5d977225c9f to your computer and use it in GitHub Desktop.
Effect Dataloader
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import * as Chunk from '@effect/data/Chunk' | |
import * as Context from '@effect/data/Context' | |
import * as Duration from '@effect/data/Duration' | |
import type * as Either from '@effect/data/Either' | |
import { pipe } from '@effect/data/Function' | |
import * as Deferred from '@effect/io/Deferred' | |
import * as FiberRefs from '@effect/io/FiberRefs' | |
import * as Queue from '@effect/io/Queue' | |
import * as Effect from './Effect.js' | |
import * as OT from './Tracing/index.js' | |
export type DataLoaderOptions = { | |
maxBatchSize?: number | |
/** | |
* Time in milliseconds how long the dataloader should wait for new keys before calling `batchCallback` | |
* | |
* @default 10 | |
* | |
* TODO replace with a more sophisticated scheduler (e.g. https://github.com/graphql/dataloader#batch-scheduling) | |
*/ | |
batchTimeoutMs?: number | |
otelBatchKey: string | |
} | |
export const makeDataLoader = <R, Err, TKey, TOut>( | |
batchCallback: (keys: readonly TKey[]) => Effect.Effect<R, never, readonly Either.Either<Err, TOut>[]>, | |
{ batchTimeoutMs = 10, maxBatchSize = Number.POSITIVE_INFINITY, otelBatchKey }: DataLoaderOptions, | |
) => | |
Effect.gen(function* ($) { | |
type QueueItem = [ | |
key: TKey, | |
promise: Deferred.Deferred<Err, TOut>, | |
env: Context.Context<R>, | |
refs: FiberRefs.FiberRefs, | |
] | |
const queue = yield* $(Queue.unbounded<QueueItem>()) | |
const loadSingle = (key: TKey): Effect.Effect<R, Err, TOut> => | |
pipe( | |
Effect.Do(), | |
Effect.bind('promise', () => Deferred.make<Err, TOut>()), | |
Effect.bind('env', () => Effect.context<R>()), | |
Effect.bind('refs', () => Effect.getFiberRefs()), | |
Effect.tap(({ promise, env, refs }) => Queue.offer(queue, [key, promise, env, refs])), | |
Effect.flatMap(({ promise }) => Deferred.await(promise)), | |
) | |
function load(key: TKey): Effect.Effect<R, Err, TOut> | |
function load(key: readonly TKey[]): Effect.Effect<R, Err, readonly TOut[]> | |
// eslint-disable-next-line prefer-arrow/prefer-arrow-functions | |
function load(key: TKey | readonly TKey[]): Effect.Effect<R, Err, TOut | readonly TOut[]> { | |
return Array.isArray(key) | |
? pipe(Effect.map(Effect.forEachPar(key, loadSingle), Chunk.toReadonlyArray)) | |
: loadSingle(key as TKey) | |
} | |
let batchRunNumber = 0 | |
yield* $( | |
pipe( | |
Effect.gen(function* ($) { | |
const batch: QueueItem[] = [] | |
let timeOfFirstItemMs: number | undefined | |
while (true) { | |
if (batch.length === 0) { | |
const itemChunk = yield* $(Queue.takeBetween(queue, 1, maxBatchSize - batch.length)) | |
batch.push(...itemChunk) | |
timeOfFirstItemMs = Date.now() | |
} else { | |
const remainingTime = batchTimeoutMs - (Date.now() - timeOfFirstItemMs!) | |
const itemChunkOption = yield* $( | |
pipe( | |
Queue.takeBetween(queue, 1, maxBatchSize - batch.length), | |
Effect.timeout(Duration.millis(remainingTime)), | |
), | |
) | |
const newRemainingTime = batchTimeoutMs - (Date.now() - timeOfFirstItemMs!) | |
if (itemChunkOption._tag === 'None' || newRemainingTime <= 0) { | |
break | |
} | |
batch.push(...itemChunkOption.value) | |
} | |
if (batch.length === maxBatchSize) { | |
break | |
} | |
} | |
return batch | |
}), | |
Effect.flatMap((queueItemsBatch) => { | |
const keys = queueItemsBatch.map(([key]) => key) | |
const env = queueItemsBatch[0]![2] | |
const refs = queueItemsBatch[0]![3] | |
return Effect.acquireUseRelease( | |
Effect.getFiberRefs(), | |
() => | |
Effect.contextWithEffect((parentContext: Context.Context<never>) => | |
Effect.flatMap(FiberRefs.setAll(refs), () => | |
pipe( | |
batchCallback([...keys]), | |
Effect.tap((results) => | |
Effect.forEachWithIndex(results, (result, index) => { | |
const promise = queueItemsBatch[index]![1] | |
return result._tag === 'Left' | |
? Deferred.fail(promise, result.left) | |
: Deferred.succeed(promise, result.right) | |
}), | |
), | |
OT.withSpan('DataLoader:batchedLoad', { | |
attributes: { | |
batchKey: otelBatchKey, | |
maxBatchSize, | |
batchTimeoutMs, | |
batchSize: queueItemsBatch.length, | |
batchRunNumber, | |
}, | |
}), | |
OT.histogram(`dataloader_batch_size`, queueItemsBatch.length, { batchKey: otelBatchKey }), | |
Effect.provideSomeContext(Context.merge(parentContext, env)), | |
), | |
), | |
), | |
(oldRefs) => Effect.setFiberRefs(oldRefs), | |
) | |
}), | |
Effect.tap(() => Effect.sync(() => batchRunNumber++)), | |
Effect.forever, | |
Effect.tapCauseLogPretty, | |
Effect.interruptible, | |
Effect.forkScoped, | |
), | |
) | |
// const sink = collectUpTo<never, QueueItem>(maxBatchSize) | |
// const schedule = Schedule.spaced(batchTimeoutMs) | |
// yield* $( | |
// pipe( | |
// S.fromQueue_(queue), | |
// // Defers the stream until either `batchTimeoutMs` has passed or `maxBatchSize` has been reached | |
// S.aggregateAsyncWithin(sink, schedule), | |
// S.mapEffect((queueItems) => { | |
// const keys = Chunk.map_(queueItems, ([key]) => key) | |
// const env = Chunk.unsafeHead(queueItems)![2] | |
// return pipe( | |
// batchCallback([...keys]), | |
// T.provide(env), | |
// T.tap((results) => | |
// T.forEachWithIndex_(results, (result, index) => { | |
// const promise = Chunk.unsafeGet_(queueItems, index)[1] | |
// return result._tag === 'Left' ? P.fail_(promise, result.left) : P.succeed_(promise, result.right) | |
// }), | |
// ), | |
// ) | |
// }), | |
// S.runDrain, | |
// tapCauseLogPretty, | |
// T.interruptible, | |
// T.forkDaemon, | |
// T.toManagedRelease(F.interrupt), | |
// ), | |
// ) | |
return { load } | |
}) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment