Last active
August 24, 2018 04:38
-
-
Save ian-p-cooke/145f14fdafc5100a1fd4f2ea29098435 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#[derive(Message)] | |
struct Batch { | |
key: String, | |
messages: Vec<String> | |
} | |
struct RedisListTailer { | |
redis: Addr<Redis>, | |
key: String, | |
current_index: isize, | |
batch_size: isize, | |
sink: Recipient<Batch>, | |
} | |
impl RedisListTailer { | |
pub fn from_index(redis: Addr<Redis>, key: String, start_index: isize, batch_size: isize, sink: Recipient<Batch>) -> RedisListTailer { | |
assert!(start_index >= 0); | |
assert!(batch_size > 0); | |
RedisListTailer { redis, key, current_index: start_index, batch_size, sink } | |
} | |
pub fn from_beginning(redis: Addr<Redis>, key: String, batch_size: isize, sink: Recipient<Batch>) -> RedisListTailer { | |
RedisListTailer::from_index(redis, key, 0, batch_size, sink) | |
} | |
} | |
#[derive(Message)] | |
struct RunOnce; | |
impl Actor for RedisListTailer { | |
type Context = Context<Self>; | |
fn started(&mut self, ctx: &mut Self::Context) { | |
ctx.notify_later(RunOnce, Duration::from_secs(0)); | |
} | |
} | |
impl Handler<RunOnce> for RedisListTailer { | |
type Result = (); | |
fn handle(&mut self, _msg: RunOnce, ctx: &mut Context<Self>) -> Self::Result { | |
let req = self.redis.send(Lrange { key: self.key.clone(), start_index: self.current_index, stop_index: self.current_index + self.batch_size - 1 }); | |
ctx.spawn(req.into_actor(self).then(move |r, act, ctx| { | |
match r { | |
Ok(res) => { | |
match res { | |
Ok(messages) => { | |
let batch_len = messages.len(); | |
if batch_len == 0 { | |
ctx.notify_later(RunOnce, Duration::from_secs(1)); | |
} else { | |
match act.sink.do_send(Batch { key: act.key.clone(), messages }) { | |
Ok(_) => { | |
act.current_index += batch_len as isize; | |
ctx.notify(RunOnce); | |
} | |
Err(e) => { | |
println!("sink error when sending batch: {}", e); | |
ctx.stop(); | |
} | |
} | |
} | |
}, | |
Err(e) => { | |
println!("redis error during lrange: {}", e); | |
ctx.stop(); | |
} | |
} | |
} | |
Err(e) => { | |
println!("mailbox error during lrange: {}", e); | |
ctx.stop(); | |
} | |
} | |
actix::fut::ok(()) | |
})); | |
() | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
the most recent revision shows what things look like after using into_actor. much nicer.