Last active
March 28, 2016 02:56
-
-
Save kevinavery/941e7d67c4f8b104f610 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
name := "akka-stream-example" | |
version := "1.0" | |
scalaVersion := "2.11.8" | |
libraryDependencies += "com.typesafe.akka" %% "akka-kernel" % "2.4.2" | |
libraryDependencies += "com.typesafe.akka" %% "akka-stream" % "2.4.2" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import akka.stream.{Attributes, Outlet, Inlet, FlowShape} | |
import akka.stream.stage.{OutHandler, InHandler, GraphStageLogic, GraphStage} | |
import akka.util.ByteString | |
/** | |
* Converts a stream of ByteString into a stream of Strings based on newlines. | |
*/ | |
class BytesToLinesFlow extends GraphStage[FlowShape[ByteString, String]] { | |
val in = Inlet[ByteString]("BytesToLinesFlow.in") | |
val out = Outlet[String]("BytesToLinesFlow.out") | |
override val shape = FlowShape.of(in, out) | |
override def createLogic(attr: Attributes): GraphStageLogic = { | |
new GraphStageLogic(shape) { | |
var byteBuffer = ByteString() | |
setHandler(in, new InHandler { | |
override def onPush(): Unit = { | |
byteBuffer ++= grab(in) | |
val (lines, remainingBytes) = extractLines(byteBuffer) | |
byteBuffer = remainingBytes | |
if (lines.isEmpty) { | |
pull(in) | |
} else { | |
emitMultiple(out, lines) | |
} | |
} | |
}) | |
setHandler(out, new OutHandler { | |
override def onPull(): Unit = { | |
pull(in) | |
} | |
}) | |
/** | |
* Returns a tuple containing the lines found in the provided bytes, and | |
* the remaining bytes. | |
*/ | |
def extractLines(bytes: ByteString): (Vector[String], ByteString) = { | |
def rec(bytes: ByteString, foundLines: Vector[String]): (Vector[String], ByteString) = { | |
val idx = bytes.indexOf('\n') | |
if (idx == -1) { | |
(foundLines, bytes) | |
} else { | |
val (lineBytes, rest) = bytes.splitAt(idx + 1) | |
rec(rest, foundLines :+ lineBytes.utf8String) | |
} | |
} | |
rec(bytes, Vector[String]()) | |
} | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import akka.actor.ActorSystem | |
import akka.stream._ | |
import akka.stream.scaladsl.FileIO | |
import java.io.File | |
import scala.concurrent.ExecutionContext.Implicits.global | |
import scala.concurrent.Future | |
import scala.util.Random | |
/** | |
* Example using Akka Stream for parallel file-processing problem by Julia Evans. | |
* http://jvns.ca/blog/2016/03/27/thread-pools-how-do-i-use-them/ | |
*/ | |
object Main extends App { | |
implicit val system = ActorSystem("my-actor-system") | |
implicit val materializer = ActorMaterializer() | |
def expensiveComputation(line: String): Future[String] = { | |
Future { | |
Thread.sleep(Random.nextInt(1000)) | |
line | |
} | |
} | |
val inputFile = new File("input.txt") | |
println(inputFile.getAbsoluteFile.toString) | |
FileIO.fromFile(inputFile) | |
.via(new BytesToLinesFlow) | |
.mapAsyncUnordered(parallelism = 32)(expensiveComputation) | |
.runForeach(print) | |
.onComplete(_ => system.terminate()) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment