Last active
December 1, 2017 18:54
-
-
Save detro/1b52ef899632634bd7ac7ff4c0eb0119 to your computer and use it in GitHub Desktop.
XZCompressorInputStreamAdapter: shows how to overcome a limit in https://commons.apache.org/proper/commons-compress/index.html, that doesn't provide a "compressing" InputStream (the same can be done to create a "decompressing" OutputStream). It avoid the use of any thread and it uses just a single buffer to hold on to data in transition.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.commons.compress.compressors.xz.XZCompressorOutputStream; | |
import javax.validation.constraints.NotNull; | |
import java.io.ByteArrayOutputStream; | |
import java.io.IOException; | |
import java.io.InputStream; | |
/** | |
* Wraps a generic InputStream into a XZ Compressed InputStream. | |
* | |
* It handles internally the necessary use of XZCompressorOutputStream, and it does it within the main thread, | |
* without any support thread. The only memory it allocates to handle the conversion is a ByteArrayOutputStream. | |
* | |
* It's all self contained, so a call to ".close()" will be enough to take care of all the internal cleanup. | |
*/ | |
public class XZCompressorInputStreamAdapter extends InputStream { | |
private static final int DEFAULT_COMPRESSION_PRESET = 2; | |
private final InputStream source; | |
private final int sourceSize; | |
private int positionInSource; | |
private final ExposedByteArrayOutputStream compressedBytesStream; | |
private final XZCompressorOutputStream compressorOutputStream; | |
private boolean compressorFinished = false; | |
// NOTE: The compressor buffers internally, so sometimes the compressed byte array it produces might be | |
// larger than the destination array passed at the "read()" call. We keep track of this and, in case there are | |
// compressed data not read yet, we pass those back without furthering the compression. Once it's all done, | |
// we resume the normal compression process. | |
private int preCompressedBytesOffset = -1; | |
private int preCompressedBytesLength = -1; | |
public XZCompressorInputStreamAdapter(InputStream source) throws IOException { | |
this(source, DEFAULT_COMPRESSION_PRESET); | |
} | |
public XZCompressorInputStreamAdapter(InputStream source, int xzCompressionPreset) throws IOException { | |
this.source = source; | |
this.positionInSource = 0; | |
this.sourceSize = source.available(); | |
this.compressedBytesStream = new ExposedByteArrayOutputStream(8 * 1_024); //< 8K: an ideal buffer size used in lots of places (this class auto-adjusts if needed anyway) | |
this.compressorOutputStream = new XZCompressorOutputStream(compressedBytesStream, xzCompressionPreset); | |
} | |
@Override | |
public int read() throws IOException { | |
if (calculateAvailable() > 0) { | |
byte[] singleByte = new byte[1]; | |
if (read(singleByte, 0, 1) > 0) { | |
return singleByte[0]; | |
} | |
} | |
return -1; | |
} | |
@Override | |
public synchronized int read(@NotNull final byte destination[], final int offset, int length) throws IOException { | |
// Validate input | |
if (destination == null) { | |
throw new NullPointerException(); | |
} else if (offset < 0 || length < 0 || length > destination.length - offset) { | |
throw new IndexOutOfBoundsException(); | |
} | |
// If there were compressed data that we couldn't read yet, read them now until it's all been consumed | |
if (preCompressedBytesOffset >= 0) { | |
// Determine how much can be read into destination of the pre-compressed data, and read it | |
int howMuchCanBeRead = Math.min(preCompressedBytesLength - preCompressedBytesOffset, destination.length); | |
System.arraycopy(compressedBytesStream.byteArray(), preCompressedBytesOffset, destination, offset, howMuchCanBeRead); | |
// Update the pre-compressed data offset | |
preCompressedBytesOffset += howMuchCanBeRead; | |
// If we are done reading the pre-compressed data, reset offset/length so we can resume compression in the next call | |
if (preCompressedBytesOffset >= preCompressedBytesLength) { | |
preCompressedBytesOffset = -1; | |
preCompressedBytesLength = -1; | |
} | |
return howMuchCanBeRead; | |
} | |
try { | |
do { | |
// First, write into the buffer uncompressed | |
// NOTE: the destination buffer is used as a "support" buffer while we pass data into the compressor. | |
// At the end we will replace it's content with the final compressed data | |
final int amountRead = source.read(destination, offset, length); | |
boolean endOfInputReached = -1 == amountRead || 0 == amountRead; | |
if (endOfInputReached && compressorFinished && compressedBytesStream.size() == 0) { | |
// If the EOI was reached, the compressor has already finished and the there was nothing else compressed, it's time to quit it | |
return -1; | |
} | |
// If any amount of bytes was read, we need to compress it | |
if (amountRead > 0) { | |
// Update the position along the source | |
positionInSource += amountRead; | |
// Write what we just read into the Compressor | |
compressorOutputStream.write(destination, offset, amountRead); | |
} | |
// If we are done reading the Stream, we need to finish it (i.e. let it flush and append the compression footer) | |
if (endOfInputReached) { | |
compressorOutputStream.finish(); | |
compressorFinished = true; | |
} | |
} while (compressedBytesStream.size() == 0); //< Keep reading until some compressed data were flushed | |
// Copy the compressed data into the destination | |
if (compressedBytesStream.length() > destination.length) { | |
// Copy compressed bytes to destination, but don't overflow it | |
System.arraycopy(compressedBytesStream.byteArray(), 0, destination, offset, destination.length); | |
// Remember up to where we read the compressed bytes so we can finish returning those in the next call | |
preCompressedBytesOffset = destination.length; | |
preCompressedBytesLength = compressedBytesStream.length(); | |
return destination.length; | |
} else { | |
// Copy ALL compressed bytes on the destination | |
System.arraycopy(compressedBytesStream.byteArray(), 0, destination, offset, compressedBytesStream.length()); | |
// Zero-out the bytes not used by the final compressed data | |
for (int i = offset + compressedBytesStream.length(); i < offset + length; ++i) { | |
destination[i] = 0x0; | |
} | |
return compressedBytesStream.length(); | |
} | |
} finally { | |
compressedBytesStream.reset(); | |
} | |
} | |
@Override | |
public int available() throws IOException { | |
return calculateAvailable(); | |
} | |
@Override | |
public void close() throws IOException { | |
source.close(); | |
compressorOutputStream.close(); | |
} | |
private int calculateAvailable() { | |
return sourceSize - positionInSource; | |
} | |
/** | |
* Allows access to the buffer in ByteArrayOutputStream without unnecessary copying | |
*/ | |
private static final class ExposedByteArrayOutputStream extends ByteArrayOutputStream { | |
ExposedByteArrayOutputStream(int expectedInputSize) { | |
super(expectedInputSize); | |
} | |
byte[] byteArray() { | |
return buf; | |
} | |
int length() { | |
return count; | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.commons.compress.compressors.xz.XZCompressorInputStream; | |
import org.apache.commons.lang3.RandomStringUtils; | |
import org.junit.Test; | |
import org.slf4j.Logger; | |
import java.io.*; | |
import java.nio.file.Files; | |
import java.nio.file.Path; | |
import java.nio.file.StandardCopyOption; | |
import java.nio.file.StandardOpenOption; | |
import static org.junit.Assert.*; | |
public class XZCompressorInputStreamAdapterTest { | |
private static final Logger LOG = Loggers.build(); | |
@Test | |
public void bla() throws IOException { | |
final Path temp = Files.createTempFile("uncompressed", null); | |
final Path tempCompressed = Files.createTempFile("compressed", null); | |
final Path copy = Files.createTempFile("uncompressed-copy", null); | |
try { | |
final InputStream tempIn = Files.newInputStream(temp); | |
Files.write(temp, RandomStringUtils.randomAlphanumeric(100 * 1024 * 1024).getBytes(), StandardOpenOption.TRUNCATE_EXISTING); | |
LOG.info("Created uncompressed file of {} bytes: {}", Files.size(temp), temp); | |
final XZCompressorInputStreamAdapter compressedTempIn = new XZCompressorInputStreamAdapter(tempIn); | |
Files.copy(compressedTempIn, tempCompressed, StandardCopyOption.REPLACE_EXISTING); | |
LOG.info("Created compressed file of {} bytes: {}", Files.size(tempCompressed), tempCompressed); | |
compressedTempIn.close(); | |
final XZCompressorInputStream compressedIn = new XZCompressorInputStream(new BufferedInputStream(Files.newInputStream(tempCompressed))); | |
final OutputStream out = Files.newOutputStream(copy); | |
final byte[] buffer = new byte[1024 * 8]; | |
int n; | |
while (-1 != (n = compressedIn.read(buffer))) { | |
out.write(buffer, 0, n); | |
} | |
out.close(); | |
compressedIn.close(); | |
LOG.info("Created unmpressed file of {} bytes: {}", Files.size(copy), copy); | |
assertArrayEquals(Files.readAllBytes(temp), Files.readAllBytes(copy)); | |
} finally { | |
Files.deleteIfExists(temp); | |
Files.deleteIfExists(tempCompressed); | |
Files.deleteIfExists(copy); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment