I have a program which looks roughly like this
- Waits for some files
 - When they arrive, parses them to extract data
 - Reads database state
 - Performs calculation
 - Saves new state to database
 - Generates emails on the basis of computed data
 - Repeat
 
It's critical that steps 3 & 5 runs within a single database transaction on each iteration, so it's currently a program written in Kleisli[IO, Connection, ?] thus:
// External Input
val waitForFiles:                IO[my.RawInput]       = ???
val parseFiles:   my.RawInput => IO[my.ProcessedInput] = ???
// Database interaction
val readPersistedState:               Kleisli[IO, Connection, my.PersistedState] = ???
val writePersistedState: my.Output => Kleisli[IO, Connection, Unit]              = ???
//Computation
val compute: (my.ProcessedInput, my.PersistedState) => my.Output = ???
//single run
for {
  f <- waitForFiles.liftKleisli[Connection]   //step 1
  g <- parseFiles(f).liftKleisli[Connection]  //setp 2
  h <- readPersistedState                     //step 3
  i =  compute(g, h)                          //setp 4
  _ <- writePersistedState(i)                 //step 5
  _ <- sendEmail(i).liftKleisli[Connection]   //step 6
}
yield ()
We generate one big singleRun: Kleisli[IO, Connection, Unit] and then run it using a library method of ours:
def runTransaction[M[_] : Monad : Catchable, A](ds: DataSource, f: Connection => M[A], isolation: Option[Isolation] = None): M[A]
That is:
val db: IO[DataSource] = ???
val singleRun: Kleisli[IO, Connection, Unit] = /* as above */
IO.ioMonad.forever { //step 7 (spin here)
  db
    .flatMap { ds => 
      runTransaction[IO, Unit](ds, singleRun.run, Some(Serialized))
    }
}
I don't wish a failed email to bail out my whole system and neither do I want the next calculation to wait on the email succeeding. I want to do sth like this:
for {
  q <- Queue.unbounded[my.Output]
 
  x <- waitForFiles
         .flatMap { f =>                          //step 1
           parseFiles(f)                          //step 2
         }
         .flatMap { g => 
           readPersistentState                    //step 3
             .map(compute(g, _))                  //step 4
             .flatMap { i =>      
               writePersistentState(i)            //step 5 
                 .flatMap(_ => q.offer(i)) 
             }
         }
         .forever                                 //step 7 (spin here) 
         .fork               
 
  y <- q
         .take
         .flatMap(attemptSendEmailUntilSucceeded) //step 6 (asynchronously)
         .forever
         .fork
 
  _ <- x.join
  _ <- y.join
}
yield ExitStatus.ExitNow(0)
However, I'm not clear on the best way to turn steps 3 to 5, which must happen in a database transaction ...
readPersistentState
  .map(compute(g, _))
  .flatMap { i => 
    writePersistentState(i).flatMap(_ => q.offer(i)) 
  }
... into a zio.IO whilst creating, threading, committing (or aborting) that transaction. You can assume I can get my hands on an IO[DataSource] in there.
- 
I assume I'm going to have to write a ZIO version of
runTransactionMonad - 
Given that ZIO doesn't ship with typeclass instances for
zio.IOI presume I shouldn't useKleisli[zio.IO[E, ?], Connection, A]. How then to composereadPersistentStateandwritePersistentStatetogether? That is, givenval a: Connection => zio.IO[Throwable, H] = ??? val f: H => I = ??? val b: I => Connection => zio.IO[Throwable, Unit] = ??? 
Is there no simpler way than:
val c: Connection => zio.IO[Throwable, I] = 
  conn => {
    a(conn).flatMap { h =>
      val i = f(h)
      b(i)(conn).map(_ => i)
    }
  }
Let's say that I have these:
val readPersistentState:               Connection => zio.IO[Throwable, my.PersistedState] = ???
val writePersistentState: my.Output => Connection => zio.IO[Throwable, Unit]              = ???
def runTransactionMonad[A](
  ds: DataSource, 
  f:  Connection => zio.IO[Throwable, A], 
  isolation: Option[Isolation]
): zio.IO[Throwable, A] = ???
Then my program is:
 for {
  ds <- IO.syncThrowable(unsafeDataSourceLookup)
 
  q <- Queue.unbounded[my.Output]
 
  x <- waitForFiles
         .flatMap { f =>                                   //step 1
           parseFiles(f)                                   //step 2
         }
         .flatMap { g => 
           runTransactionMonad(
             ds,
             conn => 
               readPersistentState(conn).flatMap { h =>     //step 3
                 val i = compute(g, h)                      //step 4
                 writePersistentState(i)(conn).map(_ => i)  //step 5
               }
           
           )
         }
         .flatMap { i =>
           q.offer(i)
         }  
         .forever                                 //step 7 (spin here) 
         .fork               
 
  y <- q
         .take
         .flatMap(attemptSendEmailUntilSucceeded) //step 6 (asynchronously)
         .forever
         .fork
 
  _ <- x.join
  _ <- y.join
}
yield ExitStatus.ExitNow(0)
Hi Chris, here's a (simplified, untested) snippet that demonstrates how it would look with FS2:
I'm not sure where the cats-effect instances stand for ZIO, but if they exist, you could just use FS2 with ZIO as the effect monad.