Created
July 28, 2018 20:18
-
-
Save mexus/6841c1a46cbb6c1f9d9b9de1f29d72bf to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
extern crate futures; | |
extern crate reql; | |
extern crate reql_types; | |
extern crate tokio; | |
extern crate tokio_core; | |
#[macro_use] | |
extern crate serde_json; | |
extern crate tokio_tungstenite; | |
extern crate url; | |
// extern crate futures_retry; | |
extern crate tokio_retry; | |
extern crate tokio_timer; | |
use futures::{Future, Sink, Stream}; | |
use reql::{Client, Config, Connection, Run}; | |
use reql_types::ServerStatus; | |
use serde_json::Value; | |
use tokio_core::reactor::Core; | |
use tokio_tungstenite::connect_async; | |
use tokio_tungstenite::tungstenite::Message; | |
use url::Url; | |
// use futures_retry::{RetryPolicy, StreamRetryExt}; | |
use std::time::{Duration, Instant}; | |
use tokio_retry::strategy::FibonacciBackoff; | |
use tokio_retry::Retry; | |
use tokio_timer::Interval; | |
const DB: &'static str = "gdax"; | |
const TBL: &'static str = "BTC_USD"; | |
fn transform_numbers(json: &mut Value) { | |
if json["type"] == "l2update" { | |
json["changes"] | |
.as_array_mut() | |
.unwrap() | |
.iter_mut() | |
.for_each(|v| { | |
(1..=2).for_each(|i| v[i] = json!(v[i].as_str().unwrap().parse::<f64>().unwrap())); | |
}); | |
} else if json["type"] == "snapshot" { | |
json["asks"] | |
.as_array_mut() | |
.unwrap() | |
.iter_mut() | |
.for_each(|v| { | |
(0..=1).for_each(|i| { | |
v[i] = json!(v[i].as_str().unwrap().parse::<f64>().unwrap()); | |
}); | |
}); | |
} else if json["type"] == "ticker" { | |
[ | |
"best_ask", | |
"best_bid", | |
"high_24h", | |
"last_size", | |
"low_24h", | |
"open_24h", | |
"price", | |
"volume_24h", | |
"volume_30d", | |
].into_iter() | |
.for_each(|i| { | |
json.get_mut(i).map(|x| { | |
*x = json!(x.as_str().unwrap().parse::<f64>().unwrap()); | |
}); | |
}); | |
} | |
} | |
fn init_db() -> reql::Result<(Client, Connection)> { | |
let r = Client::new(); | |
let conn = r.connect(Config::default())?; | |
// create db | |
r.db_create(DB) | |
.run::<ServerStatus>(conn)? | |
.map(|_| println!("db created")) | |
.map_err(|err| println!("{:?}", err)) | |
.wait() | |
.next(); | |
// create table | |
r.db(DB) | |
.table_create(TBL) | |
.run::<ServerStatus>(conn)? | |
.map(|_| println!("table created")) | |
.map_err(|err| println!("{:?}", err)) | |
.wait() | |
.next(); | |
Ok((r, conn)) | |
} | |
fn get_first(stream: impl Stream) -> impl Future<Item = (), Error = ()> { | |
stream.into_future().map(|(_maybe_first_item, _rest_stream)| ()).map_err(|_| ()) | |
} | |
fn init_ws(r: &Client, &conn: &Connection, url: &str) { | |
let mut core = Core::new().unwrap(); | |
let handle = core.handle(); | |
let subscribe = json!({ | |
"type": "subscribe", | |
"product_ids": [ "BTC-USD" ], | |
"channels": [ "ticker", "level2" ] | |
}); | |
let client = || { | |
connect_async(Url::parse(url).unwrap(), handle.remote().clone()) | |
// .retry(|_| RetryPolicy::Repeat) | |
.map_err(|e| eprintln!("connect_async: {}", e)) | |
.and_then(|(ws_stream, _)| { | |
println!("Connected to {}", url); | |
let (sink, stream) = ws_stream.split(); | |
let stream = stream.map_err(|e| eprintln!("ws_stream error: {}", e)); | |
sink.send(Message::Text(subscribe.to_string())) | |
.map_err(|e| eprintln!("sink send: {}", e)) | |
.and_then(|_| { | |
println!("subscribe sent: {}", subscribe); | |
let timer = Interval::new(Instant::now(), Duration::from_secs(1)).for_each(|_| { | |
println!("ttt"); | |
Ok(()) | |
}); | |
let q_insert = r.db(DB).table(TBL.to_owned()); | |
let flow = stream.for_each(move |msg| { | |
let mut json: Value = serde_json::from_str(msg.to_text().unwrap()).unwrap(); | |
transform_numbers(&mut json); | |
get_first(q_insert.insert(json).run::<ServerStatus>(conn).unwrap()) | |
}); | |
let f = flow.select2(timer); | |
f.map(|_| ()).map_err(|_| ()) | |
}) | |
}) | |
}; | |
let retry_strategy = FibonacciBackoff::from_millis(1); | |
let future = Retry::spawn(retry_strategy, client).then(|_| Ok::<_, ()>(())); | |
core.run(future).unwrap(); | |
} | |
fn main() { | |
let (r, conn) = init_db().unwrap(); | |
let url = "wss://ws-feed.gdax.com"; | |
// let url = "wss://ws-feed-public.sandbox.gdax.com"; | |
// let url = "ws://localhost:5999/ws-feed-public.sandbox.gdax.com"; | |
init_ws(&r, &conn, url); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment