Skip to content

Instantly share code, notes, and snippets.

@kevinavery
Last active March 28, 2016 02:56
Show Gist options
  • Save kevinavery/941e7d67c4f8b104f610 to your computer and use it in GitHub Desktop.
Save kevinavery/941e7d67c4f8b104f610 to your computer and use it in GitHub Desktop.
name := "akka-stream-example"
version := "1.0"
scalaVersion := "2.11.8"
libraryDependencies += "com.typesafe.akka" %% "akka-kernel" % "2.4.2"
libraryDependencies += "com.typesafe.akka" %% "akka-stream" % "2.4.2"
import akka.stream.{Attributes, Outlet, Inlet, FlowShape}
import akka.stream.stage.{OutHandler, InHandler, GraphStageLogic, GraphStage}
import akka.util.ByteString
/**
* Converts a stream of ByteString into a stream of Strings based on newlines.
*/
class BytesToLinesFlow extends GraphStage[FlowShape[ByteString, String]] {
val in = Inlet[ByteString]("BytesToLinesFlow.in")
val out = Outlet[String]("BytesToLinesFlow.out")
override val shape = FlowShape.of(in, out)
override def createLogic(attr: Attributes): GraphStageLogic = {
new GraphStageLogic(shape) {
var byteBuffer = ByteString()
setHandler(in, new InHandler {
override def onPush(): Unit = {
byteBuffer ++= grab(in)
val (lines, remainingBytes) = extractLines(byteBuffer)
byteBuffer = remainingBytes
if (lines.isEmpty) {
pull(in)
} else {
emitMultiple(out, lines)
}
}
})
setHandler(out, new OutHandler {
override def onPull(): Unit = {
pull(in)
}
})
/**
* Returns a tuple containing the lines found in the provided bytes, and
* the remaining bytes.
*/
def extractLines(bytes: ByteString): (Vector[String], ByteString) = {
def rec(bytes: ByteString, foundLines: Vector[String]): (Vector[String], ByteString) = {
val idx = bytes.indexOf('\n')
if (idx == -1) {
(foundLines, bytes)
} else {
val (lineBytes, rest) = bytes.splitAt(idx + 1)
rec(rest, foundLines :+ lineBytes.utf8String)
}
}
rec(bytes, Vector[String]())
}
}
}
}
import akka.actor.ActorSystem
import akka.stream._
import akka.stream.scaladsl.FileIO
import java.io.File
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
import scala.util.Random
/**
* Example using Akka Stream for parallel file-processing problem by Julia Evans.
* http://jvns.ca/blog/2016/03/27/thread-pools-how-do-i-use-them/
*/
object Main extends App {
implicit val system = ActorSystem("my-actor-system")
implicit val materializer = ActorMaterializer()
def expensiveComputation(line: String): Future[String] = {
Future {
Thread.sleep(Random.nextInt(1000))
line
}
}
val inputFile = new File("input.txt")
println(inputFile.getAbsoluteFile.toString)
FileIO.fromFile(inputFile)
.via(new BytesToLinesFlow)
.mapAsyncUnordered(parallelism = 32)(expensiveComputation)
.runForeach(print)
.onComplete(_ => system.terminate())
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment