Skip to content

Instantly share code, notes, and snippets.

@dwhitney
Created October 13, 2016 17:26
Show Gist options
  • Save dwhitney/fe5480fc666f56942a863b5632074376 to your computer and use it in GitHub Desktop.
Save dwhitney/fe5480fc666f56942a863b5632074376 to your computer and use it in GitHub Desktop.
added gzip compression -- haven't made a PR yet
package fs2
import java.util.zip.{CRC32, DataFormatException, Deflater, Inflater}
import scala.annotation.tailrec
import scala.collection.mutable.ArrayBuffer
/** Provides utilities for compressing/decompressing byte streams. */
object compress {
private val GZIP_MAGIC_NUMBER = 0x8b1f
private val gzipHeader: Array[Byte] = Array(GZIP_MAGIC_NUMBER.toByte,(GZIP_MAGIC_NUMBER >> 8).toByte,Deflater.DEFLATED.toByte,0.toByte,0.toByte,0.toByte,0.toByte,0.toByte,0.toByte,0.toByte)
def gzip[F[_]](level: Int = Deflater.DEFAULT_COMPRESSION, bufferSize: Int = 1024 * 32, strategy: Int = Deflater.DEFAULT_STRATEGY
): Pipe[F,Byte,Byte] = {
val crc32 = new CRC32()
val deflater = new Deflater(level, true)
var uncompressedSize = 0
val pure: Pipe[Pure, Byte, Byte] = { stream =>
Stream.emits(gzipHeader) ++ (stream pull { _.await flatMap { step =>
deflater.setStrategy(strategy)
val buffer = new Array[Byte](bufferSize)
val bytes = step._1.toArray
uncompressedSize = uncompressedSize + bytes.length
crc32.update(bytes)
_deflate_step(deflater, buffer)(step)
}}) ++ {
def byte(i: Int) = (i & 0xFF).toByte
val crc = crc32.getValue.intValue
Stream.emits(Array(
byte(crc), byte(crc >> 8), byte(crc >> 16), byte(crc >> 24),
byte(uncompressedSize), byte(uncompressedSize >> 8), byte(uncompressedSize >> 16), byte(uncompressedSize >> 24)
))
}
}
pipe.covary[F,Byte,Byte](pure)
}
/**
* Returns a `Pipe` that deflates (compresses) its input elements using
* a `java.util.zip.Deflater` with the parameters `level`, `nowrap` and `strategy`.
* @param level the compression level (0-9)
* @param nowrap if true then use GZIP compatible compression
* @param bufferSize size of the internal buffer that is used by the
* compressor. Default size is 32 KB.
* @param strategy compression strategy -- see `java.util.zip.Deflater` for details
*/
def deflate[F[_]](level: Int = Deflater.DEFAULT_COMPRESSION,
nowrap: Boolean = false,
bufferSize: Int = 1024 * 32,
strategy: Int = Deflater.DEFAULT_STRATEGY): Pipe[F,Byte,Byte] = {
val pure: Pipe[Pure,Byte,Byte] =
_ pull { _.await flatMap { step =>
val deflater = new Deflater(level, nowrap)
deflater.setStrategy(strategy)
val buffer = new Array[Byte](bufferSize)
_deflate_step(deflater, buffer)(step)
}}
pipe.covary[F,Byte,Byte](pure)
}
private def _deflate_step(deflater: Deflater, buffer: Array[Byte]): ((Chunk[Byte], Handle[Pure, Byte])) => Pull[Pure, Byte, Handle[Pure, Byte]] = {
case (c, h) =>
deflater.setInput(c.toArray)
val result = _deflate_collect(deflater, buffer, ArrayBuffer.empty, false).toArray
Pull.output(Chunk.bytes(result)) >> _deflate_handle(deflater, buffer)(h)
}
private def _deflate_handle(deflater: Deflater, buffer: Array[Byte]): Handle[Pure, Byte] => Pull[Pure, Byte, Handle[Pure, Byte]] =
_.await flatMap _deflate_step(deflater, buffer) or _deflate_finish(deflater, buffer)
@tailrec
private def _deflate_collect(deflater: Deflater, buffer: Array[Byte], acc: ArrayBuffer[Byte], fin: Boolean): ArrayBuffer[Byte] = {
if ((fin && deflater.finished) || (!fin && deflater.needsInput)) acc
else {
val count = deflater deflate buffer
_deflate_collect(deflater, buffer, acc ++ buffer.iterator.take(count), fin)
}
}
private def _deflate_finish(deflater: Deflater, buffer: Array[Byte]): Pull[Pure, Byte, Nothing] = {
deflater.setInput(Array.empty)
deflater.finish()
val result = _deflate_collect(deflater, buffer, ArrayBuffer.empty, true).toArray
deflater.end()
Pull.output(Chunk.bytes(result)) >> Pull.done
}
def gunzip[F[_]](bufferSize: Int = 1024 * 32): Pipe[F,Byte,Byte] = {
val pure: Pipe[Pure,Byte,Byte] = { stream =>
val header = stream.take(10).toList
if(header(0) != GZIP_MAGIC_NUMBER.toByte) Stream.fail(new DataFormatException("Not in gzip format. Gzip Magic Number missing from beginning of header"))
else if((header(1) != (GZIP_MAGIC_NUMBER >> 8).toByte)) Stream.fail(new DataFormatException("Not in gzip format. Gzip Magic Number missing from beginning of header"))
else{
val body = stream.drop(10)
body.through(inflate(true, bufferSize))
}
}
pipe.covary[F,Byte,Byte](pure)
}
/**
* Returns a `Pipe` that inflates (decompresses) its input elements using
* a `java.util.zip.Inflater` with the parameter `nowrap`.
* @param nowrap if true then support GZIP compatible compression
* @param bufferSize size of the internal buffer that is used by the
* decompressor. Default size is 32 KB.
*/
def inflate[F[_]](nowrap: Boolean = false,
bufferSize: Int = 1024 * 32): Pipe[F,Byte,Byte] = {
val pure: Pipe[Pure,Byte,Byte] =
_ pull { _.await flatMap { case (c, h) =>
val inflater = new Inflater(nowrap)
val buffer = new Array[Byte](bufferSize)
inflater.setInput(c.toArray)
val result = _inflate_collect(inflater, buffer, ArrayBuffer.empty).toArray
Pull.output(Chunk.bytes(result)) >> _inflate_handle(inflater, buffer)(h)
}}
pipe.covary[F,Byte,Byte](pure)
}
private def _inflate_step(inflater: Inflater, buffer: Array[Byte]): ((Chunk[Byte], Handle[Pure, Byte])) => Pull[Pure, Byte, Handle[Pure, Byte]] = {
case (c, h) =>
inflater.setInput(c.toArray)
val result = _inflate_collect(inflater, buffer, ArrayBuffer.empty).toArray
Pull.output(Chunk.bytes(result)) >> _inflate_handle(inflater, buffer)(h)
}
private def _inflate_handle(inflater: Inflater, buffer: Array[Byte]): Handle[Pure, Byte] => Pull[Pure, Byte, Handle[Pure, Byte]] =
_.await flatMap _inflate_step(inflater, buffer) or _inflate_finish(inflater)
@tailrec
private def _inflate_collect(inflater: Inflater, buffer: Array[Byte], acc: ArrayBuffer[Byte]): ArrayBuffer[Byte] = {
if (inflater.finished || inflater.needsInput) acc
else {
val count = inflater inflate buffer
_inflate_collect(inflater, buffer, acc ++ buffer.iterator.take(count))
}
}
private def _inflate_finish(inflater: Inflater): Pull[Pure, Nothing, Nothing] = {
if (!inflater.finished) Pull.fail(new DataFormatException("Insufficient data"))
else { inflater.end(); Pull.done }
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment