|
import { Weight } from "#models/shared" |
|
import { Config, Effect, Either, HashMap, identity, Layer, Option, pipe, S, type Schedule, Scope, Stream } from "effect-app" |
|
import { DuplexSocket } from "./DuplexSocket.js" |
|
import { getWeight, type ScaleError } from "./scale.RHEWA82c.js" |
|
import { type ServerAddress } from "./serverAddress.js" |
|
|
|
type StreamSchedule = Schedule.Schedule<number, unknown, never> |
|
|
|
export interface ScaleOptions<A, E, R = never> { |
|
schedule?: StreamSchedule |
|
persistentConnection?: boolean |
|
mapStream: ( |
|
input: Stream.Stream<Either.Either<Weight, "not_still">, ScaleError> |
|
) => Stream.Stream<A, E, R> |
|
} |
|
|
|
const provideDuplexSocket = (addr: ServerAddress) => Stream.provideSomeLayer(DuplexSocket.layer(addr.ip, addr.port)) |
|
const optionalSchedule = (schedule?: StreamSchedule) => schedule ? Stream.repeat(schedule) : identity |
|
|
|
const lie = <A, E, R>( |
|
mapStream: (input: Stream.Stream<Either.Either<Weight, "not_still">, ScaleError, never>) => Stream.Stream<A, E, R> |
|
) => |
|
mapStream as unknown as ( |
|
input: Stream.Stream<Either.Either<Weight, "not_still">, ScaleError, DuplexSocket> |
|
) => Stream.Stream<A, E, R | DuplexSocket> |
|
|
|
export class ScaleConnection extends Effect.Service<ScaleConnection>()("ScaleConnection", { |
|
effect: Effect.gen(function*() { |
|
return { |
|
/* |
|
* It connects to the scale and fetches the weight at a given schedule. |
|
* The stream will emit the weight as a `Weight` object or an error if the scale is not still. |
|
* The connection is open for a whole weight cycle, unless `persistentConnection` is set to true. |
|
* In that case, the connection is kept open as long as there are clients connected. |
|
*/ |
|
createScale: <A, E, R>( |
|
addr: ServerAddress, |
|
options: ScaleOptions<A, E, R> |
|
): Stream.Stream<A, E, R> => |
|
getWeight.pipe( |
|
lie(options.mapStream), |
|
(s) => |
|
// if we do not want a persistentConnection we firstly provide the DuplexSocket layer |
|
// so that the socket is open and shared only for requests within a getWeight cycle (still/waitUntilStill->getWeight) |
|
// otherwise the socket will be opened and shared as long as there are clients |
|
options.persistentConnection |
|
? pipe(s, optionalSchedule(options.schedule), provideDuplexSocket(addr)) |
|
: pipe(s, provideDuplexSocket(addr), optionalSchedule(options.schedule)) |
|
) |
|
} |
|
}) |
|
}) { |
|
static Auto = Config.boolean("fake").pipe( |
|
Config.nested("scale"), |
|
Effect.map((fake) => fake ? this.Fake : this.Default), |
|
Layer.unwrapEffect |
|
) |
|
static Fake = Layer.succeed( |
|
this, |
|
this.make({ |
|
createScale: (_, options) => |
|
Stream |
|
.sync(() => |
|
Either.right( |
|
new Weight({ |
|
amount: S.NonNegativeNumber(10 * Math.random()), |
|
unit: "kg" |
|
}) |
|
) |
|
) |
|
.pipe( |
|
options.mapStream, |
|
options.schedule |
|
? Stream.repeat(options.schedule) |
|
: identity |
|
) |
|
}) |
|
) |
|
} |
|
|
|
export class ScaleConnections extends Effect.Service<ScaleConnections>()("ScaleConnections", { |
|
dependencies: [ScaleConnection.Auto], |
|
scoped: Effect.gen(function*() { |
|
const { createScale } = yield* ScaleConnection |
|
|
|
const scope = yield* Effect.scope |
|
|
|
const makeScalePool = <A, E, R>(options: ScaleOptions<A, E, R>) => { |
|
let scales = HashMap.empty< |
|
ServerAddress, |
|
Stream.Stream<A, E, never> |
|
>() |
|
// to have mutual exclusion on the scales when a client is being created |
|
const sema = Effect.unsafeMakeSemaphore(1) |
|
|
|
const getOrCreateScale = Effect |
|
.fn( |
|
function*(addr: ServerAddress) { |
|
const existing = HashMap.get(scales, addr) |
|
if (Option.isSome(existing)) { |
|
return existing.value |
|
} |
|
const weightStream = yield* createScale(addr, options).pipe( |
|
// we share the stream to multiple clients so that we only ever connect once per scale |
|
Stream.share({ |
|
capacity: 100, |
|
// we always want to have the latest value available for new clients. |
|
replay: 1 |
|
}), |
|
Effect.provideService(Scope.Scope, scope) |
|
) |
|
scales = HashMap.set(scales, addr, weightStream) |
|
return weightStream |
|
}, |
|
sema.withPermits(1) |
|
) |
|
|
|
return { |
|
/** |
|
* It creates a stream for each scale upon first use and shares it with all clients. |
|
* It destroys the stream when the last client disconnects. |
|
* When a new client joins an existing scale, it will receive the latest value immediately. |
|
*/ |
|
getOrCreateScale |
|
} |
|
} |
|
|
|
return { |
|
makeScalePool |
|
} |
|
}) |
|
}) { |
|
} |