Created
June 9, 2020 20:44
-
-
Save arturaz/b340c2d8f221377c996e5084fd7b736b to your computer and use it in GitHub Desktop.
Turn a Vertx file into Scala reactor flux which reads bytes upon demand
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
trait FileProps { | |
def size: Long | |
} | |
val vfs: FileSystem = ??? | |
def readFlux(path: Path): Future[(FileProps, SFlux[Buffer])] = | |
vfs.propsFuture(path.toString).map { vertxFileProps => | |
val props = new FileProps { | |
override def size = vertxFileProps.size() | |
} | |
val flux = SMono | |
.fromFuture(vfs.openFuture(path.toString, OpenOptions().setRead(true))) | |
.flatMapMany { file => | |
SFlux.create[Buffer](sink => { | |
var read = 0L | |
sink.onRequest { bytesLong => | |
val bytes = bytesLong.toInt | |
file.readFuture(Buffer.buffer(bytes), 0, read, bytes).onComplete { | |
case util.Failure(exception) => | |
sink.error(exception) | |
case util.Success(buffer) => | |
val bytesRead = buffer.length() | |
read += bytesRead | |
if (bytesRead == 0) { | |
file.close() | |
sink.complete() | |
} | |
else sink.next(buffer) | |
} | |
} | |
}, OverflowStrategy.ERROR) | |
} | |
(props, flux) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment