Created
August 1, 2018 20:21
-
-
Save mexus/c5fdaa639ac6e9d7082e4a0e331b4d3b to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
extern crate futures; | |
extern crate reql; | |
extern crate reql_types; | |
extern crate tokio; | |
extern crate tokio_core; | |
#[macro_use] | |
extern crate serde_json; | |
extern crate futures_retry; | |
extern crate tokio_timer; | |
extern crate tokio_tungstenite; | |
extern crate url; | |
#[macro_use] | |
extern crate failure; | |
use futures::{Future, Sink, Stream}; | |
use futures_retry::{FutureRetry, RetryPolicy}; | |
use reql::{Client, Config, Connection, Run}; | |
use reql_types::ServerStatus; | |
use serde_json::Value; | |
use std::cell::RefCell; | |
use std::rc::Rc; | |
use std::time::{Duration, Instant}; | |
use tokio_core::reactor::Core; | |
use tokio_timer::Interval; | |
use tokio_tungstenite::connect_async; | |
use tokio_tungstenite::tungstenite::Message; | |
use url::Url; | |
const DB: &'static str = "gdax"; | |
const TBL: &'static str = "BTC_USD"; | |
#[derive(Debug, Fail)] | |
enum FHError { | |
#[fail(display = "connect")] | |
Connect(#[cause] tokio_tungstenite::tungstenite::Error), | |
#[fail(display = "send")] | |
Send(#[cause] tokio_tungstenite::tungstenite::Error), | |
#[fail(display = "read")] | |
Read(#[cause] tokio_tungstenite::tungstenite::Error), | |
#[fail(display = "db")] | |
Timer(#[cause] tokio_timer::Error), | |
#[fail(display = "select")] | |
DB(#[cause] reql::errors::Error), | |
} | |
fn transform_numbers(json: &mut Value) { | |
if json["type"] == "l2update" { | |
json["changes"] | |
.as_array_mut() | |
.unwrap() | |
.iter_mut() | |
.for_each(|v| { | |
(1..=2).for_each(|i| v[i] = json!(v[i].as_str().unwrap().parse::<f64>().unwrap())); | |
}); | |
} else if json["type"] == "snapshot" { | |
json["asks"] | |
.as_array_mut() | |
.unwrap() | |
.iter_mut() | |
.for_each(|v| { | |
(0..=1).for_each(|i| { | |
v[i] = json!(v[i].as_str().unwrap().parse::<f64>().unwrap()); | |
}); | |
}); | |
} else if json["type"] == "ticker" { | |
[ | |
"best_ask", | |
"best_bid", | |
"high_24h", | |
"last_size", | |
"low_24h", | |
"open_24h", | |
"price", | |
"volume_24h", | |
"volume_30d", | |
] | |
.into_iter() | |
.for_each(|i| { | |
json.get_mut(i).map(|x| { | |
*x = json!(x.as_str().unwrap().parse::<f64>().unwrap()); | |
}); | |
}); | |
} | |
} | |
fn init_db() -> reql::Result<(Client, Connection)> { | |
let r = Client::new(); | |
let conn = r.connect(Config::default())?; | |
// create db | |
r.db_create(DB) | |
.run::<ServerStatus>(conn)? | |
.map(|_| println!("db created")) | |
.map_err(|err| println!("{:?}", err)) | |
.wait() | |
.next(); | |
// create table | |
r.db(DB) | |
.table_create(TBL) | |
.run::<ServerStatus>(conn)? | |
.map(|_| println!("table created")) | |
.map_err(|err| println!("{:?}", err)) | |
.wait() | |
.next(); | |
Ok((r, conn)) | |
} | |
fn init_ws(r: &Client, &conn: &Connection, url: &str) { | |
let mut core = Core::new().unwrap(); | |
let handle = core.handle(); | |
let subscribe = json!({ | |
"type": "subscribe", | |
"product_ids": [ "BTC-USD" ], | |
"channels": [ "ticker", "level2" ] | |
}); | |
let client = || { | |
connect_async(Url::parse(url).unwrap(), handle.remote().clone()) | |
.map_err(FHError::Connect) | |
.and_then(|(ws_stream, _)| { | |
println!("Connected to {}", url); | |
let (sink, stream) = ws_stream.split(); | |
sink.send(Message::Text(subscribe.to_string())) | |
.map_err(FHError::Send) | |
.and_then(|sink| { | |
println!("subscribe sent: {}", subscribe); | |
let q_insert = r.db(DB).table(TBL); | |
let counter = Rc::new(RefCell::new(0)); | |
let counter_timer = counter.clone(); | |
let timer_counter = Interval::new(Instant::now(), Duration::from_secs(1)) | |
.map_err(FHError::Timer) | |
.for_each(move |_| { | |
let mut _counter = counter_timer.borrow_mut(); | |
println!("count: {}", _counter); | |
*_counter = 0; | |
Ok(()) | |
}); | |
let timer_ping = Interval::new(Instant::now(), Duration::from_secs(2)) | |
.and_then(|_| Ok(Message::Ping(vec![1]))) | |
.map_err(FHError::Timer); | |
let timer_ping = sink.sink_map_err(FHError::Send).send_all(timer_ping).map(|_| ()); | |
let flow = stream.map_err(FHError::Read).for_each(move |msg| { | |
let mut json: Value = | |
serde_json::from_str(msg.to_text().unwrap()).unwrap(); | |
transform_numbers(&mut json); | |
let r = q_insert.insert(json).run::<ServerStatus>(conn).unwrap(); | |
*counter.borrow_mut() += 1; | |
r.into_future().map(|_| ()).map_err(|(e, _)| FHError::DB(e)) | |
}); | |
let f = timer_counter.select(flow).map(|_| ()).map_err(|(a, _)| a); | |
f.select(timer_ping).map(|_| ()).map_err(|(a, _)| a) | |
}) | |
}) | |
}; | |
let handle_error = |e: FHError| { | |
println!("Error: {:?}", e); | |
match e { | |
FHError::Read(tokio_tungstenite::tungstenite::Error::ConnectionClosed(..)) => { | |
RetryPolicy::Repeat | |
} | |
FHError::Read(tokio_tungstenite::tungstenite::Error::Protocol(ref err)) | |
if err == "Connection reset without closing handshake" => | |
{ | |
RetryPolicy::Repeat | |
} | |
_ => RetryPolicy::ForwardError(e), | |
} | |
}; | |
let future = FutureRetry::new(client, handle_error); | |
core.run(future).unwrap(); | |
} | |
fn main() { | |
let (r, conn) = init_db().unwrap(); | |
let url = "wss://ws-feed.gdax.com"; | |
// let url = "wss://ws-feed-public.sandbox.gdax.com"; | |
// let url = "ws://localhost:5999/ws-feed-public.sandbox.gdax.com"; | |
init_ws(&r, &conn, url); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment