Created
April 28, 2025 08:25
-
-
Save L-Briand/d189089e7a3ef2c2033cd34cd9773cb3 to your computer and use it in GitHub Desktop.
Basic input and output stream from a Channel<ByteArray>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import kotlinx.coroutines.DelicateCoroutinesApi | |
import kotlinx.coroutines.channels.ReceiveChannel | |
import kotlinx.coroutines.runBlocking | |
import kotlinx.coroutines.sync.Mutex | |
import kotlinx.coroutines.sync.withLock | |
import java.io.InputStream | |
class ChannelInputStream(val channel: ReceiveChannel<ByteArray>) : InputStream() { | |
private var buffer: ByteArray? = null | |
private var bufferIndex = 0 | |
private val readLock = Mutex() | |
private inline fun <T> withLock(crossinline block: suspend () -> T) = runBlocking { readLock.withLock { block() } } | |
@OptIn(DelicateCoroutinesApi::class) | |
private suspend fun getBuffer(): ByteArray? { | |
if (channel.isClosedForReceive) return null | |
val buffer = this.buffer ?: run { | |
this.buffer = channel.receive() | |
bufferIndex = 0 | |
return this.buffer | |
} | |
if (bufferIndex >= buffer.size) { | |
this.buffer = null | |
return getBuffer() | |
} | |
return buffer | |
} | |
override fun read(): Int = withLock { | |
val buffer = getBuffer() ?: return@withLock -1 | |
val result = buffer[bufferIndex] | |
bufferIndex++ | |
unsignedByteToInt(result) | |
} | |
override fun read(b: ByteArray?, off: Int, len: Int): Int { | |
if (b == null) { | |
throw NullPointerException("Received byte array is null") | |
} else if (off < 0 || len < 0 || len > b.size - off) { | |
throw IndexOutOfBoundsException("off: $off, len: $len, b.length: ${b.size}") | |
} else if (len == 0) { | |
return 0 | |
} | |
return withLock { | |
val buffer = getBuffer() ?: return@withLock -1 | |
val bytesToRead = minOf(len, buffer.size - bufferIndex) | |
buffer.copyInto(b, off, bufferIndex, bufferIndex + bytesToRead) | |
bufferIndex += bytesToRead | |
bytesToRead | |
} | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import kotlinx.coroutines.channels.SendChannel | |
import kotlinx.coroutines.runBlocking | |
import kotlinx.coroutines.sync.Mutex | |
import kotlinx.coroutines.sync.withLock | |
import java.io.OutputStream | |
class ChannelOutputStream(val channel: SendChannel<ByteArray>) : OutputStream() { | |
private val mutex = Mutex() | |
private fun withLock(block: suspend () -> Unit) = runBlocking { mutex.withLock { block() } } | |
override fun write(b: Int) = withLock { | |
channel.send(byteArrayOf((b and 0xFF).toByte())) | |
} | |
override fun write(b: ByteArray?) { | |
val bytes = b ?: return | |
if (bytes.isEmpty()) return | |
withLock { channel.send(bytes) } | |
} | |
override fun write(b: ByteArray?, off: Int, len: Int) { | |
val bytes = b ?: return | |
if (len == 0) return | |
if (off < 0 || len < 0 || len > b.size - off) { | |
throw IndexOutOfBoundsException("off: $off, len: $len, b.length: ${b.size}") | |
} | |
withLock { | |
if (off == 0 && len == bytes.size) channel.send(bytes) | |
else channel.send(bytes.copyOfRange(off, off + len)) | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment