Last active
August 29, 2015 14:11
-
-
Save rrichardson/49deb9220c604b15bc22 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
fn run<F> (endpoints: Vec<String>, | |
batch_size: uint, | |
query: String, | |
init_strings: Option<Vec<String>>, | |
f: F) -> Sender<Control<T>> | |
where F : Send + Fn(&Prepared, &T) -> Statement | |
{ | |
let (tx1, rx1) : (Sender<Control<T>>, Receiver<Control<T>>) = channel(); | |
spawn(|:| { | |
let mut cw = | |
match Cluster::create(endpoints.connect(",")).connect() { | |
Ok(sess) => { let (tx1, rx1) = channel(); | |
let mut future = sess.prepare(query); | |
future.wait(); | |
if future.error_code().is_error() { | |
return panic!(format!("Error preparing Cassandra query: {}", future.error_code())) | |
} else { | |
CassWriter { session: sess, | |
batch_size: batch_size, | |
data_rx: rx1, | |
prepared: future.get_prepared()} | |
} | |
}, | |
Err(e) => { panic!(format!("Error creating Cassandra session: {}", e)) } | |
}; | |
loop { | |
let mut count = 0; | |
let batch = &mut Batch::new(CASS_BATCH_TYPE_LOGGED); | |
loop { | |
let mut flush = false; | |
match cw.data_rx.recv() { | |
Control::Data(params) => {batch.add_statement(f(&cw.prepared, ¶ms));}, | |
Control::Flush => flush = true, | |
Control::Exit => return, | |
_ => error!("Invalid Cassandra command supplied") | |
} | |
if flush || count >= cw.batch_size { | |
let future = cw.session.execute_batch(batch); | |
future.wait(); | |
if future.error_code().is_error() { | |
//self.data_tx.send(Control::Error(future.error_code().error_string())); this is subscriber-only for now | |
error!("Error receiver from cassandra after batch insert: {}", future.error_message()); | |
} else { | |
//self.data_tx.send(Control::Result(future.get_result().row_count())); | |
} | |
break; | |
} | |
} | |
} | |
}); | |
tx1.clone(); | |
} | |
home/rick/Projects/reactive-rs/src/cass_writer.rs:41:15: 83:10 error: cannot infer an appropriate lifetime due to conflicting requirements | |
/home/rick/Projects/reactive-rs/src/cass_writer.rs:41 spawn(|:| { | |
/home/rick/Projects/reactive-rs/src/cass_writer.rs:42 let mut cw = | |
/home/rick/Projects/reactive-rs/src/cass_writer.rs:43 match Cluster::create(endpoints.connect(",")).connect() { | |
/home/rick/Projects/reactive-rs/src/cass_writer.rs:44 Ok(sess) => { let (tx1, rx1) = channel(); | |
/home/rick/Projects/reactive-rs/src/cass_writer.rs:45 let mut future = sess.prepare(query); | |
/home/rick/Projects/reactive-rs/src/cass_writer.rs:46 future.wait(); | |
... | |
/home/rick/Projects/reactive-rs/src/cass_writer.rs:41:9: 41:14 note: first, the lifetime cannot outlive the expression at 41:8... | |
/home/rick/Projects/reactive-rs/src/cass_writer.rs:41 spawn(|:| { | |
^~~~~ | |
/home/rick/Projects/reactive-rs/src/cass_writer.rs:41:9: 41:14 note: ...so that the declared lifetime parameter bounds are satisfied | |
/home/rick/Projects/reactive-rs/src/cass_writer.rs:41 spawn(|:| { | |
^~~~~ | |
/home/rick/Projects/reactive-rs/src/cass_writer.rs:41:9: 83:11 note: but, the lifetime must be valid for the call at 41:8... | |
/home/rick/Projects/reactive-rs/src/cass_writer.rs:41 spawn(|:| { | |
/home/rick/Projects/reactive-rs/src/cass_writer.rs:42 let mut cw = | |
/home/rick/Projects/reactive-rs/src/cass_writer.rs:43 match Cluster::create(endpoints.connect(",")).connect() { | |
/home/rick/Projects/reactive-rs/src/cass_writer.rs:44 Ok(sess) => { let (tx1, rx1) = channel(); | |
/home/rick/Projects/reactive-rs/src/cass_writer.rs:45 let mut future = sess.prepare(query); | |
/home/rick/Projects/reactive-rs/src/cass_writer.rs:46 future.wait(); | |
... | |
/home/rick/Projects/reactive-rs/src/cass_writer.rs:41:15: 83:10 note: ...so that argument is valid for the call | |
/home/rick/Projects/reactive-rs/src/cass_writer.rs:41 spawn(|:| { | |
/home/rick/Projects/reactive-rs/src/cass_writer.rs:42 let mut cw = | |
/home/rick/Projects/reactive-rs/src/cass_writer.rs:43 match Cluster::create(endpoints.connect(",")).connect() { | |
/home/rick/Projects/reactive-rs/src/cass_writer.rs:44 Ok(sess) => { let (tx1, rx1) = channel(); | |
/home/rick/Projects/reactive-rs/src/cass_writer.rs:45 let mut future = sess.prepare(query); | |
/home/rick/Projects/reactive-rs/src/cass_writer.rs:46 future.wait(); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment