Created
October 13, 2016 17:26
-
-
Save dwhitney/fe5480fc666f56942a863b5632074376 to your computer and use it in GitHub Desktop.
added gzip compression -- haven't made a PR yet
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package fs2 | |
import java.util.zip.{CRC32, DataFormatException, Deflater, Inflater} | |
import scala.annotation.tailrec | |
import scala.collection.mutable.ArrayBuffer | |
/** Provides utilities for compressing/decompressing byte streams. */ | |
object compress { | |
private val GZIP_MAGIC_NUMBER = 0x8b1f | |
private val gzipHeader: Array[Byte] = Array(GZIP_MAGIC_NUMBER.toByte,(GZIP_MAGIC_NUMBER >> 8).toByte,Deflater.DEFLATED.toByte,0.toByte,0.toByte,0.toByte,0.toByte,0.toByte,0.toByte,0.toByte) | |
def gzip[F[_]](level: Int = Deflater.DEFAULT_COMPRESSION, bufferSize: Int = 1024 * 32, strategy: Int = Deflater.DEFAULT_STRATEGY | |
): Pipe[F,Byte,Byte] = { | |
val crc32 = new CRC32() | |
val deflater = new Deflater(level, true) | |
var uncompressedSize = 0 | |
val pure: Pipe[Pure, Byte, Byte] = { stream => | |
Stream.emits(gzipHeader) ++ (stream pull { _.await flatMap { step => | |
deflater.setStrategy(strategy) | |
val buffer = new Array[Byte](bufferSize) | |
val bytes = step._1.toArray | |
uncompressedSize = uncompressedSize + bytes.length | |
crc32.update(bytes) | |
_deflate_step(deflater, buffer)(step) | |
}}) ++ { | |
def byte(i: Int) = (i & 0xFF).toByte | |
val crc = crc32.getValue.intValue | |
Stream.emits(Array( | |
byte(crc), byte(crc >> 8), byte(crc >> 16), byte(crc >> 24), | |
byte(uncompressedSize), byte(uncompressedSize >> 8), byte(uncompressedSize >> 16), byte(uncompressedSize >> 24) | |
)) | |
} | |
} | |
pipe.covary[F,Byte,Byte](pure) | |
} | |
/** | |
* Returns a `Pipe` that deflates (compresses) its input elements using | |
* a `java.util.zip.Deflater` with the parameters `level`, `nowrap` and `strategy`. | |
* @param level the compression level (0-9) | |
* @param nowrap if true then use GZIP compatible compression | |
* @param bufferSize size of the internal buffer that is used by the | |
* compressor. Default size is 32 KB. | |
* @param strategy compression strategy -- see `java.util.zip.Deflater` for details | |
*/ | |
def deflate[F[_]](level: Int = Deflater.DEFAULT_COMPRESSION, | |
nowrap: Boolean = false, | |
bufferSize: Int = 1024 * 32, | |
strategy: Int = Deflater.DEFAULT_STRATEGY): Pipe[F,Byte,Byte] = { | |
val pure: Pipe[Pure,Byte,Byte] = | |
_ pull { _.await flatMap { step => | |
val deflater = new Deflater(level, nowrap) | |
deflater.setStrategy(strategy) | |
val buffer = new Array[Byte](bufferSize) | |
_deflate_step(deflater, buffer)(step) | |
}} | |
pipe.covary[F,Byte,Byte](pure) | |
} | |
private def _deflate_step(deflater: Deflater, buffer: Array[Byte]): ((Chunk[Byte], Handle[Pure, Byte])) => Pull[Pure, Byte, Handle[Pure, Byte]] = { | |
case (c, h) => | |
deflater.setInput(c.toArray) | |
val result = _deflate_collect(deflater, buffer, ArrayBuffer.empty, false).toArray | |
Pull.output(Chunk.bytes(result)) >> _deflate_handle(deflater, buffer)(h) | |
} | |
private def _deflate_handle(deflater: Deflater, buffer: Array[Byte]): Handle[Pure, Byte] => Pull[Pure, Byte, Handle[Pure, Byte]] = | |
_.await flatMap _deflate_step(deflater, buffer) or _deflate_finish(deflater, buffer) | |
@tailrec | |
private def _deflate_collect(deflater: Deflater, buffer: Array[Byte], acc: ArrayBuffer[Byte], fin: Boolean): ArrayBuffer[Byte] = { | |
if ((fin && deflater.finished) || (!fin && deflater.needsInput)) acc | |
else { | |
val count = deflater deflate buffer | |
_deflate_collect(deflater, buffer, acc ++ buffer.iterator.take(count), fin) | |
} | |
} | |
private def _deflate_finish(deflater: Deflater, buffer: Array[Byte]): Pull[Pure, Byte, Nothing] = { | |
deflater.setInput(Array.empty) | |
deflater.finish() | |
val result = _deflate_collect(deflater, buffer, ArrayBuffer.empty, true).toArray | |
deflater.end() | |
Pull.output(Chunk.bytes(result)) >> Pull.done | |
} | |
def gunzip[F[_]](bufferSize: Int = 1024 * 32): Pipe[F,Byte,Byte] = { | |
val pure: Pipe[Pure,Byte,Byte] = { stream => | |
val header = stream.take(10).toList | |
if(header(0) != GZIP_MAGIC_NUMBER.toByte) Stream.fail(new DataFormatException("Not in gzip format. Gzip Magic Number missing from beginning of header")) | |
else if((header(1) != (GZIP_MAGIC_NUMBER >> 8).toByte)) Stream.fail(new DataFormatException("Not in gzip format. Gzip Magic Number missing from beginning of header")) | |
else{ | |
val body = stream.drop(10) | |
body.through(inflate(true, bufferSize)) | |
} | |
} | |
pipe.covary[F,Byte,Byte](pure) | |
} | |
/** | |
* Returns a `Pipe` that inflates (decompresses) its input elements using | |
* a `java.util.zip.Inflater` with the parameter `nowrap`. | |
* @param nowrap if true then support GZIP compatible compression | |
* @param bufferSize size of the internal buffer that is used by the | |
* decompressor. Default size is 32 KB. | |
*/ | |
def inflate[F[_]](nowrap: Boolean = false, | |
bufferSize: Int = 1024 * 32): Pipe[F,Byte,Byte] = { | |
val pure: Pipe[Pure,Byte,Byte] = | |
_ pull { _.await flatMap { case (c, h) => | |
val inflater = new Inflater(nowrap) | |
val buffer = new Array[Byte](bufferSize) | |
inflater.setInput(c.toArray) | |
val result = _inflate_collect(inflater, buffer, ArrayBuffer.empty).toArray | |
Pull.output(Chunk.bytes(result)) >> _inflate_handle(inflater, buffer)(h) | |
}} | |
pipe.covary[F,Byte,Byte](pure) | |
} | |
private def _inflate_step(inflater: Inflater, buffer: Array[Byte]): ((Chunk[Byte], Handle[Pure, Byte])) => Pull[Pure, Byte, Handle[Pure, Byte]] = { | |
case (c, h) => | |
inflater.setInput(c.toArray) | |
val result = _inflate_collect(inflater, buffer, ArrayBuffer.empty).toArray | |
Pull.output(Chunk.bytes(result)) >> _inflate_handle(inflater, buffer)(h) | |
} | |
private def _inflate_handle(inflater: Inflater, buffer: Array[Byte]): Handle[Pure, Byte] => Pull[Pure, Byte, Handle[Pure, Byte]] = | |
_.await flatMap _inflate_step(inflater, buffer) or _inflate_finish(inflater) | |
@tailrec | |
private def _inflate_collect(inflater: Inflater, buffer: Array[Byte], acc: ArrayBuffer[Byte]): ArrayBuffer[Byte] = { | |
if (inflater.finished || inflater.needsInput) acc | |
else { | |
val count = inflater inflate buffer | |
_inflate_collect(inflater, buffer, acc ++ buffer.iterator.take(count)) | |
} | |
} | |
private def _inflate_finish(inflater: Inflater): Pull[Pure, Nothing, Nothing] = { | |
if (!inflater.finished) Pull.fail(new DataFormatException("Insufficient data")) | |
else { inflater.end(); Pull.done } | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment