Last active
December 13, 2022 22:39
-
-
Save guidoschmidt17/4fc855677e3ee0c5d9eba4fcd982c56f to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package cqrs | |
package eventstore | |
package server | |
import cats.effect.std.Console | |
import fs2.io.net.SocketOption.* | |
import natchez.Trace.Implicits.noop | |
import skunk.Session | |
import zio.* | |
import zio.interop.catz.* | |
import EventStoreImpl.* | |
private object SessionPool: | |
val make = | |
inline given Console[([A] =>> Task[A])] = Console.make | |
Session | |
.pooled( | |
host = "localhost", | |
port = 5432, | |
user = "jimmy", | |
database = "db", | |
password = Some("banana"), | |
socketOptions = List(noDelay(true), receiveBufferSize(DefaultBufferSize), sendBufferSize(DefaultBufferSize)), | |
max = 64, | |
debug = false | |
) | |
.toManagedZIO | |
.map(_.toManagedZIO) | |
.scoped | |
val live = ZLayer.scoped(make) | |
inline final val DefaultBufferSize = 2 * 1024 * 1024 | |
inline final private def readResultWithOffset[A, B]( | |
check: Option[Query[A, Long]], | |
query: Query[A ~ Long ~ Long, B], | |
args: A, | |
message: String, | |
chunksize: Int, | |
limit: Long = Long.MaxValue | |
) = | |
val checkResult = | |
sessionPool.scoped | |
.flatMap(session => { | |
session | |
.prepare(check.get) | |
.toManagedZIO | |
.scoped | |
.flatMap(_.unique(args).retry(DebugRetrySchedule(s"countResultWithOffset $message $chunksize $limit"))) | |
}) | |
.flatMap(n => if n > 0 then ZIO.succeed(n) else ZIO.fail(NoResult(message))) | |
def loopResult(out: Queue[Take[Nothing, B]], offset: Ref[Long], n: Long) = | |
def resultStream(offset: Long, limit: Long) = | |
sessionPool.scoped | |
.flatMap(session => { | |
session | |
.prepare(query) | |
.toManagedZIO | |
.scoped | |
.flatMap { query => | |
ZIO.succeed( | |
query | |
.stream(args ~ offset ~ limit, chunksize) | |
.toZStream(chunksize) | |
.tapErrorCause(ZIO.debug(_)) | |
) | |
} | |
}) | |
for | |
rowsread <- offset.get | |
result <- resultStream(rowsread, n - rowsread) | |
_ <- result.runForeach(b => offset.update(_ + 1) *> out.offer(Take.single(b))) | |
_ <- out.offer(Take.end) | |
yield () | |
val result = for | |
maxcount <- if check.isDefined then checkResult.tapErrorCause(ZIO.debug(_)) else ZIO.succeed(Long.MaxValue) | |
n = math.min(maxcount, if limit < 1 then Long.MaxValue else limit) | |
offset: Ref[Long] <- Ref.make(0L) | |
out: Queue[Take[Nothing, B]] <- Queue.bounded(SmallSize) | |
outstream = ZStream | |
.fromQueue(out, out.capacity) | |
.flattenTake | |
_ <- loopResult(out, offset, n) | |
.retry(DebugRetrySchedule(s"readResultWithOffset $message $chunksize $offset $limit")) | |
.forkDaemon | |
yield outstream | |
result.catchAll(catchAllErrors(message)) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment