Skip to content

Instantly share code, notes, and snippets.

@guidoschmidt17
Created November 14, 2022 18:27
Show Gist options
  • Save guidoschmidt17/52f7f781af775c9906fb1954b9d7c07e to your computer and use it in GitHub Desktop.
Save guidoschmidt17/52f7f781af775c9906fb1954b9d7c07e to your computer and use it in GitHub Desktop.
package cqrs
package eventstore
package client
import sttp.capabilities.WebSockets
import sttp.capabilities.zio.ZioStreams
import sttp.client3.*
import sttp.model.Uri
import sttp.ws.*
import zio.*
import zio.stream.*
import java.nio.ByteBuffer
import Command.*
import Completion.*
import Fact.*
import Payload.*
import Transport.*
abstract class AbstractEventStore extends EventStore:
val backend: Task[SttpBackend[Task, ZioStreams & WebSockets]]
val transport: Transport
val bufferpool: BufferPool
def createFacts(events: Events, tags: Tags = Tags.empty) =
sendReceive[Unit](
CreateFacts(events, tags),
ZStream.empty,
nextLogicalId,
uri
)
def createProperty(stream: Stream[Throwable, Array[Byte]], logicalId: Option[Short]) =
sendReceive[OutOfBandElement](
CreateProperty,
stream,
logicalId.get,
uri
)
def readFactsBySerialId(serialId: Long, tags: Tags = Tags.empty) =
sendReceive[Stream[ReadError, Raw]](
ReadFactsBySerialId(serialId, tags),
ZStream.empty,
nextLogicalId,
uri
)
def readFactsByAggregateRootId(uuid: Uuid) =
sendReceive[Stream[ReadError, Raw]](
ReadFactsByAggregateRootId(uuid),
ZStream.empty,
nextLogicalId,
uri
)
def readFactsByAggregateId(uuid: Uuid) =
sendReceive[Stream[ReadError, Raw]](
ReadFactsByAggregateId(uuid),
ZStream.empty,
nextLogicalId,
uri
)
def readProperty(property: OutOfBandElement) =
sendReceive[Stream[ReadError, (Long, Array[Byte])]](
ReadProperty(property),
ZStream.empty,
nextLogicalId,
uri
)
// a lot of internal stuff here
private type Incoming = Queue[Array[Byte]]
private type WebSocketChannel = WebSocket[Task[_]]
private type Consumer = Fiber.Runtime[Throwable, Nothing]
private type Outgoing = ZSink[Any, Throwable, Array[Byte], Array[Byte], Unit]
private type ConnectionData = (Uri, Outgoing, Consumer)
private type Established = Promise[Nothing, ConnectionData]
private case class Connection(private val established: Established):
import Connection.State
import State.*
def await: IO[Nothing, ConnectionData] = data.get.await
def notYetConnected = state.get == NotYetConnected
def isRequested = state.get == Requested
def isConnected = state.get == Connected
def isClosed = state.get == Closed
def getState = state.get
def connect(connectiondata: ConnectionData) =
ZIO.succeed(state.set(Connected)) *>
data.get.succeed(connectiondata) *> ZIO.logInfo(s"\n\nconnected $connectiondata")
def request = ZIO.succeed(state.set(Requested))
def disconnect =
for
_ <- ZIO.succeed(state.set(Closed))
(uri, _, consumer) <- await
_ <- consumer.interrupt
result = Connection.connections.remove(uri)
yield result
private final val data = Wrapped[Established](established)
private final val state = Wrapped[State](NotYetConnected)
end Connection
private object Connection:
enum State:
case NotYetConnected, Requested, Connected, Closed
def get(uri: Uri): UIO[Connection] =
connections.get(uri) match
case Some(connection) => ZIO.succeed(connection)
case _ =>
for
established <- Promise.make[Nothing, ConnectionData]
connection = Connection(established)
_ = connections.put(uri, connection)
yield connection
private final val connections = collection.mutable.Map[Uri, Connection]()
private case class LogicalChannel(logicalId: Short, outgoing: Outgoing, incoming: Incoming, inputstream: Stream[Throwable, Array[Byte]])
private final val uri = uri"ws://localhost:8090/.ws"
private final val logicalChannels = collection.mutable.Map[Short, LogicalChannel]()
//
private def sendReceive[A](
command: Command,
outputstream: Stream[Throwable, Array[Byte]],
logicalId: Short,
uri: Uri
): IO[ServerSideError, A] =
def doConnect(
uri: Uri,
backend: SttpBackend[Task, ZioStreams & WebSockets]
) =
def pipe(websocket: WebSocketChannel): Task[Unit] =
for
connection <- Connection.get(uri)
open <- websocket.isOpen()
_ <- ZIO.logInfo(s"\n\npipe connection $connection ${connection.getState} $websocket $open")
_ <-
if connection.isConnected then ZIO.unit
else
for
outgoing <- makeOutgoing(websocket)
consumer <- makeIncoming(websocket).forever.forkDaemon
_ <- connection.connect((uri, outgoing, consumer))
yield ()
yield ()
val websocket = asWebSocketAlways(pipe)
basicRequest
.get(uri)
.response(websocket)
.send(backend)
end doConnect
transparent inline def handleCompletion[A](accepted: Accepted, inputstream: Stream[Throwable, Array[Byte]]): IO[Throwable, A] =
def handleStream[B]: ZStream[Any, Throwable, B] =
val buffer = Wrapped[ByteBuffer](bufferpool.buffers.acquire(MaxFrameBufferSize))
inputstream
.mapZIO(bytes => transport.fromBytes[B](bytes, buffer))
.filter(_.isDefined)
.map(_.get)
command match
case CreateFacts(_, _) => ZIO.succeed(().asInstanceOf[A])
case CreateProperty => accepted.as[A]
case ReadFactsBySerialId(_, _) => ZIO.succeed(handleStream[Raw].asInstanceOf[A])
case ReadFactsByAggregateRootId(_) => ZIO.succeed(handleStream[Raw].asInstanceOf[A])
case ReadFactsByAggregateId(_) => ZIO.succeed(handleStream[Raw].asInstanceOf[A])
case ReadProperty(_) => ZIO.succeed(inputstream.asInstanceOf[A])
transparent inline def handle[A]: IO[Throwable, A] =
for
logicalChannel <- logicalChannels.get(logicalId) match
case Some(logicalchannel) => ZIO.succeed(logicalchannel)
case _ => ZIO.fail(ClientSideError(s"logicalId not found: $logicalId"))
outgoing = logicalChannel.outgoing
commandstream = ZStream.unwrap(transport.toStream(command, true, logicalId, 0))
_ <- commandstream.run(outgoing)
_ <- outputstream.run(outgoing)
completion <- transport.fromStream[Completion](logicalChannel.inputstream)
_ <- ZIO.debug(s"completion $logicalId $completion")
response <- completion match
case a: Accepted => handleCompletion[A](a, logicalChannel.inputstream)
case WithError(error) => ZIO.fail(ServerSideError(error.getMessage.nn))
yield response
val requestResponse = for
_ <- ZIO.debug(s"request $logicalId $command")
client <- backend
connection <- Connection.get(uri)
_ <-
if connection.isConnected || connection.isRequested then ZIO.unit
else connection.request *> doConnect(uri, client)
(_, outgoing, _) <- connection.await
_ <- produce(logicalId, outgoing)
response <- handle[A]
yield response
requestResponse.catchAll {
case e: ServerSideError => ZIO.fail(e)
case e: Throwable => ZIO.fail(ServerSideError(errorMsg(s"request $logicalId", e)))
}
end sendReceive
final transparent inline private def consume(payload: Array[Byte]) =
for
(logicalId, groupId, i, n, _, bytes) <- ZIO.succeed(transport.decode(payload))
logicalChannel <- logicalChannels.get(logicalId) match
case Some(logicalchannel) => ZIO.succeed(logicalchannel)
case _ => ZIO.fail(ClientSideError(s"logicalId not found: $logicalId"))
_ <-
if bytes.length > 0 || groupId != 0 then logicalChannel.incoming.offerAll(Chunk(payload))
else closeLogical(logicalId)
yield ()
final transparent inline private def produce(logicalId: Short, outgoing: Outgoing) =
for
incoming <- Queue.bounded[Array[Byte]](LargeSize)
inputstream = ZStream.fromQueue(incoming, incoming.capacity)
_ = logicalChannels.put(logicalId, LogicalChannel(logicalId, outgoing, incoming, inputstream))
yield ()
final transparent inline private def makeIncoming(websocket: WebSocketChannel): IO[Throwable, Unit] =
for
open <- websocket.isOpen()
_ <- ZIO.debug(s"makeIncoming $websocket ${open}")
payload <- websocket.receiveBinary(false)
_ <- consume(payload)
yield ()
final transparent inline private def makeOutgoing(websocket: WebSocketChannel) =
ZIO.succeed(
ZSink.foreach[Any, Throwable, Array[Byte]](b =>
websocket.isOpen().flatMap(open => ZIO.debug(s"before send ${b.length} $websocket ${open}"))
*> websocket.sendBinary(b)
*> ZIO.debug(s"after send ${b.length}")
)
)
final transparent inline private def closeLogical(logicalId: Short) =
logicalChannels.remove(logicalId).get.incoming.shutdown
final transparent inline private def nextLogicalId =
var id = transport.clientLogicalId
while logicalChannels.contains(id) do id = transport.clientLogicalId
id
final transparent inline def errorMsg(prefix: String, e: Throwable) =
s"$prefix failed with ${e.getClass.getName.nn}(${e.getMessage.nn})"
end AbstractEventStore
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment