Skip to content

Instantly share code, notes, and snippets.

@mexus
Created July 28, 2018 20:18
Show Gist options
  • Save mexus/6841c1a46cbb6c1f9d9b9de1f29d72bf to your computer and use it in GitHub Desktop.
Save mexus/6841c1a46cbb6c1f9d9b9de1f29d72bf to your computer and use it in GitHub Desktop.
extern crate futures;
extern crate reql;
extern crate reql_types;
extern crate tokio;
extern crate tokio_core;
#[macro_use]
extern crate serde_json;
extern crate tokio_tungstenite;
extern crate url;
// extern crate futures_retry;
extern crate tokio_retry;
extern crate tokio_timer;
use futures::{Future, Sink, Stream};
use reql::{Client, Config, Connection, Run};
use reql_types::ServerStatus;
use serde_json::Value;
use tokio_core::reactor::Core;
use tokio_tungstenite::connect_async;
use tokio_tungstenite::tungstenite::Message;
use url::Url;
// use futures_retry::{RetryPolicy, StreamRetryExt};
use std::time::{Duration, Instant};
use tokio_retry::strategy::FibonacciBackoff;
use tokio_retry::Retry;
use tokio_timer::Interval;
const DB: &'static str = "gdax";
const TBL: &'static str = "BTC_USD";
fn transform_numbers(json: &mut Value) {
if json["type"] == "l2update" {
json["changes"]
.as_array_mut()
.unwrap()
.iter_mut()
.for_each(|v| {
(1..=2).for_each(|i| v[i] = json!(v[i].as_str().unwrap().parse::<f64>().unwrap()));
});
} else if json["type"] == "snapshot" {
json["asks"]
.as_array_mut()
.unwrap()
.iter_mut()
.for_each(|v| {
(0..=1).for_each(|i| {
v[i] = json!(v[i].as_str().unwrap().parse::<f64>().unwrap());
});
});
} else if json["type"] == "ticker" {
[
"best_ask",
"best_bid",
"high_24h",
"last_size",
"low_24h",
"open_24h",
"price",
"volume_24h",
"volume_30d",
].into_iter()
.for_each(|i| {
json.get_mut(i).map(|x| {
*x = json!(x.as_str().unwrap().parse::<f64>().unwrap());
});
});
}
}
fn init_db() -> reql::Result<(Client, Connection)> {
let r = Client::new();
let conn = r.connect(Config::default())?;
// create db
r.db_create(DB)
.run::<ServerStatus>(conn)?
.map(|_| println!("db created"))
.map_err(|err| println!("{:?}", err))
.wait()
.next();
// create table
r.db(DB)
.table_create(TBL)
.run::<ServerStatus>(conn)?
.map(|_| println!("table created"))
.map_err(|err| println!("{:?}", err))
.wait()
.next();
Ok((r, conn))
}
fn get_first(stream: impl Stream) -> impl Future<Item = (), Error = ()> {
stream.into_future().map(|(_maybe_first_item, _rest_stream)| ()).map_err(|_| ())
}
fn init_ws(r: &Client, &conn: &Connection, url: &str) {
let mut core = Core::new().unwrap();
let handle = core.handle();
let subscribe = json!({
"type": "subscribe",
"product_ids": [ "BTC-USD" ],
"channels": [ "ticker", "level2" ]
});
let client = || {
connect_async(Url::parse(url).unwrap(), handle.remote().clone())
// .retry(|_| RetryPolicy::Repeat)
.map_err(|e| eprintln!("connect_async: {}", e))
.and_then(|(ws_stream, _)| {
println!("Connected to {}", url);
let (sink, stream) = ws_stream.split();
let stream = stream.map_err(|e| eprintln!("ws_stream error: {}", e));
sink.send(Message::Text(subscribe.to_string()))
.map_err(|e| eprintln!("sink send: {}", e))
.and_then(|_| {
println!("subscribe sent: {}", subscribe);
let timer = Interval::new(Instant::now(), Duration::from_secs(1)).for_each(|_| {
println!("ttt");
Ok(())
});
let q_insert = r.db(DB).table(TBL.to_owned());
let flow = stream.for_each(move |msg| {
let mut json: Value = serde_json::from_str(msg.to_text().unwrap()).unwrap();
transform_numbers(&mut json);
get_first(q_insert.insert(json).run::<ServerStatus>(conn).unwrap())
});
let f = flow.select2(timer);
f.map(|_| ()).map_err(|_| ())
})
})
};
let retry_strategy = FibonacciBackoff::from_millis(1);
let future = Retry::spawn(retry_strategy, client).then(|_| Ok::<_, ()>(()));
core.run(future).unwrap();
}
fn main() {
let (r, conn) = init_db().unwrap();
let url = "wss://ws-feed.gdax.com";
// let url = "wss://ws-feed-public.sandbox.gdax.com";
// let url = "ws://localhost:5999/ws-feed-public.sandbox.gdax.com";
init_ws(&r, &conn, url);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment