Last active
November 23, 2023 01:59
-
-
Save tim-smart/b33994646435aa860cc95c31ea5d5129 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { Schema, TreeFormatter } from "@effect/schema" | |
import { | |
Cause, | |
Context, | |
Data, | |
Effect, | |
Either, | |
Layer, | |
Option, | |
PrimaryKey, | |
ReadonlyArray, | |
Request, | |
RequestResolver, | |
Scope, | |
pipe, | |
} from "effect" | |
import * as Lmdb from "lmdb" | |
class StorageError extends Data.TaggedError("StorageError")<{ | |
readonly method: string | |
readonly reason: "Unknown" | "Encode" | "Construction" | |
readonly message: string | |
}> {} | |
export interface StorageOptions { | |
readonly storeId: string | |
} | |
export interface BackingStorageFactory { | |
readonly make: ( | |
options: StorageOptions, | |
) => Effect.Effect<Scope.Scope, StorageError, BackingStorage> | |
} | |
export const BackingStorageFactory = Context.Tag<BackingStorageFactory>( | |
"app/BackingStorageFactory", | |
) | |
export interface BackingStorage { | |
readonly getAll: ( | |
keys: Array<string>, | |
) => Effect.Effect<never, StorageError, Array<Option.Option<unknown>>> | |
readonly set: ( | |
key: string, | |
value: unknown, | |
) => Effect.Effect<never, StorageError, void> | |
readonly delete: (key: string) => Effect.Effect<never, StorageError, void> | |
} | |
export interface SchemaStorageFactory { | |
readonly make: <I, A>( | |
schema: Schema.Schema<I, A>, | |
options: StorageOptions, | |
) => Effect.Effect<Scope.Scope, StorageError, SchemaStorage<A>> | |
} | |
export const SchemaStorageFactory = Context.Tag<SchemaStorageFactory>( | |
"app/SchemaStorageFactory", | |
) | |
export interface SchemaStorage<A> { | |
readonly getAll: ( | |
keys: Array<string>, | |
) => Effect.Effect<never, StorageError, Array<Option.Option<A>>> | |
readonly set: ( | |
key: string, | |
value: A, | |
) => Effect.Effect<never, StorageError, void> | |
} | |
export const SchemaStorageFactoryLive = Layer.effect( | |
SchemaStorageFactory, | |
Effect.gen(function* (_) { | |
const storageFactory = yield* _(BackingStorageFactory) | |
return SchemaStorageFactory.of({ | |
make: (schema, options) => | |
Effect.gen(function* (_) { | |
const storage = yield* _(storageFactory.make(options)) | |
const parse = Schema.parse(schema) | |
const encode = Schema.encode(schema) | |
return { | |
getAll: (keys) => | |
Effect.flatMap( | |
storage.getAll(keys), | |
Effect.forEach((result, i) => | |
Option.match(result, { | |
onNone: () => Effect.succeedNone, | |
onSome: (_) => | |
parse(_).pipe( | |
Effect.tapError((_) => storage.delete(keys[i])), | |
Effect.option, | |
), | |
}), | |
), | |
), | |
set: (key, value) => | |
encode(value).pipe( | |
Effect.mapError( | |
(e) => | |
new StorageError({ | |
method: "set", | |
reason: "Encode", | |
message: TreeFormatter.formatErrors(e.errors), | |
}), | |
), | |
Effect.flatMap((_) => storage.set(key, _)), | |
), | |
} | |
}), | |
}) | |
}), | |
) | |
export interface PersistedResolverOptions<EI, EA, AI, AA> | |
extends StorageOptions { | |
readonly Failure: Schema.Schema<EI, EA> | |
readonly Success: Schema.Schema<AI, AA> | |
} | |
export const persisted = < | |
EI, | |
EA, | |
AI, | |
AA, | |
Req extends Request.Request<EA, AA> & { | |
readonly _tag: string | |
} & PrimaryKey.PrimaryKey, | |
>( | |
self: RequestResolver.RequestResolver<Req, never>, | |
options: PersistedResolverOptions<EI, EA, AI, AA>, | |
): Effect.Effect< | |
SchemaStorageFactory | Scope.Scope, | |
StorageError, | |
RequestResolver.RequestResolver<Req, never> | |
> => | |
Effect.gen(function* (_) { | |
const resultSchema = Schema.either(options.Failure, options.Success) | |
const storage = yield* _( | |
(yield* _(SchemaStorageFactory)).make(resultSchema, options), | |
) | |
const requestKey = (request: Req) => | |
`${request._tag}:${request[PrimaryKey.symbol]()}` | |
const partition = (requests: ReadonlyArray<Req>) => | |
storage.getAll(requests.map(requestKey)).pipe( | |
Effect.map( | |
ReadonlyArray.partitionMap((_, i) => | |
Option.match(_, { | |
onNone: () => Either.left(requests[i]), | |
onSome: (_) => Either.right([requests[i], _] as const), | |
}), | |
), | |
), | |
Effect.orElseSucceed(() => [requests, []] as const), | |
) | |
const set = ( | |
request: Req, | |
result: Request.Request.Result<Req>, | |
): Effect.Effect<never, never, void> => { | |
const key = requestKey(request) | |
if (result._tag === "Failure") { | |
return Either.match(Cause.failureOrCause(result.cause), { | |
onLeft: (e) => Effect.ignoreLogged(storage.set(key, Either.left(e))), | |
onRight: (_cause) => Effect.unit, | |
}) | |
} | |
return Effect.ignoreLogged(storage.set(key, Either.right(result.value))) | |
} | |
return RequestResolver.makeBatched((requests: Array<Req>) => | |
Effect.flatMap(partition(requests), ([remaining, results]) => { | |
const completeCached = Effect.forEach( | |
results, | |
([request, result]) => | |
Request.completeEffect(request, result as any) as Effect.Effect< | |
never, | |
never, | |
void | |
>, | |
{ discard: true }, | |
) | |
const completeUncached = pipe( | |
Effect.forEach( | |
remaining, | |
(request) => Effect.exit(Effect.request(request, self)), | |
{ batching: true }, | |
), | |
Effect.flatMap((results) => | |
Effect.forEach( | |
results, | |
(result, i) => { | |
const request = remaining[i] | |
return Effect.zipRight( | |
set(request, result as any), | |
Request.complete(request, result as any), | |
) | |
}, | |
{ discard: true }, | |
), | |
), | |
) | |
return Effect.zipRight(completeCached, completeUncached) | |
}), | |
) | |
}) | |
// memory storage | |
const MemoryJsonStorageLive = Layer.succeed( | |
BackingStorageFactory, | |
BackingStorageFactory.of({ | |
make: (_options) => | |
Effect.gen(function* (_) { | |
const map = new Map<string, unknown>() | |
return { | |
getAll: (keys) => | |
Effect.forEach(keys, (key) => | |
Effect.succeed(Option.fromNullable(map.get(key))), | |
), | |
set: (key, value) => Effect.sync(() => map.set(key, value)), | |
delete: (key) => Effect.sync(() => map.delete(key)), | |
} | |
}), | |
}), | |
) | |
const MemorySchemaStorageLive = SchemaStorageFactoryLive.pipe( | |
Layer.use(MemoryJsonStorageLive), | |
) | |
// lmdb storage | |
const makeLmdb = (options: Lmdb.RootDatabaseOptionsWithPath) => | |
Effect.gen(function* (_) { | |
const lmdb = Lmdb.open(options) | |
yield* _(Effect.addFinalizer(() => Effect.promise(() => lmdb.close()))) | |
return BackingStorageFactory.of({ | |
make: (options) => | |
Effect.gen(function* (_) { | |
const db = lmdb.openDB({ name: options.storeId }) | |
yield* _(Effect.addFinalizer(() => Effect.promise(() => db.close()))) | |
return { | |
getAll: (keys) => | |
Effect.map( | |
Effect.tryPromise({ | |
try: () => db.getMany(keys), | |
catch: (e) => | |
new StorageError({ | |
method: "getAll", | |
reason: "Unknown", | |
message: `${e}`, | |
}), | |
}), | |
ReadonlyArray.map(Option.fromNullable), | |
), | |
set: (key, value) => | |
Effect.tryPromise({ | |
try: () => db.put(key, value), | |
catch: (e) => | |
new StorageError({ | |
method: "set", | |
reason: "Unknown", | |
message: `${e}`, | |
}), | |
}), | |
delete: (key) => | |
Effect.tryPromise({ | |
try: () => db.remove(key), | |
catch: (e) => | |
new StorageError({ | |
method: "delete", | |
reason: "Unknown", | |
message: `${e}`, | |
}), | |
}), | |
} | |
}), | |
}) | |
}) | |
const LmdbStorageLive = (options: Lmdb.RootDatabaseOptionsWithPath) => | |
Layer.scoped(BackingStorageFactory, makeLmdb(options)) | |
const LmdbSchemaStorageLive = (options: Lmdb.RootDatabaseOptionsWithPath) => | |
SchemaStorageFactoryLive.pipe(Layer.use(LmdbStorageLive(options))) | |
// usage | |
class User extends Schema.Class<User>()({ | |
id: Schema.number, | |
name: Schema.string, | |
}) {} | |
class MyRequest extends Request.TaggedClass("MyRequest")< | |
never, | |
User, | |
{ readonly id: number } | |
> { | |
[PrimaryKey.symbol]() { | |
return this.id.toString() | |
} | |
} | |
const baseResolver = RequestResolver.fromFunctionBatched( | |
(requests: Array<MyRequest>) => | |
requests.map( | |
(_) => | |
new User({ | |
id: _.id, | |
name: "name", | |
}), | |
), | |
) | |
const persistedResolver: Effect.Effect< | |
SchemaStorageFactory | Scope.Scope, | |
StorageError, | |
RequestResolver.RequestResolver<MyRequest, never> | |
> = persisted(baseResolver, { | |
Failure: Schema.never, | |
Success: User, | |
storeId: "user", | |
}) | |
Effect.gen(function* (_) { | |
const resolver = yield* _(persistedResolver) | |
const getUsers = Effect.forEach( | |
ReadonlyArray.range(0, 10), | |
(i) => Effect.request(new MyRequest({ id: i }), resolver), | |
{ batching: true }, | |
) | |
let users = yield* _(getUsers) | |
console.log(users) | |
users = yield* _(getUsers) | |
console.log(users) | |
}).pipe( | |
Effect.scoped, | |
Effect.provide(LmdbSchemaStorageLive({ path: "./db" })), | |
Effect.tapErrorCause(Effect.logError), | |
Effect.runFork, | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment