I have a program which looks roughly like this
- Waits for some files
- When they arrive, parses them to extract data
- Reads database state
- Performs calculation
- Saves new state to database
- Generates emails on the basis of computed data
- Repeat
It's critical that steps 3 & 5 runs within a single database transaction on each iteration, so it's currently a program written in Kleisli[IO, Connection, ?]
thus:
// External Input
val waitForFiles: IO[my.RawInput] = ???
val parseFiles: my.RawInput => IO[my.ProcessedInput] = ???
// Database interaction
val readPersistedState: Kleisli[IO, Connection, my.PersistedState] = ???
val writePersistedState: my.Output => Kleisli[IO, Connection, Unit] = ???
//Computation
val compute: (my.ProcessedInput, my.PersistedState) => my.Output = ???
//single run
for {
f <- waitForFiles.liftKleisli[Connection] //step 1
g <- parseFiles(f).liftKleisli[Connection] //setp 2
h <- readPersistedState //step 3
i = compute(g, h) //setp 4
_ <- writePersistedState(i) //step 5
_ <- sendEmail(i).liftKleisli[Connection] //step 6
}
yield ()
We generate one big singleRun: Kleisli[IO, Connection, Unit]
and then run it using a library method of ours:
def runTransaction[M[_] : Monad : Catchable, A](ds: DataSource, f: Connection => M[A], isolation: Option[Isolation] = None): M[A]
That is:
val db: IO[DataSource] = ???
val singleRun: Kleisli[IO, Connection, Unit] = /* as above */
IO.ioMonad.forever { //step 7 (spin here)
db
.flatMap { ds =>
runTransaction[IO, Unit](ds, singleRun.run, Some(Serialized))
}
}
I don't wish a failed email to bail out my whole system and neither do I want the next calculation to wait on the email succeeding. I want to do sth like this:
for {
q <- Queue.unbounded[my.Output]
x <- waitForFiles
.flatMap { f => //step 1
parseFiles(f) //step 2
}
.flatMap { g =>
readPersistentState //step 3
.map(compute(g, _)) //step 4
.flatMap { i =>
writePersistentState(i) //step 5
.flatMap(_ => q.offer(i))
}
}
.forever //step 7 (spin here)
.fork
y <- q
.take
.flatMap(attemptSendEmailUntilSucceeded) //step 6 (asynchronously)
.forever
.fork
_ <- x.join
_ <- y.join
}
yield ExitStatus.ExitNow(0)
However, I'm not clear on the best way to turn steps 3 to 5, which must happen in a database transaction ...
readPersistentState
.map(compute(g, _))
.flatMap { i =>
writePersistentState(i).flatMap(_ => q.offer(i))
}
... into a zio.IO
whilst creating, threading, committing (or aborting) that transaction. You can assume I can get my hands on an IO[DataSource]
in there.
-
I assume I'm going to have to write a ZIO version of
runTransactionMonad
-
Given that ZIO doesn't ship with typeclass instances for
zio.IO
I presume I shouldn't useKleisli[zio.IO[E, ?], Connection, A]
. How then to composereadPersistentState
andwritePersistentState
together? That is, givenval a: Connection => zio.IO[Throwable, H] = ??? val f: H => I = ??? val b: I => Connection => zio.IO[Throwable, Unit] = ???
Is there no simpler way than:
val c: Connection => zio.IO[Throwable, I] =
conn => {
a(conn).flatMap { h =>
val i = f(h)
b(i)(conn).map(_ => i)
}
}
Let's say that I have these:
val readPersistentState: Connection => zio.IO[Throwable, my.PersistedState] = ???
val writePersistentState: my.Output => Connection => zio.IO[Throwable, Unit] = ???
def runTransactionMonad[A](
ds: DataSource,
f: Connection => zio.IO[Throwable, A],
isolation: Option[Isolation]
): zio.IO[Throwable, A] = ???
Then my program is:
for {
ds <- IO.syncThrowable(unsafeDataSourceLookup)
q <- Queue.unbounded[my.Output]
x <- waitForFiles
.flatMap { f => //step 1
parseFiles(f) //step 2
}
.flatMap { g =>
runTransactionMonad(
ds,
conn =>
readPersistentState(conn).flatMap { h => //step 3
val i = compute(g, h) //step 4
writePersistentState(i)(conn).map(_ => i) //step 5
}
)
}
.flatMap { i =>
q.offer(i)
}
.forever //step 7 (spin here)
.fork
y <- q
.take
.flatMap(attemptSendEmailUntilSucceeded) //step 6 (asynchronously)
.forever
.fork
_ <- x.join
_ <- y.join
}
yield ExitStatus.ExitNow(0)
Thanks, but I think you are answering a question that I am not asking. I want to understand how I can get Kleisli composition interacting with the effect