Skip to content

Instantly share code, notes, and snippets.

@mexus
Created July 28, 2018 19:39
Show Gist options
  • Save mexus/8769408dc4dbefb8a40330f9a561b39f to your computer and use it in GitHub Desktop.
Save mexus/8769408dc4dbefb8a40330f9a561b39f to your computer and use it in GitHub Desktop.
let client = || {
connect_async(Url::parse(url).unwrap(), handle.remote().clone())
// .retry(|_| RetryPolicy::Repeat)
.map_err(|e| eprintln!("connect_async: {}", e))
.and_then(|(ws_stream, _)| {
println!("Connected to {}", url);
let (sink, stream) = ws_stream.split();
sink.send(Message::Text(subscribe.to_string()))
.map_err(|e| eprintln!("sink send: {}", e))
.and_then(|_| {
println!("subscribe sent: {}", subscribe);
let timer = Interval::new(Instant::now(), Duration::from_secs(1)).for_each(|_| {
println!("ttt");
Ok(())
}).map_err(|_| ());
let q_insert = r.db(DB).table(TBL.to_owned());
let flow = stream.for_each(move |msg| {
let mut json: Value = serde_json::from_str(msg.to_text().unwrap()).unwrap();
transform_numbers(&mut json);
q_insert.insert(json).run::<ServerStatus>(conn)
.unwrap().wait().next();
// .map_err(|err| println!("{:?}", err) );
// handle.spawn(f2.into_future());
Ok(())
}).map_err(|_| ());
let f = flow.select(timer);
f.map(|_| ()).map_err(|_| ())
})
})
};
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment