Skip to content

Instantly share code, notes, and snippets.

@rrichardson
Last active August 29, 2015 14:11
Show Gist options
  • Save rrichardson/49deb9220c604b15bc22 to your computer and use it in GitHub Desktop.
Save rrichardson/49deb9220c604b15bc22 to your computer and use it in GitHub Desktop.
fn run<F> (endpoints: Vec<String>,
batch_size: uint,
query: String,
init_strings: Option<Vec<String>>,
f: F) -> Sender<Control<T>>
where F : Send + Fn(&Prepared, &T) -> Statement
{
let (tx1, rx1) : (Sender<Control<T>>, Receiver<Control<T>>) = channel();
spawn(|:| {
let mut cw =
match Cluster::create(endpoints.connect(",")).connect() {
Ok(sess) => { let (tx1, rx1) = channel();
let mut future = sess.prepare(query);
future.wait();
if future.error_code().is_error() {
return panic!(format!("Error preparing Cassandra query: {}", future.error_code()))
} else {
CassWriter { session: sess,
batch_size: batch_size,
data_rx: rx1,
prepared: future.get_prepared()}
}
},
Err(e) => { panic!(format!("Error creating Cassandra session: {}", e)) }
};
loop {
let mut count = 0;
let batch = &mut Batch::new(CASS_BATCH_TYPE_LOGGED);
loop {
let mut flush = false;
match cw.data_rx.recv() {
Control::Data(params) => {batch.add_statement(f(&cw.prepared, &params));},
Control::Flush => flush = true,
Control::Exit => return,
_ => error!("Invalid Cassandra command supplied")
}
if flush || count >= cw.batch_size {
let future = cw.session.execute_batch(batch);
future.wait();
if future.error_code().is_error() {
//self.data_tx.send(Control::Error(future.error_code().error_string())); this is subscriber-only for now
error!("Error receiver from cassandra after batch insert: {}", future.error_message());
} else {
//self.data_tx.send(Control::Result(future.get_result().row_count()));
}
break;
}
}
}
});
tx1.clone();
}
home/rick/Projects/reactive-rs/src/cass_writer.rs:41:15: 83:10 error: cannot infer an appropriate lifetime due to conflicting requirements
/home/rick/Projects/reactive-rs/src/cass_writer.rs:41 spawn(|:| {
/home/rick/Projects/reactive-rs/src/cass_writer.rs:42 let mut cw =
/home/rick/Projects/reactive-rs/src/cass_writer.rs:43 match Cluster::create(endpoints.connect(",")).connect() {
/home/rick/Projects/reactive-rs/src/cass_writer.rs:44 Ok(sess) => { let (tx1, rx1) = channel();
/home/rick/Projects/reactive-rs/src/cass_writer.rs:45 let mut future = sess.prepare(query);
/home/rick/Projects/reactive-rs/src/cass_writer.rs:46 future.wait();
...
/home/rick/Projects/reactive-rs/src/cass_writer.rs:41:9: 41:14 note: first, the lifetime cannot outlive the expression at 41:8...
/home/rick/Projects/reactive-rs/src/cass_writer.rs:41 spawn(|:| {
^~~~~
/home/rick/Projects/reactive-rs/src/cass_writer.rs:41:9: 41:14 note: ...so that the declared lifetime parameter bounds are satisfied
/home/rick/Projects/reactive-rs/src/cass_writer.rs:41 spawn(|:| {
^~~~~
/home/rick/Projects/reactive-rs/src/cass_writer.rs:41:9: 83:11 note: but, the lifetime must be valid for the call at 41:8...
/home/rick/Projects/reactive-rs/src/cass_writer.rs:41 spawn(|:| {
/home/rick/Projects/reactive-rs/src/cass_writer.rs:42 let mut cw =
/home/rick/Projects/reactive-rs/src/cass_writer.rs:43 match Cluster::create(endpoints.connect(",")).connect() {
/home/rick/Projects/reactive-rs/src/cass_writer.rs:44 Ok(sess) => { let (tx1, rx1) = channel();
/home/rick/Projects/reactive-rs/src/cass_writer.rs:45 let mut future = sess.prepare(query);
/home/rick/Projects/reactive-rs/src/cass_writer.rs:46 future.wait();
...
/home/rick/Projects/reactive-rs/src/cass_writer.rs:41:15: 83:10 note: ...so that argument is valid for the call
/home/rick/Projects/reactive-rs/src/cass_writer.rs:41 spawn(|:| {
/home/rick/Projects/reactive-rs/src/cass_writer.rs:42 let mut cw =
/home/rick/Projects/reactive-rs/src/cass_writer.rs:43 match Cluster::create(endpoints.connect(",")).connect() {
/home/rick/Projects/reactive-rs/src/cass_writer.rs:44 Ok(sess) => { let (tx1, rx1) = channel();
/home/rick/Projects/reactive-rs/src/cass_writer.rs:45 let mut future = sess.prepare(query);
/home/rick/Projects/reactive-rs/src/cass_writer.rs:46 future.wait();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment