Last active
May 9, 2018 15:54
-
-
Save dalegaspi/2d494f37bec09f11171fec406865102c to your computer and use it in GitHub Desktop.
Simple wrapper object for various compression algorithms (GZip, LZ4, Snappy) using generics
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.io.{ByteArrayInputStream, ByteArrayOutputStream, ObjectInputStream, ObjectOutputStream} | |
import java.util.zip.{GZIPInputStream, GZIPOutputStream} | |
import scala.util.Try | |
/** | |
* aped from https://gist.github.com/owainlewis/1e7d1e68a6818ee4d50e (gzip compression) | |
* and from https://stackoverflow.com/a/39371571/918858 (ser/deser from Array[Byte]) | |
*/ | |
object CompressionUtils { | |
/** | |
* serialize [[T]] to byte array | |
* | |
* @param value | |
* @tparam T | |
* @return | |
*/ | |
def serialize[T](value: T): Array[Byte] = { | |
val stream: ByteArrayOutputStream = new ByteArrayOutputStream() | |
val oos = new ObjectOutputStream(stream) | |
oos.writeObject(value) | |
oos.close() | |
stream.toByteArray | |
} | |
/** | |
* deserialie from byte array to [[T]] | |
* | |
* @param bytes | |
* @tparam T | |
* @return | |
*/ | |
def deserialize[T](bytes: Array[Byte]): T = { | |
val ois = new ObjectInputStream(new ByteArrayInputStream(bytes)) | |
val value = ois.readObject.asInstanceOf[T] | |
ois.close() | |
value | |
} | |
trait Compressor { | |
def compress[T](input: T): Array[Byte] | |
def decompress[T](input: Array[Byte]): Option[T] | |
} | |
object Lz4 extends Compressor { | |
/** | |
* compress from [[T]] | |
* | |
* @param input | |
* @tparam T | |
* @return | |
*/ | |
override def compress[T](input: T): Array[Byte] = ByteArray.compress(serialize(input)) | |
/** | |
* decompress to [[T]] | |
* | |
* @param input | |
* @tparam T | |
* @return | |
*/ | |
override def decompress[T](input: Array[Byte]): Option[T] = ByteArray.decompress(input).map(deserialize[T]) | |
object ByteArray { | |
import net.jpountz.lz4.LZ4Factory | |
val factory = LZ4Factory.fastestInstance | |
val compressor = factory.fastCompressor | |
val decompressor = factory.fastDecompressor | |
val COMPRESSED_LENGTH = 4 | |
/** | |
* generic byte array compression | |
* | |
* for some bizarre reason the authors of LZ4 compression needs the decompressed length when decompressing | |
* but doesn't provide a facility OOB to store it in the compressed data; you can either create a buffer | |
* which is over-allocated (wasting memory), or store the original uncompressed size in the payload | |
* i decided to do the latter: the first 4 bytes is the uncompressed size | |
* | |
* @param input | |
* @return | |
*/ | |
def compress(input: Array[Byte]): Array[Byte] = { | |
val decompressedLength = input.length | |
val bbLength = java.nio.ByteBuffer.allocate(COMPRESSED_LENGTH) | |
bbLength.putInt(decompressedLength) | |
val maxCompressedLength = compressor.maxCompressedLength(decompressedLength) | |
val compressed = new Array[Byte](maxCompressedLength) | |
compressor.compress(input, 0, decompressedLength, compressed, 0, maxCompressedLength) | |
bbLength.array() ++ compressed | |
} | |
/** | |
* generic byte array decompression | |
* | |
* for some bizarre reason the authors of LZ4 compression needs the decompressed length when decompressing | |
* but doesn't provide a facility OOB to store it in the compressed data; you can either create a buffer | |
* which is over-allocated (wasting memory), or store the original uncompressed size in the payload | |
* i decided to do the latter: the first 4 bytes is the uncompressed size | |
* | |
* @param compressed | |
* @return | |
*/ | |
def decompress(compressed: Array[Byte]): Option[Array[Byte]] = | |
try { | |
val compressedLength = compressed.length - COMPRESSED_LENGTH | |
val bbDecompressedLengthArray = compressed.slice(0, COMPRESSED_LENGTH) | |
val bbDecompressedLength = java.nio.ByteBuffer.wrap(bbDecompressedLengthArray) | |
val decompressedLength = bbDecompressedLength.getInt | |
val restored = new Array[Byte](decompressedLength) | |
val actualDecompressedLength = decompressor | |
.decompress(compressed.slice(COMPRESSED_LENGTH, compressedLength), 0, restored, 0, decompressedLength) | |
Some(restored) | |
} catch { | |
case _: Throwable => None | |
} | |
} | |
} | |
object Snappy extends Compressor { | |
/** | |
* compress from [[T]] | |
* | |
* @param input | |
* @tparam T | |
* @return | |
*/ | |
override def compress[T](input: T): Array[Byte] = ByteArray.compress(serialize(input)) | |
/** | |
* decompress to [[T]] | |
* | |
* @param input | |
* @tparam T | |
* @return | |
*/ | |
override def decompress[T](input: Array[Byte]): Option[T] = ByteArray.decompress(input).map(deserialize[T]) | |
object ByteArray { | |
import org.xerial.snappy.Snappy.{compress => snappycompress, uncompress} | |
/** | |
* generic byte array compression | |
* | |
* @param input | |
* @return | |
*/ | |
def compress(input: Array[Byte]): Array[Byte] = { | |
snappycompress(input) | |
} | |
/** | |
* generic byte array decompression | |
* | |
* @param compressed | |
* @return | |
*/ | |
def decompress(compressed: Array[Byte]): Option[Array[Byte]] = | |
Try { uncompress(compressed) } toOption | |
} | |
} | |
object Gzip extends Compressor { | |
/** | |
* compress from [[T]] | |
* | |
* @param input | |
* @tparam T | |
* @return | |
*/ | |
override def compress[T](input: T): Array[Byte] = ByteArray.compress(serialize(input)) | |
/** | |
* decompress to [[T]] | |
* | |
* @param input | |
* @tparam T | |
* @return | |
*/ | |
override def decompress[T](input: Array[Byte]): Option[T] = ByteArray.decompress(input).map(deserialize[T]) | |
object ByteArray { | |
/** | |
* generic byte array compression | |
* | |
* @param input | |
* @return | |
*/ | |
def compress(input: Array[Byte]): Array[Byte] = { | |
val bos = new ByteArrayOutputStream(input.length) | |
val gzip = new GZIPOutputStream(bos) | |
gzip.write(input) | |
gzip.close() | |
val compressed = bos.toByteArray | |
bos.close() | |
compressed | |
} | |
/** | |
* generic byte array decompression | |
* | |
* @param compressed | |
* @return | |
*/ | |
def decompress(compressed: Array[Byte]): Option[Array[Byte]] = | |
Try { | |
val inputStream = new GZIPInputStream(new ByteArrayInputStream(compressed)) | |
org.apache.commons.io.IOUtils.toByteArray(inputStream) | |
}.toOption | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment