Skip to content

Instantly share code, notes, and snippets.

@mexus
Created August 1, 2018 20:21
Show Gist options
  • Save mexus/c5fdaa639ac6e9d7082e4a0e331b4d3b to your computer and use it in GitHub Desktop.
Save mexus/c5fdaa639ac6e9d7082e4a0e331b4d3b to your computer and use it in GitHub Desktop.
extern crate futures;
extern crate reql;
extern crate reql_types;
extern crate tokio;
extern crate tokio_core;
#[macro_use]
extern crate serde_json;
extern crate futures_retry;
extern crate tokio_timer;
extern crate tokio_tungstenite;
extern crate url;
#[macro_use]
extern crate failure;
use futures::{Future, Sink, Stream};
use futures_retry::{FutureRetry, RetryPolicy};
use reql::{Client, Config, Connection, Run};
use reql_types::ServerStatus;
use serde_json::Value;
use std::cell::RefCell;
use std::rc::Rc;
use std::time::{Duration, Instant};
use tokio_core::reactor::Core;
use tokio_timer::Interval;
use tokio_tungstenite::connect_async;
use tokio_tungstenite::tungstenite::Message;
use url::Url;
const DB: &'static str = "gdax";
const TBL: &'static str = "BTC_USD";
#[derive(Debug, Fail)]
enum FHError {
#[fail(display = "connect")]
Connect(#[cause] tokio_tungstenite::tungstenite::Error),
#[fail(display = "send")]
Send(#[cause] tokio_tungstenite::tungstenite::Error),
#[fail(display = "read")]
Read(#[cause] tokio_tungstenite::tungstenite::Error),
#[fail(display = "db")]
Timer(#[cause] tokio_timer::Error),
#[fail(display = "select")]
DB(#[cause] reql::errors::Error),
}
fn transform_numbers(json: &mut Value) {
if json["type"] == "l2update" {
json["changes"]
.as_array_mut()
.unwrap()
.iter_mut()
.for_each(|v| {
(1..=2).for_each(|i| v[i] = json!(v[i].as_str().unwrap().parse::<f64>().unwrap()));
});
} else if json["type"] == "snapshot" {
json["asks"]
.as_array_mut()
.unwrap()
.iter_mut()
.for_each(|v| {
(0..=1).for_each(|i| {
v[i] = json!(v[i].as_str().unwrap().parse::<f64>().unwrap());
});
});
} else if json["type"] == "ticker" {
[
"best_ask",
"best_bid",
"high_24h",
"last_size",
"low_24h",
"open_24h",
"price",
"volume_24h",
"volume_30d",
]
.into_iter()
.for_each(|i| {
json.get_mut(i).map(|x| {
*x = json!(x.as_str().unwrap().parse::<f64>().unwrap());
});
});
}
}
fn init_db() -> reql::Result<(Client, Connection)> {
let r = Client::new();
let conn = r.connect(Config::default())?;
// create db
r.db_create(DB)
.run::<ServerStatus>(conn)?
.map(|_| println!("db created"))
.map_err(|err| println!("{:?}", err))
.wait()
.next();
// create table
r.db(DB)
.table_create(TBL)
.run::<ServerStatus>(conn)?
.map(|_| println!("table created"))
.map_err(|err| println!("{:?}", err))
.wait()
.next();
Ok((r, conn))
}
fn init_ws(r: &Client, &conn: &Connection, url: &str) {
let mut core = Core::new().unwrap();
let handle = core.handle();
let subscribe = json!({
"type": "subscribe",
"product_ids": [ "BTC-USD" ],
"channels": [ "ticker", "level2" ]
});
let client = || {
connect_async(Url::parse(url).unwrap(), handle.remote().clone())
.map_err(FHError::Connect)
.and_then(|(ws_stream, _)| {
println!("Connected to {}", url);
let (sink, stream) = ws_stream.split();
sink.send(Message::Text(subscribe.to_string()))
.map_err(FHError::Send)
.and_then(|sink| {
println!("subscribe sent: {}", subscribe);
let q_insert = r.db(DB).table(TBL);
let counter = Rc::new(RefCell::new(0));
let counter_timer = counter.clone();
let timer_counter = Interval::new(Instant::now(), Duration::from_secs(1))
.map_err(FHError::Timer)
.for_each(move |_| {
let mut _counter = counter_timer.borrow_mut();
println!("count: {}", _counter);
*_counter = 0;
Ok(())
});
let timer_ping = Interval::new(Instant::now(), Duration::from_secs(2))
.and_then(|_| Ok(Message::Ping(vec![1])))
.map_err(FHError::Timer);
let timer_ping = sink.sink_map_err(FHError::Send).send_all(timer_ping).map(|_| ());
let flow = stream.map_err(FHError::Read).for_each(move |msg| {
let mut json: Value =
serde_json::from_str(msg.to_text().unwrap()).unwrap();
transform_numbers(&mut json);
let r = q_insert.insert(json).run::<ServerStatus>(conn).unwrap();
*counter.borrow_mut() += 1;
r.into_future().map(|_| ()).map_err(|(e, _)| FHError::DB(e))
});
let f = timer_counter.select(flow).map(|_| ()).map_err(|(a, _)| a);
f.select(timer_ping).map(|_| ()).map_err(|(a, _)| a)
})
})
};
let handle_error = |e: FHError| {
println!("Error: {:?}", e);
match e {
FHError::Read(tokio_tungstenite::tungstenite::Error::ConnectionClosed(..)) => {
RetryPolicy::Repeat
}
FHError::Read(tokio_tungstenite::tungstenite::Error::Protocol(ref err))
if err == "Connection reset without closing handshake" =>
{
RetryPolicy::Repeat
}
_ => RetryPolicy::ForwardError(e),
}
};
let future = FutureRetry::new(client, handle_error);
core.run(future).unwrap();
}
fn main() {
let (r, conn) = init_db().unwrap();
let url = "wss://ws-feed.gdax.com";
// let url = "wss://ws-feed-public.sandbox.gdax.com";
// let url = "ws://localhost:5999/ws-feed-public.sandbox.gdax.com";
init_ws(&r, &conn, url);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment