Skip to content

Instantly share code, notes, and snippets.

@flatcapital
Created February 26, 2023 22:48
Show Gist options
  • Save flatcapital/1041f04b63aaed8cfb90bd483a5534c6 to your computer and use it in GitHub Desktop.
Save flatcapital/1041f04b63aaed8cfb90bd483a5534c6 to your computer and use it in GitHub Desktop.
Candle aggregator example
[package]
name = "demo"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
# async
tokio = { version = "1.20.1", features = ["sync", "macros", "rt-multi-thread"] }
tokio-stream = { version = "0.1.9", features = ["sync"] }
# barter
barter-data = "0.6.0"
barter-integration = "0.5.1"
# candles
trade_aggregation = "^7"
use barter_data::{
event::{MarketEvent},
exchange::{
binance::{spot::BinanceSpot},
// kraken::Kraken,
},
streams::Streams,
subscription::{
trade::{PublicTrades, PublicTrade},
},
};
use barter_integration::model::{InstrumentKind, Side};
use tokio_stream::StreamExt;
use trade_aggregation::{
candle_components::{Close, High, Low, Open},
*,
};
#[derive(Debug, Default, Clone, Candle)]
struct Candle {
open: Open,
high: High,
low: Low,
close: Close,
}
#[tokio::main]
async fn main() {
// Candle aggregator of 10 second candles
let time_rule = TimeRule::new(10, TimestampResolution::Millisecond);
let mut aggregator = GenericAggregator::<Candle, TimeRule, Trade>::new(time_rule);
let streams: Streams<MarketEvent<PublicTrade>> = Streams::builder_multi()
// Add PublicTrades Streams for various exchanges
.add(Streams::<PublicTrades>::builder()
.subscribe([
(BinanceSpot::default(), "btc", "usdt", InstrumentKind::Spot, PublicTrades),
]))
// .subscribe([
// (Kraken::default(), "xbt", "usd", InstrumentKind::Spot, PublicTrades),
// ]))
.init()
.await
.unwrap();
let mut joined_stream = streams.join_map().await;
while let Some((_exchange, trade)) = joined_stream.next().await {
let ts = trade.exchange_time.timestamp_millis();
let price = trade.kind.price;
let size = if trade.kind.side == Side::Buy {
trade.kind.amount
} else {
// sell is negative
0f64 - trade.kind.amount
};
// convert to Aggregator Trade
let t:Trade = Trade {
timestamp: ts,
price,
size,
};
if let Some(candle) = aggregator.update(&t) {
println!(
"candle created with open: {}, high: {}, low: {}, close: {}",
candle.open(),
candle.high(),
candle.low(),
candle.close()
);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment