Skip to content

Instantly share code, notes, and snippets.

@L-Briand
Created April 28, 2025 08:25
Show Gist options
  • Save L-Briand/d189089e7a3ef2c2033cd34cd9773cb3 to your computer and use it in GitHub Desktop.
Save L-Briand/d189089e7a3ef2c2033cd34cd9773cb3 to your computer and use it in GitHub Desktop.
Basic input and output stream from a Channel<ByteArray>
import kotlinx.coroutines.DelicateCoroutinesApi
import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import java.io.InputStream
class ChannelInputStream(val channel: ReceiveChannel<ByteArray>) : InputStream() {
private var buffer: ByteArray? = null
private var bufferIndex = 0
private val readLock = Mutex()
private inline fun <T> withLock(crossinline block: suspend () -> T) = runBlocking { readLock.withLock { block() } }
@OptIn(DelicateCoroutinesApi::class)
private suspend fun getBuffer(): ByteArray? {
if (channel.isClosedForReceive) return null
val buffer = this.buffer ?: run {
this.buffer = channel.receive()
bufferIndex = 0
return this.buffer
}
if (bufferIndex >= buffer.size) {
this.buffer = null
return getBuffer()
}
return buffer
}
override fun read(): Int = withLock {
val buffer = getBuffer() ?: return@withLock -1
val result = buffer[bufferIndex]
bufferIndex++
unsignedByteToInt(result)
}
override fun read(b: ByteArray?, off: Int, len: Int): Int {
if (b == null) {
throw NullPointerException("Received byte array is null")
} else if (off < 0 || len < 0 || len > b.size - off) {
throw IndexOutOfBoundsException("off: $off, len: $len, b.length: ${b.size}")
} else if (len == 0) {
return 0
}
return withLock {
val buffer = getBuffer() ?: return@withLock -1
val bytesToRead = minOf(len, buffer.size - bufferIndex)
buffer.copyInto(b, off, bufferIndex, bufferIndex + bytesToRead)
bufferIndex += bytesToRead
bytesToRead
}
}
}
import kotlinx.coroutines.channels.SendChannel
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import java.io.OutputStream
class ChannelOutputStream(val channel: SendChannel<ByteArray>) : OutputStream() {
private val mutex = Mutex()
private fun withLock(block: suspend () -> Unit) = runBlocking { mutex.withLock { block() } }
override fun write(b: Int) = withLock {
channel.send(byteArrayOf((b and 0xFF).toByte()))
}
override fun write(b: ByteArray?) {
val bytes = b ?: return
if (bytes.isEmpty()) return
withLock { channel.send(bytes) }
}
override fun write(b: ByteArray?, off: Int, len: Int) {
val bytes = b ?: return
if (len == 0) return
if (off < 0 || len < 0 || len > b.size - off) {
throw IndexOutOfBoundsException("off: $off, len: $len, b.length: ${b.size}")
}
withLock {
if (off == 0 && len == bytes.size) channel.send(bytes)
else channel.send(bytes.copyOfRange(off, off + len))
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment