Created
November 14, 2022 18:27
-
-
Save guidoschmidt17/52f7f781af775c9906fb1954b9d7c07e to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package cqrs | |
package eventstore | |
package client | |
import sttp.capabilities.WebSockets | |
import sttp.capabilities.zio.ZioStreams | |
import sttp.client3.* | |
import sttp.model.Uri | |
import sttp.ws.* | |
import zio.* | |
import zio.stream.* | |
import java.nio.ByteBuffer | |
import Command.* | |
import Completion.* | |
import Fact.* | |
import Payload.* | |
import Transport.* | |
abstract class AbstractEventStore extends EventStore: | |
val backend: Task[SttpBackend[Task, ZioStreams & WebSockets]] | |
val transport: Transport | |
val bufferpool: BufferPool | |
def createFacts(events: Events, tags: Tags = Tags.empty) = | |
sendReceive[Unit]( | |
CreateFacts(events, tags), | |
ZStream.empty, | |
nextLogicalId, | |
uri | |
) | |
def createProperty(stream: Stream[Throwable, Array[Byte]], logicalId: Option[Short]) = | |
sendReceive[OutOfBandElement]( | |
CreateProperty, | |
stream, | |
logicalId.get, | |
uri | |
) | |
def readFactsBySerialId(serialId: Long, tags: Tags = Tags.empty) = | |
sendReceive[Stream[ReadError, Raw]]( | |
ReadFactsBySerialId(serialId, tags), | |
ZStream.empty, | |
nextLogicalId, | |
uri | |
) | |
def readFactsByAggregateRootId(uuid: Uuid) = | |
sendReceive[Stream[ReadError, Raw]]( | |
ReadFactsByAggregateRootId(uuid), | |
ZStream.empty, | |
nextLogicalId, | |
uri | |
) | |
def readFactsByAggregateId(uuid: Uuid) = | |
sendReceive[Stream[ReadError, Raw]]( | |
ReadFactsByAggregateId(uuid), | |
ZStream.empty, | |
nextLogicalId, | |
uri | |
) | |
def readProperty(property: OutOfBandElement) = | |
sendReceive[Stream[ReadError, (Long, Array[Byte])]]( | |
ReadProperty(property), | |
ZStream.empty, | |
nextLogicalId, | |
uri | |
) | |
// a lot of internal stuff here | |
private type Incoming = Queue[Array[Byte]] | |
private type WebSocketChannel = WebSocket[Task[_]] | |
private type Consumer = Fiber.Runtime[Throwable, Nothing] | |
private type Outgoing = ZSink[Any, Throwable, Array[Byte], Array[Byte], Unit] | |
private type ConnectionData = (Uri, Outgoing, Consumer) | |
private type Established = Promise[Nothing, ConnectionData] | |
private case class Connection(private val established: Established): | |
import Connection.State | |
import State.* | |
def await: IO[Nothing, ConnectionData] = data.get.await | |
def notYetConnected = state.get == NotYetConnected | |
def isRequested = state.get == Requested | |
def isConnected = state.get == Connected | |
def isClosed = state.get == Closed | |
def getState = state.get | |
def connect(connectiondata: ConnectionData) = | |
ZIO.succeed(state.set(Connected)) *> | |
data.get.succeed(connectiondata) *> ZIO.logInfo(s"\n\nconnected $connectiondata") | |
def request = ZIO.succeed(state.set(Requested)) | |
def disconnect = | |
for | |
_ <- ZIO.succeed(state.set(Closed)) | |
(uri, _, consumer) <- await | |
_ <- consumer.interrupt | |
result = Connection.connections.remove(uri) | |
yield result | |
private final val data = Wrapped[Established](established) | |
private final val state = Wrapped[State](NotYetConnected) | |
end Connection | |
private object Connection: | |
enum State: | |
case NotYetConnected, Requested, Connected, Closed | |
def get(uri: Uri): UIO[Connection] = | |
connections.get(uri) match | |
case Some(connection) => ZIO.succeed(connection) | |
case _ => | |
for | |
established <- Promise.make[Nothing, ConnectionData] | |
connection = Connection(established) | |
_ = connections.put(uri, connection) | |
yield connection | |
private final val connections = collection.mutable.Map[Uri, Connection]() | |
private case class LogicalChannel(logicalId: Short, outgoing: Outgoing, incoming: Incoming, inputstream: Stream[Throwable, Array[Byte]]) | |
private final val uri = uri"ws://localhost:8090/.ws" | |
private final val logicalChannels = collection.mutable.Map[Short, LogicalChannel]() | |
// | |
private def sendReceive[A]( | |
command: Command, | |
outputstream: Stream[Throwable, Array[Byte]], | |
logicalId: Short, | |
uri: Uri | |
): IO[ServerSideError, A] = | |
def doConnect( | |
uri: Uri, | |
backend: SttpBackend[Task, ZioStreams & WebSockets] | |
) = | |
def pipe(websocket: WebSocketChannel): Task[Unit] = | |
for | |
connection <- Connection.get(uri) | |
open <- websocket.isOpen() | |
_ <- ZIO.logInfo(s"\n\npipe connection $connection ${connection.getState} $websocket $open") | |
_ <- | |
if connection.isConnected then ZIO.unit | |
else | |
for | |
outgoing <- makeOutgoing(websocket) | |
consumer <- makeIncoming(websocket).forever.forkDaemon | |
_ <- connection.connect((uri, outgoing, consumer)) | |
yield () | |
yield () | |
val websocket = asWebSocketAlways(pipe) | |
basicRequest | |
.get(uri) | |
.response(websocket) | |
.send(backend) | |
end doConnect | |
transparent inline def handleCompletion[A](accepted: Accepted, inputstream: Stream[Throwable, Array[Byte]]): IO[Throwable, A] = | |
def handleStream[B]: ZStream[Any, Throwable, B] = | |
val buffer = Wrapped[ByteBuffer](bufferpool.buffers.acquire(MaxFrameBufferSize)) | |
inputstream | |
.mapZIO(bytes => transport.fromBytes[B](bytes, buffer)) | |
.filter(_.isDefined) | |
.map(_.get) | |
command match | |
case CreateFacts(_, _) => ZIO.succeed(().asInstanceOf[A]) | |
case CreateProperty => accepted.as[A] | |
case ReadFactsBySerialId(_, _) => ZIO.succeed(handleStream[Raw].asInstanceOf[A]) | |
case ReadFactsByAggregateRootId(_) => ZIO.succeed(handleStream[Raw].asInstanceOf[A]) | |
case ReadFactsByAggregateId(_) => ZIO.succeed(handleStream[Raw].asInstanceOf[A]) | |
case ReadProperty(_) => ZIO.succeed(inputstream.asInstanceOf[A]) | |
transparent inline def handle[A]: IO[Throwable, A] = | |
for | |
logicalChannel <- logicalChannels.get(logicalId) match | |
case Some(logicalchannel) => ZIO.succeed(logicalchannel) | |
case _ => ZIO.fail(ClientSideError(s"logicalId not found: $logicalId")) | |
outgoing = logicalChannel.outgoing | |
commandstream = ZStream.unwrap(transport.toStream(command, true, logicalId, 0)) | |
_ <- commandstream.run(outgoing) | |
_ <- outputstream.run(outgoing) | |
completion <- transport.fromStream[Completion](logicalChannel.inputstream) | |
_ <- ZIO.debug(s"completion $logicalId $completion") | |
response <- completion match | |
case a: Accepted => handleCompletion[A](a, logicalChannel.inputstream) | |
case WithError(error) => ZIO.fail(ServerSideError(error.getMessage.nn)) | |
yield response | |
val requestResponse = for | |
_ <- ZIO.debug(s"request $logicalId $command") | |
client <- backend | |
connection <- Connection.get(uri) | |
_ <- | |
if connection.isConnected || connection.isRequested then ZIO.unit | |
else connection.request *> doConnect(uri, client) | |
(_, outgoing, _) <- connection.await | |
_ <- produce(logicalId, outgoing) | |
response <- handle[A] | |
yield response | |
requestResponse.catchAll { | |
case e: ServerSideError => ZIO.fail(e) | |
case e: Throwable => ZIO.fail(ServerSideError(errorMsg(s"request $logicalId", e))) | |
} | |
end sendReceive | |
final transparent inline private def consume(payload: Array[Byte]) = | |
for | |
(logicalId, groupId, i, n, _, bytes) <- ZIO.succeed(transport.decode(payload)) | |
logicalChannel <- logicalChannels.get(logicalId) match | |
case Some(logicalchannel) => ZIO.succeed(logicalchannel) | |
case _ => ZIO.fail(ClientSideError(s"logicalId not found: $logicalId")) | |
_ <- | |
if bytes.length > 0 || groupId != 0 then logicalChannel.incoming.offerAll(Chunk(payload)) | |
else closeLogical(logicalId) | |
yield () | |
final transparent inline private def produce(logicalId: Short, outgoing: Outgoing) = | |
for | |
incoming <- Queue.bounded[Array[Byte]](LargeSize) | |
inputstream = ZStream.fromQueue(incoming, incoming.capacity) | |
_ = logicalChannels.put(logicalId, LogicalChannel(logicalId, outgoing, incoming, inputstream)) | |
yield () | |
final transparent inline private def makeIncoming(websocket: WebSocketChannel): IO[Throwable, Unit] = | |
for | |
open <- websocket.isOpen() | |
_ <- ZIO.debug(s"makeIncoming $websocket ${open}") | |
payload <- websocket.receiveBinary(false) | |
_ <- consume(payload) | |
yield () | |
final transparent inline private def makeOutgoing(websocket: WebSocketChannel) = | |
ZIO.succeed( | |
ZSink.foreach[Any, Throwable, Array[Byte]](b => | |
websocket.isOpen().flatMap(open => ZIO.debug(s"before send ${b.length} $websocket ${open}")) | |
*> websocket.sendBinary(b) | |
*> ZIO.debug(s"after send ${b.length}") | |
) | |
) | |
final transparent inline private def closeLogical(logicalId: Short) = | |
logicalChannels.remove(logicalId).get.incoming.shutdown | |
final transparent inline private def nextLogicalId = | |
var id = transport.clientLogicalId | |
while logicalChannels.contains(id) do id = transport.clientLogicalId | |
id | |
final transparent inline def errorMsg(prefix: String, e: Throwable) = | |
s"$prefix failed with ${e.getClass.getName.nn}(${e.getMessage.nn})" | |
end AbstractEventStore |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment