Skip to content

Instantly share code, notes, and snippets.

@mexus
Created July 30, 2018 20:42
Show Gist options
  • Save mexus/ca5257550be5e0a5b9a6772cd81a9410 to your computer and use it in GitHub Desktop.
Save mexus/ca5257550be5e0a5b9a6772cd81a9410 to your computer and use it in GitHub Desktop.
extern crate futures;
extern crate reql;
extern crate reql_types;
extern crate tokio;
extern crate tokio_core;
#[macro_use]
extern crate serde_json;
extern crate and_then2;
extern crate futures_retry;
extern crate tokio_timer;
extern crate tokio_tungstenite;
extern crate url;
use and_then2::FutureExt;
use futures::{Future, IntoFuture, Sink, Stream};
use futures_retry::{FutureRetry, RetryPolicy};
use reql::{Client, Config, Connection, Run};
use reql_types::ServerStatus;
use serde_json::Value;
use std::time::{Duration, Instant};
use tokio_core::reactor::Core;
use tokio_timer::Interval;
use tokio_tungstenite::connect_async;
use tokio_tungstenite::tungstenite::Message;
use url::Url;
const DB: &'static str = "gdax";
const TBL: &'static str = "BTC_USD";
fn transform_numbers(json: &mut Value) {
if json["type"] == "l2update" {
json["changes"]
.as_array_mut()
.unwrap()
.iter_mut()
.for_each(|v| {
(1..=2).for_each(|i| v[i] = json!(v[i].as_str().unwrap().parse::<f64>().unwrap()));
});
} else if json["type"] == "snapshot" {
json["asks"]
.as_array_mut()
.unwrap()
.iter_mut()
.for_each(|v| {
(0..=1).for_each(|i| {
v[i] = json!(v[i].as_str().unwrap().parse::<f64>().unwrap());
});
});
} else if json["type"] == "ticker" {
[
"best_ask",
"best_bid",
"high_24h",
"last_size",
"low_24h",
"open_24h",
"price",
"volume_24h",
"volume_30d",
].into_iter()
.for_each(|i| {
json.get_mut(i).map(|x| {
*x = json!(x.as_str().unwrap().parse::<f64>().unwrap());
});
});
}
}
fn init_db() -> reql::Result<(Client, Connection)> {
let r = Client::new();
let conn = r.connect(Config::default())?;
// create db
r.db_create(DB)
.run::<ServerStatus>(conn)?
.map(|_| println!("db created"))
.map_err(|err| println!("{:?}", err))
.wait()
.next();
// create table
r.db(DB)
.table_create(TBL)
.run::<ServerStatus>(conn)?
.map(|_| println!("table created"))
.map_err(|err| println!("{:?}", err))
.wait()
.next();
Ok((r, conn))
}
fn for_each2<S, F, R>(s: S, mut f: F) -> impl Future<Item = Result<(), R::Error>, Error = S::Error>
where
S: Stream,
F: FnMut(S::Item) -> R,
R: IntoFuture,
{
enum TempError<A, B> {
TopLevel(A),
BottomLevel(B),
}
s.map_err(TempError::TopLevel)
.for_each(move |s_item: S::Item| {
f(s_item)
.into_future()
.map(|_| ())
.map_err(TempError::BottomLevel)
})
.then(|res| match res {
Ok(()) => Ok(Ok(())),
Err(TempError::TopLevel(upper)) => Err(upper),
Err(TempError::BottomLevel(lower)) => Ok(Err(lower)),
})
}
fn init_ws(r: &Client, &conn: &Connection, url: &str) {
let mut core = Core::new().unwrap();
let handle = core.handle();
let subscribe = json!({
"type": "subscribe",
"product_ids": [ "BTC-USD" ],
"channels": [ "ticker", "level2" ]
});
let client = || {
connect_async(Url::parse(url).unwrap(), handle.remote().clone()).and_then2(
|(ws_stream, _)| {
println!("Connected to {}", url);
let (sink, stream) = ws_stream.split();
let stream = stream.map_err(|e| println!("{}", e));
sink.send(Message::Text(subscribe.to_string()))
.and_then2(|_| {
println!("subscribe sent: {}", subscribe);
let timer =
Interval::new(Instant::now(), Duration::from_secs(1)).for_each(|_| {
println!("ttt");
Ok(())
});
let q_insert = r.db(DB).table(TBL.to_owned());
let flow = for_each2(stream, move |msg| {
let mut json: Value =
serde_json::from_str(msg.to_text().unwrap()).unwrap();
transform_numbers(&mut json);
let r = q_insert.insert(json).run::<ServerStatus>(conn).unwrap();
r.into_future()
});
flow.select2(timer)
})
},
)
};
let handle_error = |_e| {
RetryPolicy::Repeat::<()>
// match _e {
// Err(_) => RetryPolicy::Repeat,
// _ => RetryPolicy::ForwardError(e),
// }
};
let future = FutureRetry::new(client, handle_error);
core.run(future).unwrap();
}
fn main() {
let (r, conn) = init_db().unwrap();
let url = "wss://ws-feed.gdax.com";
// let url = "wss://ws-feed-public.sandbox.gdax.com";
// let url = "ws://localhost:5999/ws-feed-public.sandbox.gdax.com";
init_ws(&r, &conn, url);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment