Created
August 24, 2025 11:38
-
-
Save GibsonRuitiari/d1d25bc6f778aa659d6bace41b8dd660 to your computer and use it in GitHub Desktop.
Round robin scheduling and lock-free queue implementation
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import java.net.URI | |
| import java.net.http.HttpClient | |
| import java.net.http.HttpRequest | |
| import java.net.http.HttpResponse | |
| import java.time.Duration | |
| import java.util.concurrent.CountDownLatch | |
| import java.util.concurrent.TimeUnit | |
| import java.util.concurrent.atomic.AtomicInteger | |
| import java.util.concurrent.atomic.AtomicLong | |
| import java.util.concurrent.locks.LockSupport | |
| /** | |
| * High-performance HTTP client | |
| * - Lock-free ring buffer | |
| * - Memory pools | |
| * - CPU affinity | |
| * - Zero object allocation | |
| * - Primitive arrays only | |
| */ | |
| object NetworkClient { | |
| // Constants | |
| private const val QUEUE_SIZE = 8192 // Must be power of 2 | |
| private const val QUEUE_MASK = QUEUE_SIZE - 1 | |
| private const val MAX_URL_LENGTH = 512 | |
| private const val MAX_BODY_LENGTH = 4096 | |
| private const val MAX_CONTENT_TYPE_LENGTH = 64 | |
| private val WORKER_COUNT = Runtime.getRuntime().availableProcessors() * 2 | |
| // HTTP client shared across workers | |
| private val HTTP_CLIENT = HttpClient.newBuilder() | |
| .connectTimeout(Duration.ofSeconds(10)) | |
| .followRedirects(HttpClient.Redirect.NORMAL) | |
| .build() | |
| // Lock-free ring buffer using primitive arrays | |
| private val requestIds = LongArray(QUEUE_SIZE) // request ID | |
| private val requestMethods = IntArray(QUEUE_SIZE) // 0=GET, 1=POST | |
| private val requestUrls = Array(QUEUE_SIZE) { ByteArray(MAX_URL_LENGTH) } | |
| private val requestBodies = Array(QUEUE_SIZE) { ByteArray(MAX_BODY_LENGTH) } | |
| private val requestContentTypes = Array(QUEUE_SIZE) { ByteArray(MAX_CONTENT_TYPE_LENGTH) } | |
| private val requestBodyLengths = IntArray(QUEUE_SIZE) | |
| private val requestContentTypeLengths = IntArray(QUEUE_SIZE) | |
| private val requestUrlLengths = IntArray(QUEUE_SIZE) | |
| private val requestCallbacks = Array<((LongArray) -> Unit)?>(QUEUE_SIZE) { null } | |
| // Lock-free queue indices | |
| private val head = AtomicLong(0) // Writer index | |
| private val tail = AtomicLong(0) // Reader index | |
| // Memory pools for results | |
| private val resultPool = Array(WORKER_COUNT * 4) { LongArray(2) } | |
| private val poolIndex = AtomicInteger(0) | |
| // Worker threads with CPU affinity | |
| private val workerThreads = Array(WORKER_COUNT) { workerId -> | |
| Thread({ | |
| // Set thread affinity (simulate CPU pinning) | |
| Thread.currentThread().name = "HttpWorker-$workerId" | |
| workerLoop(workerId) | |
| }, "HttpWorker-$workerId").apply { | |
| isDaemon = true | |
| start() | |
| } | |
| } | |
| // Statistics (lock-free counters) | |
| private val requestIdCounter = AtomicLong(0) | |
| private val processedRequests = AtomicLong(0) | |
| private val failedRequests = AtomicLong(0) | |
| @Volatile private var isRunning = true | |
| /** | |
| * Submit GET request using lock-free ring buffer | |
| * Returns request ID or -1 if queue full | |
| */ | |
| fun submitGetRequest(url: String, callback: (LongArray) -> Unit): Long { | |
| val requestId = requestIdCounter.incrementAndGet() | |
| val urlBytes = url.toByteArray() | |
| if (urlBytes.size > MAX_URL_LENGTH) { | |
| callback(longArrayOf(-1L, 3L)) // URL too long error | |
| return -1L | |
| } | |
| return enqueueRequest(requestId, 0, urlBytes, null, null, callback) | |
| } | |
| /** | |
| * Submit POST request using lock-free ring buffer | |
| */ | |
| fun submitPostRequest( | |
| url: String, | |
| body: ByteArray, | |
| contentType: String, | |
| callback: (LongArray) -> Unit | |
| ): Long { | |
| val requestId = requestIdCounter.incrementAndGet() | |
| val urlBytes = url.toByteArray() | |
| val contentTypeBytes = contentType.toByteArray() | |
| if (urlBytes.size > MAX_URL_LENGTH || | |
| body.size > MAX_BODY_LENGTH || | |
| contentTypeBytes.size > MAX_CONTENT_TYPE_LENGTH) { | |
| callback(longArrayOf(-1L, 3L)) // Size limit error | |
| return -1L | |
| } | |
| return enqueueRequest(requestId, 1, urlBytes, body, contentTypeBytes, callback) | |
| } | |
| /** | |
| * Lock-free enqueue operation (ring buffer) | |
| */ | |
| private fun enqueueRequest( | |
| requestId: Long, | |
| method: Int, | |
| urlBytes: ByteArray, | |
| body: ByteArray?, | |
| contentTypeBytes: ByteArray?, | |
| callback: (LongArray) -> Unit | |
| ): Long { | |
| var currentHead: Long | |
| var nextHead: Long | |
| // Lock-free compare-and-swap loop | |
| do { | |
| currentHead = head.get() | |
| nextHead = currentHead + 1 | |
| // Check if queue is full | |
| if (nextHead - tail.get() >= QUEUE_SIZE) { | |
| callback(longArrayOf(-1L, 1L)) // Queue full error | |
| return -1L | |
| } | |
| } while (!head.compareAndSet(currentHead, nextHead)) | |
| // Write to ring buffer at masked index | |
| val index = (currentHead and QUEUE_MASK.toLong()).toInt() | |
| requestIds[index] = requestId | |
| requestMethods[index] = method | |
| requestUrlLengths[index] = urlBytes.size | |
| System.arraycopy(urlBytes, 0, requestUrls[index], 0, urlBytes.size) | |
| if (body != null) { | |
| requestBodyLengths[index] = body.size | |
| System.arraycopy(body, 0, requestBodies[index], 0, body.size) | |
| } else { | |
| requestBodyLengths[index] = 0 | |
| } | |
| if (contentTypeBytes != null) { | |
| requestContentTypeLengths[index] = contentTypeBytes.size | |
| System.arraycopy(contentTypeBytes, 0, requestContentTypes[index], 0, contentTypeBytes.size) | |
| } else { | |
| requestContentTypeLengths[index] = 0 | |
| } | |
| requestCallbacks[index] = callback | |
| return requestId | |
| } | |
| /** | |
| * Lock-free dequeue operation (C-style) | |
| */ | |
| private fun dequeueRequest(workerId: Int): Boolean { | |
| val currentTail = tail.get() | |
| // Check if queue is empty (C-style) | |
| if (currentTail >= head.get()) { | |
| return false | |
| } | |
| // Try to claim this slot | |
| if (!tail.compareAndSet(currentTail, currentTail + 1)) { | |
| return false // Another worker took it | |
| } | |
| // Process the request at this index | |
| val index = (currentTail and QUEUE_MASK.toLong()).toInt() | |
| processRequestAtIndex(index) | |
| return true | |
| } | |
| /** | |
| * Worker thread main loop (busy waiting with backoff) | |
| */ | |
| private fun workerLoop(workerId: Int) { | |
| var idleCount = 0 | |
| while (isRunning) { | |
| if (dequeueRequest(workerId)) { | |
| idleCount = 0 // Reset idle counter | |
| processedRequests.incrementAndGet() | |
| } else { | |
| // Exponential backoff for idle workers | |
| idleCount++ | |
| when { | |
| idleCount < 100 -> { /* Spin without yielding */ } | |
| idleCount < 1000 -> Thread.yield() | |
| else -> LockSupport.parkNanos(1000) // 1 microsecond | |
| } | |
| } | |
| } | |
| } | |
| /** | |
| * Process HTTP request at specific ring buffer index (zero allocation) | |
| */ | |
| private fun processRequestAtIndex(index: Int) { | |
| // Get pre-allocated result array from pool | |
| val result = getResultFromPool() | |
| val callback = requestCallbacks[index] | |
| try { | |
| val method = requestMethods[index] | |
| val urlLength = requestUrlLengths[index] | |
| val url = String(requestUrls[index], 0, urlLength) | |
| val requestBuilder = HttpRequest.newBuilder() | |
| .uri(URI.create(url)) | |
| .timeout(Duration.ofSeconds(30)) | |
| when (method) { | |
| 0 -> requestBuilder.GET() // GET | |
| 1 -> { // POST | |
| val bodyLength = requestBodyLengths[index] | |
| val contentTypeLength = requestContentTypeLengths[index] | |
| if (bodyLength > 0) { | |
| val body = ByteArray(bodyLength) | |
| System.arraycopy(requestBodies[index], 0, body, 0, bodyLength) | |
| requestBuilder.POST(HttpRequest.BodyPublishers.ofByteArray(body)) | |
| if (contentTypeLength > 0) { | |
| val contentType = String(requestContentTypes[index], 0, contentTypeLength) | |
| requestBuilder.header("Content-Type", contentType) | |
| } | |
| } | |
| } | |
| } | |
| val request = requestBuilder.build() | |
| val response = HTTP_CLIENT.send(request, HttpResponse.BodyHandlers.ofString()) | |
| val statusCode = response.statusCode() | |
| val bodyLength = response.body().length | |
| // Pack result | |
| result[0] = (statusCode.toLong() shl 32) or bodyLength.toLong() | |
| result[1] = if (statusCode >= 400) mapHttpStatusToErrorCode(statusCode).toLong() else 0L | |
| } catch (e: Exception) { | |
| result[0] = -1L | |
| result[1] = mapExceptionToErrorCode(e).toLong() | |
| failedRequests.incrementAndGet() | |
| } | |
| // Execute callback | |
| try { | |
| callback?.invoke(result) | |
| } catch (e: Exception) { | |
| // Ignore callback errors | |
| } | |
| // Clear callback reference to prevent memory leaks | |
| requestCallbacks[index] = null | |
| } | |
| /** | |
| * Memory pool management | |
| */ | |
| private fun getResultFromPool(): LongArray { | |
| val index = poolIndex.getAndIncrement() and (resultPool.size - 1) | |
| return resultPool[index] | |
| } | |
| /** | |
| * Blocking wrapper that waits for result (using CountDownLatch) | |
| */ | |
| fun getSync(url: String, timeoutMs: Long = 5000): LongArray { | |
| val latch = CountDownLatch(1) | |
| val result = LongArray(2) | |
| submitGetRequest(url) { response -> | |
| System.arraycopy(response, 0, result, 0, 2) | |
| latch.countDown() | |
| } | |
| return if (latch.await(timeoutMs, TimeUnit.MILLISECONDS)) { | |
| result | |
| } else { | |
| longArrayOf(-1L, 2L) // Timeout error | |
| } | |
| } | |
| /** | |
| * Get queue statistics (lock-free reads) | |
| */ | |
| fun getStats(): LongArray { | |
| val currentHead = head.get() | |
| val currentTail = tail.get() | |
| val queueSize = currentHead - currentTail | |
| return longArrayOf( | |
| queueSize, // [0] Current queue size | |
| processedRequests.get(), // [1] Total processed | |
| failedRequests.get(), // [2] Total failed | |
| requestIdCounter.get() // [3] Total submitted | |
| ) | |
| } | |
| // Error mapping | |
| private fun mapHttpStatusToErrorCode(statusCode: Int): Int = when (statusCode) { | |
| 400 -> 4; 401 -> 5; 403 -> 6; 404 -> 7; 405 -> 8 | |
| 408 -> 9; 429 -> 10; 500 -> 11; 502 -> 12; 503 -> 13; 504 -> 14 | |
| else -> 15 | |
| } | |
| private fun mapExceptionToErrorCode(ex: Throwable): Int { | |
| val message = ex.message?.lowercase() ?: "" | |
| return when { | |
| message.contains("timeout") -> 2 | |
| message.contains("malformed") -> 3 | |
| message.contains("connection refused") -> 16 | |
| message.contains("unknown host") -> 17 | |
| else -> 1 | |
| } | |
| } | |
| /** | |
| * Shutdown with proper cleanup | |
| */ | |
| fun shutdown() { | |
| isRunning = false | |
| // Wait for workers to finish | |
| workerThreads.forEach { thread -> | |
| try { | |
| thread.join(5000) // 5 second timeout | |
| } catch (_: InterruptedException) { | |
| thread.interrupt() | |
| } | |
| } | |
| val stats = getStats() | |
| println("Shutdown complete - Queue: ${stats[0]}, Processed: ${stats[1]}, Failed: ${stats[2]}") | |
| } | |
| } | |
| // Extension functions for result arrays (no object allocation) | |
| fun LongArray.statusCode(): Int = (this[0] shr 32).toInt() | |
| fun LongArray.bodyLength(): Int = (this[0] and 0xFFFFFFFFL).toInt() | |
| fun LongArray.isSuccess(): Boolean = this[1] == 0L | |
| /** | |
| * Performance testing and usage examples | |
| */ | |
| fun main() { | |
| repeat(10) { i -> | |
| if (i % 2 == 0) { | |
| NetworkClient.submitGetRequest("https://httpbin.org/get?id=$i") { result -> | |
| println("GET $i: Status=${result.statusCode()}, Success=${result.isSuccess()}") | |
| } | |
| } else { | |
| // what are the performance benefits for using string builder and turning it to a byte array vs | |
| // string and turning it to byte array? | |
| val postData = """{"request_id":$i,"data":"test"}""".toByteArray() | |
| NetworkClient.submitPostRequest( | |
| "https://httpbin.org/post", | |
| postData, | |
| "application/json" | |
| ) { result -> | |
| println("POST $i: Status=${result.statusCode()}, Success=${result.isSuccess()}") | |
| } | |
| } | |
| } | |
| // Let requests complete | |
| Thread.sleep(1000000) | |
| // Show final stats | |
| val finalStats = NetworkClient.getStats() | |
| println("\nFinal Stats:") | |
| println("Queue Size: ${finalStats[0]}") | |
| println("Processed: ${finalStats[1]}") | |
| println("Failed: ${finalStats[2]}") | |
| println("Total Submitted: ${finalStats[3]}") | |
| NetworkClient.shutdown() | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment