Skip to content

Instantly share code, notes, and snippets.

@rrichardson
Last active August 29, 2015 14:13
Show Gist options
  • Save rrichardson/a48ffd8b242207ca6918 to your computer and use it in GitHub Desktop.
Save rrichardson/a48ffd8b242207ca6918 to your computer and use it in GitHub Desktop.
pub trait Subscriber {
type Input;
fn on_next(&mut self, t: Self::Input) -> bool;
fn on_subscribe(&mut self, usize) {
debug!("on_subscribe called");
}
fn on_error(&mut self, err: &str) {
error!("on_error called: {:?}", err);
}
fn on_complete(&mut self, force: bool) {
debug!("on_complete called");
}
}
pub trait Publisher<'a> {
type Output;
fn subscribe(&mut self, Box<Subscriber<Input=Self::Output> + 'a>);
fn next(&mut self) -> bool {
panic!("Unimplemented fn, presumably run() or next() is being attempted on a processor, not a publisher");
}
fn run(&mut self) {
loop {
if ! self.next() {
break
}
}
}
}
pub trait Processor<'a> : Subscriber + Publisher<'a> { }
impl<'a, P> Processor<'a> for P
where P : Subscriber + Publisher<'a> { }
struct StreamBuf (pub usize, pub usize);
/// A Traversal style representation of a socket
#[derive(Clone)]
pub struct NetStream<'a> {
pub dtx: Sender,
pub drx: Arc<Receiver<StreamBuf>>,
pub tok: Token,
}
impl<'a> NetStream<'a>
{
pub fn new(tok: Token,
drx: Receiver<StreamBuf>,
dtx: Sender) -> NetStream<'a> {
NetStream { tok: tok, drx: Arc::new(drx), dtx: dtx.clone() }
}
}
pub struct NetStreamer<'a>
{
stream: NetStream<'a>,
subscriber: Option<Box<Subscriber<Input=StreamBuf> + 'a >>
//subscriber: Option<Box<Subscriber<Input=<NetStreamer<'a> as Publisher<'a>>::Output> + 'a >>
}
impl<'a> Subscriber for NetStreamer<'a>
{
type Input = StreamBuf;
fn on_next(&mut self, StreamBuf (buf, _) : StreamBuf) -> bool {
//TODO better handle queue failure, maybe put the returned buf
//isizeo a recovery queue
match self.stream.dtx.send(StreamBuf(buf, self.stream.tok)) {
Ok(()) => true,
Err(_) => false
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment