Created
February 23, 2014 22:15
-
-
Save manuels/9178144 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
extern crate sync; | |
extern crate collections; | |
use std::comm::Chan; | |
use std::mem::size_of_val; | |
use std::io::{ChanWriter,PortReader}; | |
use collections::hashmap::HashMap; | |
use std::task::try; | |
use sync::RWArc; | |
use std::rand::random; | |
use std::io::timer::sleep; | |
struct b { | |
i: u8 | |
} | |
impl b { | |
fn new() -> b { b{i: 1} } | |
} | |
struct MultiplexStream { | |
// arc: ~RWArc<HashMap<u32, ~Chan<~[u8]>>>, | |
arc: ~RWArc<HashMap<u32, b>>, | |
downstream_chan: Chan<~[u8]> | |
} | |
impl MultiplexStream { | |
fn new(downstream: (Port<~[u8]>, Chan<~[u8]>)) -> ~MultiplexStream { | |
let (downstream_port, downstream_chan) = downstream; | |
let mux = ~MultiplexStream { | |
arc: ~RWArc::new(HashMap::new()), | |
downstream_chan: downstream_chan | |
}; | |
// begin | |
let arc = mux.arc.clone(); | |
spawn(proc() { | |
try(proc() { | |
loop { | |
let mut reader = PortReader::new(downstream_port); | |
let port_num = reader.read_le_u32().unwrap(); | |
let data = reader.read_to_end().unwrap(); | |
arc.read(|open_ports| { | |
match open_ports.find(&port_num) { | |
Some(intermediate) => { | |
/* let success = intermediate.try_send(data.clone()); | |
if !success { | |
arc.write(|open_ports| { | |
open_ports.remove(&port_num); | |
}) | |
}*/ | |
}, | |
None => {} | |
}; | |
}); | |
} | |
return (); | |
}); | |
// downstream was probably closed => cleanup | |
arc.write(|open_ports| { | |
//let iter = open_ports.move_iter(); | |
//TODO: takeall | |
/* | |
for c in iter { | |
// TODO: does this really do what I expect it to do? | |
// Is this really neccessary? As soon as we 'take' the | |
// channel and we reach the next iteration, the channel | |
// should be closed. | |
c.drop() | |
} | |
*/ | |
}) | |
}); | |
// end | |
return mux; | |
} | |
/* | |
fn is_port_open(self, port_num: u32) -> bool { | |
let arc = self.arc.clone(); | |
do arc.read |open_ports| { | |
let res = open_ports.contains_key(portr_num); | |
return res; | |
} | |
} | |
*/ | |
fn open(self, port_num: u32) -> Result<(Port<~b>, Chan<~[u8]>), ()> { | |
let arc = self.arc.clone(); | |
// let (upstream_port, intermediate_chan): (Port<~[u8]>, Chan<~[u8]>) = Chan::new(); | |
let (upstream_port, intermediate_chan): (Port<~b>, Chan<~b>) = Chan::new(); | |
let (intermediate_port, upstream_chan): (Port<~[u8]>, Chan<~[u8]>) = Chan::new(); | |
let upstream = (upstream_port, upstream_chan); | |
let port_is_already_open = arc.write(|open_ports| { | |
//let res = open_ports.find_or_insert(port_num, intermediate_chan); | |
//let port_is_already_open = (res == intermediate_chan); | |
if open_ports.contains_key(&port_num) { | |
return true; | |
} | |
else { | |
open_ports.insert(port_num, b::new()); | |
return false; | |
} | |
}); | |
if port_is_already_open { | |
return Err(()); | |
}; | |
spawn(proc() { | |
//do try { | |
loop { | |
let data = intermediate_port.recv(); | |
let mut writer = ChanWriter::new(self.downstream_chan); | |
writer.write_le_u32(port_num); | |
writer.write(data); | |
writer.flush(); | |
} | |
//} | |
// upstream was probably closed => cleanup | |
arc.write(|open_ports| { | |
open_ports.remove(&port_num); | |
}) | |
}); | |
return Ok(upstream); | |
} | |
} | |
#[test] | |
fn test_multichannel() { | |
let (base_port1, base_chan1): (Port<~[u8]>, Chan<~[u8]>) = Chan::new(); | |
let (base_port2, base_chan2): (Port<~[u8]>, Chan<~[u8]>) = Chan::new(); | |
let mux1 = MultiplexStream::new((base_port1, base_chan2)); | |
let mux2 = MultiplexStream::new((base_port2, base_chan1)); | |
// a MultiplexStream is UDP-like: It is not guaranteed that the remote | |
// host really receives the packet | |
// ...so we must ensure that both sides began listening before sending | |
// any data! | |
let (port1, chan1) = mux1.open(1).unwrap(); | |
let (port2, chan2) = mux2.open(1).unwrap(); | |
spawn(proc() { | |
let msg = ~[1 as u8]; | |
chan1.send(msg); | |
let buf1 = port1.recv(); | |
//assert!(buf1[0] == 2) | |
}); | |
spawn(proc() { | |
let msg = ~[2 as u8]; | |
chan2.send(msg); | |
let buf2 = port2.recv(); | |
//assert!(buf2[0] == 1) | |
}); | |
} | |
/* | |
#[test] | |
fn test_multichannel_quick_check() { | |
let base_stream1: (Port<~[u8]>, Chan<~[u8]>) = Chan::new(); | |
let base_stream2: (Port<~[u8]>, Chan<~[u8]>) = Chan::new(); | |
do spawn { | |
let (port1, _) = base_stream1; | |
let (_, chan2) = base_stream2; | |
let data = port1.recv(); | |
chan2.send(data); | |
} | |
do spawn { | |
let (_, chan1) = base_stream1; | |
let (port2, _) = base_stream2; | |
let data = port2.recv(); | |
chan1.send(data); | |
} | |
let mux1 = MultiplexStream::new(base_stream1); | |
let mux2 = MultiplexStream::new(base_stream2); | |
// a MultiplexStream is UDP-like: It is not guaranteed that the remote | |
// host really receives the packet | |
let mux1 = MultiplexStream::new(base_stream1); | |
let mux2 = MultiplexStream::new(base_stream2); | |
for mux in (mux1, mux2) { | |
do spawn { | |
let open_ports = []; | |
loop { | |
let action: u8 = random(); | |
match action { | |
0 if !open_ports.is_empty() => { | |
// send random data over a random channel | |
// the port is at least locally open, maybe not on remote | |
let len = (random::<u32>() % 4096)+1; | |
let data = do range(len).iter().map { random::<u8>() }; | |
let port = open_ports.ind_sample(); | |
port.send(data); | |
} | |
1 => { | |
// try to open a random, non-open port | |
for retry in range(100) { | |
let num = random::<u32>(); | |
if !open_ports.any(|port| port.num == num) { | |
open_ports.append_one(mux.open(num)); | |
break; | |
} | |
} | |
} | |
2 if !open_ports.is_empty() => { | |
// close a random, open port | |
let port = open_ports.ind_sample(); | |
open_ports.remove(&port); | |
port.close(); | |
} | |
3 => { | |
// try to receive data from a random, open port | |
let port = open_ports.ind_sample(); | |
let data = port.recv(); | |
} | |
} | |
sleep(random::<u8>() % 100); // in millisec | |
} | |
} | |
} | |
} | |
*/ |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment