Skip to content

Instantly share code, notes, and snippets.

@larry0x
Created October 27, 2024 17:31
Show Gist options
  • Save larry0x/fdb283629d944790d557a27f6db6e806 to your computer and use it in GitHub Desktop.
Save larry0x/fdb283629d944790d557a27f6db6e806 to your computer and use it in GitHub Desktop.
websocket demo
[package]
name = "websocket-demo"
version = "0.0.0"
edition = "2021"
[dependencies]
anyhow = "1"
futures = "0.3"
serde = { version = "1", features = ["derive"] }
serde_json = "1"
tokio = { version = "1", features = ["full"] }
tokio-tungstenite = "0.24"
tracing = "0.1"
tracing-subscriber = "0.3"
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>WebSocket Demo</title>
</head>
<body>
<div style="margin-bottom: 20px;">
<button id="queryBalancesButton">query balances</button>
</div>
<div style="margin-bottom: 20px;">
<form>
<label for="from">from:</label>
<input type="text" id="from" name="from" required>
<label for="to">to:</label>
<input type="text" id="to" name="to" required>
<label for="amount">amount:</label>
<input type="number" id="amount" name="amount" required>
<button type="submit" id="sendTransactionButton">send transaction</button>
</form>
</div>
<div style="margin-bottom: 20px;">
<code id="messageData"></code>
</div>
<script>
const ws = new WebSocket('ws://127.0.0.1:1234');
const queryBalancesButton = document.getElementById('queryBalancesButton');
const sendTransactionButton = document.getElementById('sendTransactionButton');
const fromInput = document.getElementById('from');
const toInput = document.getElementById('to');
const amountInput = document.getElementById('amount');
const messageData = document.getElementById('messageData');
let index = 0;
ws.onopen = () => {
console.log('connected to websocket server');
};
ws.onmessage = (message) => {
console.log('received message:', message);
const data = JSON.parse(message.data);
messageData.innerHTML = JSON.stringify(data, null, 2);
};
queryBalancesButton.addEventListener('click', () => {
event.preventDefault();
index += 1;
const req = { index, data: { balances: {} } };
console.log('sending request:', req);
ws.send(JSON.stringify(req));
});
sendTransactionButton.addEventListener('click', (event) => {
event.preventDefault();
index += 1;
const from = fromInput.value;
const to = toInput.value;
const amount = Number(amountInput.value);
const req = { index, data: { transfer: { from, to, amount } } };
console.log('sending request:', req);
ws.send(JSON.stringify(req));
});
</script>
</body>
</html>
use {
anyhow::anyhow,
futures::{SinkExt, StreamExt},
serde::{Deserialize, Serialize},
std::{collections::BTreeMap, sync::Arc, time::Duration},
tokio::{
net::{TcpListener, TcpStream},
sync::RwLock,
time::sleep,
},
tokio_tungstenite::accept_async,
tracing::{info, Level},
};
const TCP_LISTEN_ADDR: &str = "127.0.0.1:1234";
/// Mimicking a blockchain. Stores balances of users.
#[derive(Debug)]
struct App {
// name => balance
pub balances: BTreeMap<String, u128>,
}
impl App {
pub fn new<I, N>(initial_balances: I) -> Self
where
I: IntoIterator<Item = (N, u128)>,
N: Into<String>,
{
let balances = initial_balances
.into_iter()
.map(|(n, b)| (n.into(), b))
.collect();
App { balances }
}
}
#[derive(Serialize, Deserialize, Debug)]
struct Payload<T> {
pub index: u32,
pub data: T,
}
/// Mimicking requests sent to a blockchain node. Can either be a transaction or
/// a query.
#[derive(Deserialize, Debug)]
#[serde(rename_all = "snake_case")]
enum Request {
/// Make a transfer.
Transfer {
from: String,
to: String,
amount: u128,
},
/// Query the balances of all users.
Balances {},
}
/// Mimicking the response the node sends back to the client.
#[derive(Serialize, Debug)]
#[serde(rename_all = "snake_case")]
enum Response {
Transfer {},
Balances(BTreeMap<String, u128>),
}
// TODO: gracefully handle the errors instead of unwrapping in this function.
async fn handle_client(app: Arc<RwLock<App>>, stream: TcpStream) {
let (write, mut read) = accept_async(stream).await.unwrap().split();
// Put the write half in a Arc so it can be shared across tasks.
let write = Arc::new(RwLock::new(write));
while let Some(req_raw) = read.next().await {
// Deserialize the request.
// TODO: handle websocket close message
let req_str = req_raw.unwrap().into_text().unwrap();
let req = serde_json::from_str::<Payload<Request>>(&req_str).unwrap();
info!(req = req_str, "Received request");
// Make clones of app and write stream, because `tokio::spawn` needs to
// make ownership.
let app_clone = Arc::clone(&app);
let write_clone = Arc::clone(&write);
// Spawn a task to handle each request concurrently.
tokio::spawn(async move {
// Handle the request, generate a response.
// Stringify the error so that it can be sent over websocket.
let res = handle_request(app_clone, req.data)
.await
.map_err(|err| err.to_string());
let payload = Payload {
index: req.index,
data: res,
};
let payload_str = serde_json::to_string(&payload).unwrap();
info!(payload = &payload_str, "Prepared response");
write_clone
.write()
.await
.send(payload_str.into())
.await
.unwrap();
});
}
}
async fn handle_request(app: Arc<RwLock<App>>, req: Request) -> anyhow::Result<Response> {
let res = match req {
Request::Transfer { from, to, amount } => {
// Sleep for 10 seconds to simulate a slow transaction.
// Why don't we get the write lock on app before sleeping? Because
// this is how Grug app works: during `FinalizeBlock` it only takes
// a read lock. Only in `Commit` it takes a write lock. The 10 second
// sleep mimics the `FinalizeBlock`, so we don't take the write lock.
sleep(Duration::from_secs(10)).await;
// Get write lock on the app.
let mut app = app.write().await;
let from_before = app.balances.get(&from).unwrap_or(&0);
let from_after = from_before.checked_sub(amount).ok_or_else(|| {
anyhow!("sender has insufficient balance: {from_before} < {amount}")
})?;
let to_before = app.balances.get(&to).unwrap_or(&0);
let to_after = to_before.checked_add(amount).ok_or_else(|| {
anyhow!("recipient has too big balance: {to_before} + {amount} > u128::MAX")
})?;
app.balances.insert(from, from_after);
app.balances.insert(to, to_after);
Response::Transfer {}
}
Request::Balances {} => {
// Get read lock on the app.
let app = app.read().await;
Response::Balances(app.balances.clone())
}
};
Ok(res)
}
#[tokio::main]
async fn main() {
tracing_subscriber::fmt().with_max_level(Level::INFO).init();
let app = Arc::new(RwLock::new(App::new([
("alice", 100_000_000),
("bob", 100_000_000),
])));
info!("Created app");
let listener = TcpListener::bind(TCP_LISTEN_ADDR).await.unwrap();
info!(addr = TCP_LISTEN_ADDR, "TCP listening");
while let Ok((stream, addr)) = listener.accept().await {
info!(addr = addr.to_string(), "New TCP connection");
tokio::spawn(tokio::spawn(handle_client(Arc::clone(&app), stream)));
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment