import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration.Duration
import scala.concurrent.{Await, Future}
import scala.util.{Failure, Random, Success}
object CompositeFutureChallenge extends App {
val random = new Random()
val f1 = Future {
random.nextInt(10)
}
val f2 = Future {
random.nextInt(10)
}
val f3 = Future {
random.nextInt(10)
}
val f4 = Future {
random.nextInt(10)
}
val compositeFuture: Future[Int] = for {
i1 <- f1
i2 <- f2
i3 <- f3
i4 <- f4
} yield i1 * i2 * i3 * i4
compositeFuture onComplete {
case Success(value) => println(value)
case Failure(throwable) => throwable.printStackTrace()
}
Await.result(compositeFuture, Duration.Inf)
}
以上のように、 for 文を利用して合成した Future を作成することができます。
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration.Duration
import scala.concurrent.{Await, Future, Promise}
import scala.util.{Failure, Success}
object PromiseStdIn extends App {
def applyFromStdIn(lineInputProcessor: Int => Unit): Unit = {
lineInputProcessor(io.StdIn.readLine().toInt)
}
val promise: Promise[Int] = Promise[Int]
applyFromStdIn((i) => promise.success(i * 7))
val future: Future[Int] = promise.future
future onComplete {
case Success(value) => println(value)
case Failure(throwable) => throwable.printStackTrace()
}
Await.result(future, Duration.Inf)
}
以上のようにコールバック関数を求める処理を Future に変換することができます。 また変換過程の中で、受け取った整数を 7 倍しています。
import java.util.concurrent.atomic.AtomicInteger
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.{Promise, Future}
import scala.util.Random
object CountDownLatchSample extends App {
val indexHolder = new AtomicInteger(0)
val random = new Random()
val promises: Seq[Promise[Int]] = for {i <- 1 to 3} yield Promise[Int]
val futures: Seq[Future[Int]] = for {i <- 1 to 8} yield Future[Int] {
val waitMilliSec = random.nextInt(1000)
Thread.sleep(waitMilliSec)
waitMilliSec
}
futures.foreach { f => f.foreach {case waitMilliSec =>
val index = indexHolder.getAndIncrement
if(index < promises.length) {
promises(index).success(waitMilliSec)
}
}}
promises.foreach { p => p.future.foreach { case waitMilliSec => println(waitMilliSec)}}
Thread.sleep(5000)
}
上記のコードを簡単に説明すると、指定された処理を行う Future の配列を用意し、 それらがそれぞれ成功した時に AtomicInteger で確保されている index をアトミックにインクリメントさせながら、 Promise の配列のそれぞれに成功結果を定義しています。
そして、最後に Promise の配列から作り出した全ての Future に対して、 コンソールに出力をさせる処理を定義します。
基本的な Future と Promise を使った処理で表現されていますが、 ひとつ気をつけなくてはいけないのは AtomicInteger の部分です。
これは Future に渡した関数の中では、同じスレッドが利用されているとは限らないために必要となる部分です。 別なスレッドから変更される値に関しては、値を原子的に更新するようにコードを書かなければなりません。 プリミティブな値に関して原子的な操作を提供するのが AtomicInteger という Java のクラスとなります。