Skip to content

Instantly share code, notes, and snippets.

@dain
Created June 21, 2011 22:32
Show Gist options
  • Save dain/1039101 to your computer and use it in GitHub Desktop.
Save dain/1039101 to your computer and use it in GitHub Desktop.
Hadoop CompressionCodec for com.ning.compress-lzf
import com.ning.compress.lzf.LZFInputStream;
import com.ning.compress.lzf.LZFOutputStream;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionInputStream;
import org.apache.hadoop.io.compress.CompressionOutputStream;
import org.apache.hadoop.io.compress.Compressor;
import org.apache.hadoop.io.compress.Decompressor;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
public class LZFCodec implements CompressionCodec
{
@Override
public CompressionOutputStream createOutputStream(OutputStream out)
throws IOException
{
return new LZFCompressionOutputStream(out);
}
@Override
public CompressionOutputStream createOutputStream(OutputStream out, Compressor compressor)
throws IOException
{
return createOutputStream(out);
}
@Override
public Class<? extends Compressor> getCompressorType()
{
return LZFDummyCompressor.class;
}
@Override
public LZFDummyCompressor createCompressor()
{
return new LZFDummyCompressor();
}
@Override
public CompressionInputStream createInputStream(InputStream in)
throws IOException
{
throw new UnsupportedOperationException();
}
@Override
public CompressionInputStream createInputStream(InputStream in, Decompressor decompressor)
throws IOException
{
throw new UnsupportedOperationException();
}
@Override
public Class<? extends Decompressor> getDecompressorType()
{
return LZFDummyDecompressor.class;
}
@Override
public Decompressor createDecompressor()
{
return new LZFDummyDecompressor();
}
@Override
public String getDefaultExtension()
{
return ".lzf";
}
private static class LZFCompressionInputStream extends CompressionInputStream
{
private LZFInputStream input;
public LZFCompressionInputStream(InputStream in)
throws IOException
{
super(in);
input = new LZFInputStream(super.in);
}
public void close()
throws IOException
{
input.close();
}
public int read(byte[] b, int off, int len)
throws IOException
{
return input.read(b, off, len);
}
public void resetState()
throws IOException
{
input = new LZFInputStream(super.in);
}
public int read()
throws IOException
{
return input.read();
}
}
private static class LZFCompressionOutputStream extends CompressionOutputStream
{
private final LZFOutputStream output;
private boolean needsFlush;
public LZFCompressionOutputStream(OutputStream out)
throws IOException
{
super(out);
output = new LZFOutputStream(out);
}
public void finish()
throws IOException
{
output.flush();
needsFlush = true;
}
private void flushIfNeeded()
throws IOException
{
if (needsFlush) {
if (output != null) {
output.flush();
}
needsFlush = false;
}
}
public void resetState()
throws IOException
{
// Cannot write to out at this point because out might not be ready
// yet, as in SequenceFile.Writer implementation.
needsFlush = true;
}
public void write(int b)
throws IOException
{
flushIfNeeded();
output.write(b);
}
public void write(byte[] b, int off, int len)
throws IOException
{
flushIfNeeded();
output.write(b, off, len);
}
public void close()
throws IOException
{
flushIfNeeded();
output.flush();
output.close();
needsFlush = true;
}
}
public static class LZFDummyCompressor implements Compressor
{
@Override
public void setInput(byte[] b, int off, int len)
{
throw new UnsupportedOperationException();
}
@Override
public boolean needsInput()
{
throw new UnsupportedOperationException();
}
@Override
public void setDictionary(byte[] b, int off, int len)
{
throw new UnsupportedOperationException();
}
@Override
public long getBytesRead()
{
throw new UnsupportedOperationException();
}
@Override
public long getBytesWritten()
{
throw new UnsupportedOperationException();
}
@Override
public void finish()
{
throw new UnsupportedOperationException();
}
@Override
public boolean finished()
{
throw new UnsupportedOperationException();
}
@Override
public int compress(byte[] b, int off, int len)
throws IOException
{
throw new UnsupportedOperationException();
}
@Override
public void reset()
{
throw new UnsupportedOperationException();
}
@Override
public void end()
{
throw new UnsupportedOperationException();
}
}
public static class LZFDummyDecompressor implements Decompressor
{
@Override
public void setInput(byte[] b, int off, int len)
{
throw new UnsupportedOperationException();
}
@Override
public boolean needsInput()
{
throw new UnsupportedOperationException();
}
@Override
public void setDictionary(byte[] b, int off, int len)
{
throw new UnsupportedOperationException();
}
@Override
public boolean needsDictionary()
{
throw new UnsupportedOperationException();
}
@Override
public boolean finished()
{
throw new UnsupportedOperationException();
}
@Override
public int decompress(byte[] b, int off, int len)
throws IOException
{
throw new UnsupportedOperationException();
}
@Override
public void reset()
{
throw new UnsupportedOperationException();
}
@Override
public void end()
{
throw new UnsupportedOperationException();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment