Created
September 7, 2016 13:32
-
-
Save pyldin601/a528d6a525295bfc11ba7f81b6c544e4 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package biz.radioteria.flow.stream | |
import java.io.IOException | |
import java.io.OutputStream | |
import java.util.concurrent.LinkedBlockingQueue | |
import java.util.concurrent.ThreadPoolExecutor | |
import java.util.concurrent.TimeUnit | |
class NonblockingOutputStream(private val outputStream: OutputStream) : OutputStream() { | |
val QUEUE_SIZE = 128 | |
val threadPoolExecutor: ThreadPoolExecutor = | |
ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, LinkedBlockingQueue(QUEUE_SIZE)) | |
override fun write(b: Int) { | |
blockingAction { | |
outputStream.write(b) | |
} | |
} | |
override fun write(b: ByteArray) { | |
blockingAction { | |
outputStream.write(b) | |
} | |
} | |
override fun write(b: ByteArray, off: Int, len: Int) { | |
blockingAction { | |
outputStream.write(b, off, len) | |
} | |
} | |
override fun flush() { | |
blockingAction { | |
outputStream.flush() | |
} | |
} | |
override fun close() { | |
blockingAction { | |
outputStream.close() | |
} | |
} | |
fun isBlocked(): Boolean { | |
return threadPoolExecutor.queue.remainingCapacity() == 0 | |
} | |
private fun blockingAction(block: () -> Unit) { | |
if (isBlocked()) { | |
throw IOException("Queue size exceeded limit ($QUEUE_SIZE)") | |
} | |
threadPoolExecutor.submit { | |
block.invoke() | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment