Last active
October 2, 2020 21:02
-
-
Save rkrzewski/a85aaae0dc16b6b43923a08d7f62fdb0 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import cats._ | |
import cats.data._ | |
import cats.effect._ | |
import cats.syntax.all._ | |
import doobie._ | |
import doobie.syntax.stream._ | |
import fs2._ | |
object Streaming { | |
/** | |
* Requirements: | |
* . incoming incoming from an earlier processing stage is a `Stream[F[_], Either[NonEmptyChain[E], A]]` | |
* . data should be inserted to db in batches, batches should not linger in memory longer that necessary | |
* . before writing the data begins, a metadata record needs to be inserted, returning a generated key | |
* . subsequent data inserts may refer to aforementioned generated key (say FK to `file_imports.id`) | |
* . whole stream should be processed in a single transaction (assuming sufficient transaction timeout) | |
* . as long as `Right[A]` elements are coming, they should be inserted, but when `Left[NonEmptyChain[E]]` elements appear, whatever has been already inserted should be rolled back | |
* . the incoming stream should be fully drained even in presence of errors | |
* . errors should be accumulated, up to a defined number. after exceeding the limit, further errors are discarded. | |
* . at the end `Either[NonEmptyChain[E], Int]` should be emitted, where `Int` represents total number of inserted rows | |
* | |
* @tparam F[_] polymorphic effect type | |
* @tparam M metadata type | |
* @tparam A data type | |
* @tparam E error type | |
* @param insertMeta DB operation that generates metadata | |
* @param insertRow prepare an `Update` for batch insert for given metadata value | |
* @param batchSize number of data rows to be inerted in a single batch | |
* @param maxErrors maximum number of errors to be accumuluated | |
* @param xa DB transactor | |
* @return a `Pipe` that converts incoming `Either[NonEmptyChain[E], Int]` into a stram that emits single | |
* `Either[NonEmptyChain[E], Int]` value, where `Int` represents total number of inserted rows | |
*/ | |
def insert[F[_]: Effect, M, A, E]( | |
insertMeta: ConnectionIO[M], | |
insertRow: M => Update[A], | |
batchSize: Int, | |
maxErrors: Int, | |
xa: Transactor[F] | |
): Pipe[F, Either[NonEmptyChain[E], A], Either[NonEmptyChain[E], Int]] = { | |
def concatRetainN( | |
a: NonEmptyChain[E], | |
b: NonEmptyChain[E], | |
n: Int | |
): Option[NonEmptyChain[E]] = | |
NonEmptyChain.fromSeq(a.concat(b).toList.take(n)) | |
val toConnectionIO = λ[F ~> ConnectionIO](fa => | |
Async[ConnectionIO].async(cb => | |
Effect[F].runAsync(fa)(res => IO(cb(res))).unsafeRunSync() | |
) | |
) | |
in => | |
Stream | |
.eval(insertMeta) | |
.flatMap { meta => | |
val update = insertRow(meta) | |
def pull( | |
meta: M, | |
stream: Stream[ConnectionIO, Either[NonEmptyChain[E], A]], | |
errors: Option[NonEmptyChain[E]], | |
written: Int | |
): Pull[ConnectionIO, Either[NonEmptyChain[E], Int], Unit] = | |
stream.pull.uncons.flatMap { | |
case Some((chunk, tail)) => | |
(errors, chunk.toVector.parSequence) match { | |
case (None, Right(rows)) => | |
Pull.eval(update.updateMany(rows)).flatMap(w => pull(meta, tail, None, written + w)) | |
case (None, Left(errors)) => | |
Pull.eval(FC.rollback) >> pull(meta, tail, Some(errors), 0) | |
case (Some(prevErrors), Right(_)) => | |
pull(meta, tail, Some(prevErrors), 0) | |
case (Some(prevErrors), Left(newErrors)) => | |
pull(meta, tail, concatRetainN(prevErrors, newErrors, maxErrors), 0) | |
} | |
case None => | |
Pull.output1( | |
errors match { | |
case None => Right(written) | |
case Some(errors) => Left(errors) | |
} | |
) >> Pull.done | |
} | |
pull(meta, in.buffer(batchSize).translate(toConnectionIO), None, 0).stream | |
} | |
.transact(xa) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment