Skip to content

Instantly share code, notes, and snippets.

@sammyrulez
Created August 26, 2015 09:57
Show Gist options
  • Save sammyrulez/cde553f5c055ab49a9b6 to your computer and use it in GitHub Desktop.
Save sammyrulez/cde553f5c055ab49a9b6 to your computer and use it in GitHub Desktop.
read a file by logic blocks of grouping values end elaborate in parallel
package astrm
import java.io.File
import akka.actor.ActorSystem
import akka.stream.io.Framing
import akka.stream.scaladsl.{Sink, Source}
import akka.stream.{ActorMaterializer, OverflowStrategy}
import akka.util.ByteString
import scala.concurrent.Future
object MapApp extends App{
implicit val actorSystem = ActorSystem()
import actorSystem.dispatcher
implicit val flowMaterializer = ActorMaterializer()
val logFile = new File("src/main/resources/data.txt")
import akka.stream.io.Implicits._
private val stringSource: Source[String,Future[Long]] = Source.synchronousFile(logFile).
via(Framing.delimiter(ByteString(System.lineSeparator), maximumFrameLength = 512, allowTruncation = true))
.map(k => {
println("\t" + k.utf8String)
k.utf8String
})
// split the words into separate streams first
val grouped: Source[(String, Source[String, Unit]), Future[Long]] = stringSource.groupBy(k => k.substring(0,6))
// ad logic to the streams
val valuesByKey: Source[Future[(String,List[String])], Future[Long]] = grouped.map {
groupedTouple =>{
val key: String = groupedTouple._1
groupedTouple._2.runFold((key, List[String]())) {
case ((w, count), data) => (w, count ++ List(data.substring(7)) )
}
}
}
val endResult: Source[(String,List[String]), Future[Long]] =
valuesByKey
.buffer(100, OverflowStrategy.fail)
.mapAsync(4)(identity)
endResult.runWith(Sink.foreach(f => println(f))).andThen {
case _ =>
actorSystem.shutdown()
actorSystem.awaitTermination()
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment