Created
October 31, 2018 16:04
-
-
Save fmoraesmeli/9d39f9696b3920ac7c822499cef7cbd4 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import lombok.AccessLevel; | |
import lombok.EqualsAndHashCode; | |
import lombok.Getter; | |
import lombok.RequiredArgsConstructor; | |
import lombok.Value; | |
import lombok.extern.slf4j.Slf4j; | |
import org.reactivestreams.Publisher; | |
import org.springframework.core.io.buffer.DataBuffer; | |
import org.springframework.core.io.buffer.DefaultDataBufferFactory; | |
import reactor.core.publisher.Flux; | |
import reactor.core.publisher.Mono; | |
import reactor.core.publisher.SynchronousSink; | |
import java.io.IOException; | |
import java.io.OutputStream; | |
import java.util.zip.ZipEntry; | |
import java.util.zip.ZipOutputStream; | |
@Slf4j | |
public final class Zipper { | |
private static final Integer DEFAULT_PACKET_MINIMUM_SIZE = 24 * 1024 * 8; | |
private final DefaultDataBufferFactory dataBufferFactory = new DefaultDataBufferFactory(); | |
private final WrappedDataBufferOutputStream wrappedDataBufferOutputStream = | |
new WrappedDataBufferOutputStream(dataBufferFactory.allocateBuffer()); | |
private final ZipOutputStream zos = new ZipOutputStream(wrappedDataBufferOutputStream); | |
private final Integer packetMinimumSize; | |
@Getter | |
private final Flux<DataBuffer> zipStream; | |
public Zipper(final Integer packetMinimumSize, final Publisher<Entry> fileStream) { | |
this.packetMinimumSize = packetMinimumSize; | |
this.zipStream = Flux.from(fileStream) | |
.switchIfEmpty(Flux.error(new EmptySourceException())) | |
.handle(this::processEntry) | |
.concatWith(lastOutputMono()); | |
} | |
public Zipper(final Publisher<Entry> fileStream) { | |
this(DEFAULT_PACKET_MINIMUM_SIZE, fileStream); | |
} | |
private void processEntry(final Entry entry, final SynchronousSink<DataBuffer> sink) { | |
writeEntryBytes(entry); | |
if(wrappedDataBufferOutputStream.getCurrentDataBuffer().readableByteCount() >= packetMinimumSize) { | |
sink.next(getOutput()); | |
} | |
} | |
private DataBuffer getOutput() { | |
final DataBuffer dataBuffer = wrappedDataBufferOutputStream.getCurrentDataBuffer(); | |
wrappedDataBufferOutputStream.newDataBuffer(dataBufferFactory.allocateBuffer()); | |
return dataBuffer; | |
} | |
private void writeEntryBytes(final Entry entry) { | |
final ZipEntry zipEntry = new ZipEntry(entry.name); | |
uncheckedRun(() -> { | |
zos.putNextEntry(zipEntry); | |
zos.write(entry.content); | |
zos.closeEntry(); | |
}); | |
log.debug("written entry: {}", entry.name); | |
} | |
private Mono<DataBuffer> lastOutputMono() { | |
return Mono.fromSupplier(() -> { | |
uncheckedRun(zos::close); | |
return wrappedDataBufferOutputStream.getCurrentDataBuffer(); | |
}); | |
} | |
private void uncheckedRun(final CheckedIORunnable fn) { | |
try { | |
fn.run(); | |
} catch (IOException io) { | |
throw new ZipIOException(io); | |
} | |
} | |
@FunctionalInterface | |
private interface CheckedIORunnable { | |
void run() throws IOException; | |
} | |
@EqualsAndHashCode(callSuper = true) | |
private static final class WrappedDataBufferOutputStream extends OutputStream { | |
@Getter | |
private DataBuffer currentDataBuffer; | |
private OutputStream dataBufferOutputStream; | |
private WrappedDataBufferOutputStream(final DataBuffer dataBuffer) { | |
newDataBuffer(dataBuffer); | |
} | |
@Override | |
public void write(int b) throws IOException { | |
dataBufferOutputStream.write(b); | |
} | |
private void newDataBuffer(final DataBuffer dataBuffer) { | |
this.currentDataBuffer = dataBuffer; | |
this.dataBufferOutputStream = currentDataBuffer.asOutputStream(); | |
} | |
} | |
@Value | |
public static final class Entry { | |
private final String name; | |
private final byte[] content; | |
} | |
@EqualsAndHashCode(callSuper = true) | |
@RequiredArgsConstructor(access = AccessLevel.PRIVATE) | |
public static final class ZipIOException extends RuntimeException { | |
@Getter | |
private final IOException inner; | |
} | |
public static final class EmptySourceException extends RuntimeException {} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment