Advertisement
Guest User

Untitled

a guest
Jan 22nd, 2020
146
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Rust 10.02 KB | None | 0 0
  1. enum PacketReceiverError {
  2.     ClientDisconnect,
  3. }
  4.  
  5. struct PacketReceiver {
  6.     socket: Arc<async_std::net::TcpStream>,
  7.     cipher: Arc<Mutex<Box<dyn PSOCipher + Send>>>,
  8.     recv_buffer: Vec<u8>,
  9.     incoming_data: Vec<u8>,
  10. }
  11.  
  12. impl PacketReceiver {
  13.     fn new(socket: Arc<async_std::net::TcpStream>, cipher: Arc<Mutex<Box<dyn PSOCipher + Send>>>) -> PacketReceiver {
  14.         PacketReceiver {
  15.             socket: socket,
  16.             cipher: cipher,
  17.             recv_buffer: Vec::new(),
  18.             incoming_data: Vec::new(),
  19.         }
  20.     }
  21.  
  22.     async fn fill_recv_buffer(&mut self) -> Result<(), PacketReceiverError>{
  23.         info!("fill buf start");
  24.         let mut data = [0u8; 0x8000];
  25.  
  26.         info!("a");
  27.         let mut socket = &*self.socket;
  28.         let len = socket.read(&mut data).await.unwrap();
  29.         info!("len {:?}", len);
  30.         if len == 0 {
  31.             return Err(PacketReceiverError::ClientDisconnect);
  32.         }
  33.  
  34.         self.recv_buffer.extend_from_slice(&mut data[..len]);
  35.  
  36.         info!("c");
  37.         let mut dec_buf = {
  38.             let mut cipher = self.cipher.lock().await;
  39.             info!("c2");
  40.             let block_chunk_len = self.recv_buffer.len() / cipher.block_size() * cipher.block_size();
  41.             info!("d");
  42.             let buf = self.recv_buffer.drain(..block_chunk_len).collect();
  43.             info!("e");
  44.             cipher.decrypt(&buf).unwrap()
  45.         };
  46.         self.incoming_data.append(&mut dec_buf);
  47.         info!("f");
  48.         info!("fill buf end");
  49.  
  50.         Ok(())
  51.     }
  52.  
  53.     async fn recv_pkts<R: RecvServerPacket + Send + std::fmt::Debug>(&mut self) -> Result<Vec<R>, PacketReceiverError> {
  54.         self.fill_recv_buffer().await?;
  55.  
  56.         let mut result = Vec::new();
  57.         loop {
  58.             if self.incoming_data.len() < 2 {
  59.                 break;
  60.             }
  61.             let pkt_size = u16::from_le_bytes([self.incoming_data[0], self.incoming_data[1]]) as usize;
  62.             let mut pkt_len = pkt_size;
  63.             while pkt_len % self.cipher.lock().await.block_size() != 0 {
  64.                 pkt_len += 1;
  65.             }
  66.  
  67.             if pkt_len > self.incoming_data.len() {
  68.                 break;
  69.             }
  70.  
  71.             let pkt_data = self.incoming_data.drain(..pkt_len).collect::<Vec<_>>();
  72.  
  73.             log::trace!("[recv buf] {:?}", pkt_data);
  74.             let pkt = match R::from_bytes(&pkt_data[..pkt_size]) {
  75.                 Ok(p) => p,
  76.                 Err(err) => {
  77.                     warn!("error RecvServerPacket::from_bytes: {:?}", err);
  78.                     continue
  79.                 },
  80.             };
  81.  
  82.             log::trace!("[recv from ] {:?}", pkt);
  83.             result.push(pkt);
  84.         }
  85.  
  86.         Ok(result)
  87.     }
  88.  
  89.  
  90. }
  91.  
  92. async fn send_pkt<S: SendServerPacket + Send + std::fmt::Debug>(socket: Arc<async_std::net::TcpStream>, cipher: Arc<Mutex<Box<dyn PSOCipher + Send>>>, pkt: S) {
  93.     log::trace!("[send] {:?}", pkt);
  94.     let buf = pkt.as_bytes();
  95.     if buf.len() < 1024*2 {
  96.         log::trace!("[send: buf] {:?}", buf);
  97.     }
  98.     else {
  99.         log::trace!("[send: buf] [...large buffer...]");
  100.     }
  101.     let cbuf = cipher.lock().await.encrypt(&buf).unwrap();
  102.     let mut ssock = &*socket;
  103.     ssock.write_all(&cbuf).await;
  104.  
  105. }
  106.  
  107.  
  108. enum AsyncClientAction<S, R> {
  109.     NewClient(ClientId, async_std::sync::Sender<S>),
  110.     Packet(ClientId, R),
  111.     Disconnect(ClientId),
  112. }
  113.  
  114.  
  115. enum AsyncServerStateAction<S> {
  116.     Cipher(Box<dyn PSOCipher + Send + Sync>, Box<dyn PSOCipher + Send + Sync>),
  117.     Packet(S),
  118.     Disconnect,
  119. }
  120.  
  121.  
  122.  
  123. async fn server_state_loop<STATE, S, R, E>(mut state: STATE,
  124.                                            server_state_receiver: async_std::sync::Receiver<AsyncClientAction<AsyncServerStateAction<S>, R>>) where
  125.     STATE: ServerState<SendPacket=S, RecvPacket=R, PacketError=E> + Send + 'static,
  126.    S: SendServerPacket + std::fmt::Debug + Send + 'static,
  127.     R: RecvServerPacket + std::fmt::Debug + Send + 'static,
  128.    E: std::fmt::Debug + Send,
  129. {
  130.    async_std::task::spawn(async move {
  131.        let mut clients = HashMap::new();
  132.  
  133.        loop {
  134.            let action = server_state_receiver.recv().await.unwrap();
  135.  
  136.            match action {
  137.                AsyncClientAction::NewClient(client_id, sender) => {
  138.                    clients.insert(client_id, sender.clone());
  139.                    for action in state.on_connect(client_id) {
  140.                        match action {
  141.                            OnConnect::Cipher((inc, outc)) => {
  142.                                sender.send(AsyncServerStateAction::Cipher(inc, outc)).await;
  143.                            },
  144.                            OnConnect::Packet(pkt) => {
  145.                                sender.send(AsyncServerStateAction::Packet(pkt)).await;
  146.                            }
  147.                        }
  148.                    }
  149.                },
  150.                AsyncClientAction::Packet(client_id, pkt) => {
  151.                    let k = state.handle(client_id, &pkt);
  152.                    let pkts = k.unwrap().collect::<Vec<_>>();
  153.                    for (client_id, pkt) in pkts {
  154.                        let client = clients.get_mut(&client_id).unwrap();
  155.                        client.send(AsyncServerStateAction::Packet(pkt)).await;
  156.                    }
  157.                },
  158.                AsyncClientAction::Disconnect(client_id) => {
  159.                    let pkts = state.on_disconnect(client_id);
  160.                    for (client_id, pkt) in pkts {
  161.                        let client = clients.get_mut(&client_id).unwrap();
  162.                        client.send(AsyncServerStateAction::Packet(pkt)).await;
  163.                    }
  164.  
  165.                    let client = clients.remove(&client_id).unwrap();
  166.                    client.send(AsyncServerStateAction::Disconnect).await;
  167.                }
  168.            }
  169.        }
  170.    });
  171. }
  172.  
  173. async fn client_recv_loop<S, R>(client_id: ClientId,
  174.                                socket: Arc<async_std::net::TcpStream>,
  175.                                cipher: Arc<Mutex<Box<dyn PSOCipher + Send>>>,
  176.                                server_sender: async_std::sync::Sender<AsyncClientAction<AsyncServerStateAction<S>, R>>,
  177.                                client_sender: async_std::sync::Sender<AsyncServerStateAction<S>>) where
  178.    S: SendServerPacket + std::fmt::Debug + Send + 'static,
  179.     R: RecvServerPacket + std::fmt::Debug + Send + 'static,
  180. {
  181.    async_std::task::spawn(async move {
  182.        server_sender.send(AsyncClientAction::NewClient(client_id, client_sender)).await;
  183.        let mut pkt_receiver = PacketReceiver::new(socket, cipher);
  184.  
  185.        loop {
  186.            match pkt_receiver.recv_pkts().await {
  187.                Ok(pkts) => {
  188.                    for pkt in pkts {
  189.                        trace!("[recv from {:?}] {:?}", client_id, pkt);
  190.                        server_sender.send(AsyncClientAction::Packet(client_id, pkt)).await;
  191.                    }
  192.                },
  193.                Err(err) => {
  194.                    match err {
  195.                        PacketReceiverError::ClientDisconnect => {
  196.                            server_sender.send(AsyncClientAction::Disconnect(client_id));
  197.                            break;
  198.                        }
  199.                    }
  200.                }
  201.            }
  202.        }
  203.    });
  204. }
  205.  
  206. async fn client_send_loop<S>(client_id: ClientId,
  207.                                 socket: Arc<async_std::net::TcpStream>,
  208.                                 cipher_in: Arc<Mutex<Box<dyn PSOCipher + Send>>>,
  209.                                 cipher_out: Arc<Mutex<Box<dyn PSOCipher + Send>>>,
  210.                                 client_receiver: async_std::sync::Receiver<AsyncServerStateAction<S>>,
  211.  
  212. ) where
  213.    S: SendServerPacket + std::fmt::Debug + Send + 'static,
  214. {
  215.     async_std::task::spawn(async move {
  216.         loop {
  217.             let action = client_receiver.recv().await.unwrap();
  218.             match action {
  219.                 AsyncServerStateAction::Cipher(inc, outc) => {
  220.                     *cipher_in.lock().await = inc;
  221.                     *cipher_out.lock().await = outc;
  222.                 }
  223.                 AsyncServerStateAction::Packet(pkt) => {
  224.                     trace!("[send to {:?}] {:?}", client_id, pkt);
  225.                     send_pkt(socket.clone(), cipher_out.clone(), pkt).await
  226.                 },
  227.                 AsyncServerStateAction::Disconnect => {
  228.                     break;
  229.                 }
  230.             };
  231.         }
  232.     });
  233. }
  234.  
  235.  
  236.  
  237. pub async fn mainloop_async<STATE, S, R, E>(mut state: STATE, port: u16) where
  238.     STATE: ServerState<SendPacket=S, RecvPacket=R, PacketError=E> + Send + Clone + 'static,
  239.    S: SendServerPacket + std::fmt::Debug + Send + Sync + 'static,
  240.     R: RecvServerPacket + std::fmt::Debug + Send + Sync + 'static,
  241.    E: std::fmt::Debug + Send,
  242. {
  243.  
  244.    let listener = async_std::task::spawn(async move {
  245.        let listener = async_std::net::TcpListener::bind(&std::net::SocketAddr::from((std::net::Ipv4Addr::new(0,0,0,0), port))).await.unwrap();
  246.        let mut id = 1;
  247.  
  248.        let (server_state_sender, server_state_receiver) = async_std::sync::channel(1024);
  249.  
  250.        server_state_loop(state, server_state_receiver).await;
  251.  
  252.        loop {
  253.            let (sock, addr) = listener.accept().await.unwrap();
  254.            let client_id = crate::common::serverstate::ClientId(id);
  255.            id += 1;
  256.  
  257.            info!("new client {:?} {:?} {:?}", client_id, sock, addr);
  258.  
  259.            let (client_sender, client_receiver) = async_std::sync::channel(64);
  260.            let socket = Arc::new(sock);
  261.            let cipher_in: Arc<Mutex<Box<dyn PSOCipher + Send>>> = Arc::new(Mutex::new(Box::new(NullCipher {})));
  262.            let cipher_out: Arc<Mutex<Box<dyn PSOCipher + Send>>> = Arc::new(Mutex::new(Box::new(NullCipher {})));
  263.  
  264.            client_recv_loop(client_id, socket.clone(), cipher_in.clone(), server_state_sender.clone(), client_sender).await;
  265.            client_send_loop(client_id, socket.clone(), cipher_in.clone(), cipher_out.clone(), client_receiver).await;
  266.        }
  267.    });
  268.  
  269.    listener.await
  270. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement