-
-
Save timperrett/1193049 to your computer and use it in GitHub Desktop.
Scatter-Gather with Akka Dataflow
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class Aggregator(recipients: Iterable[ActorRef]) extends Actor{ | |
def receive = { | |
case msg @ Message(text) => | |
println("Started processing message `%s`" format(text)) | |
val result = Promise[String]() | |
val promises = List.fill(recipients.size)(Promise[String]()) | |
recipients.zip(promises).map{case (recipient, promise) => | |
(recipient !!! msg).map{result: String => | |
println("Binding recipient's response: %s" format(result)) | |
flow{ | |
promise << result | |
} | |
} | |
} | |
flow{ | |
def gather(promises: List[CompletableFuture[String]], result: String = ""): String @cps[Future[Any]] = | |
promises match { | |
case head :: tail => gather(tail, head() + result) | |
case Nil => result | |
} | |
println("Binding result...") | |
result << gather(promises) | |
} | |
self.reply(result) | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class Aggregator(recipients: Iterable[ActorRef]) extends Actor |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
case msg @ Message(text) => | |
val promises = List.fill(recipients.size)(Promise[String]()) | |
recipients.zip(promises).map{case (recipient, promise) => | |
(recipient !!! msg).map{result: String => | |
println("Binding recipient's response: %s" format(result)) | |
flow{ | |
promise << result | |
} | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
flow{ | |
def gather(promises: List[CompletableFuture[String]], result: String = ""): String @cps[Future[Any]] = | |
promises match { | |
case head :: tail => gather(tail, head() + result) | |
case Nil => result | |
} | |
println("Binding result...") | |
result << gather(promises) | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class Recipient(id: Int) extends Actor { | |
def receive = { | |
case Message(msg) => | |
Thread.sleep(1000) | |
self.reply("%s, [%s]! ".format(msg, id)) | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
val recipients = (1 to 5).map(i => actorOf(new Recipient(i)).start) | |
val aggregator = actorOf(new Aggregator(recipients)).start | |
val results1 = aggregator !! Message("Hello") | |
val results2 = aggregator !! Message("world") | |
results1.map{ res => | |
println("Result: %s" format(res.asInstanceOf[Future[String]].get)) | |
} | |
results2.map{ res => | |
println("Result: %s" format(res.asInstanceOf[Future[String]].get)) | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Started processing message `Hello` | |
Binding result... | |
Started processing message `world` | |
Binding result... | |
Binding recipient response: Hello, [3]! | |
Binding recipient response: Hello, [1]! | |
Binding recipient response: Hello, [4]! | |
Binding recipient response: Hello, [2]! | |
Binding recipient response: Hello, [5]! | |
Result: Hello, [5]! Hello, [4]! Hello, [3]! Hello, [2]! Hello, [1]! | |
Binding recipient response: world, [4]! | |
Binding recipient response: world, [3]! | |
Binding recipient response: world, [5]! | |
Binding recipient response: world, [2]! | |
Binding recipient response: world, [1]! | |
Result: world, [5]! world, [4]! world, [3]! world, [2]! world, [1]! |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment