Skip to content

Instantly share code, notes, and snippets.

@kougazhang
Created September 1, 2021 05:50
Show Gist options
  • Save kougazhang/e7731d986224ced084337154a7b8e3a1 to your computer and use it in GitHub Desktop.
Save kougazhang/e7731d986224ced084337154a7b8e3a1 to your computer and use it in GitHub Desktop.
#java #flink
import org.apache.flink.api.java.io.TextOutputFormat;
import org.apache.flink.core.fs.Path;
import java.io.IOException;
public class GZIPOutputFormat<T> extends TextOutputFormat<T> {
public GZIPOutputFormat(Path outputPath) {
super(outputPath);
}
@Override
public void open(int taskNumber, int numTasks) throws IOException {
super.open(taskNumber, numTasks);
// wrap gzip
this.stream = new GZIPWrappedStream(this.stream, 4096, false);
}
}
@kougazhang
Copy link
Author

import org.apache.flink.core.fs.FSDataOutputStream;

import java.io.IOException;
import java.util.zip.GZIPOutputStream;

public class GZIPWrappedStream extends FSDataOutputStream {

    private GZIPOutputStream stream;
    private FSDataOutputStream original;

    public GZIPWrappedStream(FSDataOutputStream original, int size, boolean syncFlush) throws IOException {
        this.stream = new GZIPOutputStream(original, size, syncFlush);
        this.original = original;
    }

    @Override
    public void write(final int b) throws IOException {
        this.stream.write(b);
    }

    @Override
    public void write(final byte[] b, final int off, final int len) throws IOException {
        this.stream.write(b, off, len);
    }

    @Override
    public void close() throws IOException {
        this.stream.close();
    }


    @Override
    public void flush() throws IOException {
        this.stream.flush();
    }

    @Override
    public void sync() throws IOException {
        this.original.sync();
    }

    @Override
    public long getPos() throws IOException {
        return this.original.getPos();
    }
}

@kougazhang
Copy link
Author

// usage
sorted.write(new GZIPOutputFormat<>(new Path(jobKey.dest_path)), jobKey.dest_path, FileSystem.WriteMode.OVERWRITE);

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment