-
-
Save Baccata/b63f6176ca4f90670d69115d703f67a5 to your computer and use it in GitHub Desktop.
Convert Java futures to cats-effect IO
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package teikametrics | |
import cats.effect.{Sync, Timer} | |
import scala.concurrent.duration.FiniteDuration | |
import scala.concurrent.blocking | |
import cats.syntax.all._ | |
object JavaFuture { | |
// Alias for more friendly imports | |
type JFuture[A] = java.util.concurrent.Future[A] | |
/** Convert a java future to an effectful value, using async sleep polling to get the result. | |
* | |
* Because this uses async sleep, it won't block a thread. | |
* | |
* Java future only offers us blocking and polling as the interface */ | |
def toIO[F[_], A](fa: F[JFuture[A]], pollInterval: FiniteDuration)( | |
implicit F: Sync[F], | |
timer: Timer[F] | |
): F[A] = { | |
def loop(jf: JFuture[A]): F[A] = | |
F.delay(jf.isDone).flatMap { isDone => | |
// Not calling blocking(jf.get) here because that could cause a thread hop, and isDone==true so this should be very fast to return. | |
if (isDone) F.delay(jf.get) | |
else timer.sleep(pollInterval) *> loop(jf) | |
} | |
fa.flatMap(loop) | |
} | |
/** Convert a java future to an effectful value, blocking (on an async thread) to get the result. | |
* | |
* This blocks the thread the ExecutorContext runs on, but it does inform the EC to use BlockContext | |
* | |
* Java future only offers us blocking and polling as the interface */ | |
def toIO[F[_], A](fa: F[JFuture[A]])( | |
implicit F: Sync[F], | |
timer: Timer[F] | |
): F[A] = | |
fa.flatMap { jf => | |
timer.shift *> F.delay(blocking(jf.get)) | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package teikametrics | |
import java.util.concurrent.FutureTask | |
import cats.effect.IO | |
import org.scalatest.{FlatSpec, Matchers} | |
import teikametrics.JavaFuture._ | |
import scala.concurrent.ExecutionContext | |
import scala.concurrent.ExecutionContext.global | |
import scala.concurrent.duration._ | |
class JavaFutureTest extends FlatSpec with Matchers { | |
implicit def ec: ExecutionContext = global | |
def jfio[A](jf: => JFuture[A]): IO[A] = toIO[IO, A](IO(jf), 10.milliseconds) | |
behavior of "JavaFuture.toIO" | |
it should "return the value" in { | |
val io = jfio { | |
val t = new FutureTask[Int](() => 1) | |
t.run() | |
t | |
} | |
io.unsafeRunSync() shouldEqual 1 | |
} | |
it should "not stack overflow" in { | |
val io = jfio { new FutureTask[Int](() => 1) /* not running */ } | |
io.unsafeRunTimed(1.second) shouldBe None | |
} | |
it should "propagate errors" in { | |
val io = jfio { | |
val t = new FutureTask[Int](() => throw new Exception("foo")) | |
t.run() | |
t | |
} | |
io.attempt.unsafeRunSync() shouldBe 'left | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment