Last active
May 25, 2024 10:20
-
-
Save dacr/4ff8b96c05ef238b24a8fdf041221ac6 to your computer and use it in GitHub Desktop.
scala asynchronous operations / published by https://github.com/dacr/code-examples-manager #37088128-23be-4c5b-a15d-ab898ea131ce/7b8396f536b44fa8e208ad03c8ae058d39bdb05e
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// summary : scala asynchronous operations | |
// keywords : scala, asynchronous, learning, futures, @testable | |
// publish : gist | |
// authors : David Crosson | |
// license : Apache NON-AI License Version 2.0 (https://raw.githubusercontent.com/non-ai-licenses/non-ai-licenses/main/NON-AI-APACHE2) | |
// id : 37088128-23be-4c5b-a15d-ab898ea131ce | |
// created-on : 2020-09-06T20:35:57Z | |
// managed-by : https://github.com/dacr/code-examples-manager | |
// run-with : scala-cli $file | |
// --------------------- | |
//> using scala "3.4.2" | |
//> using dep "org.scalatest::scalatest:3.2.16" | |
//> using objectWrapper | |
// --------------------- | |
import org.scalatest._, flatspec._, matchers._, OptionValues._, concurrent._ | |
import scala.concurrent.{Future, Promise, Await} | |
import scala.concurrent.duration._ | |
import scala.util.{Failure, Success} | |
class ScalaLanguageAsyncBasicsTest extends AsyncFlatSpec with should.Matchers with ScalaFutures { | |
override def suiteName: String = "Scala Language Asynchronous Basics" | |
implicit override def executionContext = scala.concurrent.ExecutionContext.Implicits.global | |
implicit val defaultPatience: PatienceConfig = { | |
import org.scalatest.time._ | |
PatienceConfig(timeout = Span(100, Millis), interval = Span(5, Millis)) | |
} | |
// ========================================================================== | |
"Promise" can "be used to provide a future result " in { | |
val myGiftPromise = Promise[String]() | |
Future(myGiftPromise.success("gift")) | |
myGiftPromise.future.map { gift => | |
gift shouldBe "gift" | |
} | |
} | |
it can "fail" in { | |
val myGiftPromise = Promise[String]() | |
Future(myGiftPromise.failure(new Exception("you don't deserve it"))) | |
info("myGiftPromise.future.failed :") | |
info("The returned Future will be successfully completed with the Throwable of the original Future if the original Future fails.") | |
info("If the original Future is successful, the returned Future is failed with a NoSuchElementException.") | |
myGiftPromise.future.failed.map { ex => | |
ex.getMessage should include regex "deserve" | |
} | |
} | |
// ========================================================================== | |
"Future" can "be created" in { | |
Future(1) | |
.map(_ shouldBe 1) | |
} | |
// ------------------------------------------------- | |
it can "complete" in { | |
val f = Future(1) | |
whenReady(f) { _ => // scalatest in async mode requires a future[Assertion] to be returned | |
f.isCompleted shouldBe true | |
} | |
} | |
// ------------------------------------------------- | |
it can "complete in success" in { | |
val f = Future(1) | |
whenReady(f) { _ => // scalatest in async mode requires a future[Assertion] to be returned | |
f.value.filter(_.isSuccess) shouldBe defined | |
} | |
} | |
// ------------------------------------------------- | |
it can "complete in failure" in { | |
val f = Future(throw new Exception("boom")) | |
f.failed.map { ex => // scalatest in async mode requires a future[Assertion] to be returned | |
ex.getMessage shouldBe "boom" | |
} | |
} | |
// ------------------------------------------------- | |
it can "wait for completion" in { | |
val f = Future("hello") | |
Await.ready(f, 10.millis) | |
info("Await.ready : OF COURSE DON'T USE IT !") | |
Future(()).map(_ => f.isCompleted shouldBe true) // Because scalatest want a future return value... (of course) | |
} | |
// ------------------------------------------------- | |
it can "wait for result" in { | |
val f = Future("hello") | |
val r = Await.result(f, 10.millis) | |
info("Await.result : OF COURSE DON'T USE IT !") | |
Future(()).map(_ => r shouldBe "hello") // Because scalatest want a future return value... (of course) | |
} | |
// ------------------------------------------------- | |
it can "be badly composed" in { | |
def f1() = Future(1) | |
def f2() = Future(2) | |
val futureResult = for { | |
r1 <- f1() | |
r2 <- f2() // will start only once f1 has finished | |
} yield r1 + r2 | |
futureResult.map(_ shouldBe 3) | |
} | |
// ------------------------------------------------- | |
it can "be better composed" in { | |
val f1 = Future(1) | |
val f2 = Future(2) | |
val futureResult = for { | |
r1 <- f1 | |
r2 <- f2 | |
} yield r1 + r2 | |
futureResult.map(_ shouldBe 3) | |
} | |
// ------------------------------------------------- | |
it can "allow transparent operation" in { | |
Future("A") | |
.andThen { case Success(value) => value + "B" } | |
.map(_ shouldBe "A") | |
} | |
// ------------------------------------------------- | |
it should "recover from failure" in { | |
Future(throw new Exception("BAD")) | |
.recover { _ => "A" } | |
.map(_ shouldBe "A") | |
} | |
// ========================================================================== | |
"Futures" can "be possible to react when all futures are completed successfully" in { | |
val futures: Seq[Future[Int]] = Seq(Future(1), Future(2), Future(3)) | |
info("Take care here as all futures are 'started'") | |
val future: Future[Seq[Int]] = Future.sequence(futures) | |
future.map(_.sum shouldBe 6) | |
} | |
// ------------------------------------------------- | |
it can "be possible to get the first completed but they are all started" in { | |
val futures: Seq[Future[Int]] = Seq(Future(1), Future(2), Future(3)) | |
info("Take care here as all futures are 'started'") | |
val future: Future[Int] = Future.firstCompletedOf(futures) | |
future.map(_ should be > 0) | |
} | |
// ------------------------------------------------- | |
it can "be possible to get the first completed event with lazy list (just a check)" in { | |
def slowish(delayMs: Int, value: Int): Int = { | |
Thread.sleep(delayMs) // Of course never use sleep :) | |
value | |
} | |
val futures: LazyList[Future[Int]] = { | |
Future(slowish(10, 1)) #:: | |
Future(slowish(5, 2)) #:: | |
Future(slowish(1, 3)) #:: | |
LazyList.empty | |
} | |
val future: Future[Int] = Future.firstCompletedOf(futures) | |
future.map(_ shouldBe 3) | |
} | |
// ------------------------------------------------- | |
it should "execute sequentially what ever their result states" in { | |
val p0 = Promise[String]() | |
def f1 = Future(throw new Exception("err1")) | |
def f2 = Future(p0.success("success")) | |
info("f1 andThen f2 => always execute f2 but always return f1") | |
f1.andThen(_ => f2) | |
p0.future.map(result => result shouldBe "success") | |
} | |
// ------------------------------------------------- | |
it should "execute sequentially but stops on first on error" in { | |
val p0 = Promise[String]() | |
def f1 = Future(throw new Exception("err1")) | |
def f2 = Future(p0.success("good")) | |
f1.map(_ => f2) | |
Future { | |
Thread.sleep(100) | |
p0.future.isCompleted shouldBe false | |
} | |
} | |
// ------------------------------------------------- | |
val checkLimit = 1_000_000 | |
val checkGroupSize = 1_000 | |
val checkExpectedResult = LazyList.from(0).take(checkLimit).map(v => BigInt(v)).sum | |
it should "be possible to execute a lot of futures sequentially#1" in { | |
def someGroupedCompute(values: Iterable[Int]) = Future(BigInt(values.sum)) | |
note("it works without any memory issues, was trying to reproduce an issue seen elsewhere...") | |
val futureResult = | |
LazyList | |
.from(0) | |
.take(checkLimit) | |
.grouped(checkGroupSize) | |
.foldLeft(someGroupedCompute(Nil)) { (prevFuture, nextGroup) => | |
prevFuture.flatMap( sumA => someGroupedCompute(nextGroup).map(sumB => sumA+sumB)) | |
} | |
futureResult.map(result => result shouldBe checkExpectedResult) | |
} | |
it should "be possible to execute a lot of futures sequentially#2" in { | |
def someGroupedCompute(values: Iterable[Int]) = Future(BigInt(values.sum)) | |
note("it works without any memory issues, was trying to reproduce an issue seen elsewhere...") | |
def generator(from:Int) = new Iterator[Int]() { | |
var current = from | |
override def hasNext = true | |
override def next() = { | |
val prev = current | |
current+=1 | |
prev | |
} | |
} | |
val futureResult = | |
generator(0) | |
.to(LazyList) | |
.take(checkLimit) | |
.grouped(checkGroupSize) | |
.foldLeft(someGroupedCompute(Nil)) { (prevFuture, nextGroup) => | |
prevFuture.flatMap( sumA => someGroupedCompute(nextGroup).map(sumB => sumA+sumB)) | |
} | |
futureResult.map(result => result shouldBe checkExpectedResult) | |
} | |
it should "be possible to execute a lot of futures sequentially#3" in { | |
def someGroupedCompute(values: Iterable[Int]) = Future(BigInt(values.sum)) | |
note("THIS has been a good workaround to a seen issue with flatmap (linked to java future conversion ?)") | |
val futureResult = | |
LazyList | |
.from(0) | |
.take(checkLimit) | |
.grouped(checkGroupSize) | |
.map(someGroupedCompute) | |
.reduceLeft{ (futureA,futureB) => | |
for { | |
resultA <- futureA | |
resultB <- futureB | |
} yield resultA + resultB | |
} | |
futureResult.map(result => result shouldBe checkExpectedResult) | |
} | |
// ========================================================================== | |
"Executors" can "be customized" in { | |
implicit val customexecutor = { | |
import scala.concurrent.ExecutionContext | |
//ExecutionContext.Implicits.global // the default one | |
//ExecutionContext.fromExecutor(new java.util.concurrent.ForkJoinPool(4)) | |
ExecutionContext.fromExecutor(java.util.concurrent.Executors.newFixedThreadPool(4)) | |
//ExecutionContext.fromExecutor(java.util.concurrent.Executors.newWorkStealingPool(4)) | |
//ExecutionContext.fromExecutor(java.util.concurrent.Executors.newSingleThreadExecutor()) | |
//ExecutionContext.fromExecutor(java.util.concurrent.Executors.newScheduledThreadPool(4)) | |
//ExecutionContext.fromExecutor(java.util.concurrent.Executors.newCachedThreadPool()) | |
} | |
val fut = Future(1)::Future(2)::Future(3)::Nil | |
Future.sequence(fut) map {f => | |
f should have size 3 | |
f.sum shouldBe 6 | |
} | |
} | |
// ========================================================================== | |
} | |
org.scalatest.tools.Runner.main(Array("-oDF", "-s", classOf[ScalaLanguageAsyncBasicsTest].getName)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment