Created
October 14, 2019 16:23
-
-
Save crakjie/d8389212500bb18c3995162b5d853bc5 to your computer and use it in GitHub Desktop.
cats effect transaction keeper
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import cats._ | |
import implicits._ | |
import cats.effect._ | |
import concurrent._ | |
import cats.effect.implicits._ | |
/** | |
* The transactioner help to make transaction by unsuring the transaction is open before runing task and task are ended before close | |
* @tparam F | |
*/ | |
trait Transactioner[F[_]] { | |
/** | |
* Open a transaction | |
* @return Sucess if the transaction is not opened fail if it's already opened | |
*/ | |
def open: F[Unit] | |
/** | |
* start a task | |
* @return Sucess if the transaction is open fail if it's already close or not opened | |
*/ | |
def task[A](t: F[A]): F[A] | |
/** | |
* signify the end the transaction, sementicaly wait the end of all task and close the transaction | |
* @return Sucess is the transaction open fail if it's already close or not opened | |
*/ | |
def close: F[Unit] | |
} | |
object Transactioner { | |
def create[F[_]](implicit F: Concurrent[F]): F[Transactioner[F]] = { | |
sealed trait State | |
case class Closing(cpt: Int, d: Deferred[F, Unit]) extends State | |
case object NotOpened extends State | |
case object Closed extends State | |
case class Opened(cpt: Int) extends State | |
Ref.of[F, State](NotOpened).map { state => | |
new Transactioner[F] { | |
override def open: F[Unit] = | |
state.modify { | |
case NotOpened => Opened(0) -> F.unit | |
case Closed => Closed -> F.raiseError[Unit](new IllegalStateException("Transaction is already closed")) | |
case st: Opened => st -> F.raiseError[Unit](new IllegalStateException("Transaction is open")) | |
case st: Closing => st -> F.raiseError[Unit](new IllegalStateException("Transaction is open")) | |
}.flatten | |
override def task[A](t: F[A]): F[A] = { | |
for { | |
_ <- startTask | |
a <- t | |
_ <- endTask | |
} yield a | |
} | |
private def startTask: F[Unit] = | |
state.modify { | |
case Opened(cpt) => Opened(cpt + 1) -> F.unit | |
case NotOpened => NotOpened -> F.raiseError[Unit](new IllegalStateException("Transaction is not opened")) | |
case Closing(cpt, df) => | |
Closing(cpt, df) -> F.raiseError[Unit](new IllegalStateException("Transaction is closed")) | |
case Closed => Closed -> F.raiseError[Unit](new IllegalStateException("Transaction is closed")) | |
}.flatten | |
private def endTask: F[Unit] = | |
state.modify { | |
case Opened(cpt) => Opened(cpt - 1) -> F.unit | |
case Closing(1, df) => Closing(0, df) -> df.complete(()) | |
case Closing(cpt, df) => Closing(cpt - 1, df) -> F.unit | |
case NotOpened => NotOpened -> F.raiseError[Unit](new IllegalStateException("Transaction is no opended")) | |
case Closed => Closed -> F.raiseError[Unit](new IllegalStateException("Transaction is closed")) | |
}.flatten | |
override def close: F[Unit] = Deferred[F, Unit].flatMap { newValue => | |
state.modify { | |
case Opened(0) => | |
Closed -> F.unit | |
case Opened(cpt) => | |
Closing(cpt, newValue) -> newValue.get.onCancel { | |
//On cancel if it's still closing go back to the open status. | |
state.update { | |
case Closing(cpt, _) => Opened(cpt) | |
case st => st | |
} | |
} | |
case st: Closing => | |
st -> F.raiseError[Unit](new IllegalStateException("Transaction is already closing")) | |
case st @ Closed => | |
st -> F.raiseError[Unit](new IllegalStateException("Transaction is closed")) | |
case st @ NotOpened => | |
st -> F.raiseError[Unit](new IllegalStateException("Transaction is not opened")) | |
}.flatten | |
} | |
} | |
} | |
} | |
} | |
import monix.eval.Task | |
import org.scalatest.{AsyncFlatSpec, FlatSpec, Matchers} | |
import monix.execution.Scheduler.Implicits.global | |
import scala.language.postfixOps | |
import scala.concurrent.Await | |
import scala.concurrent.duration.Duration | |
import scala.concurrent.duration._ | |
class TransactionerTest extends FlatSpec with Matchers { | |
it should "not allow close on not open transaction" in { | |
val task = for { | |
t <- Transactioner.create[Task] | |
_ <- t.close | |
} yield {} | |
assertThrows[IllegalStateException](Await.result(task.runToFuture, Duration.Inf)) | |
} | |
it should "not allow open transaction twice" in { | |
val task = for { | |
t <- Transactioner.create[Task] | |
_ <- t.open | |
_ <- t.open | |
} yield {} | |
assertThrows[IllegalStateException](Await.result(task.runToFuture, Duration.Inf)) | |
} | |
it should "not allow open on closed transaction" in { | |
val task = for { | |
t <- Transactioner.create[Task] | |
_ <- t.open | |
_ <- t.close | |
_ <- t.open | |
} yield {} | |
assertThrows[IllegalStateException](Await.result(task.runToFuture, Duration.Inf)) | |
} | |
it should "not allow close transaction twice" in { | |
val task = for { | |
t <- Transactioner.create[Task] | |
_ <- t.open | |
_ <- t.close | |
_ <- t.close | |
} yield {} | |
assertThrows[IllegalStateException](Await.result(task.runToFuture, Duration.Inf)) | |
} | |
it should "not allow new task on not open transaction" in { | |
val task = for { | |
t <- Transactioner.create[Task] | |
_ <- t.task(Task.unit) | |
} yield {} | |
assertThrows[IllegalStateException](Await.result(task.runToFuture, Duration.Inf)) | |
} | |
it should "not allow new task on closed transaction" in { | |
val task = for { | |
t <- Transactioner.create[Task] | |
_ <- t.open | |
_ <- t.close | |
_ <- t.task(Task.unit) | |
} yield {} | |
assertThrows[IllegalStateException](Await.result(task.runToFuture, Duration.Inf)) | |
} | |
it should "allow new task on while transaction is not closed" in { | |
val task = for { | |
t <- Transactioner.create[Task] | |
_ <- t.open | |
a <- t.task(Task(1)) | |
b <- t.task(Task(2)) | |
_ <- t.close | |
} yield { a + b } | |
Await.result(task.runToFuture, Duration.Inf) shouldBe 3 | |
} | |
it should "allow to call close before the end of tasks" in { | |
val task = for { | |
t <- Transactioner.create[Task] | |
_ <- t.open | |
a <- t.task(Task(1).delayResult(1 seconds)).asyncBoundary | |
b <- t.task(Task(2)).asyncBoundary | |
_ <- t.close | |
} yield { a + b } | |
Await.result(task.runToFuture, Duration.Inf) shouldBe 3 | |
} | |
it should "allow to call close before the end of tasks second way" in { | |
val task = for { | |
t <- Transactioner.create[Task] | |
_ <- t.open | |
a <- Task.parMap2( | |
// the close came 500 millis after the start task | |
t.close.delayExecution(500 millis), | |
//start task is call imediatly and end task after a second | |
t.task(Task(1).delayResult(1 seconds)).asyncBoundary | |
)((_, b) => b) | |
} yield { a } | |
Await.result(task.runToFuture, Duration.Inf) shouldBe 1 | |
} | |
it should "not allow new task on closed transaction even in concurent case" in { | |
val task = for { | |
t <- Transactioner.create[Task] | |
_ <- t.open | |
a <- Task.parMap2(t.close.delayResult(500 millis), | |
Task.shift.delayExecution(1 seconds).flatMap(_ => t.task(Task(1))))((_, b) => b) | |
} yield { a } | |
assertThrows[IllegalStateException](Await.result(task.runToFuture, Duration.Inf)) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment