Advertisement
Guest User

Untitled

a guest
Feb 23rd, 2014
60
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 5.93 KB | None | 0 0
  1. extern crate sync;
  2. extern crate collections;
  3.  
  4. use std::comm::Chan;
  5. use std::mem::size_of_val;
  6. use std::io::{ChanWriter,PortReader};
  7. use collections::hashmap::HashMap;
  8. use std::task::try;
  9. use sync::RWArc;
  10.  
  11. use std::rand::random;
  12. use std::io::timer::sleep;
  13.  
  14. struct b {
  15. i: u8
  16. }
  17.  
  18. impl b {
  19. fn new() -> b { b{i: 1} }
  20. }
  21.  
  22. struct MultiplexStream {
  23. // arc: ~RWArc<HashMap<u32, ~Chan<~[u8]>>>,
  24. arc: ~RWArc<HashMap<u32, b>>,
  25. downstream_chan: Chan<~[u8]>
  26. }
  27.  
  28. impl MultiplexStream {
  29. fn new(downstream: (Port<~[u8]>, Chan<~[u8]>)) -> ~MultiplexStream {
  30. let (downstream_port, downstream_chan) = downstream;
  31. let mux = ~MultiplexStream {
  32. arc: ~RWArc::new(HashMap::new()),
  33. downstream_chan: downstream_chan
  34. };
  35.  
  36. // begin
  37. let arc = mux.arc.clone();
  38. spawn(proc() {
  39. try(proc() {
  40. loop {
  41. let mut reader = PortReader::new(downstream_port);
  42. let port_num = reader.read_le_u32().unwrap();
  43. let data = reader.read_to_end().unwrap();
  44.  
  45. arc.read(|open_ports| {
  46. match open_ports.find(&port_num) {
  47. Some(intermediate) => {
  48. /* let success = intermediate.try_send(data.clone());
  49.  
  50. if !success {
  51. arc.write(|open_ports| {
  52. open_ports.remove(&port_num);
  53. })
  54. }*/
  55. },
  56. None => {}
  57. };
  58. });
  59. }
  60.  
  61. return ();
  62. });
  63.  
  64. // downstream was probably closed => cleanup
  65. arc.write(|open_ports| {
  66. //let iter = open_ports.move_iter();
  67. //TODO: takeall
  68.  
  69. /*
  70. for c in iter {
  71. // TODO: does this really do what I expect it to do?
  72. // Is this really neccessary? As soon as we 'take' the
  73. // channel and we reach the next iteration, the channel
  74. // should be closed.
  75. c.drop()
  76. }
  77. */
  78. })
  79. });
  80. // end
  81.  
  82. return mux;
  83. }
  84.  
  85.  
  86. /*
  87. fn is_port_open(self, port_num: u32) -> bool {
  88. let arc = self.arc.clone();
  89.  
  90.  
  91. do arc.read |open_ports| {
  92. let res = open_ports.contains_key(portr_num);
  93. return res;
  94. }
  95. }
  96. */
  97.  
  98. fn open(self, port_num: u32) -> Result<(Port<~b>, Chan<~[u8]>), ()> {
  99. let arc = self.arc.clone();
  100.  
  101. // let (upstream_port, intermediate_chan): (Port<~[u8]>, Chan<~[u8]>) = Chan::new();
  102. let (upstream_port, intermediate_chan): (Port<~b>, Chan<~b>) = Chan::new();
  103. let (intermediate_port, upstream_chan): (Port<~[u8]>, Chan<~[u8]>) = Chan::new();
  104. let upstream = (upstream_port, upstream_chan);
  105.  
  106. let port_is_already_open = arc.write(|open_ports| {
  107. //let res = open_ports.find_or_insert(port_num, intermediate_chan);
  108. //let port_is_already_open = (res == intermediate_chan);
  109.  
  110. if open_ports.contains_key(&port_num) {
  111. return true;
  112. }
  113. else {
  114. open_ports.insert(port_num, b::new());
  115. return false;
  116. }
  117. });
  118. if port_is_already_open {
  119. return Err(());
  120. };
  121.  
  122. spawn(proc() {
  123. //do try {
  124. loop {
  125. let data = intermediate_port.recv();
  126.  
  127. let mut writer = ChanWriter::new(self.downstream_chan);
  128. writer.write_le_u32(port_num);
  129. writer.write(data);
  130. writer.flush();
  131. }
  132. //}
  133.  
  134. // upstream was probably closed => cleanup
  135. arc.write(|open_ports| {
  136. open_ports.remove(&port_num);
  137. })
  138. });
  139.  
  140. return Ok(upstream);
  141. }
  142. }
  143.  
  144.  
  145. #[test]
  146. fn test_multichannel() {
  147. let (base_port1, base_chan1): (Port<~[u8]>, Chan<~[u8]>) = Chan::new();
  148. let (base_port2, base_chan2): (Port<~[u8]>, Chan<~[u8]>) = Chan::new();
  149.  
  150. let mux1 = MultiplexStream::new((base_port1, base_chan2));
  151. let mux2 = MultiplexStream::new((base_port2, base_chan1));
  152.  
  153. // a MultiplexStream is UDP-like: It is not guaranteed that the remote
  154. // host really receives the packet
  155.  
  156. // ...so we must ensure that both sides began listening before sending
  157. // any data!
  158. let (port1, chan1) = mux1.open(1).unwrap();
  159. let (port2, chan2) = mux2.open(1).unwrap();
  160.  
  161. spawn(proc() {
  162. let msg = ~[1 as u8];
  163. chan1.send(msg);
  164.  
  165. let buf1 = port1.recv();
  166. //assert!(buf1[0] == 2)
  167. });
  168.  
  169. spawn(proc() {
  170. let msg = ~[2 as u8];
  171. chan2.send(msg);
  172.  
  173. let buf2 = port2.recv();
  174. //assert!(buf2[0] == 1)
  175. });
  176. }
  177.  
  178. /*
  179.  
  180. #[test]
  181. fn test_multichannel_quick_check() {
  182. let base_stream1: (Port<~[u8]>, Chan<~[u8]>) = Chan::new();
  183. let base_stream2: (Port<~[u8]>, Chan<~[u8]>) = Chan::new();
  184.  
  185. do spawn {
  186. let (port1, _) = base_stream1;
  187. let (_, chan2) = base_stream2;
  188. let data = port1.recv();
  189. chan2.send(data);
  190. }
  191.  
  192. do spawn {
  193. let (_, chan1) = base_stream1;
  194. let (port2, _) = base_stream2;
  195. let data = port2.recv();
  196. chan1.send(data);
  197. }
  198.  
  199. let mux1 = MultiplexStream::new(base_stream1);
  200. let mux2 = MultiplexStream::new(base_stream2);
  201.  
  202. // a MultiplexStream is UDP-like: It is not guaranteed that the remote
  203. // host really receives the packet
  204.  
  205. let mux1 = MultiplexStream::new(base_stream1);
  206. let mux2 = MultiplexStream::new(base_stream2);
  207.  
  208. for mux in (mux1, mux2) {
  209. do spawn {
  210. let open_ports = [];
  211.  
  212. loop {
  213. let action: u8 = random();
  214. match action {
  215. 0 if !open_ports.is_empty() => {
  216. // send random data over a random channel
  217. // the port is at least locally open, maybe not on remote
  218. let len = (random::<u32>() % 4096)+1;
  219. let data = do range(len).iter().map { random::<u8>() };
  220.  
  221. let port = open_ports.ind_sample();
  222. port.send(data);
  223. }
  224. 1 => {
  225. // try to open a random, non-open port
  226. for retry in range(100) {
  227. let num = random::<u32>();
  228. if !open_ports.any(|port| port.num == num) {
  229. open_ports.append_one(mux.open(num));
  230. break;
  231. }
  232. }
  233. }
  234. 2 if !open_ports.is_empty() => {
  235. // close a random, open port
  236. let port = open_ports.ind_sample();
  237. open_ports.remove(&port);
  238. port.close();
  239. }
  240. 3 => {
  241. // try to receive data from a random, open port
  242. let port = open_ports.ind_sample();
  243. let data = port.recv();
  244. }
  245. }
  246. sleep(random::<u8>() % 100); // in millisec
  247. }
  248. }
  249. }
  250. }
  251.  
  252. */
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement