Last active
July 11, 2024 20:05
-
-
Save Daenyth/720f357a47f04e779992b21f0c6f32f5 to your computer and use it in GitHub Desktop.
OutputStreamSteam [fs2 0.10] - write into a java.io.OutputStream, read from fs2.Stream.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.io.OutputStream | |
import java.util.concurrent.Executors | |
import cats.effect.{Async, Effect, IO, Timer} | |
import cats.implicits._ | |
import fs2.async.mutable.Queue | |
import fs2.{Chunk, Stream} | |
import scala.annotation.tailrec | |
import scala.concurrent.{ExecutionContext, SyncVar} | |
/** | |
* Provides an [[java.io.OutputStream OutputStream]] where all bytes written to the [[java.io.OutputStream#write write]] | |
* methods will be emitted in the fs2 stream from [[OutputStreamStream#stream stream]] | |
* | |
* When maxSize is set [[java.io.OutputStream#write write]] will block until there is room. Consume elements | |
* from [[OutputStreamStream#stream stream]] concurrently to make room available. | |
* | |
* The chunk size determines how big the fs2 Stream chunks are, rather than doing work for every single byte going through. | |
* | |
* Callers should call [[java.io.OutputStream#close close]] to terminate the fs2 stream; | |
* if this is not done, the stream will run forever. | |
* | |
* @param maxSize The maximum amount of byte chunks to allow to be buffered at once. | |
* If unset, this will be unbounded, with unbounded memory usage possible. | |
* @param chunkSize The size of chunks to put into the fs2 stream | |
* @param ec The context used to manage the internal buffer's concurrency. | |
* Can be as low as one thread as long as `write` is called from some other thread. | |
*/ | |
class OutputStreamStream[F[_]]( | |
maxSize: Option[Int], | |
chunkSize: Int, | |
)( | |
implicit ec: ExecutionContext, | |
F: Effect[F] | |
) extends OutputStream { | |
private var bufferedChunk: Vector[Byte] = Vector.empty | |
private val queue: Queue[F, Option[Vector[Byte]]] = unsafeRunSync( | |
maxSize | |
.fold(Queue.unbounded[F, Option[Vector[Byte]]])(size => | |
Queue.bounded[F, Option[Vector[Byte]]](size))) | |
/** Gets an fs2 stream which will emit each byte sent via OutputStream#write. This should not be passed | |
* to multiple consumers; the likely outcome will be each consumer getting a subset | |
* of bytes, with neither getting the full data set. */ | |
val stream: Stream[F, Vector[Byte]] = queue.dequeueAvailable.unNoneTerminate | |
@tailrec | |
private def addChunk(newChunk: Vector[Byte]): Unit = { | |
val newChunkSize = newChunk.size | |
val bufferedChunkSize = bufferedChunk.size | |
val spaceLeftInTheBuffer = chunkSize - bufferedChunkSize | |
if (newChunkSize > spaceLeftInTheBuffer) { | |
// Not enough space in the buffer to contain whole new chunk. | |
// Recursively slice and enqueue chunk | |
// in order to preserve chunk size. | |
bufferedChunk = bufferedChunk ++ newChunk.take(spaceLeftInTheBuffer) | |
flushBuffer() | |
addChunk(newChunk.drop(spaceLeftInTheBuffer)) | |
} else { | |
// There is enough space in the buffer for whole new chunk | |
bufferedChunk = bufferedChunk ++ newChunk | |
} | |
} | |
private def flushBuffer(): Unit = { | |
enqueueChunkSync(Some(bufferedChunk)) | |
bufferedChunk = Vector.empty | |
} | |
private def enqueueChunkSync(a: Option[Vector[Byte]]): Unit = | |
unsafeRunSync(queue.enqueue1(a)) | |
override def write(bytes: Array[Byte]): Unit = | |
addChunk(Vector(bytes: _*)) | |
override def write(bytes: Array[Byte], off: Int, len: Int): Unit = | |
addChunk(Chunk.bytes(bytes, off, len).toVector) | |
override def write(b: Int): Unit = | |
addChunk(Vector(b.toByte)) | |
/** Flushes any incomplete byte chunks into the fs2 Stream */ | |
override def flush(): Unit = flushBuffer() | |
/** Terminates the fs2 Stream provided by `OutputStreamStream#stream`, | |
* flushing any incomplete byte chunks before doing so. */ | |
override def close(): Unit = { | |
flush() | |
enqueueChunkSync(None) | |
} | |
private def unsafeRunSync[A](fa: F[A]): A = { | |
val done = new SyncVar[Either[Throwable, A]] | |
F.runAsync(fa)(r => IO(done.put(r))).unsafeRunSync | |
done.get.fold(throw _, identity) | |
} | |
} | |
object OutputStreamStream { | |
/** | |
* Take a function that emits to an [[java.io.OutputStream OutputStream]] effectfully, | |
* and return a stream which, when run, will perform that function and emit | |
* the bytes recorded in the OutputStream as an fs2.Stream | |
* | |
* The stream produced by this will terminate if: | |
* - `f` returns | |
* - `f` calls `OutputStream#close` | |
* - The sink that you compose the stream with terminates. | |
* | |
* If none of those happens, the stream will run forever. | |
* | |
* `f` will be run in a new thread to prevent deadlocks. | |
*/ | |
def withOutputStream[F[_]]( | |
f: OutputStream => F[_], | |
maxSize: Option[Int], | |
chunkSize: Int | |
)( | |
implicit ec: ExecutionContext, | |
F: Effect[F], | |
timer: Timer[F] | |
): Stream[F, Vector[Byte]] = | |
Stream.bracket(F.delay(Executors.newFixedThreadPool(1)))( | |
use = p => | |
withOutputStream[F]( | |
f, | |
maxSize, | |
chunkSize, | |
ExecutionContext.fromExecutor(p) | |
), | |
release = p => F.delay(p.shutdown)) | |
/** | |
* Take a function that emits to an [[java.io.OutputStream OutputStream]] effectfully, | |
* and return a stream which, when run, will perform that function and emit | |
* the bytes recorded in the OutputStream as an fs2.Stream | |
* | |
* The stream produced by this will terminate if: | |
* - `f` returns | |
* - `f` calls `OutputStream#close` | |
* - The sink that you compose the stream with terminates. | |
* | |
* If none of those happens, the stream will run forever. | |
* | |
* Like `withOutputStream(f, maxSize, chunkSize)` except instead of spawning | |
* a new thread, use an explicit thread pool | |
*/ | |
def withOutputStream[F[_]]( | |
f: OutputStream => F[_], | |
maxSize: Option[Int], | |
chunkSize: Int, | |
blocking: ExecutionContext | |
)( | |
implicit ec: ExecutionContext, | |
F: Effect[F], | |
timer: Timer[F] | |
): Stream[F, Vector[Byte]] = | |
Stream | |
.eval(F.delay(new OutputStreamStream(maxSize, chunkSize))) | |
.flatMap { oss => | |
val write: F[Unit] = | |
Async.shift[F](blocking) *> | |
f(oss) *> | |
F.delay(oss.close()) *> | |
Async.shift[F](ec) | |
oss.stream.concurrently(Stream.eval(write)) | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.io.OutputStream | |
import java.util.concurrent.Executors | |
import cats.effect.IO | |
import cats.syntax.apply._ | |
import fs2.Stream | |
import org.scalatest.{FunSpec, Matchers, OptionValues} | |
import scala.concurrent.ExecutionContext | |
import scala.concurrent.ExecutionContext.global | |
import scala.concurrent.duration._ | |
class OutputStreamStreamTest extends FunSpec with Matchers with OptionValues { | |
implicit val ec: ExecutionContext = global | |
def runToFlatVec[A](s: Stream[IO, Vector[A]]): Vector[A] = | |
runToVec(s).flatten | |
def runToVec[A](s: Stream[IO, A]): Vector[A] = | |
s.compile.toVector.unsafeRunSync() | |
val chunkSize = 16 | |
def newOSS = new OutputStreamStream[IO](maxSize = None, chunkSize = chunkSize) | |
describe("OutputStreamStream") { | |
it("reads written bytes") { | |
val oss = newOSS | |
oss.write(1) | |
oss.flush() | |
runToFlatVec(oss.stream.take(1)) shouldEqual Vector(1.toByte) | |
} | |
it("finishes when closed") { | |
val oss = newOSS | |
oss.write(1) | |
oss.close() | |
runToFlatVec(oss.stream) shouldEqual Vector(1.toByte) | |
} | |
it("preserves order of encoded bytes") { | |
val oss = newOSS | |
val s = "тℯḯкα" | |
oss.write(s.getBytes("UTF-8")) | |
oss.close() | |
runToVec( | |
oss.stream | |
.flatMap(Stream.emits(_).covary[IO]) | |
.through(fs2.text.utf8Decode) | |
).mkString shouldEqual s | |
} | |
} | |
describe("withOutputStream[IO]") { | |
it("writes data and terminates when `f` returns") { | |
OutputStreamStream | |
.withOutputStream[IO]((os: OutputStream) => IO(os.write(1)), | |
maxSize = None, | |
chunkSize = chunkSize) | |
.compile | |
.toVector | |
.unsafeRunSync() | |
.flatten shouldEqual Vector(1.toByte) | |
} | |
it("can be manually closed from inside `f`") { | |
OutputStreamStream | |
.withOutputStream[IO]((os: OutputStream) => IO(os.close()) *> IO.never, | |
maxSize = None, | |
chunkSize = chunkSize) | |
.compile | |
.toVector | |
.unsafeRunSync() | |
.flatten shouldEqual Vector.empty | |
} | |
it("fails when `f` fails") { | |
val e = new Exception("boom") | |
OutputStreamStream | |
.withOutputStream[IO]((_: OutputStream) => IO.raiseError(e), | |
maxSize = None, | |
chunkSize = chunkSize) | |
.compile | |
.toVector | |
.attempt | |
.unsafeRunSync() shouldBe Left(e) | |
} | |
it("Doesn't deadlock") { | |
val oneThread: ExecutionContext = | |
ExecutionContext.fromExecutor(Executors.newFixedThreadPool(1)) | |
def write(os: OutputStream): IO[Unit] = IO { | |
os.write(1) | |
os.write(1) | |
os.write(1) | |
os.write(1) | |
os.write(1) | |
os.write(1) | |
} | |
val s = OutputStreamStream | |
.withOutputStream[IO](write _, maxSize = Some(1), chunkSize = 1)( | |
oneThread, | |
implicitly, | |
implicitly) | |
.take(5) | |
val result = | |
withClue(".value failing means that it timed out due to deadlock") { | |
s.compile.toVector | |
.unsafeRunTimed(1.second) | |
.value | |
.flatten | |
} | |
result should have size 5 | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Note: This is obsolete, it's now built into the library as
readOutputStream
in fs2-io