|  | import { Weight } from "#models/shared" | 
        
          |  | import { Config, Effect, Either, HashMap, identity, Layer, Option, pipe, S, type Schedule, Scope, Stream } from "effect-app" | 
        
          |  | import { DuplexSocket } from "./DuplexSocket.js" | 
        
          |  | import { getWeight, type ScaleError } from "./scale.RHEWA82c.js" | 
        
          |  | import { type ServerAddress } from "./serverAddress.js" | 
        
          |  |  | 
        
          |  | type StreamSchedule = Schedule.Schedule<number, unknown, never> | 
        
          |  |  | 
        
          |  | export interface ScaleOptions<A, E, R = never> { | 
        
          |  | schedule?: StreamSchedule | 
        
          |  | persistentConnection?: boolean | 
        
          |  | mapStream: ( | 
        
          |  | input: Stream.Stream<Either.Either<Weight, "not_still">, ScaleError> | 
        
          |  | ) => Stream.Stream<A, E, R> | 
        
          |  | } | 
        
          |  |  | 
        
          |  | const provideDuplexSocket = (addr: ServerAddress) => Stream.provideSomeLayer(DuplexSocket.layer(addr.ip, addr.port)) | 
        
          |  | const optionalSchedule = (schedule?: StreamSchedule) => schedule ? Stream.repeat(schedule) : identity | 
        
          |  |  | 
        
          |  | const lie = <A, E, R>( | 
        
          |  | mapStream: (input: Stream.Stream<Either.Either<Weight, "not_still">, ScaleError, never>) => Stream.Stream<A, E, R> | 
        
          |  | ) => | 
        
          |  | mapStream as unknown as ( | 
        
          |  | input: Stream.Stream<Either.Either<Weight, "not_still">, ScaleError, DuplexSocket> | 
        
          |  | ) => Stream.Stream<A, E, R | DuplexSocket> | 
        
          |  |  | 
        
          |  | export class ScaleConnection extends Effect.Service<ScaleConnection>()("ScaleConnection", { | 
        
          |  | effect: Effect.gen(function*() { | 
        
          |  | return { | 
        
          |  | /* | 
        
          |  | * It connects to the scale and fetches the weight at a given schedule. | 
        
          |  | * The stream will emit the weight as a `Weight` object or an error if the scale is not still. | 
        
          |  | * The connection is open for a whole weight cycle, unless `persistentConnection` is set to true. | 
        
          |  | * In that case, the connection is kept open as long as there are clients connected. | 
        
          |  | */ | 
        
          |  | createScale: <A, E, R>( | 
        
          |  | addr: ServerAddress, | 
        
          |  | options: ScaleOptions<A, E, R> | 
        
          |  | ): Stream.Stream<A, E, R> => | 
        
          |  | getWeight.pipe( | 
        
          |  | lie(options.mapStream), | 
        
          |  | (s) => | 
        
          |  | // if we do not want a persistentConnection we firstly provide the DuplexSocket layer | 
        
          |  | // so that the socket is open and shared only for requests within a getWeight cycle (still/waitUntilStill->getWeight) | 
        
          |  | // otherwise the socket will be opened and shared as long as there are clients | 
        
          |  | options.persistentConnection | 
        
          |  | ? pipe(s, optionalSchedule(options.schedule), provideDuplexSocket(addr)) | 
        
          |  | : pipe(s, provideDuplexSocket(addr), optionalSchedule(options.schedule)) | 
        
          |  | ) | 
        
          |  | } | 
        
          |  | }) | 
        
          |  | }) { | 
        
          |  | static Auto = Config.boolean("fake").pipe( | 
        
          |  | Config.nested("scale"), | 
        
          |  | Effect.map((fake) => fake ? this.Fake : this.Default), | 
        
          |  | Layer.unwrapEffect | 
        
          |  | ) | 
        
          |  | static Fake = Layer.succeed( | 
        
          |  | this, | 
        
          |  | this.make({ | 
        
          |  | createScale: (_, options) => | 
        
          |  | Stream | 
        
          |  | .sync(() => | 
        
          |  | Either.right( | 
        
          |  | new Weight({ | 
        
          |  | amount: S.NonNegativeNumber(10 * Math.random()), | 
        
          |  | unit: "kg" | 
        
          |  | }) | 
        
          |  | ) | 
        
          |  | ) | 
        
          |  | .pipe( | 
        
          |  | options.mapStream, | 
        
          |  | options.schedule | 
        
          |  | ? Stream.repeat(options.schedule) | 
        
          |  | : identity | 
        
          |  | ) | 
        
          |  | }) | 
        
          |  | ) | 
        
          |  | } | 
        
          |  |  | 
        
          |  | export class ScaleConnections extends Effect.Service<ScaleConnections>()("ScaleConnections", { | 
        
          |  | dependencies: [ScaleConnection.Auto], | 
        
          |  | scoped: Effect.gen(function*() { | 
        
          |  | const { createScale } = yield* ScaleConnection | 
        
          |  |  | 
        
          |  | const scope = yield* Effect.scope | 
        
          |  |  | 
        
          |  | const makeScalePool = <A, E, R>(options: ScaleOptions<A, E, R>) => { | 
        
          |  | let scales = HashMap.empty< | 
        
          |  | ServerAddress, | 
        
          |  | Stream.Stream<A, E, never> | 
        
          |  | >() | 
        
          |  | // to have mutual exclusion on the scales when a client is being created | 
        
          |  | const sema = Effect.unsafeMakeSemaphore(1) | 
        
          |  |  | 
        
          |  | const getOrCreateScale = Effect | 
        
          |  | .fn( | 
        
          |  | function*(addr: ServerAddress) { | 
        
          |  | const existing = HashMap.get(scales, addr) | 
        
          |  | if (Option.isSome(existing)) { | 
        
          |  | return existing.value | 
        
          |  | } | 
        
          |  | const weightStream = yield* createScale(addr, options).pipe( | 
        
          |  | // we share the stream to multiple clients so that we only ever connect once per scale | 
        
          |  | Stream.share({ | 
        
          |  | capacity: 100, | 
        
          |  | // we always want to have the latest value available for new clients. | 
        
          |  | replay: 1 | 
        
          |  | }), | 
        
          |  | Effect.provideService(Scope.Scope, scope) | 
        
          |  | ) | 
        
          |  | scales = HashMap.set(scales, addr, weightStream) | 
        
          |  | return weightStream | 
        
          |  | }, | 
        
          |  | sema.withPermits(1) | 
        
          |  | ) | 
        
          |  |  | 
        
          |  | return { | 
        
          |  | /** | 
        
          |  | * It creates a stream for each scale upon first use and shares it with all clients. | 
        
          |  | * It destroys the stream when the last client disconnects. | 
        
          |  | * When a new client joins an existing scale, it will receive the latest value immediately. | 
        
          |  | */ | 
        
          |  | getOrCreateScale | 
        
          |  | } | 
        
          |  | } | 
        
          |  |  | 
        
          |  | return { | 
        
          |  | makeScalePool | 
        
          |  | } | 
        
          |  | }) | 
        
          |  | }) { | 
        
          |  | } |