Skip to content

Instantly share code, notes, and snippets.

@entzik
Created February 24, 2018 22:53
Show Gist options
  • Save entzik/547586e9952919271be4e9bde0861d60 to your computer and use it in GitHub Desktop.
Save entzik/547586e9952919271be4e9bde0861d60 to your computer and use it in GitHub Desktop.
/**
* tske a {@link FilePart}, transfer it to disk using {@link AsynchronousFileChannel}s and return a {@link Mono} representing the result
*
* @param filePart - the request part containing the file to be saved
* @return a {@link Mono} representing the result of the operation
*/
private Mono<String> saveFile(FilePart filePart) {
LOGGER.info("handling file upload {}", filePart.filename());
// if a file with the same name already exists in a repository, delete and recreate it
final String filename = filePart.filename();
File file = new File(filename);
if (file.exists())
file.delete();
try {
file.createNewFile();
} catch (IOException e) {
return Mono.error(e); // if creating a new file fails return an error
}
try {
// create an async file channel to store the file on disk
final AsynchronousFileChannel fileChannel = AsynchronousFileChannel.open(file.toPath(), StandardOpenOption.WRITE);
final CloseCondition closeCondition = new CloseCondition();
// pointer to the end of file offset
AtomicInteger fileWriteOffset = new AtomicInteger(0);
// error signal
AtomicBoolean errorFlag = new AtomicBoolean(false);
LOGGER.info("subscribing to file parts");
// FilePart.content produces a flux of data buffers, each need to be written to the file
return filePart.content().doOnEach(dataBufferSignal -> {
if (dataBufferSignal.hasValue() && !errorFlag.get()) {
// read data from the incoming data buffer into a file array
DataBuffer dataBuffer = dataBufferSignal.get();
int count = dataBuffer.readableByteCount();
byte[] bytes = new byte[count];
dataBuffer.read(bytes);
// create a file channel compatible byte buffer
final ByteBuffer byteBuffer = ByteBuffer.allocate(count);
byteBuffer.put(bytes);
byteBuffer.flip();
// get the current write offset and increment by the buffer size
final int filePartOffset = fileWriteOffset.getAndAdd(count);
LOGGER.info("processing file part at offset {}", filePartOffset);
// write the buffer to disk
closeCondition.onTaskSubmitted();
fileChannel.write(byteBuffer, filePartOffset, null, new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer result, ByteBuffer attachment) {
// file part successfuly written to disk, clean up
LOGGER.info("done saving file part {}", filePartOffset);
byteBuffer.clear();
if (closeCondition.onTaskCompleted())
try {
LOGGER.info("closing after last part");
fileChannel.close();
} catch (IOException ignored) {
ignored.printStackTrace();
}
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
// there as an error while writing to disk, set an error flag
errorFlag.set(true);
LOGGER.info("error saving file part {}", filePartOffset);
}
});
}
}).doOnComplete(() -> {
// all done, close the file channel
LOGGER.info("done processing file parts");
if (closeCondition.canCloseOnComplete())
try {
LOGGER.info("closing after complete");
fileChannel.close();
} catch (IOException ignored) {
}
}).doOnError(t -> {
// ooops there was an error
LOGGER.info("error processing file parts");
try {
fileChannel.close();
} catch (IOException ignored) {
}
// take last, map to a status string
}).last().map(dataBuffer -> filePart.filename() + " " + (errorFlag.get() ? "error" : "uploaded"));
} catch (IOException e) {
// unable to open the file channel, return an error
LOGGER.info("error opening the file channel");
return Mono.error(e);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment