Skip to content

Instantly share code, notes, and snippets.

@eduardonunesp
Created January 5, 2023 00:31
Show Gist options
  • Save eduardonunesp/1dc07589dc21c8875e7f004974d0135d to your computer and use it in GitHub Desktop.
Save eduardonunesp/1dc07589dc21c8875e7f004974d0135d to your computer and use it in GitHub Desktop.
WebSocket RUST
use futures_util::{SinkExt, StreamExt};
use serde::{Deserialize, Serialize};
use tokio::sync::mpsc;
use tokio_tungstenite::{
connect_async,
tungstenite::{Message, Result},
};
use url::Url;
const WS_URL: &'static str = "wss://ws-feed.exchange.coinbase.com";
#[derive(Serialize, Deserialize, Debug)]
struct SubReq {
#[serde(rename = "type")]
ttype: String,
product_ids: Vec<String>,
channels: Vec<String>,
}
#[derive(Serialize, Deserialize, Debug)]
struct MatchRes {
#[serde(rename = "type")]
ttype: String,
trade_id: u64,
maker_order_id: String,
taker_order_id: String,
side: String,
size: String,
price: String,
product_id: String,
sequence: u64,
time: String,
}
#[derive(Serialize, Deserialize)]
struct SubscriptionRes {
#[serde(rename = "type")]
ttype: String,
}
#[derive(Debug, Clone)]
pub struct TradePair {
pub from: String,
pub to: String,
}
impl TradePair {
pub fn new(from: String, to: String) -> TradePair {
TradePair { from, to }
}
pub fn to_string(&self) -> String {
format!("{}-{}", self.from, self.to)
}
}
#[derive(Debug, Clone)]
pub struct Trade {
pub trade_pair: TradePair,
pub price: f64,
pub quantity: f64,
}
impl Trade {
pub fn new(trade_pair: TradePair, price: f64, quantity: f64) -> Trade {
Trade {
trade_pair,
price,
quantity,
}
}
pub fn to_string(&self) -> String {
format!(
"{}-{}: {} @ {}",
self.trade_pair.from, self.trade_pair.to, self.quantity, self.price
)
}
}
type TX = mpsc::Sender<Trade>;
type RX = mpsc::Receiver<Trade>;
fn create_trade_channel(buffer: usize) -> (TX, RX) {
mpsc::channel::<Trade>(buffer)
}
#[derive(Debug)]
pub struct TradeProvider {
pub tx: TX,
pub rx: RX,
}
impl TradeProvider {
pub fn new() -> TradeProvider {
let (tx, rx) = create_trade_channel(32);
TradeProvider { tx, rx }
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut trade_provider = TradeProvider::new();
let url = Url::parse(WS_URL)?;
let (mut ws_stream, _) = connect_async(url).await?;
let pair = TradePair::new("BTC".into(), "USD".into());
let sub_req = SubReq {
ttype: "subscribe".to_string(),
product_ids: vec![pair.to_string()],
channels: vec!["matches".to_string()],
};
ws_stream
.send(Message::Text(serde_json::to_string(&sub_req)?))
.await?;
while let Some(msg) = ws_stream.next().await {
let msg = msg.unwrap();
let msg = serde_json::from_str::<SubscriptionRes>(&msg.to_string()).unwrap();
if msg.ttype == "subscriptions" {
break;
}
}
println!("Subscribed to {}", pair.to_string());
let tp_tx = trade_provider.tx.clone();
tokio::spawn(async move {
while let Some(msg) = ws_stream.next().await {
let msg = msg.unwrap();
let msg = serde_json::from_str::<MatchRes>(&msg.to_string()).unwrap();
let trade = Trade::new(
pair.clone(),
msg.price.parse().unwrap(),
msg.size.parse().unwrap(),
);
tp_tx.send(trade.clone()).await.unwrap();
}
});
// Just count til 10
const MAX: u32 = 10;
let mut count = 0;
loop {
tokio::select! {
result = trade_provider.rx.recv() => {
if let Some(trade) = result {
println!("{}", trade.to_string());
}
if count == MAX {
break;
}
count += 1;
}
}
}
Ok(())
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment