Skip to content

Instantly share code, notes, and snippets.

@JoolsF
Last active January 8, 2019 16:01
Show Gist options
  • Select an option

  • Save JoolsF/a1bda11043200670ee41e3e323550da2 to your computer and use it in GitHub Desktop.

Select an option

Save JoolsF/a1bda11043200670ee41e3e323550da2 to your computer and use it in GitHub Desktop.
Akka stream example 1
import akka.Done
import akka.actor._
import akka.stream._
import akka.stream.scaladsl.Source
import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success}
object TestStreamApp extends App {
private implicit val executionContext = ExecutionContext.Implicits.global
implicit val system = ActorSystem("TestSystem")
implicit val materializer = ActorMaterializer()
def process(page: Int): Future[Int] =
Future.successful(page)
val l: Stream[Int] = 1 #:: l.map(_ + 1)
val streamedTasks: Future[Done] =
Source(l) //toy example, streaming a list of int
.takeWhile(_ <= 2)
.mapAsync(parallelism = 2)(process)
.map(s => s"Processed: $s")
.runForeach(println)
streamedTasks.andThen {
case Success(_) => println("Completed tasks")
case Failure(ex) => println("Failed to completed tasks", ex)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment