Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- extern crate sync;
- extern crate collections;
- use std::comm::Chan;
- use std::mem::size_of_val;
- use std::io::{ChanWriter,PortReader};
- use collections::hashmap::HashMap;
- use std::task::try;
- use sync::RWArc;
- use std::rand::random;
- use std::io::timer::sleep;
- struct b {
- i: u8
- }
- impl b {
- fn new() -> b { b{i: 1} }
- }
- struct MultiplexStream {
- // arc: ~RWArc<HashMap<u32, ~Chan<~[u8]>>>,
- arc: ~RWArc<HashMap<u32, b>>,
- downstream_chan: Chan<~[u8]>
- }
- impl MultiplexStream {
- fn new(downstream: (Port<~[u8]>, Chan<~[u8]>)) -> ~MultiplexStream {
- let (downstream_port, downstream_chan) = downstream;
- let mux = ~MultiplexStream {
- arc: ~RWArc::new(HashMap::new()),
- downstream_chan: downstream_chan
- };
- // begin
- let arc = mux.arc.clone();
- spawn(proc() {
- try(proc() {
- loop {
- let mut reader = PortReader::new(downstream_port);
- let port_num = reader.read_le_u32().unwrap();
- let data = reader.read_to_end().unwrap();
- arc.read(|open_ports| {
- match open_ports.find(&port_num) {
- Some(intermediate) => {
- /* let success = intermediate.try_send(data.clone());
- if !success {
- arc.write(|open_ports| {
- open_ports.remove(&port_num);
- })
- }*/
- },
- None => {}
- };
- });
- }
- return ();
- });
- // downstream was probably closed => cleanup
- arc.write(|open_ports| {
- //let iter = open_ports.move_iter();
- //TODO: takeall
- /*
- for c in iter {
- // TODO: does this really do what I expect it to do?
- // Is this really neccessary? As soon as we 'take' the
- // channel and we reach the next iteration, the channel
- // should be closed.
- c.drop()
- }
- */
- })
- });
- // end
- return mux;
- }
- /*
- fn is_port_open(self, port_num: u32) -> bool {
- let arc = self.arc.clone();
- do arc.read |open_ports| {
- let res = open_ports.contains_key(portr_num);
- return res;
- }
- }
- */
- fn open(self, port_num: u32) -> Result<(Port<~b>, Chan<~[u8]>), ()> {
- let arc = self.arc.clone();
- // let (upstream_port, intermediate_chan): (Port<~[u8]>, Chan<~[u8]>) = Chan::new();
- let (upstream_port, intermediate_chan): (Port<~b>, Chan<~b>) = Chan::new();
- let (intermediate_port, upstream_chan): (Port<~[u8]>, Chan<~[u8]>) = Chan::new();
- let upstream = (upstream_port, upstream_chan);
- let port_is_already_open = arc.write(|open_ports| {
- //let res = open_ports.find_or_insert(port_num, intermediate_chan);
- //let port_is_already_open = (res == intermediate_chan);
- if open_ports.contains_key(&port_num) {
- return true;
- }
- else {
- open_ports.insert(port_num, b::new());
- return false;
- }
- });
- if port_is_already_open {
- return Err(());
- };
- spawn(proc() {
- //do try {
- loop {
- let data = intermediate_port.recv();
- let mut writer = ChanWriter::new(self.downstream_chan);
- writer.write_le_u32(port_num);
- writer.write(data);
- writer.flush();
- }
- //}
- // upstream was probably closed => cleanup
- arc.write(|open_ports| {
- open_ports.remove(&port_num);
- })
- });
- return Ok(upstream);
- }
- }
- #[test]
- fn test_multichannel() {
- let (base_port1, base_chan1): (Port<~[u8]>, Chan<~[u8]>) = Chan::new();
- let (base_port2, base_chan2): (Port<~[u8]>, Chan<~[u8]>) = Chan::new();
- let mux1 = MultiplexStream::new((base_port1, base_chan2));
- let mux2 = MultiplexStream::new((base_port2, base_chan1));
- // a MultiplexStream is UDP-like: It is not guaranteed that the remote
- // host really receives the packet
- // ...so we must ensure that both sides began listening before sending
- // any data!
- let (port1, chan1) = mux1.open(1).unwrap();
- let (port2, chan2) = mux2.open(1).unwrap();
- spawn(proc() {
- let msg = ~[1 as u8];
- chan1.send(msg);
- let buf1 = port1.recv();
- //assert!(buf1[0] == 2)
- });
- spawn(proc() {
- let msg = ~[2 as u8];
- chan2.send(msg);
- let buf2 = port2.recv();
- //assert!(buf2[0] == 1)
- });
- }
- /*
- #[test]
- fn test_multichannel_quick_check() {
- let base_stream1: (Port<~[u8]>, Chan<~[u8]>) = Chan::new();
- let base_stream2: (Port<~[u8]>, Chan<~[u8]>) = Chan::new();
- do spawn {
- let (port1, _) = base_stream1;
- let (_, chan2) = base_stream2;
- let data = port1.recv();
- chan2.send(data);
- }
- do spawn {
- let (_, chan1) = base_stream1;
- let (port2, _) = base_stream2;
- let data = port2.recv();
- chan1.send(data);
- }
- let mux1 = MultiplexStream::new(base_stream1);
- let mux2 = MultiplexStream::new(base_stream2);
- // a MultiplexStream is UDP-like: It is not guaranteed that the remote
- // host really receives the packet
- let mux1 = MultiplexStream::new(base_stream1);
- let mux2 = MultiplexStream::new(base_stream2);
- for mux in (mux1, mux2) {
- do spawn {
- let open_ports = [];
- loop {
- let action: u8 = random();
- match action {
- 0 if !open_ports.is_empty() => {
- // send random data over a random channel
- // the port is at least locally open, maybe not on remote
- let len = (random::<u32>() % 4096)+1;
- let data = do range(len).iter().map { random::<u8>() };
- let port = open_ports.ind_sample();
- port.send(data);
- }
- 1 => {
- // try to open a random, non-open port
- for retry in range(100) {
- let num = random::<u32>();
- if !open_ports.any(|port| port.num == num) {
- open_ports.append_one(mux.open(num));
- break;
- }
- }
- }
- 2 if !open_ports.is_empty() => {
- // close a random, open port
- let port = open_ports.ind_sample();
- open_ports.remove(&port);
- port.close();
- }
- 3 => {
- // try to receive data from a random, open port
- let port = open_ports.ind_sample();
- let data = port.recv();
- }
- }
- sleep(random::<u8>() % 100); // in millisec
- }
- }
- }
- }
- */
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement