Created
September 26, 2025 22:54
-
-
Save wch/e46de182e20aa7ee22016300d4c10e8f to your computer and use it in GitHub Desktop.
Key-value store with subscriptions using websockets, in R
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env Rscript | |
| # WebSocket Key-Value Store Server with ID scoping | |
| # | |
| # This server provides a WebSocket-based key-value store with publish/subscribe | |
| # functionality. All operations are scoped by an application ID, allowing multiple | |
| # independent applications to use the same server concurrently. | |
| # | |
| # COMMUNICATION PROTOCOL (JSON): | |
| # ============================== | |
| # | |
| # Client -> Server Messages: | |
| # -------------------------- | |
| # Set a value: | |
| # {"type": "set", "id": "app1", "key": "mykey", "value": <any JSON value>} | |
| # Response: {"type": "value", "id": "app1", "key": "mykey", "value": <value>} | |
| # | |
| # Get a value: | |
| # {"type": "get", "id": "app1", "key": "mykey"} | |
| # Response: {"type": "value", "id": "app1", "key": "mykey", "value": <value or null>} | |
| # | |
| # Delete a value: | |
| # {"type": "delete", "id": "app1", "key": "mykey"} | |
| # Response: {"type": "deleted", "id": "app1", "key": "mykey"} | |
| # | |
| # Subscribe to changes: | |
| # {"type": "subscribe", "id": "app1", "key": "mykey"} | |
| # Response: {"type": "value", "id": "app1", "key": "mykey", "value": <current value if exists>} | |
| # Then receives updates when value changes or is deleted | |
| # | |
| # Unsubscribe from changes: | |
| # {"type": "unsubscribe", "id": "app1", "key": "mykey"} | |
| # No response | |
| # | |
| # Server -> Client Messages: | |
| # -------------------------- | |
| # Value notification (sent to subscribers when value changes): | |
| # {"type": "value", "id": "app1", "key": "mykey", "value": <new value>} | |
| # | |
| # Deletion notification (sent to subscribers when key is deleted): | |
| # {"type": "deleted", "id": "app1", "key": "mykey"} | |
| # | |
| # Error message: | |
| # {"type": "error", "message": "error description"} | |
| # | |
| # FEATURES: | |
| # - All operations are scoped by application ID for multi-tenant support | |
| # - Real-time notifications to all subscribers when values change | |
| # - Automatic cleanup of subscriptions when WebSocket connections close | |
| # - In-memory storage (data is not persisted between server restarts) | |
| # - HTML test client served at http://localhost:8080 | |
| # | |
| # USAGE: | |
| # source("kv_store_server.R") # Server starts automatically | |
| # # Server runs in blocking mode. To stop, press Ctrl+C | |
| library(httpuv) | |
| library(jsonlite) | |
| # Global storage for values and subscriptions | |
| # Structure: values[[id]][[key]] = value | |
| values <- list() | |
| # Track subscriptions per WebSocket connection | |
| # Structure: subscriptions[[ws_id]][[id]][[key]] = TRUE | |
| subscriptions <- list() | |
| # Map WebSocket objects to their IDs | |
| ws_to_id <- new.env(hash = TRUE) | |
| # Counter for WebSocket connection IDs | |
| ws_counter <- 0 | |
| # Create or start the key-value store server | |
| kv_store_server <- function(host = "127.0.0.1", port = 8080) { | |
| app <- list( | |
| call = function(req) { | |
| serve_html_page(req, port) | |
| }, | |
| onWSOpen = function(ws) { | |
| # Assign unique ID to this WebSocket connection | |
| ws_counter <<- ws_counter + 1 | |
| ws_id <- paste0("ws_", ws_counter) | |
| # Store the mapping using the WebSocket object as key | |
| ws_key <- paste0(capture.output(print(ws)), collapse = "") | |
| ws_to_id[[ws_key]] <- ws_id | |
| # Initialize subscription tracking for this connection | |
| subscriptions[[ws_id]] <<- list() | |
| ws$onMessage(function(binary, message) { | |
| handle_ws_message(ws, binary, message) | |
| }) | |
| ws$onClose(function() { | |
| # Get the WebSocket ID from our mapping | |
| ws_key <- paste0(capture.output(print(ws)), collapse = "") | |
| ws_id <- ws_to_id[[ws_key]] | |
| # Clean up subscriptions for this connection | |
| if (!is.null(ws_id) && ws_id %in% names(subscriptions)) { | |
| subscriptions[[ws_id]] <<- NULL | |
| } | |
| # Remove the mapping | |
| rm(list = ws_key, envir = ws_to_id) | |
| }) | |
| } | |
| ) | |
| # Run in blocking mode. For non-blocking, use startServer() | |
| runServer(host, port, app) | |
| # startServer(host, port, app) | |
| } | |
| # Serve the HTML page for WebSocket testing | |
| serve_html_page <- function(req, port) { | |
| wsUrl <- paste0( | |
| "ws://", | |
| ifelse(is.null(req$HTTP_HOST), req$SERVER_NAME, req$HTTP_HOST) | |
| ) | |
| list( | |
| status = 200L, | |
| headers = list('Content-Type' = 'text/html'), | |
| body = sprintf( | |
| ' | |
| <!DOCTYPE html> | |
| <html> | |
| <head> | |
| <title>Key-Value Store WebSocket Client</title> | |
| <style> | |
| body { | |
| font-family: monospace; | |
| padding: 20px; | |
| max-width: 1200px; | |
| margin: 0 auto; | |
| } | |
| .controls-container { | |
| display: grid; | |
| grid-template-columns: 1fr 1fr; | |
| gap: 20px; | |
| margin-bottom: 20px; | |
| } | |
| .panel { | |
| border: 1px solid #ccc; | |
| padding: 10px; | |
| border-radius: 5px; | |
| } | |
| h2 { margin-top: 0; } | |
| input, button { | |
| margin: 5px 0; | |
| padding: 5px; | |
| } | |
| input[type="text"] { | |
| width: 200px; | |
| } | |
| textarea { | |
| width: 100%%; | |
| height: 120px; | |
| font-family: monospace; | |
| } | |
| #output { | |
| background: #f0f0f0; | |
| padding: 10px; | |
| height: 400px; | |
| overflow-y: auto; | |
| white-space: pre-wrap; | |
| } | |
| .message { | |
| margin: 2px 0; | |
| padding: 2px; | |
| } | |
| .sent { color: blue; } | |
| .received { color: green; } | |
| .error { color: red; } | |
| #status { | |
| padding: 10px; | |
| margin-bottom: 10px; | |
| border-radius: 5px; | |
| } | |
| .connected { background: #d4ffd4; } | |
| .disconnected { background: #ffd4d4; } | |
| </style> | |
| </head> | |
| <body> | |
| <h1>Key-Value Store WebSocket Client</h1> | |
| <div id="status" class="disconnected">Disconnected</div> | |
| <div class="controls-container"> | |
| <div class="panel"> | |
| <h2>Controls</h2> | |
| <div> | |
| <label>App ID: <input type="text" id="appId" value="app1" /></label> | |
| </div> | |
| <div> | |
| <label>Key: <input type="text" id="key" value="mykey" /></label> | |
| </div> | |
| <div> | |
| <label>Value: <input type="text" id="value" value="myvalue" /></label> | |
| </div> | |
| <div style="margin-top: 15px;"> | |
| <button onclick="sendSet()">Set</button> | |
| <button onclick="sendGet()">Get</button> | |
| <button onclick="sendDelete()">Delete</button> | |
| </div> | |
| <div style="margin-top: 15px;"> | |
| <button onclick="sendSubscribe()">Subscribe</button> | |
| <button onclick="sendUnsubscribe()">Unsubscribe</button> | |
| </div> | |
| </div> | |
| <div class="panel"> | |
| <h2>Custom JSON Message</h2> | |
| <textarea id="customJson">{ | |
| "type": "get", | |
| "id": "app1", | |
| "key": "mykey" | |
| }</textarea> | |
| <br> | |
| <button onclick="sendCustom()">Send Custom</button> | |
| </div> | |
| </div> | |
| <div class="panel"> | |
| <div style="display: flex; justify-content: space-between; align-items: center;"> | |
| <h2>Messages</h2> | |
| <button onclick="clearOutput()">Clear</button> | |
| </div> | |
| <div id="output"></div> | |
| </div> | |
| <script> | |
| const ws = new WebSocket("%s"); | |
| const output = document.getElementById("output"); | |
| const status = document.getElementById("status"); | |
| function log(message, className) { | |
| const div = document.createElement("div"); | |
| div.className = "message " + className; | |
| div.textContent = new Date().toLocaleTimeString() + " - " + message; | |
| output.appendChild(div); | |
| output.scrollTop = output.scrollHeight; | |
| } | |
| ws.onopen = function() { | |
| status.textContent = "Connected"; | |
| status.className = "connected"; | |
| log("Connected to WebSocket server", "received"); | |
| }; | |
| ws.onmessage = function(event) { | |
| log("Received: " + event.data, "received"); | |
| }; | |
| ws.onerror = function(error) { | |
| log("Error: " + error, "error"); | |
| }; | |
| ws.onclose = function() { | |
| status.textContent = "Disconnected"; | |
| status.className = "disconnected"; | |
| log("Disconnected from server", "error"); | |
| }; | |
| function getInputs() { | |
| return { | |
| id: document.getElementById("appId").value, | |
| key: document.getElementById("key").value, | |
| value: document.getElementById("value").value | |
| }; | |
| } | |
| function sendMessage(msg) { | |
| const json = JSON.stringify(msg); | |
| ws.send(json); | |
| log("Sent: " + json, "sent"); | |
| } | |
| function sendSet() { | |
| const inputs = getInputs(); | |
| sendMessage({ | |
| type: "set", | |
| id: inputs.id, | |
| key: inputs.key, | |
| value: inputs.value | |
| }); | |
| } | |
| function sendGet() { | |
| const inputs = getInputs(); | |
| sendMessage({ | |
| type: "get", | |
| id: inputs.id, | |
| key: inputs.key | |
| }); | |
| } | |
| function sendDelete() { | |
| const inputs = getInputs(); | |
| sendMessage({ | |
| type: "delete", | |
| id: inputs.id, | |
| key: inputs.key | |
| }); | |
| } | |
| function sendSubscribe() { | |
| const inputs = getInputs(); | |
| sendMessage({ | |
| type: "subscribe", | |
| id: inputs.id, | |
| key: inputs.key | |
| }); | |
| } | |
| function sendUnsubscribe() { | |
| const inputs = getInputs(); | |
| sendMessage({ | |
| type: "unsubscribe", | |
| id: inputs.id, | |
| key: inputs.key | |
| }); | |
| } | |
| function sendCustom() { | |
| try { | |
| const json = document.getElementById("customJson").value; | |
| const msg = JSON.parse(json); | |
| ws.send(json); | |
| log("Sent: " + json, "sent"); | |
| } catch(e) { | |
| log("Invalid JSON: " + e.message, "error"); | |
| } | |
| } | |
| function clearOutput() { | |
| output.innerHTML = ""; | |
| } | |
| </script> | |
| </body> | |
| </html> | |
| ', | |
| wsUrl | |
| ) | |
| ) | |
| } | |
| # Handle incoming WebSocket messages | |
| handle_ws_message <- function(ws, binary, message) { | |
| tryCatch( | |
| { | |
| # Parse JSON message | |
| msg <- fromJSON(message, simplifyVector = FALSE) | |
| # Validate required fields | |
| if (is.null(msg$type)) { | |
| send_error(ws, "Missing 'type' field") | |
| return() | |
| } | |
| # Handle different message types | |
| if (msg$type == "set") { | |
| handle_set(ws, msg) | |
| } else if (msg$type == "get") { | |
| handle_get(ws, msg) | |
| } else if (msg$type == "delete") { | |
| handle_delete(ws, msg) | |
| } else if (msg$type == "subscribe") { | |
| handle_subscribe(ws, msg) | |
| } else if (msg$type == "unsubscribe") { | |
| handle_unsubscribe(ws, msg) | |
| } else { | |
| send_error(ws, paste("Unknown message type:", msg$type)) | |
| } | |
| }, | |
| error = function(e) { | |
| send_error(ws, paste("Error processing message:", e$message)) | |
| } | |
| ) | |
| } | |
| # Handle 'set' operation | |
| handle_set <- function(ws, msg) { | |
| if (is.null(msg$id) || is.null(msg$key)) { | |
| send_error(ws, "Set requires 'id' and 'key' fields") | |
| return() | |
| } | |
| # Initialize storage for this ID if needed | |
| if (is.null(values[[msg$id]])) { | |
| values[[msg$id]] <<- list() | |
| } | |
| # Store the value | |
| values[[msg$id]][[msg$key]] <<- msg$value | |
| # Send confirmation to setter | |
| ws$send(toJSON( | |
| list( | |
| type = "value", | |
| id = msg$id, | |
| key = msg$key, | |
| value = msg$value | |
| ), | |
| auto_unbox = TRUE | |
| )) | |
| # Notify all subscribers of this id/key | |
| notify_subscribers(msg$id, msg$key, msg$value) | |
| } | |
| # Handle 'get' operation | |
| handle_get <- function(ws, msg) { | |
| if (is.null(msg$id) || is.null(msg$key)) { | |
| send_error(ws, "Get requires 'id' and 'key' fields") | |
| return() | |
| } | |
| # Get the value if it exists | |
| value <- NULL | |
| if (!is.null(values[[msg$id]]) && !is.null(values[[msg$id]][[msg$key]])) { | |
| value <- values[[msg$id]][[msg$key]] | |
| } | |
| # Send response | |
| ws$send(toJSON( | |
| list( | |
| type = "value", | |
| id = msg$id, | |
| key = msg$key, | |
| value = value | |
| ), | |
| auto_unbox = TRUE, | |
| null = "null" | |
| )) | |
| } | |
| # Handle 'delete' operation | |
| handle_delete <- function(ws, msg) { | |
| if (is.null(msg$id) || is.null(msg$key)) { | |
| send_error(ws, "Delete requires 'id' and 'key' fields") | |
| return() | |
| } | |
| # Delete the value if it exists | |
| if (!is.null(values[[msg$id]]) && !is.null(values[[msg$id]][[msg$key]])) { | |
| values[[msg$id]][[msg$key]] <<- NULL | |
| # Clean up empty ID namespace | |
| if (length(values[[msg$id]]) == 0) { | |
| values[[msg$id]] <<- NULL | |
| } | |
| } | |
| # Send confirmation | |
| ws$send(toJSON( | |
| list( | |
| type = "deleted", | |
| id = msg$id, | |
| key = msg$key | |
| ), | |
| auto_unbox = TRUE | |
| )) | |
| # Notify subscribers that the key was deleted | |
| notify_subscribers_deleted(msg$id, msg$key) | |
| } | |
| # Get WebSocket ID from the mapping | |
| get_ws_id <- function(ws) { | |
| ws_key <- paste0(capture.output(print(ws)), collapse = "") | |
| ws_to_id[[ws_key]] | |
| } | |
| # Handle 'subscribe' operation | |
| handle_subscribe <- function(ws, msg) { | |
| if (is.null(msg$id) || is.null(msg$key)) { | |
| send_error(ws, "Subscribe requires 'id' and 'key' fields") | |
| return() | |
| } | |
| ws_id <- get_ws_id(ws) | |
| # Initialize subscription structure if needed | |
| if (is.null(subscriptions[[ws_id]][[msg$id]])) { | |
| subscriptions[[ws_id]][[msg$id]] <<- list() | |
| } | |
| # Add subscription | |
| subscriptions[[ws_id]][[msg$id]][[msg$key]] <<- ws | |
| # Send current value if it exists | |
| if (!is.null(values[[msg$id]]) && !is.null(values[[msg$id]][[msg$key]])) { | |
| ws$send(toJSON( | |
| list( | |
| type = "value", | |
| id = msg$id, | |
| key = msg$key, | |
| value = values[[msg$id]][[msg$key]] | |
| ), | |
| auto_unbox = TRUE | |
| )) | |
| } | |
| } | |
| # Handle 'unsubscribe' operation | |
| handle_unsubscribe <- function(ws, msg) { | |
| if (is.null(msg$id) || is.null(msg$key)) { | |
| send_error(ws, "Unsubscribe requires 'id' and 'key' fields") | |
| return() | |
| } | |
| ws_id <- get_ws_id(ws) | |
| # Remove subscription if it exists | |
| if (!is.null(subscriptions[[ws_id]][[msg$id]][[msg$key]])) { | |
| subscriptions[[ws_id]][[msg$id]][[msg$key]] <<- NULL | |
| # Clean up empty structures | |
| if (length(subscriptions[[ws_id]][[msg$id]]) == 0) { | |
| subscriptions[[ws_id]][[msg$id]] <<- NULL | |
| } | |
| if (length(subscriptions[[ws_id]]) == 0) { | |
| subscriptions[[ws_id]] <<- NULL | |
| } | |
| } | |
| } | |
| # Notify all subscribers when a value changes | |
| notify_subscribers <- function(id, key, value) { | |
| for (ws_id in names(subscriptions)) { | |
| if (!is.null(subscriptions[[ws_id]][[id]][[key]])) { | |
| tryCatch( | |
| { | |
| ws <- subscriptions[[ws_id]][[id]][[key]] | |
| ws$send(toJSON( | |
| list( | |
| type = "value", | |
| id = id, | |
| key = key, | |
| value = value | |
| ), | |
| auto_unbox = TRUE | |
| )) | |
| }, | |
| error = function(e) { | |
| # Connection might be closed, ignore error | |
| } | |
| ) | |
| } | |
| } | |
| } | |
| # Notify subscribers when a key is deleted | |
| notify_subscribers_deleted <- function(id, key) { | |
| for (ws_id in names(subscriptions)) { | |
| if (!is.null(subscriptions[[ws_id]][[id]][[key]])) { | |
| tryCatch( | |
| { | |
| ws <- subscriptions[[ws_id]][[id]][[key]] | |
| ws$send(toJSON( | |
| list( | |
| type = "deleted", | |
| id = id, | |
| key = key | |
| ), | |
| auto_unbox = TRUE | |
| )) | |
| }, | |
| error = function(e) { | |
| # Connection might be closed, ignore error | |
| } | |
| ) | |
| } | |
| } | |
| } | |
| # Send error message | |
| send_error <- function(ws, message) { | |
| ws$send(toJSON( | |
| list( | |
| type = "error", | |
| message = message | |
| ), | |
| auto_unbox = TRUE | |
| )) | |
| } | |
| # Start the server | |
| cat("Starting Key-Value Store WebSocket Server...\n") | |
| cat("Open http://127.0.0.1:8080 in your browser to test\n") | |
| # cat("Stop with: s$stop()\n\n") | |
| s <- kv_store_server() | |
| # Keep server running | |
| # Stop with: s$stop() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment