Last active
August 29, 2015 14:13
-
-
Save rrichardson/a48ffd8b242207ca6918 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
pub trait Subscriber { | |
type Input; | |
fn on_next(&mut self, t: Self::Input) -> bool; | |
fn on_subscribe(&mut self, usize) { | |
debug!("on_subscribe called"); | |
} | |
fn on_error(&mut self, err: &str) { | |
error!("on_error called: {:?}", err); | |
} | |
fn on_complete(&mut self, force: bool) { | |
debug!("on_complete called"); | |
} | |
} | |
pub trait Publisher<'a> { | |
type Output; | |
fn subscribe(&mut self, Box<Subscriber<Input=Self::Output> + 'a>); | |
fn next(&mut self) -> bool { | |
panic!("Unimplemented fn, presumably run() or next() is being attempted on a processor, not a publisher"); | |
} | |
fn run(&mut self) { | |
loop { | |
if ! self.next() { | |
break | |
} | |
} | |
} | |
} | |
pub trait Processor<'a> : Subscriber + Publisher<'a> { } | |
impl<'a, P> Processor<'a> for P | |
where P : Subscriber + Publisher<'a> { } | |
struct StreamBuf (pub usize, pub usize); | |
/// A Traversal style representation of a socket | |
#[derive(Clone)] | |
pub struct NetStream<'a> { | |
pub dtx: Sender, | |
pub drx: Arc<Receiver<StreamBuf>>, | |
pub tok: Token, | |
} | |
impl<'a> NetStream<'a> | |
{ | |
pub fn new(tok: Token, | |
drx: Receiver<StreamBuf>, | |
dtx: Sender) -> NetStream<'a> { | |
NetStream { tok: tok, drx: Arc::new(drx), dtx: dtx.clone() } | |
} | |
} | |
pub struct NetStreamer<'a> | |
{ | |
stream: NetStream<'a>, | |
subscriber: Option<Box<Subscriber<Input=StreamBuf> + 'a >> | |
//subscriber: Option<Box<Subscriber<Input=<NetStreamer<'a> as Publisher<'a>>::Output> + 'a >> | |
} | |
impl<'a> Subscriber for NetStreamer<'a> | |
{ | |
type Input = StreamBuf; | |
fn on_next(&mut self, StreamBuf (buf, _) : StreamBuf) -> bool { | |
//TODO better handle queue failure, maybe put the returned buf | |
//isizeo a recovery queue | |
match self.stream.dtx.send(StreamBuf(buf, self.stream.tok)) { | |
Ok(()) => true, | |
Err(_) => false | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment