Skip to content

Instantly share code, notes, and snippets.

@patroza
Last active July 15, 2025 06:03
Show Gist options
  • Save patroza/2664b1373cccc0f6cf366df958b9d453 to your computer and use it in GitHub Desktop.
Save patroza/2664b1373cccc0f6cf366df958b9d453 to your computer and use it in GitHub Desktop.
Get weight from Weight Scale implementation for RHEWA 82c

RHEWA 82c

  • exposed over Server Sent Events, only publishing changes
  • Scale accepts only 1 tcp connection at a time
  • Share weight requests between clients
  • Connect only when there are clients

Running

Client Proxy

Exposes scale weight as Server Sent Events endpoint by connecting over tcp to specified scale (at request time).

  • tsx ./main.ts

listens on port 3334 by default

Arguments

  • --fake use a random generated weight instead of connecting to a scale for real

Visit http://localhost:3334/?address=scale-ip:scale-port e.g if you run the Scale Emulator, you can use: http://localhost:3334/?address=localhost:3335

Scale Emulator

  • tsx ./main-server.ts

listens on port 3335 by default

import { NodeSocket } from "@effect/platform-node"
import { Context, Effect, flow, Layer, Mailbox, Schedule, Stream } from "effect-app"
const TIMEOUT_MS = 2000
const makeDuplexSocket = Effect.fnUntraced(function*(ip: string, port: number) {
const q = yield* Mailbox.make<string>()
yield* Effect.addFinalizer(() => q.shutdown)
const s = NodeSocket.makeNetChannel({
host: ip,
port,
timeout: TIMEOUT_MS
})
const sq = yield* Mailbox.toStream(q).pipe(
Stream.encodeText,
Stream.pipeThroughChannel(s),
Stream.retry(Schedule.exponential(100).pipe(Schedule.intersect(Schedule.recurs(3)))),
Stream.decodeText(),
Mailbox.fromStream()
)
const write = (msg: string) => q.offer(msg)
const read = Mailbox.toStream(sq)
return {
write,
read,
request: flow(write, Effect.map(() => read), Stream.unwrap)
}
})
export class DuplexSocket
extends Context.Tag("DuplexSocket")<DuplexSocket, Effect.Success<ReturnType<typeof makeDuplexSocket>>>()
{
static layer = (ip: string, port: number) => Layer.scoped(DuplexSocket, makeDuplexSocket(ip, port))
}
import { makeSSE } from "#lib/middleware"
import { type Weight } from "#models/shared"
import { ScaleEvents } from "#resources/Events"
import { Effect, Either, Schedule, Stream } from "effect-app"
import { HttpServerRequest } from "effect-app/http"
import * as S from "effect-app/Schema"
import { ScaleConnections } from "./ScaleConnections.js"
import { ServerAddressFromString } from "./serverAddress.js"
const addTimestamp = <X>(evt: X) => ({
evt,
timestamp: new Date()
})
export const makeScaleEvents = Effect.gen(function*() {
const { makeScalePool } = yield* ScaleConnections
const pool = makeScalePool({
schedule: Schedule.spaced("1 seconds"),
mapStream: (s) =>
s.pipe(
// display not_still as "..."
Stream.map(Either.mapLeft(() => "...")),
// turn ScaleErrors into messages, exposing them to clients, as well as preventing the Stream to end prematurely - we want it to keep repeating, even on errors.
Stream.catchAll((err) => Stream.succeed(Either.left(err.message) as Either.Either<Weight, string>))
)
})
return Effect
.gen(function*() {
const params = yield* HttpServerRequest.ParsedSearchParams
const address = params["address"]
if (!address || Array.isArray(address) && address.length === 0) {
return yield* Effect.dieMessage("Missing address")
}
const args = Array.isArray(address) ? address[0]! : address
const sa = yield* S.decodeUnknown(ServerAddressFromString)(args)
let id = BigInt(0)
const stream = yield* pool
.getOrCreateScale(sa)
const sse = makeSSE(ScaleEvents)
return yield* sse(stream.pipe(
Stream.changes,
Stream.map((e) => addTimestamp(e)),
Stream.map(({ evt, timestamp }) => ({
evt: { id: id++, result: evt, timestamp },
namespace: "primary"
}))
))
})
})
import { NodeRuntime, NodeSocketServer } from "@effect/platform-node"
import { Console, Effect, Layer, Mailbox } from "effect-app"
const makeDuplexServerSocket = Effect.fnUntraced(
function*(port: number, handleMessage: (msg: string) => Effect<string>) {
const q = yield* Mailbox.make<string>()
yield* Effect.addFinalizer(() => q.shutdown)
const server = yield* NodeSocketServer.make({
port
})
const td = new TextDecoder("ascii")
yield* server
.run(Effect.fnUntraced(function*(socket) {
const write = yield* socket.writer
yield* socket
.run((msg) =>
Effect.sync(() => td.decode(msg)).pipe(
Effect.tap((msg) => Effect.flatMap(handleMessage(msg), write))
)
)
.pipe(Effect.forkScoped)
}))
.pipe(
Effect.forkScoped
)
}
)
const port = 3335
const newWeight = Effect.sync(() => (Math.random() * 100).toFixed(2))
Effect
.gen(function*() {
console.log("listening on port", port)
let weight = yield* newWeight
yield* makeDuplexServerSocket(
port,
Effect.fnUntraced(function*(msg) {
if (msg.startsWith("<Gs")) {
if (Math.random() > 0.1) {
return `1<Gs>`
}
weight = yield* newWeight
return `0<Gs>`
}
return weight + "<GB>"
})
)
})
.pipe(Layer.scopedDiscard, Layer.launch, Effect.onExit(Console.log), NodeRuntime.runMain())
import { HttpMiddleware, HttpRouter, HttpServer } from "@effect/platform"
import { NodeHttpServer, NodeRuntime } from "@effect/platform-node"
import { captureException } from "@sentry/node"
import * as Sentry from "@sentry/node"
import { Cause, Config, ConfigProvider, Console, Effect, Layer } from "effect-app"
import { createServer } from "http"
import { makeScaleEvents } from "./events.js"
import { ScaleConnections } from "./ScaleConnections.js"
process.on("uncaughtException", function(err) {
console.log("uncaughtException", err)
throw err
})
// Sentry.init({ ... })
const catchAndReport = <A, E, R>(self: Effect.Effect<A, E, R>) =>
Effect.catchAllCause(self, (err) => Effect.sync(() => captureException(Cause.squash(err)))).pipe(Effect.asVoid)
const HttpLive = Effect
.gen(function*() {
const port = yield* Config.integer("PORT").pipe(Config.withDefault(3334))
const SSE = yield* makeScaleEvents
const router = HttpRouter.empty.pipe(HttpRouter.get("/", SSE))
yield* Console.log("Starting to listen on port: " + port)
return HttpServer
.serve(router, HttpMiddleware.cors())
.pipe(
Layer.provide(NodeHttpServer.layer(() => createServer(), { port }))
)
})
.pipe(
Layer.unwrapEffect,
Layer.provide(ScaleConnections.Default)
)
NodeRuntime.runMain(
Layer
.launch(
HttpLive.pipe(Layer
.provide(
Layer.setConfigProvider(
ConfigProvider
.fromMap(
new Map([[
"scale.fake",
process.argv.some((arg) => arg === "--fake").toString()
]])
)
.pipe(ConfigProvider.orElse(ConfigProvider.fromEnv))
)
))
)
.pipe(
catchAndReport,
Effect.onExit(() => Effect.promise(() => Sentry.flush(10_000)))
)
)
import { Weight } from "#models/shared"
import { NonNegativeNumberFromString } from "#resources/Events"
import { Chunk, Data, Effect, Either, Option, pipe, Schedule, Schema, Stream } from "effect-app"
import { DuplexSocket } from "./DuplexSocket.js"
const OUTPUT_INTERFACE = "1"
const WAIT_FOR_STILL_CNT = 50
const TIMEOUT_MS = 2000
const MAX_BUFFER = 200
type ScaleResponse = Either.Either<Weight, "not_still">
const ScaleResponse = {
weight: (weight: Weight): Either.Either<Weight, "not_still"> => Either.right(weight),
notStill: Either.left("not_still") as Either.Either<Weight, "not_still">
}
export class ScaleError extends Data.TaggedError("ScaleError")<{
message: string
}> {}
// send a command to the scale and wait for a response
const scaleRequest = Effect.fnUntraced(
function*(strScaleCmd: string, strScaleParams: string) {
const command = `<${strScaleCmd}${strScaleParams}>`
const expectedSuffix = `<${strScaleCmd}>`
yield* Effect.logDebug(" writing command", command)
const { request } = yield* DuplexSocket
return request(command).pipe(
Stream.scan("", (acc, chunk) => acc + chunk),
Stream.mapEffect(Effect.fn(function*(received) {
yield* Effect.logDebug(`Received: ${received}`, received.endsWith(expectedSuffix), expectedSuffix)
if (received.endsWith(expectedSuffix)) {
return Option.some(
received
.substring(
0,
received.length - expectedSuffix.length
)
.trim()
)
} else if (received.length > MAX_BUFFER) {
return yield* new ScaleError({ message: `ScaleRequest buffer overflow for command ${command}` })
} else if (received.endsWith("!>")) {
return yield* new ScaleError({ message: "Nicht erfolgreich" })
} else if (received.endsWith("\">")) {
return yield* new ScaleError({ message: "Länge fehlerhaft" })
} else if (received.endsWith("%!>")) {
return yield* new ScaleError({ message: "Prozent Fehler" })
} else if (received.endsWith("~!>")) {
return yield* new ScaleError({ message: "Unbekannter Fehler" })
} else {
return Option.none()
}
})),
Stream.filter(Option.isSome),
Stream.timeout(TIMEOUT_MS),
Stream.map((option) => option.value),
Stream.take(1)
)
},
Stream.unwrap,
Stream.tapBoth({
onFailure: (err) => Effect.logError(err.message),
onSuccess: (msg) => Effect.logDebug(`msg ${msg}`)
}),
Stream.runCollect,
Effect.map(Chunk.join(""))
)
const isScaleStill = scaleRequest("Gs", OUTPUT_INTERFACE).pipe(
Effect.map((result) => result === "1")
)
const waitUntilScaleStill = pipe(
isScaleStill,
Stream.fromEffect,
Stream.repeat(Schedule.spaced(100).pipe(Schedule.intersect(Schedule.recurs(WAIT_FOR_STILL_CNT)))),
Stream.changes,
Stream.takeUntil((isStill) => isStill),
Stream.runLast,
Effect.map(Option.getOrElse(() => false))
)
const getScaleWeight = scaleRequest("GB", OUTPUT_INTERFACE).pipe(
Effect.andThen((data) =>
Schema.decodeUnknown(NonNegativeNumberFromString)(data).pipe(
Effect.map((data) => new Weight({ amount: data, unit: "kg" })),
Effect.catchTags({
ParseError: () => new ScaleError({ message: `Kein gültiger Wert (${JSON.stringify(data)})` })
})
)
),
Effect.map(ScaleResponse.weight),
Stream.fromEffect
)
export const getWeight = isScaleStill
.pipe(
Stream.flatMap((isStill) =>
// if the scale is still, immediately get and return the weight
isStill
? getScaleWeight
: Stream.concat(
// if the scale was not still initially, immediately return a "not_still" message once
Stream.succeed(ScaleResponse.notStill),
// then wait until still
waitUntilScaleStill.pipe(
Stream.flatMap((isStill) =>
isStill
? getScaleWeight
// if at the end of the wait the scale is still not still, fail the stream
: Stream.fail(new ScaleError({ message: "Waage nicht in Ruhe!" }))
)
)
)
),
Stream.tapBoth({
onFailure: (err) => Effect.logError(err.message),
onSuccess: (weight) => Effect.logDebug(`Weight read: ${weight}`)
}),
Stream.catchTag(
"SocketError",
(err) => Stream.fail(new ScaleError({ message: `Socket error: ${err.message}` }))
)
)
import { Weight } from "#models/shared"
import { Config, Effect, Either, HashMap, identity, Layer, Option, pipe, S, type Schedule, Scope, Stream } from "effect-app"
import { DuplexSocket } from "./DuplexSocket.js"
import { getWeight, type ScaleError } from "./scale.RHEWA82c.js"
import { type ServerAddress } from "./serverAddress.js"
type StreamSchedule = Schedule.Schedule<number, unknown, never>
export interface ScaleOptions<A, E, R = never> {
schedule?: StreamSchedule
persistentConnection?: boolean
mapStream: (
input: Stream.Stream<Either.Either<Weight, "not_still">, ScaleError>
) => Stream.Stream<A, E, R>
}
const provideDuplexSocket = (addr: ServerAddress) => Stream.provideSomeLayer(DuplexSocket.layer(addr.ip, addr.port))
const optionalSchedule = (schedule?: StreamSchedule) => schedule ? Stream.repeat(schedule) : identity
const lie = <A, E, R>(
mapStream: (input: Stream.Stream<Either.Either<Weight, "not_still">, ScaleError, never>) => Stream.Stream<A, E, R>
) =>
mapStream as unknown as (
input: Stream.Stream<Either.Either<Weight, "not_still">, ScaleError, DuplexSocket>
) => Stream.Stream<A, E, R | DuplexSocket>
export class ScaleConnection extends Effect.Service<ScaleConnection>()("ScaleConnection", {
effect: Effect.gen(function*() {
return {
/*
* It connects to the scale and fetches the weight at a given schedule.
* The stream will emit the weight as a `Weight` object or an error if the scale is not still.
* The connection is open for a whole weight cycle, unless `persistentConnection` is set to true.
* In that case, the connection is kept open as long as there are clients connected.
*/
createScale: <A, E, R>(
addr: ServerAddress,
options: ScaleOptions<A, E, R>
): Stream.Stream<A, E, R> =>
getWeight.pipe(
lie(options.mapStream),
(s) =>
// if we do not want a persistentConnection we firstly provide the DuplexSocket layer
// so that the socket is open and shared only for requests within a getWeight cycle (still/waitUntilStill->getWeight)
// otherwise the socket will be opened and shared as long as there are clients
options.persistentConnection
? pipe(s, optionalSchedule(options.schedule), provideDuplexSocket(addr))
: pipe(s, provideDuplexSocket(addr), optionalSchedule(options.schedule))
)
}
})
}) {
static Auto = Config.boolean("fake").pipe(
Config.nested("scale"),
Effect.map((fake) => fake ? this.Fake : this.Default),
Layer.unwrapEffect
)
static Fake = Layer.succeed(
this,
this.make({
createScale: (_, options) =>
Stream
.sync(() =>
Either.right(
new Weight({
amount: S.NonNegativeNumber(10 * Math.random()),
unit: "kg"
})
)
)
.pipe(
options.mapStream,
options.schedule
? Stream.repeat(options.schedule)
: identity
)
})
)
}
export class ScaleConnections extends Effect.Service<ScaleConnections>()("ScaleConnections", {
dependencies: [ScaleConnection.Auto],
scoped: Effect.gen(function*() {
const { createScale } = yield* ScaleConnection
const scope = yield* Effect.scope
const makeScalePool = <A, E, R>(options: ScaleOptions<A, E, R>) => {
let scales = HashMap.empty<
ServerAddress,
Stream.Stream<A, E, never>
>()
// to have mutual exclusion on the scales when a client is being created
const sema = Effect.unsafeMakeSemaphore(1)
const getOrCreateScale = Effect
.fn(
function*(addr: ServerAddress) {
const existing = HashMap.get(scales, addr)
if (Option.isSome(existing)) {
return existing.value
}
const weightStream = yield* createScale(addr, options).pipe(
// we share the stream to multiple clients so that we only ever connect once per scale
Stream.share({
capacity: 100,
// we always want to have the latest value available for new clients.
replay: 1
}),
Effect.provideService(Scope.Scope, scope)
)
scales = HashMap.set(scales, addr, weightStream)
return weightStream
},
sema.withPermits(1)
)
return {
/**
* It creates a stream for each scale upon first use and shares it with all clients.
* It destroys the stream when the last client disconnects.
* When a new client joins an existing scale, it will receive the latest value immediately.
*/
getOrCreateScale
}
}
return {
makeScalePool
}
})
}) {
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment