Skip to content

Instantly share code, notes, and snippets.

@gamlerhart
Created March 22, 2012 23:44
Show Gist options
  • Save gamlerhart/2165529 to your computer and use it in GitHub Desktop.
Save gamlerhart/2165529 to your computer and use it in GitHub Desktop.
AsyncIO
case class LineItem(number: Int, content: String)
val parser = for {
numberOfItem <- IO.takeUntil(ByteString(":"))
lineContent <- IO.takeUntil(ByteString("\n"))
} yield LineItem(Integer.parseInt(numberOfItem.utf8String), lineContent.utf8String)
// will close the file when returned future has finished
val onlyCoolLines = FileIO.withFile(Paths.get("aFile.txt")) {
file =>
val linesFuture = file.readSegments(parser)
val coolLinesFuture = linesFuture.map(
lines => lines.filter(
line => line.content.contains("cool")))
coolLinesFuture
}
onlyCoolLines.onSuccess {
case LineItem(no, line) => println(line)
}
import akka.dispatch.{ ExecutionContext, Promise }
import info.gamlor.io.FileIO
// A plain execution context
implicit val dispatcher = ExecutionContext.fromExecutorService(yourExecutorServiceGoesHere)
// or within an actor
implicit val dispatcher = context.dispatcher
val file = FileIO.open("myFile.data")
// read 200 bytes from the beginning of the file
val readResultFuture = file.read(0,200)
// do stuff with the future
readResultFuture.onSuccess({
case bytes:ByteString=>{
println(bytes.utf8String)
}
}).andThen{ case _ => file.close()} // Close when we're done reading
resolvers += "Gamlor-Repo" at "https://github.com/gamlerhart/gamlor-mvn/raw/master/snapshots"
libraryDependencies += "com.typesafe.akka" % "akka-actor" % "2.0"
libraryDependencies += "info.gamlor.akkaasync" %% "akka-io" % "1.0-SNAPSHOT"
case class Page(header: String, body: String, footer: String)
val parser = for {
headerLine <- IO.takeUntil(ByteString("\n"))
body <- IO.takeUntil(ByteString("[End-Body]"))
footer <- IO.takeUntil(ByteString("[End-Footer]"))
} yield Page(headerLine.utf8String, body.utf8String, footer.utf8String)
// We can use an iteraree as parser.
// The parse result will be in the future.
// There are overload available to read from certain positions.
val readResultFuture = file.readAll(parser)
readResultFuture.onSuccess {
case Page(header, body, footer) => {
println(header)
println(body)
println(footer)
}
}
case class LineItem(number: Int, content:String)
val parser = for {
numberOfItem <- IO.takeUntil(ByteString(":"))
lineContent <- IO.takeUntil(ByteString("\n"))
} yield LineItem(Integer.parseInt(numberOfItem.utf8String), lineContent.utf8String)
// We can use an iteraree as parser.
// This keep parsing until file ends/max amount is reached.
// Every time the iteraree is done parsing it will add that
// to the result.
// There are overload available to read from certain positions.
val readResultFuture = file.readSegments(parser)
readResultFuture.onSuccess {
case items:Seq[LineItem] => {
items.foreach({i=>
println(i.number)
println(i.content)
})
}
}
protected def receive = {
case path: Path => {
// will be created with the context,
// so this actor supervises this file reading actor
val fileReadingActor = IOActors.createForFile(path)
// The actor will respond with a ReadResponse
// which will contain the read data
fileReadingActor ! ReadInChunks(0, Int.MaxValue,path)
}
case ReadInChunksResponse(data, _) => {
data match{
case IO.Chunk(bytes) =>processPartOfPicture(bytes)
case IO.EOF => finishPicture()
}
}
}
def processPartOfPicture(string: ByteString){
// do stuff
}
def finishPicture(){
// done
}
class PictureIO extends Actor {
import info.gamlor.io.IOActors._
override val supervisorStrategy = OneForOneStrategy(5, Duration(60, TimeUnit.SECONDS)) {
case ex: IOException => {
println("Couldn't read file. Giving up on this file "+ex)
Stop
}
case ex: Exception => Escalate
}
protected def receive = {
case path: Path => {
// will be created with the context,
// so this actor supervises this file reading actor
val fileReadingActor = IOActors.createForFile(path)
// The actor will respond with a ReadResponse
// which will contain the read data
fileReadingActor ! Read(0, Int.MaxValue)
}
case ReadResponse(data, _, _) => {
processTheBytesOfThisPicture(data)
}
}
def processTheBytesOfThisPicture(data: ByteString) {
// do something
}
}
val textFile = FileIO.openText("lines.txt")
val allLinesFuture = textFile.readAllLines()
allLinesFuture.onSuccess({
case line:Seq[String]=>{
println(line)
}
}).andThen{ case _ => file.close()}
val file = FileIO.open(Paths.get("myFile.data"),StandardOpenOption.CREATE,StandardOpenOption.WRITE,StandardOpenOption.READ)
file.write(ByteString("data data"),0).onComplete{
_=>file.close()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment