Skip to content

Instantly share code, notes, and snippets.

@Daenyth
Last active July 11, 2024 20:05
Show Gist options
  • Save Daenyth/720f357a47f04e779992b21f0c6f32f5 to your computer and use it in GitHub Desktop.
Save Daenyth/720f357a47f04e779992b21f0c6f32f5 to your computer and use it in GitHub Desktop.
OutputStreamSteam [fs2 0.10] - write into a java.io.OutputStream, read from fs2.Stream.
import java.io.OutputStream
import java.util.concurrent.Executors
import cats.effect.{Async, Effect, IO, Timer}
import cats.implicits._
import fs2.async.mutable.Queue
import fs2.{Chunk, Stream}
import scala.annotation.tailrec
import scala.concurrent.{ExecutionContext, SyncVar}
/**
* Provides an [[java.io.OutputStream OutputStream]] where all bytes written to the [[java.io.OutputStream#write write]]
* methods will be emitted in the fs2 stream from [[OutputStreamStream#stream stream]]
*
* When maxSize is set [[java.io.OutputStream#write write]] will block until there is room. Consume elements
* from [[OutputStreamStream#stream stream]] concurrently to make room available.
*
* The chunk size determines how big the fs2 Stream chunks are, rather than doing work for every single byte going through.
*
* Callers should call [[java.io.OutputStream#close close]] to terminate the fs2 stream;
* if this is not done, the stream will run forever.
*
* @param maxSize The maximum amount of byte chunks to allow to be buffered at once.
* If unset, this will be unbounded, with unbounded memory usage possible.
* @param chunkSize The size of chunks to put into the fs2 stream
* @param ec The context used to manage the internal buffer's concurrency.
* Can be as low as one thread as long as `write` is called from some other thread.
*/
class OutputStreamStream[F[_]](
maxSize: Option[Int],
chunkSize: Int,
)(
implicit ec: ExecutionContext,
F: Effect[F]
) extends OutputStream {
private var bufferedChunk: Vector[Byte] = Vector.empty
private val queue: Queue[F, Option[Vector[Byte]]] = unsafeRunSync(
maxSize
.fold(Queue.unbounded[F, Option[Vector[Byte]]])(size =>
Queue.bounded[F, Option[Vector[Byte]]](size)))
/** Gets an fs2 stream which will emit each byte sent via OutputStream#write. This should not be passed
* to multiple consumers; the likely outcome will be each consumer getting a subset
* of bytes, with neither getting the full data set. */
val stream: Stream[F, Vector[Byte]] = queue.dequeueAvailable.unNoneTerminate
@tailrec
private def addChunk(newChunk: Vector[Byte]): Unit = {
val newChunkSize = newChunk.size
val bufferedChunkSize = bufferedChunk.size
val spaceLeftInTheBuffer = chunkSize - bufferedChunkSize
if (newChunkSize > spaceLeftInTheBuffer) {
// Not enough space in the buffer to contain whole new chunk.
// Recursively slice and enqueue chunk
// in order to preserve chunk size.
bufferedChunk = bufferedChunk ++ newChunk.take(spaceLeftInTheBuffer)
flushBuffer()
addChunk(newChunk.drop(spaceLeftInTheBuffer))
} else {
// There is enough space in the buffer for whole new chunk
bufferedChunk = bufferedChunk ++ newChunk
}
}
private def flushBuffer(): Unit = {
enqueueChunkSync(Some(bufferedChunk))
bufferedChunk = Vector.empty
}
private def enqueueChunkSync(a: Option[Vector[Byte]]): Unit =
unsafeRunSync(queue.enqueue1(a))
override def write(bytes: Array[Byte]): Unit =
addChunk(Vector(bytes: _*))
override def write(bytes: Array[Byte], off: Int, len: Int): Unit =
addChunk(Chunk.bytes(bytes, off, len).toVector)
override def write(b: Int): Unit =
addChunk(Vector(b.toByte))
/** Flushes any incomplete byte chunks into the fs2 Stream */
override def flush(): Unit = flushBuffer()
/** Terminates the fs2 Stream provided by `OutputStreamStream#stream`,
* flushing any incomplete byte chunks before doing so. */
override def close(): Unit = {
flush()
enqueueChunkSync(None)
}
private def unsafeRunSync[A](fa: F[A]): A = {
val done = new SyncVar[Either[Throwable, A]]
F.runAsync(fa)(r => IO(done.put(r))).unsafeRunSync
done.get.fold(throw _, identity)
}
}
object OutputStreamStream {
/**
* Take a function that emits to an [[java.io.OutputStream OutputStream]] effectfully,
* and return a stream which, when run, will perform that function and emit
* the bytes recorded in the OutputStream as an fs2.Stream
*
* The stream produced by this will terminate if:
* - `f` returns
* - `f` calls `OutputStream#close`
* - The sink that you compose the stream with terminates.
*
* If none of those happens, the stream will run forever.
*
* `f` will be run in a new thread to prevent deadlocks.
*/
def withOutputStream[F[_]](
f: OutputStream => F[_],
maxSize: Option[Int],
chunkSize: Int
)(
implicit ec: ExecutionContext,
F: Effect[F],
timer: Timer[F]
): Stream[F, Vector[Byte]] =
Stream.bracket(F.delay(Executors.newFixedThreadPool(1)))(
use = p =>
withOutputStream[F](
f,
maxSize,
chunkSize,
ExecutionContext.fromExecutor(p)
),
release = p => F.delay(p.shutdown))
/**
* Take a function that emits to an [[java.io.OutputStream OutputStream]] effectfully,
* and return a stream which, when run, will perform that function and emit
* the bytes recorded in the OutputStream as an fs2.Stream
*
* The stream produced by this will terminate if:
* - `f` returns
* - `f` calls `OutputStream#close`
* - The sink that you compose the stream with terminates.
*
* If none of those happens, the stream will run forever.
*
* Like `withOutputStream(f, maxSize, chunkSize)` except instead of spawning
* a new thread, use an explicit thread pool
*/
def withOutputStream[F[_]](
f: OutputStream => F[_],
maxSize: Option[Int],
chunkSize: Int,
blocking: ExecutionContext
)(
implicit ec: ExecutionContext,
F: Effect[F],
timer: Timer[F]
): Stream[F, Vector[Byte]] =
Stream
.eval(F.delay(new OutputStreamStream(maxSize, chunkSize)))
.flatMap { oss =>
val write: F[Unit] =
Async.shift[F](blocking) *>
f(oss) *>
F.delay(oss.close()) *>
Async.shift[F](ec)
oss.stream.concurrently(Stream.eval(write))
}
}
import java.io.OutputStream
import java.util.concurrent.Executors
import cats.effect.IO
import cats.syntax.apply._
import fs2.Stream
import org.scalatest.{FunSpec, Matchers, OptionValues}
import scala.concurrent.ExecutionContext
import scala.concurrent.ExecutionContext.global
import scala.concurrent.duration._
class OutputStreamStreamTest extends FunSpec with Matchers with OptionValues {
implicit val ec: ExecutionContext = global
def runToFlatVec[A](s: Stream[IO, Vector[A]]): Vector[A] =
runToVec(s).flatten
def runToVec[A](s: Stream[IO, A]): Vector[A] =
s.compile.toVector.unsafeRunSync()
val chunkSize = 16
def newOSS = new OutputStreamStream[IO](maxSize = None, chunkSize = chunkSize)
describe("OutputStreamStream") {
it("reads written bytes") {
val oss = newOSS
oss.write(1)
oss.flush()
runToFlatVec(oss.stream.take(1)) shouldEqual Vector(1.toByte)
}
it("finishes when closed") {
val oss = newOSS
oss.write(1)
oss.close()
runToFlatVec(oss.stream) shouldEqual Vector(1.toByte)
}
it("preserves order of encoded bytes") {
val oss = newOSS
val s = "тℯḯкα"
oss.write(s.getBytes("UTF-8"))
oss.close()
runToVec(
oss.stream
.flatMap(Stream.emits(_).covary[IO])
.through(fs2.text.utf8Decode)
).mkString shouldEqual s
}
}
describe("withOutputStream[IO]") {
it("writes data and terminates when `f` returns") {
OutputStreamStream
.withOutputStream[IO]((os: OutputStream) => IO(os.write(1)),
maxSize = None,
chunkSize = chunkSize)
.compile
.toVector
.unsafeRunSync()
.flatten shouldEqual Vector(1.toByte)
}
it("can be manually closed from inside `f`") {
OutputStreamStream
.withOutputStream[IO]((os: OutputStream) => IO(os.close()) *> IO.never,
maxSize = None,
chunkSize = chunkSize)
.compile
.toVector
.unsafeRunSync()
.flatten shouldEqual Vector.empty
}
it("fails when `f` fails") {
val e = new Exception("boom")
OutputStreamStream
.withOutputStream[IO]((_: OutputStream) => IO.raiseError(e),
maxSize = None,
chunkSize = chunkSize)
.compile
.toVector
.attempt
.unsafeRunSync() shouldBe Left(e)
}
it("Doesn't deadlock") {
val oneThread: ExecutionContext =
ExecutionContext.fromExecutor(Executors.newFixedThreadPool(1))
def write(os: OutputStream): IO[Unit] = IO {
os.write(1)
os.write(1)
os.write(1)
os.write(1)
os.write(1)
os.write(1)
}
val s = OutputStreamStream
.withOutputStream[IO](write _, maxSize = Some(1), chunkSize = 1)(
oneThread,
implicitly,
implicitly)
.take(5)
val result =
withClue(".value failing means that it timed out due to deadlock") {
s.compile.toVector
.unsafeRunTimed(1.second)
.value
.flatten
}
result should have size 5
}
}
}
@Daenyth
Copy link
Author

Daenyth commented Sep 22, 2021

Note: This is obsolete, it's now built into the library as readOutputStream in fs2-io

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment