Skip to content

Instantly share code, notes, and snippets.

@arturaz
Created June 9, 2020 20:44
Show Gist options
  • Save arturaz/b340c2d8f221377c996e5084fd7b736b to your computer and use it in GitHub Desktop.
Save arturaz/b340c2d8f221377c996e5084fd7b736b to your computer and use it in GitHub Desktop.
Turn a Vertx file into Scala reactor flux which reads bytes upon demand
trait FileProps {
def size: Long
}
val vfs: FileSystem = ???
def readFlux(path: Path): Future[(FileProps, SFlux[Buffer])] =
vfs.propsFuture(path.toString).map { vertxFileProps =>
val props = new FileProps {
override def size = vertxFileProps.size()
}
val flux = SMono
.fromFuture(vfs.openFuture(path.toString, OpenOptions().setRead(true)))
.flatMapMany { file =>
SFlux.create[Buffer](sink => {
var read = 0L
sink.onRequest { bytesLong =>
val bytes = bytesLong.toInt
file.readFuture(Buffer.buffer(bytes), 0, read, bytes).onComplete {
case util.Failure(exception) =>
sink.error(exception)
case util.Success(buffer) =>
val bytesRead = buffer.length()
read += bytesRead
if (bytesRead == 0) {
file.close()
sink.complete()
}
else sink.next(buffer)
}
}
}, OverflowStrategy.ERROR)
}
(props, flux)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment