Skip to content

Instantly share code, notes, and snippets.

@pauca
Last active September 22, 2017 12:51
Show Gist options
  • Save pauca/0f2f67113c2fe330a6bc2c75cd3b712c to your computer and use it in GitHub Desktop.
Save pauca/0f2f67113c2fe330a6bc2c75cd3b712c to your computer and use it in GitHub Desktop.
Gist to stream strings to a akka queue that gzips and stores
import akka.actor._
import akka.actor.ActorLogging
import akka.actor.ActorSystem
import akka.stream._
import akka.stream.actor.ActorSubscriberMessage._
import akka.stream.actor._
import akka.stream.scaladsl._
import akka.{ NotUsed, Done }
import akka.util.ByteString
import java.io._
import scala.concurrent._
import scala.concurrent.ExecutionContext.Implicits.global
import akka.stream.scaladsl._
class GzipCompressor(
ofile : String ,
bufferMaxSize:Int = 100,
queueSize:Int = 1000
)(implicit val materializer : ActorMaterializer ){
var isOpen = true
val buffer = new StringBuilder(capacity = bufferMaxSize)
// val overflowStrategy = akka.stream.OverflowStrategy.fail
val overflowStrategy = akka.stream.OverflowStrategy.backpressure
assert(ofile.endsWith(".gz"))
val fileSink = FileIO.toPath((new File(ofile)).toPath)
var queue = akka.stream.scaladsl.Source.queue[String](queueSize, overflowStrategy).
map(x=> {
ByteString.fromArray(x.getBytes("UTF-8"))
}).via( Compression.gzip ).
to( fileSink).run
def add(z:String) : Unit = {
assert(isOpen)
z.sliding( bufferMaxSize, bufferMaxSize).foreach( s=> {
if( ( s.size + buffer.size ) >= bufferMaxSize ){
queue offer buffer.toString
buffer.clear
}
buffer ++= s
})
}
def close = {
queue offer buffer.toString
isOpen = false
queue complete
}
}
/*
implicit val system = ActorSystem("AkkaGzipCompressor")
implicit val materializer = ActorMaterializer()
val gz = new GzipCompressor("wow.gz")
gz.add("sfsds1")
gz.add("sfsds2")
gz.add("sfsds3")
gz.close
*/
/*
val gz1 = new GzipCompressor("wow1.gz")
val gz2 = new GzipCompressor("wow2.gz")
val gz3 = new GzipCompressor("wow3.gz")
for( i <- 1 to 1000000 ){
gz1.add("sfsds1")
gz2.add("sfsds2sfsds2sfsds2")
gz3.add("sfsds3sfsds2sfsds2sfsds2")
}
gz3.close
gz1.close
gz2.close
*/
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment