Created
October 27, 2024 17:31
-
-
Save larry0x/fdb283629d944790d557a27f6db6e806 to your computer and use it in GitHub Desktop.
websocket demo
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
[package] | |
name = "websocket-demo" | |
version = "0.0.0" | |
edition = "2021" | |
[dependencies] | |
anyhow = "1" | |
futures = "0.3" | |
serde = { version = "1", features = ["derive"] } | |
serde_json = "1" | |
tokio = { version = "1", features = ["full"] } | |
tokio-tungstenite = "0.24" | |
tracing = "0.1" | |
tracing-subscriber = "0.3" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<!DOCTYPE html> | |
<html lang="en"> | |
<head> | |
<meta charset="UTF-8"> | |
<meta name="viewport" content="width=device-width, initial-scale=1.0"> | |
<title>WebSocket Demo</title> | |
</head> | |
<body> | |
<div style="margin-bottom: 20px;"> | |
<button id="queryBalancesButton">query balances</button> | |
</div> | |
<div style="margin-bottom: 20px;"> | |
<form> | |
<label for="from">from:</label> | |
<input type="text" id="from" name="from" required> | |
<label for="to">to:</label> | |
<input type="text" id="to" name="to" required> | |
<label for="amount">amount:</label> | |
<input type="number" id="amount" name="amount" required> | |
<button type="submit" id="sendTransactionButton">send transaction</button> | |
</form> | |
</div> | |
<div style="margin-bottom: 20px;"> | |
<code id="messageData"></code> | |
</div> | |
<script> | |
const ws = new WebSocket('ws://127.0.0.1:1234'); | |
const queryBalancesButton = document.getElementById('queryBalancesButton'); | |
const sendTransactionButton = document.getElementById('sendTransactionButton'); | |
const fromInput = document.getElementById('from'); | |
const toInput = document.getElementById('to'); | |
const amountInput = document.getElementById('amount'); | |
const messageData = document.getElementById('messageData'); | |
let index = 0; | |
ws.onopen = () => { | |
console.log('connected to websocket server'); | |
}; | |
ws.onmessage = (message) => { | |
console.log('received message:', message); | |
const data = JSON.parse(message.data); | |
messageData.innerHTML = JSON.stringify(data, null, 2); | |
}; | |
queryBalancesButton.addEventListener('click', () => { | |
event.preventDefault(); | |
index += 1; | |
const req = { index, data: { balances: {} } }; | |
console.log('sending request:', req); | |
ws.send(JSON.stringify(req)); | |
}); | |
sendTransactionButton.addEventListener('click', (event) => { | |
event.preventDefault(); | |
index += 1; | |
const from = fromInput.value; | |
const to = toInput.value; | |
const amount = Number(amountInput.value); | |
const req = { index, data: { transfer: { from, to, amount } } }; | |
console.log('sending request:', req); | |
ws.send(JSON.stringify(req)); | |
}); | |
</script> | |
</body> | |
</html> |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use { | |
anyhow::anyhow, | |
futures::{SinkExt, StreamExt}, | |
serde::{Deserialize, Serialize}, | |
std::{collections::BTreeMap, sync::Arc, time::Duration}, | |
tokio::{ | |
net::{TcpListener, TcpStream}, | |
sync::RwLock, | |
time::sleep, | |
}, | |
tokio_tungstenite::accept_async, | |
tracing::{info, Level}, | |
}; | |
const TCP_LISTEN_ADDR: &str = "127.0.0.1:1234"; | |
/// Mimicking a blockchain. Stores balances of users. | |
#[derive(Debug)] | |
struct App { | |
// name => balance | |
pub balances: BTreeMap<String, u128>, | |
} | |
impl App { | |
pub fn new<I, N>(initial_balances: I) -> Self | |
where | |
I: IntoIterator<Item = (N, u128)>, | |
N: Into<String>, | |
{ | |
let balances = initial_balances | |
.into_iter() | |
.map(|(n, b)| (n.into(), b)) | |
.collect(); | |
App { balances } | |
} | |
} | |
#[derive(Serialize, Deserialize, Debug)] | |
struct Payload<T> { | |
pub index: u32, | |
pub data: T, | |
} | |
/// Mimicking requests sent to a blockchain node. Can either be a transaction or | |
/// a query. | |
#[derive(Deserialize, Debug)] | |
#[serde(rename_all = "snake_case")] | |
enum Request { | |
/// Make a transfer. | |
Transfer { | |
from: String, | |
to: String, | |
amount: u128, | |
}, | |
/// Query the balances of all users. | |
Balances {}, | |
} | |
/// Mimicking the response the node sends back to the client. | |
#[derive(Serialize, Debug)] | |
#[serde(rename_all = "snake_case")] | |
enum Response { | |
Transfer {}, | |
Balances(BTreeMap<String, u128>), | |
} | |
// TODO: gracefully handle the errors instead of unwrapping in this function. | |
async fn handle_client(app: Arc<RwLock<App>>, stream: TcpStream) { | |
let (write, mut read) = accept_async(stream).await.unwrap().split(); | |
// Put the write half in a Arc so it can be shared across tasks. | |
let write = Arc::new(RwLock::new(write)); | |
while let Some(req_raw) = read.next().await { | |
// Deserialize the request. | |
// TODO: handle websocket close message | |
let req_str = req_raw.unwrap().into_text().unwrap(); | |
let req = serde_json::from_str::<Payload<Request>>(&req_str).unwrap(); | |
info!(req = req_str, "Received request"); | |
// Make clones of app and write stream, because `tokio::spawn` needs to | |
// make ownership. | |
let app_clone = Arc::clone(&app); | |
let write_clone = Arc::clone(&write); | |
// Spawn a task to handle each request concurrently. | |
tokio::spawn(async move { | |
// Handle the request, generate a response. | |
// Stringify the error so that it can be sent over websocket. | |
let res = handle_request(app_clone, req.data) | |
.await | |
.map_err(|err| err.to_string()); | |
let payload = Payload { | |
index: req.index, | |
data: res, | |
}; | |
let payload_str = serde_json::to_string(&payload).unwrap(); | |
info!(payload = &payload_str, "Prepared response"); | |
write_clone | |
.write() | |
.await | |
.send(payload_str.into()) | |
.await | |
.unwrap(); | |
}); | |
} | |
} | |
async fn handle_request(app: Arc<RwLock<App>>, req: Request) -> anyhow::Result<Response> { | |
let res = match req { | |
Request::Transfer { from, to, amount } => { | |
// Sleep for 10 seconds to simulate a slow transaction. | |
// Why don't we get the write lock on app before sleeping? Because | |
// this is how Grug app works: during `FinalizeBlock` it only takes | |
// a read lock. Only in `Commit` it takes a write lock. The 10 second | |
// sleep mimics the `FinalizeBlock`, so we don't take the write lock. | |
sleep(Duration::from_secs(10)).await; | |
// Get write lock on the app. | |
let mut app = app.write().await; | |
let from_before = app.balances.get(&from).unwrap_or(&0); | |
let from_after = from_before.checked_sub(amount).ok_or_else(|| { | |
anyhow!("sender has insufficient balance: {from_before} < {amount}") | |
})?; | |
let to_before = app.balances.get(&to).unwrap_or(&0); | |
let to_after = to_before.checked_add(amount).ok_or_else(|| { | |
anyhow!("recipient has too big balance: {to_before} + {amount} > u128::MAX") | |
})?; | |
app.balances.insert(from, from_after); | |
app.balances.insert(to, to_after); | |
Response::Transfer {} | |
} | |
Request::Balances {} => { | |
// Get read lock on the app. | |
let app = app.read().await; | |
Response::Balances(app.balances.clone()) | |
} | |
}; | |
Ok(res) | |
} | |
#[tokio::main] | |
async fn main() { | |
tracing_subscriber::fmt().with_max_level(Level::INFO).init(); | |
let app = Arc::new(RwLock::new(App::new([ | |
("alice", 100_000_000), | |
("bob", 100_000_000), | |
]))); | |
info!("Created app"); | |
let listener = TcpListener::bind(TCP_LISTEN_ADDR).await.unwrap(); | |
info!(addr = TCP_LISTEN_ADDR, "TCP listening"); | |
while let Ok((stream, addr)) = listener.accept().await { | |
info!(addr = addr.to_string(), "New TCP connection"); | |
tokio::spawn(tokio::spawn(handle_client(Arc::clone(&app), stream))); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment