Created
July 30, 2018 20:42
-
-
Save mexus/ca5257550be5e0a5b9a6772cd81a9410 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
extern crate futures; | |
extern crate reql; | |
extern crate reql_types; | |
extern crate tokio; | |
extern crate tokio_core; | |
#[macro_use] | |
extern crate serde_json; | |
extern crate and_then2; | |
extern crate futures_retry; | |
extern crate tokio_timer; | |
extern crate tokio_tungstenite; | |
extern crate url; | |
use and_then2::FutureExt; | |
use futures::{Future, IntoFuture, Sink, Stream}; | |
use futures_retry::{FutureRetry, RetryPolicy}; | |
use reql::{Client, Config, Connection, Run}; | |
use reql_types::ServerStatus; | |
use serde_json::Value; | |
use std::time::{Duration, Instant}; | |
use tokio_core::reactor::Core; | |
use tokio_timer::Interval; | |
use tokio_tungstenite::connect_async; | |
use tokio_tungstenite::tungstenite::Message; | |
use url::Url; | |
const DB: &'static str = "gdax"; | |
const TBL: &'static str = "BTC_USD"; | |
fn transform_numbers(json: &mut Value) { | |
if json["type"] == "l2update" { | |
json["changes"] | |
.as_array_mut() | |
.unwrap() | |
.iter_mut() | |
.for_each(|v| { | |
(1..=2).for_each(|i| v[i] = json!(v[i].as_str().unwrap().parse::<f64>().unwrap())); | |
}); | |
} else if json["type"] == "snapshot" { | |
json["asks"] | |
.as_array_mut() | |
.unwrap() | |
.iter_mut() | |
.for_each(|v| { | |
(0..=1).for_each(|i| { | |
v[i] = json!(v[i].as_str().unwrap().parse::<f64>().unwrap()); | |
}); | |
}); | |
} else if json["type"] == "ticker" { | |
[ | |
"best_ask", | |
"best_bid", | |
"high_24h", | |
"last_size", | |
"low_24h", | |
"open_24h", | |
"price", | |
"volume_24h", | |
"volume_30d", | |
].into_iter() | |
.for_each(|i| { | |
json.get_mut(i).map(|x| { | |
*x = json!(x.as_str().unwrap().parse::<f64>().unwrap()); | |
}); | |
}); | |
} | |
} | |
fn init_db() -> reql::Result<(Client, Connection)> { | |
let r = Client::new(); | |
let conn = r.connect(Config::default())?; | |
// create db | |
r.db_create(DB) | |
.run::<ServerStatus>(conn)? | |
.map(|_| println!("db created")) | |
.map_err(|err| println!("{:?}", err)) | |
.wait() | |
.next(); | |
// create table | |
r.db(DB) | |
.table_create(TBL) | |
.run::<ServerStatus>(conn)? | |
.map(|_| println!("table created")) | |
.map_err(|err| println!("{:?}", err)) | |
.wait() | |
.next(); | |
Ok((r, conn)) | |
} | |
fn for_each2<S, F, R>(s: S, mut f: F) -> impl Future<Item = Result<(), R::Error>, Error = S::Error> | |
where | |
S: Stream, | |
F: FnMut(S::Item) -> R, | |
R: IntoFuture, | |
{ | |
enum TempError<A, B> { | |
TopLevel(A), | |
BottomLevel(B), | |
} | |
s.map_err(TempError::TopLevel) | |
.for_each(move |s_item: S::Item| { | |
f(s_item) | |
.into_future() | |
.map(|_| ()) | |
.map_err(TempError::BottomLevel) | |
}) | |
.then(|res| match res { | |
Ok(()) => Ok(Ok(())), | |
Err(TempError::TopLevel(upper)) => Err(upper), | |
Err(TempError::BottomLevel(lower)) => Ok(Err(lower)), | |
}) | |
} | |
fn init_ws(r: &Client, &conn: &Connection, url: &str) { | |
let mut core = Core::new().unwrap(); | |
let handle = core.handle(); | |
let subscribe = json!({ | |
"type": "subscribe", | |
"product_ids": [ "BTC-USD" ], | |
"channels": [ "ticker", "level2" ] | |
}); | |
let client = || { | |
connect_async(Url::parse(url).unwrap(), handle.remote().clone()).and_then2( | |
|(ws_stream, _)| { | |
println!("Connected to {}", url); | |
let (sink, stream) = ws_stream.split(); | |
let stream = stream.map_err(|e| println!("{}", e)); | |
sink.send(Message::Text(subscribe.to_string())) | |
.and_then2(|_| { | |
println!("subscribe sent: {}", subscribe); | |
let timer = | |
Interval::new(Instant::now(), Duration::from_secs(1)).for_each(|_| { | |
println!("ttt"); | |
Ok(()) | |
}); | |
let q_insert = r.db(DB).table(TBL.to_owned()); | |
let flow = for_each2(stream, move |msg| { | |
let mut json: Value = | |
serde_json::from_str(msg.to_text().unwrap()).unwrap(); | |
transform_numbers(&mut json); | |
let r = q_insert.insert(json).run::<ServerStatus>(conn).unwrap(); | |
r.into_future() | |
}); | |
flow.select2(timer) | |
}) | |
}, | |
) | |
}; | |
let handle_error = |_e| { | |
RetryPolicy::Repeat::<()> | |
// match _e { | |
// Err(_) => RetryPolicy::Repeat, | |
// _ => RetryPolicy::ForwardError(e), | |
// } | |
}; | |
let future = FutureRetry::new(client, handle_error); | |
core.run(future).unwrap(); | |
} | |
fn main() { | |
let (r, conn) = init_db().unwrap(); | |
let url = "wss://ws-feed.gdax.com"; | |
// let url = "wss://ws-feed-public.sandbox.gdax.com"; | |
// let url = "ws://localhost:5999/ws-feed-public.sandbox.gdax.com"; | |
init_ws(&r, &conn, url); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment