Last active
October 7, 2015 11:18
-
-
Save txus/db972a583cf37fc9a21a to your computer and use it in GitHub Desktop.
Pipe together standard Rust [sync] channels and cool comm::mpmc channels!
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
extern crate comm; | |
use std::sync::mpsc::{SyncSender, Receiver, SendError, RecvError}; | |
use comm::{Error as CommError}; | |
use comm::mpmc::bounded::Channel; | |
use std::thread; | |
use std::marker::Sized; | |
pub enum Xor<A, B> { | |
Left(A), | |
Right(B) | |
} | |
pub type StdChannelError<T> = Xor<RecvError, SendError<T>>; | |
pub enum PipeError<T> { | |
StdRecvError(RecvError), | |
StdSendError(SendError<T>), | |
Comm(CommError) | |
} | |
pub trait BlockingSender<T>: Sized | |
where T: Send { | |
fn blocking_send(&self, msg: T) -> Result<(), PipeError<T>>; | |
} | |
pub trait BlockingReceiver<T>: Sized | |
where T: Send { | |
fn blocking_receive(&self) -> Result<T, PipeError<T>>; | |
} | |
impl<'a, T> BlockingSender<T> for Channel<'a, T> | |
where T: Send { | |
fn blocking_send(&self, msg: T) -> Result<(), PipeError<T>> { | |
self.send_sync(msg).map_err(|(_, err)| PipeError::Comm(err)) | |
} | |
} | |
impl<'a, T> BlockingReceiver<T> for Channel<'a, T> | |
where T: Send { | |
fn blocking_receive(&self) -> Result<T, PipeError<T>> { | |
self.recv_sync().map_err(PipeError::Comm) | |
} | |
} | |
impl<T: Send> BlockingSender<T> for SyncSender<T> { | |
fn blocking_send(&self, msg: T) -> Result<(), PipeError<T>> { | |
self.send(msg).map_err(PipeError::StdSendError) | |
} | |
} | |
impl<T: Send> BlockingReceiver<T> for Receiver<T> { | |
fn blocking_receive(&self) -> Result<T, PipeError<T>> { | |
self.recv().map_err(PipeError::StdRecvError) | |
} | |
} | |
pub fn pipe<A, B, T, U, F>(from: A, to: B, f: F) -> thread::JoinHandle<()> | |
where T: 'static + Send, | |
U: 'static + Send, | |
F: Fn(T) -> U + 'static + Send, | |
A: BlockingReceiver<T> + 'static + Send, | |
B: BlockingSender<U> + 'static + Send | |
{ | |
thread::spawn(move || { | |
loop { | |
from.blocking_receive() | |
.map(&f).map_err(Xor::Left) | |
.and_then(|msg| to.blocking_send(msg).map_err(Xor::Right)); | |
} | |
}) | |
} | |
#[cfg(test)] | |
mod test { | |
use super::*; | |
use std::sync::mpsc::{SyncSender, Receiver}; | |
use std::sync::mpsc::sync_channel; | |
use comm::mpmc::bounded::Channel; | |
use std::thread; | |
#[test] | |
fn it_pipes_together_two_std_channels() { | |
let (tx1, rx1): (SyncSender<i32>, Receiver<i32>) = sync_channel(1); | |
let (tx2, rx2): (SyncSender<String>, Receiver<String>) = sync_channel(1); | |
pipe(rx1, tx2, |n: i32| n.to_string()); | |
thread::spawn(move || { | |
let _ = tx1.send(42).unwrap(); | |
}); | |
assert_eq!("42", rx2.recv().unwrap()); | |
} | |
#[test] | |
fn it_pipes_together_two_comm_channels() { | |
let chan1: Channel<i32> = Channel::new(1); | |
let chan2: Channel<String> = Channel::new(1); | |
let tx = chan1.clone(); | |
let rx = chan2.clone(); | |
pipe(chan1, chan2, |n: i32| n.to_string()); | |
thread::spawn(move || { | |
let _ = tx.send_sync(42).unwrap(); | |
}); | |
assert_eq!("42", rx.recv_sync().unwrap()); | |
} | |
#[test] | |
fn it_pipes_together_an_std_and_a_comm_channel() { | |
let (tx1, rx1): (SyncSender<i32>, Receiver<i32>) = sync_channel(1); | |
let chan2: Channel<String> = Channel::new(1); | |
let rx = chan2.clone(); | |
pipe(rx1, chan2, |n: i32| n.to_string()); | |
thread::spawn(move || { | |
let _ = tx1.send(42).unwrap(); | |
}); | |
assert_eq!("42", rx.recv_sync().unwrap()); | |
} | |
#[test] | |
fn it_pipes_together_a_comm_and_an_std_channel() { | |
let chan1: Channel<i32> = Channel::new(1); | |
let (tx2, rx2): (SyncSender<String>, Receiver<String>) = sync_channel(1); | |
let tx = chan1.clone(); | |
pipe(chan1, tx2, |n: i32| n.to_string()); | |
thread::spawn(move || { | |
let _ = tx.send_sync(42).unwrap(); | |
}); | |
assert_eq!("42", rx2.recv().unwrap()); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment