Skip to content

Instantly share code, notes, and snippets.

@GibsonRuitiari
Created August 24, 2025 11:38
Show Gist options
  • Save GibsonRuitiari/d1d25bc6f778aa659d6bace41b8dd660 to your computer and use it in GitHub Desktop.
Save GibsonRuitiari/d1d25bc6f778aa659d6bace41b8dd660 to your computer and use it in GitHub Desktop.
Round robin scheduling and lock-free queue implementation
import java.net.URI
import java.net.http.HttpClient
import java.net.http.HttpRequest
import java.net.http.HttpResponse
import java.time.Duration
import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.atomic.AtomicLong
import java.util.concurrent.locks.LockSupport
/**
* High-performance HTTP client
* - Lock-free ring buffer
* - Memory pools
* - CPU affinity
* - Zero object allocation
* - Primitive arrays only
*/
object NetworkClient {
// Constants
private const val QUEUE_SIZE = 8192 // Must be power of 2
private const val QUEUE_MASK = QUEUE_SIZE - 1
private const val MAX_URL_LENGTH = 512
private const val MAX_BODY_LENGTH = 4096
private const val MAX_CONTENT_TYPE_LENGTH = 64
private val WORKER_COUNT = Runtime.getRuntime().availableProcessors() * 2
// HTTP client shared across workers
private val HTTP_CLIENT = HttpClient.newBuilder()
.connectTimeout(Duration.ofSeconds(10))
.followRedirects(HttpClient.Redirect.NORMAL)
.build()
// Lock-free ring buffer using primitive arrays
private val requestIds = LongArray(QUEUE_SIZE) // request ID
private val requestMethods = IntArray(QUEUE_SIZE) // 0=GET, 1=POST
private val requestUrls = Array(QUEUE_SIZE) { ByteArray(MAX_URL_LENGTH) }
private val requestBodies = Array(QUEUE_SIZE) { ByteArray(MAX_BODY_LENGTH) }
private val requestContentTypes = Array(QUEUE_SIZE) { ByteArray(MAX_CONTENT_TYPE_LENGTH) }
private val requestBodyLengths = IntArray(QUEUE_SIZE)
private val requestContentTypeLengths = IntArray(QUEUE_SIZE)
private val requestUrlLengths = IntArray(QUEUE_SIZE)
private val requestCallbacks = Array<((LongArray) -> Unit)?>(QUEUE_SIZE) { null }
// Lock-free queue indices
private val head = AtomicLong(0) // Writer index
private val tail = AtomicLong(0) // Reader index
// Memory pools for results
private val resultPool = Array(WORKER_COUNT * 4) { LongArray(2) }
private val poolIndex = AtomicInteger(0)
// Worker threads with CPU affinity
private val workerThreads = Array(WORKER_COUNT) { workerId ->
Thread({
// Set thread affinity (simulate CPU pinning)
Thread.currentThread().name = "HttpWorker-$workerId"
workerLoop(workerId)
}, "HttpWorker-$workerId").apply {
isDaemon = true
start()
}
}
// Statistics (lock-free counters)
private val requestIdCounter = AtomicLong(0)
private val processedRequests = AtomicLong(0)
private val failedRequests = AtomicLong(0)
@Volatile private var isRunning = true
/**
* Submit GET request using lock-free ring buffer
* Returns request ID or -1 if queue full
*/
fun submitGetRequest(url: String, callback: (LongArray) -> Unit): Long {
val requestId = requestIdCounter.incrementAndGet()
val urlBytes = url.toByteArray()
if (urlBytes.size > MAX_URL_LENGTH) {
callback(longArrayOf(-1L, 3L)) // URL too long error
return -1L
}
return enqueueRequest(requestId, 0, urlBytes, null, null, callback)
}
/**
* Submit POST request using lock-free ring buffer
*/
fun submitPostRequest(
url: String,
body: ByteArray,
contentType: String,
callback: (LongArray) -> Unit
): Long {
val requestId = requestIdCounter.incrementAndGet()
val urlBytes = url.toByteArray()
val contentTypeBytes = contentType.toByteArray()
if (urlBytes.size > MAX_URL_LENGTH ||
body.size > MAX_BODY_LENGTH ||
contentTypeBytes.size > MAX_CONTENT_TYPE_LENGTH) {
callback(longArrayOf(-1L, 3L)) // Size limit error
return -1L
}
return enqueueRequest(requestId, 1, urlBytes, body, contentTypeBytes, callback)
}
/**
* Lock-free enqueue operation (ring buffer)
*/
private fun enqueueRequest(
requestId: Long,
method: Int,
urlBytes: ByteArray,
body: ByteArray?,
contentTypeBytes: ByteArray?,
callback: (LongArray) -> Unit
): Long {
var currentHead: Long
var nextHead: Long
// Lock-free compare-and-swap loop
do {
currentHead = head.get()
nextHead = currentHead + 1
// Check if queue is full
if (nextHead - tail.get() >= QUEUE_SIZE) {
callback(longArrayOf(-1L, 1L)) // Queue full error
return -1L
}
} while (!head.compareAndSet(currentHead, nextHead))
// Write to ring buffer at masked index
val index = (currentHead and QUEUE_MASK.toLong()).toInt()
requestIds[index] = requestId
requestMethods[index] = method
requestUrlLengths[index] = urlBytes.size
System.arraycopy(urlBytes, 0, requestUrls[index], 0, urlBytes.size)
if (body != null) {
requestBodyLengths[index] = body.size
System.arraycopy(body, 0, requestBodies[index], 0, body.size)
} else {
requestBodyLengths[index] = 0
}
if (contentTypeBytes != null) {
requestContentTypeLengths[index] = contentTypeBytes.size
System.arraycopy(contentTypeBytes, 0, requestContentTypes[index], 0, contentTypeBytes.size)
} else {
requestContentTypeLengths[index] = 0
}
requestCallbacks[index] = callback
return requestId
}
/**
* Lock-free dequeue operation (C-style)
*/
private fun dequeueRequest(workerId: Int): Boolean {
val currentTail = tail.get()
// Check if queue is empty (C-style)
if (currentTail >= head.get()) {
return false
}
// Try to claim this slot
if (!tail.compareAndSet(currentTail, currentTail + 1)) {
return false // Another worker took it
}
// Process the request at this index
val index = (currentTail and QUEUE_MASK.toLong()).toInt()
processRequestAtIndex(index)
return true
}
/**
* Worker thread main loop (busy waiting with backoff)
*/
private fun workerLoop(workerId: Int) {
var idleCount = 0
while (isRunning) {
if (dequeueRequest(workerId)) {
idleCount = 0 // Reset idle counter
processedRequests.incrementAndGet()
} else {
// Exponential backoff for idle workers
idleCount++
when {
idleCount < 100 -> { /* Spin without yielding */ }
idleCount < 1000 -> Thread.yield()
else -> LockSupport.parkNanos(1000) // 1 microsecond
}
}
}
}
/**
* Process HTTP request at specific ring buffer index (zero allocation)
*/
private fun processRequestAtIndex(index: Int) {
// Get pre-allocated result array from pool
val result = getResultFromPool()
val callback = requestCallbacks[index]
try {
val method = requestMethods[index]
val urlLength = requestUrlLengths[index]
val url = String(requestUrls[index], 0, urlLength)
val requestBuilder = HttpRequest.newBuilder()
.uri(URI.create(url))
.timeout(Duration.ofSeconds(30))
when (method) {
0 -> requestBuilder.GET() // GET
1 -> { // POST
val bodyLength = requestBodyLengths[index]
val contentTypeLength = requestContentTypeLengths[index]
if (bodyLength > 0) {
val body = ByteArray(bodyLength)
System.arraycopy(requestBodies[index], 0, body, 0, bodyLength)
requestBuilder.POST(HttpRequest.BodyPublishers.ofByteArray(body))
if (contentTypeLength > 0) {
val contentType = String(requestContentTypes[index], 0, contentTypeLength)
requestBuilder.header("Content-Type", contentType)
}
}
}
}
val request = requestBuilder.build()
val response = HTTP_CLIENT.send(request, HttpResponse.BodyHandlers.ofString())
val statusCode = response.statusCode()
val bodyLength = response.body().length
// Pack result
result[0] = (statusCode.toLong() shl 32) or bodyLength.toLong()
result[1] = if (statusCode >= 400) mapHttpStatusToErrorCode(statusCode).toLong() else 0L
} catch (e: Exception) {
result[0] = -1L
result[1] = mapExceptionToErrorCode(e).toLong()
failedRequests.incrementAndGet()
}
// Execute callback
try {
callback?.invoke(result)
} catch (e: Exception) {
// Ignore callback errors
}
// Clear callback reference to prevent memory leaks
requestCallbacks[index] = null
}
/**
* Memory pool management
*/
private fun getResultFromPool(): LongArray {
val index = poolIndex.getAndIncrement() and (resultPool.size - 1)
return resultPool[index]
}
/**
* Blocking wrapper that waits for result (using CountDownLatch)
*/
fun getSync(url: String, timeoutMs: Long = 5000): LongArray {
val latch = CountDownLatch(1)
val result = LongArray(2)
submitGetRequest(url) { response ->
System.arraycopy(response, 0, result, 0, 2)
latch.countDown()
}
return if (latch.await(timeoutMs, TimeUnit.MILLISECONDS)) {
result
} else {
longArrayOf(-1L, 2L) // Timeout error
}
}
/**
* Get queue statistics (lock-free reads)
*/
fun getStats(): LongArray {
val currentHead = head.get()
val currentTail = tail.get()
val queueSize = currentHead - currentTail
return longArrayOf(
queueSize, // [0] Current queue size
processedRequests.get(), // [1] Total processed
failedRequests.get(), // [2] Total failed
requestIdCounter.get() // [3] Total submitted
)
}
// Error mapping
private fun mapHttpStatusToErrorCode(statusCode: Int): Int = when (statusCode) {
400 -> 4; 401 -> 5; 403 -> 6; 404 -> 7; 405 -> 8
408 -> 9; 429 -> 10; 500 -> 11; 502 -> 12; 503 -> 13; 504 -> 14
else -> 15
}
private fun mapExceptionToErrorCode(ex: Throwable): Int {
val message = ex.message?.lowercase() ?: ""
return when {
message.contains("timeout") -> 2
message.contains("malformed") -> 3
message.contains("connection refused") -> 16
message.contains("unknown host") -> 17
else -> 1
}
}
/**
* Shutdown with proper cleanup
*/
fun shutdown() {
isRunning = false
// Wait for workers to finish
workerThreads.forEach { thread ->
try {
thread.join(5000) // 5 second timeout
} catch (_: InterruptedException) {
thread.interrupt()
}
}
val stats = getStats()
println("Shutdown complete - Queue: ${stats[0]}, Processed: ${stats[1]}, Failed: ${stats[2]}")
}
}
// Extension functions for result arrays (no object allocation)
fun LongArray.statusCode(): Int = (this[0] shr 32).toInt()
fun LongArray.bodyLength(): Int = (this[0] and 0xFFFFFFFFL).toInt()
fun LongArray.isSuccess(): Boolean = this[1] == 0L
/**
* Performance testing and usage examples
*/
fun main() {
repeat(10) { i ->
if (i % 2 == 0) {
NetworkClient.submitGetRequest("https://httpbin.org/get?id=$i") { result ->
println("GET $i: Status=${result.statusCode()}, Success=${result.isSuccess()}")
}
} else {
// what are the performance benefits for using string builder and turning it to a byte array vs
// string and turning it to byte array?
val postData = """{"request_id":$i,"data":"test"}""".toByteArray()
NetworkClient.submitPostRequest(
"https://httpbin.org/post",
postData,
"application/json"
) { result ->
println("POST $i: Status=${result.statusCode()}, Success=${result.isSuccess()}")
}
}
}
// Let requests complete
Thread.sleep(1000000)
// Show final stats
val finalStats = NetworkClient.getStats()
println("\nFinal Stats:")
println("Queue Size: ${finalStats[0]}")
println("Processed: ${finalStats[1]}")
println("Failed: ${finalStats[2]}")
println("Total Submitted: ${finalStats[3]}")
NetworkClient.shutdown()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment