Created
October 30, 2018 11:55
-
-
Save fmoraesmeli/b5f01b951725f54e98c46a99b8ebf849 to your computer and use it in GitHub Desktop.
Utility Class to Stream zip bytes using Reactor Core Publisher
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import lombok.AccessLevel; | |
import lombok.EqualsAndHashCode; | |
import lombok.Getter; | |
import lombok.RequiredArgsConstructor; | |
import lombok.Value; | |
import org.reactivestreams.Publisher; | |
import reactor.core.publisher.Flux; | |
import reactor.core.publisher.Mono; | |
import reactor.core.publisher.SynchronousSink; | |
import java.io.ByteArrayOutputStream; | |
import java.io.IOException; | |
import java.util.zip.ZipEntry; | |
import java.util.zip.ZipOutputStream; | |
public final class Zipper { | |
private static final int SIZE_512K = 512 * 1024; | |
private final ByteArrayOutputStream baos = new ByteArrayOutputStream(); | |
private final ZipOutputStream zos = new ZipOutputStream(baos); | |
private final Integer outputPacketSizeThreshold; | |
@Getter | |
private final Flux<byte[]> zipStream; | |
public Zipper(final Integer outputPacketSizeThreshold, final Publisher<Entry> fileStream) { | |
this.outputPacketSizeThreshold = outputPacketSizeThreshold; | |
final Mono<byte[]> lastBytesMono = Mono.fromSupplier(this::getLastBytes); | |
this.zipStream = Flux.from(fileStream).handle(this::processEntry).concatWith(lastBytesMono); | |
} | |
public Zipper(final Publisher<Entry> fileStream) { | |
this(SIZE_512K, fileStream); | |
} | |
private void processEntry(final Entry entry, final SynchronousSink<byte[]> sink) { | |
writeEntryBytes(entry); | |
if(baos.size() >= outputPacketSizeThreshold) { | |
sink.next(getWrittenBytesAndReset()); | |
} | |
} | |
private byte[] getWrittenBytesAndReset() { | |
final byte[] chunk = baos.toByteArray(); | |
baos.reset(); | |
return chunk; | |
} | |
private void writeEntryBytes(final Entry entry) { | |
final ZipEntry zipEntry = new ZipEntry(entry.name); | |
uncheckedRun(() -> { | |
zos.putNextEntry(zipEntry); | |
zos.write(entry.content); | |
zos.closeEntry(); | |
}); | |
} | |
private byte[] getLastBytes() { | |
uncheckedRun(zos::close); | |
return baos.toByteArray(); | |
} | |
private void uncheckedRun(final CheckedIORunnable fn) { | |
try { | |
fn.run(); | |
} catch (IOException io) { | |
throw new ZipIOException(io); | |
} | |
} | |
@FunctionalInterface | |
private interface CheckedIORunnable { | |
void run() throws IOException; | |
} | |
@Value | |
public static final class Entry { | |
private final String name; | |
private final byte[] content; | |
} | |
@EqualsAndHashCode(callSuper = true) | |
@RequiredArgsConstructor(access = AccessLevel.PRIVATE) | |
public static final class ZipIOException extends RuntimeException { | |
@Getter | |
private final IOException inner; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment