Created
June 9, 2017 15:34
-
-
Save pauca/6f8a382223d47f1c47c836c512c0b023 to your computer and use it in GitHub Desktop.
Basic Akka Stream Template: read, transform, write
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
build.sbt | |
name := "AkkaStreamTemplate" | |
version := "0.1-SNAPSHOT" | |
scalaVersion := "2.12.2" | |
libraryDependencies ++= { | |
Seq( | |
"com.typesafe.akka" %% "akka-stream" % "2.5.1", | |
"com.github.scopt" %% "scopt" % "3.5.0" | |
) | |
} | |
resolvers += Resolver.sonatypeRepo("public") | |
*/ | |
import akka.actor._ | |
import akka.actor.ActorLogging | |
import akka.actor.ActorSystem | |
import akka.stream._ | |
import akka.stream.actor.ActorSubscriberMessage._ | |
import akka.stream.actor._ | |
import akka.stream.scaladsl._ | |
import akka.{ NotUsed, Done } | |
import scala.io.Source | |
import java.io._ | |
import scala.concurrent.ExecutionContext.Implicits.global | |
import scala.concurrent._ | |
case class Config(ifile: String="" , ofile :String="" ) | |
class WriterSubscriber( ofile :String) extends ActorSubscriber with ActorLogging { | |
val requestStrategy = WatermarkRequestStrategy(500) | |
val bw = new PrintWriter( new FileOutputStream( new File(ofile ))) | |
def save( s : String ) = { | |
bw.write(s) | |
} | |
def receive = { | |
case OnNext(s : String) => { | |
bw.write(s) | |
} | |
case OnError(err: Exception) => | |
log.error(err, "Receieved Exception") | |
bw.close | |
context.stop(self) | |
context.system.terminate() | |
System.exit(1) | |
case OnComplete => | |
log.info("Stream Completed!") | |
bw.close | |
context.stop(self) | |
context.system.terminate() | |
println(s"Output@\n${ofile}\n") | |
case _ =>{ | |
log.error("Unexpected Stream !") | |
bw.close | |
context.stop(self) | |
context.system.terminate() | |
System.exit(1) | |
} | |
} | |
} | |
object AkkaStreamTemplate extends App { | |
val parser = new scopt.OptionParser[Config]("") { | |
head(""" """ , " ") | |
opt[String]( "ifile") required() action { (x, c) => c.copy(ifile = x)} | |
opt[String]( "ofile") required() action { (x, c) => c.copy(ofile = x)} | |
} | |
parser.parse(args, Config()) match { | |
case Some(config) =>{ | |
try { | |
implicit val system = ActorSystem("AkkaStreamTemplate") | |
implicit val materializer = ActorMaterializer() | |
val props = new scala.sys.SystemProperties | |
val nrThreads: Int = props.get("scala.concurrent.context.maxThreads") match { | |
case Some(v) => v.toInt | |
case None => Runtime.getRuntime.availableProcessors | |
} | |
println("nrOfThreads: " + nrThreads) | |
// val lines = Source.fromFile(config.ifile).getLines | |
val lines = List("a","b","c") | |
val source = akka.stream.scaladsl.Source.fromIterator(() => lines.toIterator ) | |
val step1 = source.mapAsync(Math.max( 1,nrThreads))( line => Future{ | |
// do something | |
line | |
}) | |
step1.runWith(Sink.actorSubscriber(Props(new WriterSubscriber(config.ofile)))) | |
} catch { | |
case ex: IOException => println("Had an Exception: " +ex) | |
} | |
} | |
case None => | |
println("Arguments are bad.") | |
} | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment