Skip to content

Instantly share code, notes, and snippets.

@Ichoran
Created October 9, 2024 06:25
Show Gist options
  • Save Ichoran/a4c797f16a968e9e85c4492efc6cb474 to your computer and use it in GitHub Desktop.
Save Ichoran/a4c797f16a968e9e85c4492efc6cb474 to your computer and use it in GitHub Desktop.
A reimplementation of parallel gzip to use Java threads directly, since there seems to be some weird bug on JVM 8 that impacts the original Scala Future version
// Begin Gist Information
// Original code by Stefan Zieger (see https://github.com/szeiger/zinc/blob/1d296b2fbeaae1cf14e4c00db0bbc2203f9783a4/internal/zinc-persist/src/main/scala/sbt/internal/inc/consistent/ParallelGzipOutputStream.scala)
// Modified by Rex Kerr to use Java threads directly rather than Future
// Probably Apache Commons License? Whatever the original one was is fine.
// NOT HEAVILY TESTED! (I did verify binary-identical results on one test data file.)
// End Gist Information
import java.io.{ ByteArrayOutputStream, FilterOutputStream, OutputStream }
import java.util.zip.{ CRC32, Deflater, DeflaterOutputStream }
import java.util.concurrent.{SynchronousQueue, ArrayBlockingQueue, LinkedTransferQueue}
import scala.annotation.tailrec
import scala.concurrent.duration.Duration
import scala.concurrent.{ Await, ExecutionContext, Future }
import scala.collection.mutable
/**
* Parallel gzip compression. Algorithm based on https://github.com/shevek/parallelgzip
* with additional optimization and simplification. This is essentially a block-buffered
* stream but instead of writing a full block to the underlying output, it is passed to a
* thread pool for compression and the compressed blocks are collected when flushing.
*/
object ParallelGzipOutputStream {
private val blockSize = 64 * 1024
private val compression = Deflater.DEFAULT_COMPRESSION
// Holds an input buffer to load data and an output buffer to write
// the compressed data into. Compressing clears the input buffer.
// Compressed data can be retrieved with `output.writeTo(OutputStream)`.
private final class Block(var index: Int) {
val input = new Array[Byte](blockSize)
var inputN = 0
val output = new ByteArrayOutputStream(blockSize + (blockSize >> 3))
val deflater = new Deflater(compression, true)
val dos = new DeflaterOutputStream(output, deflater, true)
def write(buf: Array[Byte], off: Int, len: Int): Int = {
val n = math.min(len, input.length - inputN)
System.arraycopy(buf, off, input, inputN, n)
n
}
def compress(): Unit = {
deflater.reset()
output.reset()
dos.write(input, 0, inputN)
dos.flush()
inputN = 0
}
}
// Waits for data to appear in a SynchronousQueue.
// When it does, compress it and pass it along. Also put self in a pool of workers awaiting more work.
// If data does not appear but a `None` appears instead, cease running (and do not add self to work queue).
private final class Worker(val workers: ArrayBlockingQueue[Worker], val compressed: LinkedTransferQueue[Either[Int, Block]]) extends Thread {
val work = new SynchronousQueue[Option[Block]]
@tailrec
def loop(): Unit = {
work.take() match {
case Some(block) =>
block.compress()
compressed.put(Right(block))
workers.put(this)
loop()
case _ =>
}
}
override def run(): Unit = {
loop()
}
}
// Waits for data to appear in a LinkedTransferQueue.
// When it does, place it into a sorted tree and, if the data is in order, write it out.
// Once the data has been written, place it into a cache for completed buffers.
// If data does not appear but an integer appears instead, set a mark to quit once
// that many blocks have been written.
private final class Scribe(out: OutputStream, val completed: LinkedTransferQueue[Block]) extends Thread {
val work = new LinkedTransferQueue[Either[Int, Block]]
private val tree = new collection.mutable.TreeMap[Int, Block]
private var next = 0
private var stopAt = Int.MaxValue
@tailrec
def loop(): Unit = {
work.take() match {
case Right(block) =>
tree(block.index) = block
case Left(limit) =>
stopAt = limit
}
while (tree.nonEmpty && tree.head._2.index == next) {
val block = tree.remove(next).get
block.output.writeTo(out)
completed.put(block)
next += 1
}
if (next < stopAt) loop()
}
override def run(): Unit = {
out.write(header)
loop()
}
}
private val header = Array[Byte](0x1f.toByte, 0x8b.toByte, Deflater.DEFLATED, 0, 0, 0, 0, 0, 0, 0)
}
/**
* Implements a parallel chunked compression algorithm (using minimum of two extra threads).
* Note that the methods in this class are not themselves threadsafe; this class
* has "interior concurrency" (c.f. interior mutability). In particular, writing
* concurrent with or after a close operation is not defined.
*/
final class ParallelGzipOutputStream(out: OutputStream, parallelism: Int)
extends FilterOutputStream(out) {
import ParallelGzipOutputStream._
private val crc = new CRC32
private var totalBlocks = 0
private var totalCount = 0L
private val bufferLimit = parallelism * 3
private var bufferCount = 1
private var current = new Block(0)
private val workerCount = math.max(1, parallelism - 1)
private val workers = new ArrayBlockingQueue[Worker](workerCount)
private val buffers = new LinkedTransferQueue[Block]()
private val scribe = new Scribe(out, buffers)
scribe.start()
while (workers.remainingCapacity() > 0) {
val w = new Worker(workers, scribe.work)
workers.put(w)
w.start()
}
override def write(b: Int): Unit = write(Array[Byte]((b & 0xff).toByte))
override def write(b: Array[Byte]): Unit = write(b, 0, b.length)
@tailrec
override def write(b: Array[Byte], off: Int, len: Int): Unit = {
val copy = math.min(len, blockSize - current.inputN)
crc.update(b, off, copy)
totalCount += copy
System.arraycopy(b, off, current.input, current.inputN, copy)
current.inputN += copy
if (copy < len) {
submit()
write(b, off + copy, len - copy)
}
}
private def submit(): Unit = {
val w = workers.take()
w.work.put(Some(current))
totalBlocks += 1
current = buffers.poll()
if (current eq null) {
if (bufferCount < bufferLimit) {
current = new Block(totalBlocks)
bufferCount += 1
}
else {
current = buffers.take()
}
}
current.index = totalBlocks
}
private def flushImpl(shutdown: Boolean): Unit = {
val fetched = new Array[Block](bufferCount - 1)
var n = 0
// If we have all the buffers, all pending work is done.
while (n < fetched.length) {
fetched(n) = buffers.take()
n += 1
}
if (shutdown) {
// Send stop signal to workers and scribe
n = workerCount
while (n > 0) {
workers.take().work.put(None)
n -= 1
}
scribe.work.put(Left(totalBlocks))
}
else {
// Put all the buffers back so we can keep accepting data.
n = 0
while (n < fetched.length) {
buffers.put(fetched(n))
n += 1
}
}
}
/**
* Blocks until all pending data is written. Note that this is a poor use of a parallel data writing class.
* It is preferable to write all data and then close the stream. Note also that a flushed stream will not
* have the trailing CRC checksum and therefore will not be a valid compressed file, so there is little point
* flushing early.
*/
override def flush(): Unit = {
if (current.inputN > 0) submit()
flushImpl(false)
super.flush()
}
override def close(): Unit = {
if (current.inputN > 0) submit()
flushImpl(true)
val buf = new Array[Byte](10)
val bb = java.nio.ByteBuffer.wrap(buf)
bb.order(java.nio.ByteOrder.LITTLE_ENDIAN)
bb.putShort(3)
println(crc.getValue)
println(totalCount)
bb.putInt(crc.getValue.toInt)
bb.putInt((totalCount & 0xFFFFFFFFL).toInt)
out.write(buf)
out.close()
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment