Skip to content

Instantly share code, notes, and snippets.

@wch
Created September 26, 2025 22:54
Show Gist options
  • Select an option

  • Save wch/e46de182e20aa7ee22016300d4c10e8f to your computer and use it in GitHub Desktop.

Select an option

Save wch/e46de182e20aa7ee22016300d4c10e8f to your computer and use it in GitHub Desktop.
Key-value store with subscriptions using websockets, in R
#!/usr/bin/env Rscript
# WebSocket Key-Value Store Server with ID scoping
#
# This server provides a WebSocket-based key-value store with publish/subscribe
# functionality. All operations are scoped by an application ID, allowing multiple
# independent applications to use the same server concurrently.
#
# COMMUNICATION PROTOCOL (JSON):
# ==============================
#
# Client -> Server Messages:
# --------------------------
# Set a value:
# {"type": "set", "id": "app1", "key": "mykey", "value": <any JSON value>}
# Response: {"type": "value", "id": "app1", "key": "mykey", "value": <value>}
#
# Get a value:
# {"type": "get", "id": "app1", "key": "mykey"}
# Response: {"type": "value", "id": "app1", "key": "mykey", "value": <value or null>}
#
# Delete a value:
# {"type": "delete", "id": "app1", "key": "mykey"}
# Response: {"type": "deleted", "id": "app1", "key": "mykey"}
#
# Subscribe to changes:
# {"type": "subscribe", "id": "app1", "key": "mykey"}
# Response: {"type": "value", "id": "app1", "key": "mykey", "value": <current value if exists>}
# Then receives updates when value changes or is deleted
#
# Unsubscribe from changes:
# {"type": "unsubscribe", "id": "app1", "key": "mykey"}
# No response
#
# Server -> Client Messages:
# --------------------------
# Value notification (sent to subscribers when value changes):
# {"type": "value", "id": "app1", "key": "mykey", "value": <new value>}
#
# Deletion notification (sent to subscribers when key is deleted):
# {"type": "deleted", "id": "app1", "key": "mykey"}
#
# Error message:
# {"type": "error", "message": "error description"}
#
# FEATURES:
# - All operations are scoped by application ID for multi-tenant support
# - Real-time notifications to all subscribers when values change
# - Automatic cleanup of subscriptions when WebSocket connections close
# - In-memory storage (data is not persisted between server restarts)
# - HTML test client served at http://localhost:8080
#
# USAGE:
# source("kv_store_server.R") # Server starts automatically
# # Server runs in blocking mode. To stop, press Ctrl+C
library(httpuv)
library(jsonlite)
# Global storage for values and subscriptions
# Structure: values[[id]][[key]] = value
values <- list()
# Track subscriptions per WebSocket connection
# Structure: subscriptions[[ws_id]][[id]][[key]] = TRUE
subscriptions <- list()
# Map WebSocket objects to their IDs
ws_to_id <- new.env(hash = TRUE)
# Counter for WebSocket connection IDs
ws_counter <- 0
# Create or start the key-value store server
kv_store_server <- function(host = "127.0.0.1", port = 8080) {
app <- list(
call = function(req) {
serve_html_page(req, port)
},
onWSOpen = function(ws) {
# Assign unique ID to this WebSocket connection
ws_counter <<- ws_counter + 1
ws_id <- paste0("ws_", ws_counter)
# Store the mapping using the WebSocket object as key
ws_key <- paste0(capture.output(print(ws)), collapse = "")
ws_to_id[[ws_key]] <- ws_id
# Initialize subscription tracking for this connection
subscriptions[[ws_id]] <<- list()
ws$onMessage(function(binary, message) {
handle_ws_message(ws, binary, message)
})
ws$onClose(function() {
# Get the WebSocket ID from our mapping
ws_key <- paste0(capture.output(print(ws)), collapse = "")
ws_id <- ws_to_id[[ws_key]]
# Clean up subscriptions for this connection
if (!is.null(ws_id) && ws_id %in% names(subscriptions)) {
subscriptions[[ws_id]] <<- NULL
}
# Remove the mapping
rm(list = ws_key, envir = ws_to_id)
})
}
)
# Run in blocking mode. For non-blocking, use startServer()
runServer(host, port, app)
# startServer(host, port, app)
}
# Serve the HTML page for WebSocket testing
serve_html_page <- function(req, port) {
wsUrl <- paste0(
"ws://",
ifelse(is.null(req$HTTP_HOST), req$SERVER_NAME, req$HTTP_HOST)
)
list(
status = 200L,
headers = list('Content-Type' = 'text/html'),
body = sprintf(
'
<!DOCTYPE html>
<html>
<head>
<title>Key-Value Store WebSocket Client</title>
<style>
body {
font-family: monospace;
padding: 20px;
max-width: 1200px;
margin: 0 auto;
}
.controls-container {
display: grid;
grid-template-columns: 1fr 1fr;
gap: 20px;
margin-bottom: 20px;
}
.panel {
border: 1px solid #ccc;
padding: 10px;
border-radius: 5px;
}
h2 { margin-top: 0; }
input, button {
margin: 5px 0;
padding: 5px;
}
input[type="text"] {
width: 200px;
}
textarea {
width: 100%%;
height: 120px;
font-family: monospace;
}
#output {
background: #f0f0f0;
padding: 10px;
height: 400px;
overflow-y: auto;
white-space: pre-wrap;
}
.message {
margin: 2px 0;
padding: 2px;
}
.sent { color: blue; }
.received { color: green; }
.error { color: red; }
#status {
padding: 10px;
margin-bottom: 10px;
border-radius: 5px;
}
.connected { background: #d4ffd4; }
.disconnected { background: #ffd4d4; }
</style>
</head>
<body>
<h1>Key-Value Store WebSocket Client</h1>
<div id="status" class="disconnected">Disconnected</div>
<div class="controls-container">
<div class="panel">
<h2>Controls</h2>
<div>
<label>App ID: <input type="text" id="appId" value="app1" /></label>
</div>
<div>
<label>Key: <input type="text" id="key" value="mykey" /></label>
</div>
<div>
<label>Value: <input type="text" id="value" value="myvalue" /></label>
</div>
<div style="margin-top: 15px;">
<button onclick="sendSet()">Set</button>
<button onclick="sendGet()">Get</button>
<button onclick="sendDelete()">Delete</button>
</div>
<div style="margin-top: 15px;">
<button onclick="sendSubscribe()">Subscribe</button>
<button onclick="sendUnsubscribe()">Unsubscribe</button>
</div>
</div>
<div class="panel">
<h2>Custom JSON Message</h2>
<textarea id="customJson">{
"type": "get",
"id": "app1",
"key": "mykey"
}</textarea>
<br>
<button onclick="sendCustom()">Send Custom</button>
</div>
</div>
<div class="panel">
<div style="display: flex; justify-content: space-between; align-items: center;">
<h2>Messages</h2>
<button onclick="clearOutput()">Clear</button>
</div>
<div id="output"></div>
</div>
<script>
const ws = new WebSocket("%s");
const output = document.getElementById("output");
const status = document.getElementById("status");
function log(message, className) {
const div = document.createElement("div");
div.className = "message " + className;
div.textContent = new Date().toLocaleTimeString() + " - " + message;
output.appendChild(div);
output.scrollTop = output.scrollHeight;
}
ws.onopen = function() {
status.textContent = "Connected";
status.className = "connected";
log("Connected to WebSocket server", "received");
};
ws.onmessage = function(event) {
log("Received: " + event.data, "received");
};
ws.onerror = function(error) {
log("Error: " + error, "error");
};
ws.onclose = function() {
status.textContent = "Disconnected";
status.className = "disconnected";
log("Disconnected from server", "error");
};
function getInputs() {
return {
id: document.getElementById("appId").value,
key: document.getElementById("key").value,
value: document.getElementById("value").value
};
}
function sendMessage(msg) {
const json = JSON.stringify(msg);
ws.send(json);
log("Sent: " + json, "sent");
}
function sendSet() {
const inputs = getInputs();
sendMessage({
type: "set",
id: inputs.id,
key: inputs.key,
value: inputs.value
});
}
function sendGet() {
const inputs = getInputs();
sendMessage({
type: "get",
id: inputs.id,
key: inputs.key
});
}
function sendDelete() {
const inputs = getInputs();
sendMessage({
type: "delete",
id: inputs.id,
key: inputs.key
});
}
function sendSubscribe() {
const inputs = getInputs();
sendMessage({
type: "subscribe",
id: inputs.id,
key: inputs.key
});
}
function sendUnsubscribe() {
const inputs = getInputs();
sendMessage({
type: "unsubscribe",
id: inputs.id,
key: inputs.key
});
}
function sendCustom() {
try {
const json = document.getElementById("customJson").value;
const msg = JSON.parse(json);
ws.send(json);
log("Sent: " + json, "sent");
} catch(e) {
log("Invalid JSON: " + e.message, "error");
}
}
function clearOutput() {
output.innerHTML = "";
}
</script>
</body>
</html>
',
wsUrl
)
)
}
# Handle incoming WebSocket messages
handle_ws_message <- function(ws, binary, message) {
tryCatch(
{
# Parse JSON message
msg <- fromJSON(message, simplifyVector = FALSE)
# Validate required fields
if (is.null(msg$type)) {
send_error(ws, "Missing 'type' field")
return()
}
# Handle different message types
if (msg$type == "set") {
handle_set(ws, msg)
} else if (msg$type == "get") {
handle_get(ws, msg)
} else if (msg$type == "delete") {
handle_delete(ws, msg)
} else if (msg$type == "subscribe") {
handle_subscribe(ws, msg)
} else if (msg$type == "unsubscribe") {
handle_unsubscribe(ws, msg)
} else {
send_error(ws, paste("Unknown message type:", msg$type))
}
},
error = function(e) {
send_error(ws, paste("Error processing message:", e$message))
}
)
}
# Handle 'set' operation
handle_set <- function(ws, msg) {
if (is.null(msg$id) || is.null(msg$key)) {
send_error(ws, "Set requires 'id' and 'key' fields")
return()
}
# Initialize storage for this ID if needed
if (is.null(values[[msg$id]])) {
values[[msg$id]] <<- list()
}
# Store the value
values[[msg$id]][[msg$key]] <<- msg$value
# Send confirmation to setter
ws$send(toJSON(
list(
type = "value",
id = msg$id,
key = msg$key,
value = msg$value
),
auto_unbox = TRUE
))
# Notify all subscribers of this id/key
notify_subscribers(msg$id, msg$key, msg$value)
}
# Handle 'get' operation
handle_get <- function(ws, msg) {
if (is.null(msg$id) || is.null(msg$key)) {
send_error(ws, "Get requires 'id' and 'key' fields")
return()
}
# Get the value if it exists
value <- NULL
if (!is.null(values[[msg$id]]) && !is.null(values[[msg$id]][[msg$key]])) {
value <- values[[msg$id]][[msg$key]]
}
# Send response
ws$send(toJSON(
list(
type = "value",
id = msg$id,
key = msg$key,
value = value
),
auto_unbox = TRUE,
null = "null"
))
}
# Handle 'delete' operation
handle_delete <- function(ws, msg) {
if (is.null(msg$id) || is.null(msg$key)) {
send_error(ws, "Delete requires 'id' and 'key' fields")
return()
}
# Delete the value if it exists
if (!is.null(values[[msg$id]]) && !is.null(values[[msg$id]][[msg$key]])) {
values[[msg$id]][[msg$key]] <<- NULL
# Clean up empty ID namespace
if (length(values[[msg$id]]) == 0) {
values[[msg$id]] <<- NULL
}
}
# Send confirmation
ws$send(toJSON(
list(
type = "deleted",
id = msg$id,
key = msg$key
),
auto_unbox = TRUE
))
# Notify subscribers that the key was deleted
notify_subscribers_deleted(msg$id, msg$key)
}
# Get WebSocket ID from the mapping
get_ws_id <- function(ws) {
ws_key <- paste0(capture.output(print(ws)), collapse = "")
ws_to_id[[ws_key]]
}
# Handle 'subscribe' operation
handle_subscribe <- function(ws, msg) {
if (is.null(msg$id) || is.null(msg$key)) {
send_error(ws, "Subscribe requires 'id' and 'key' fields")
return()
}
ws_id <- get_ws_id(ws)
# Initialize subscription structure if needed
if (is.null(subscriptions[[ws_id]][[msg$id]])) {
subscriptions[[ws_id]][[msg$id]] <<- list()
}
# Add subscription
subscriptions[[ws_id]][[msg$id]][[msg$key]] <<- ws
# Send current value if it exists
if (!is.null(values[[msg$id]]) && !is.null(values[[msg$id]][[msg$key]])) {
ws$send(toJSON(
list(
type = "value",
id = msg$id,
key = msg$key,
value = values[[msg$id]][[msg$key]]
),
auto_unbox = TRUE
))
}
}
# Handle 'unsubscribe' operation
handle_unsubscribe <- function(ws, msg) {
if (is.null(msg$id) || is.null(msg$key)) {
send_error(ws, "Unsubscribe requires 'id' and 'key' fields")
return()
}
ws_id <- get_ws_id(ws)
# Remove subscription if it exists
if (!is.null(subscriptions[[ws_id]][[msg$id]][[msg$key]])) {
subscriptions[[ws_id]][[msg$id]][[msg$key]] <<- NULL
# Clean up empty structures
if (length(subscriptions[[ws_id]][[msg$id]]) == 0) {
subscriptions[[ws_id]][[msg$id]] <<- NULL
}
if (length(subscriptions[[ws_id]]) == 0) {
subscriptions[[ws_id]] <<- NULL
}
}
}
# Notify all subscribers when a value changes
notify_subscribers <- function(id, key, value) {
for (ws_id in names(subscriptions)) {
if (!is.null(subscriptions[[ws_id]][[id]][[key]])) {
tryCatch(
{
ws <- subscriptions[[ws_id]][[id]][[key]]
ws$send(toJSON(
list(
type = "value",
id = id,
key = key,
value = value
),
auto_unbox = TRUE
))
},
error = function(e) {
# Connection might be closed, ignore error
}
)
}
}
}
# Notify subscribers when a key is deleted
notify_subscribers_deleted <- function(id, key) {
for (ws_id in names(subscriptions)) {
if (!is.null(subscriptions[[ws_id]][[id]][[key]])) {
tryCatch(
{
ws <- subscriptions[[ws_id]][[id]][[key]]
ws$send(toJSON(
list(
type = "deleted",
id = id,
key = key
),
auto_unbox = TRUE
))
},
error = function(e) {
# Connection might be closed, ignore error
}
)
}
}
}
# Send error message
send_error <- function(ws, message) {
ws$send(toJSON(
list(
type = "error",
message = message
),
auto_unbox = TRUE
))
}
# Start the server
cat("Starting Key-Value Store WebSocket Server...\n")
cat("Open http://127.0.0.1:8080 in your browser to test\n")
# cat("Stop with: s$stop()\n\n")
s <- kv_store_server()
# Keep server running
# Stop with: s$stop()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment