Skip to content

Instantly share code, notes, and snippets.

@mgild
Created June 5, 2025 13:26
Show Gist options
  • Save mgild/ec71bda606e8fbbd81e57b45a1b4971c to your computer and use it in GitHub Desktop.
Save mgild/ec71bda606e8fbbd81e57b45a1b4971c to your computer and use it in GitHub Desktop.
use tokio_tungstenite::{connect_async, tungstenite::Message};
use futures_util::{StreamExt, SinkExt};
use serde::{Deserialize, Serialize};
#[derive(Debug, Deserialize)]
struct PythPriceFeed {
id: String,
price: Option<PriceData>,
ema_price: Option<PriceData>,
}
#[derive(Debug, Deserialize)]
struct PriceData {
price: String,
conf: String,
expo: i32,
publish_time: u64,
}
#[derive(Debug, Serialize)]
struct SubscriptionMessage {
#[serde(rename = "type")]
msg_type: String,
method: String,
ids: Vec<String>,
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
// Use the Hermes WebSocket endpoint for real-time price feeds
let url = "wss://hermes.pyth.network/ws";
let (ws_stream, _) = connect_async(url).await?;
let (mut write, mut read) = ws_stream.split();
println!("Connected to Pyth Network WebSocket stream.");
// Subscribe to specific price feeds (example: SOL/USD, BTC/USD, ETH/USD)
let subscription = SubscriptionMessage {
msg_type: "subscribe".to_string(),
method: "price_feed_update".to_string(),
ids: vec![
"0xef0d8b6fda2ceba41da15d4095d1da392a0d2f8ed0c6c7bc0f4cfac8c280b56d".to_string(), // SOL/USD
],
};
let subscription_msg = serde_json::to_string(&subscription)?;
write.send(Message::Text(subscription_msg.into())).await?;
println!("Sent subscription message for SOL/USD, BTC/USD, ETH/USD price feeds");
while let Some(msg) = read.next().await {
match msg {
Ok(Message::Text(text)) => {
match serde_json::from_str::<serde_json::Value>(&text) {
Ok(json) => {
println!("Received message: {}", text);
if let Some(method) = json.get("method") {
if method == "price_feed_update" {
if let Some(params) = json.get("params") {
if let Ok(price_feed) = serde_json::from_value::<PythPriceFeed>(params.clone()) {
print_price_update(&price_feed);
}
}
}
}
}
Err(e) => {
eprintln!("Failed to parse JSON: {}", e);
println!("Raw message: {}", text);
}
}
}
Ok(Message::Binary(data)) => {
println!("Received binary data: {} bytes", data.len());
}
Ok(Message::Ping(data)) => {
write.send(Message::Pong(data)).await?;
}
Ok(Message::Close(_)) => {
println!("WebSocket connection closed");
break;
}
Err(e) => {
eprintln!("WebSocket error: {:?}", e);
break;
}
_ => {}
}
}
Ok(())
}
fn print_price_update(price_feed: &PythPriceFeed) {
println!("=== Price Feed Update ===");
println!("Feed ID: {}", price_feed.id);
if let Some(price) = &price_feed.price {
let price_value = price.price.parse::<f64>().unwrap_or(0.0) * 10f64.powi(price.expo);
let conf_value = price.conf.parse::<f64>().unwrap_or(0.0) * 10f64.powi(price.expo);
println!("Price: ${:.4} ± ${:.4}", price_value, conf_value);
println!("Publish Time: {}", price.publish_time);
}
if let Some(ema_price) = &price_feed.ema_price {
let ema_value = ema_price.price.parse::<f64>().unwrap_or(0.0) * 10f64.powi(ema_price.expo);
println!("EMA Price: ${:.4}", ema_value);
}
println!("========================");
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment