Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- enum PacketReceiverError {
- ClientDisconnect,
- }
- struct PacketReceiver {
- socket: Arc<async_std::net::TcpStream>,
- cipher: Arc<Mutex<Box<dyn PSOCipher + Send>>>,
- recv_buffer: Vec<u8>,
- incoming_data: Vec<u8>,
- }
- impl PacketReceiver {
- fn new(socket: Arc<async_std::net::TcpStream>, cipher: Arc<Mutex<Box<dyn PSOCipher + Send>>>) -> PacketReceiver {
- PacketReceiver {
- socket: socket,
- cipher: cipher,
- recv_buffer: Vec::new(),
- incoming_data: Vec::new(),
- }
- }
- async fn fill_recv_buffer(&mut self) -> Result<(), PacketReceiverError>{
- info!("fill buf start");
- let mut data = [0u8; 0x8000];
- info!("a");
- let mut socket = &*self.socket;
- let len = socket.read(&mut data).await.unwrap();
- info!("len {:?}", len);
- if len == 0 {
- return Err(PacketReceiverError::ClientDisconnect);
- }
- self.recv_buffer.extend_from_slice(&mut data[..len]);
- info!("c");
- let mut dec_buf = {
- let mut cipher = self.cipher.lock().await;
- info!("c2");
- let block_chunk_len = self.recv_buffer.len() / cipher.block_size() * cipher.block_size();
- info!("d");
- let buf = self.recv_buffer.drain(..block_chunk_len).collect();
- info!("e");
- cipher.decrypt(&buf).unwrap()
- };
- self.incoming_data.append(&mut dec_buf);
- info!("f");
- info!("fill buf end");
- Ok(())
- }
- async fn recv_pkts<R: RecvServerPacket + Send + std::fmt::Debug>(&mut self) -> Result<Vec<R>, PacketReceiverError> {
- self.fill_recv_buffer().await?;
- let mut result = Vec::new();
- loop {
- if self.incoming_data.len() < 2 {
- break;
- }
- let pkt_size = u16::from_le_bytes([self.incoming_data[0], self.incoming_data[1]]) as usize;
- let mut pkt_len = pkt_size;
- while pkt_len % self.cipher.lock().await.block_size() != 0 {
- pkt_len += 1;
- }
- if pkt_len > self.incoming_data.len() {
- break;
- }
- let pkt_data = self.incoming_data.drain(..pkt_len).collect::<Vec<_>>();
- log::trace!("[recv buf] {:?}", pkt_data);
- let pkt = match R::from_bytes(&pkt_data[..pkt_size]) {
- Ok(p) => p,
- Err(err) => {
- warn!("error RecvServerPacket::from_bytes: {:?}", err);
- continue
- },
- };
- log::trace!("[recv from ] {:?}", pkt);
- result.push(pkt);
- }
- Ok(result)
- }
- }
- async fn send_pkt<S: SendServerPacket + Send + std::fmt::Debug>(socket: Arc<async_std::net::TcpStream>, cipher: Arc<Mutex<Box<dyn PSOCipher + Send>>>, pkt: S) {
- log::trace!("[send] {:?}", pkt);
- let buf = pkt.as_bytes();
- if buf.len() < 1024*2 {
- log::trace!("[send: buf] {:?}", buf);
- }
- else {
- log::trace!("[send: buf] [...large buffer...]");
- }
- let cbuf = cipher.lock().await.encrypt(&buf).unwrap();
- let mut ssock = &*socket;
- ssock.write_all(&cbuf).await;
- }
- enum AsyncClientAction<S, R> {
- NewClient(ClientId, async_std::sync::Sender<S>),
- Packet(ClientId, R),
- Disconnect(ClientId),
- }
- enum AsyncServerStateAction<S> {
- Cipher(Box<dyn PSOCipher + Send + Sync>, Box<dyn PSOCipher + Send + Sync>),
- Packet(S),
- Disconnect,
- }
- async fn server_state_loop<STATE, S, R, E>(mut state: STATE,
- server_state_receiver: async_std::sync::Receiver<AsyncClientAction<AsyncServerStateAction<S>, R>>) where
- STATE: ServerState<SendPacket=S, RecvPacket=R, PacketError=E> + Send + 'static,
- S: SendServerPacket + std::fmt::Debug + Send + 'static,
- R: RecvServerPacket + std::fmt::Debug + Send + 'static,
- E: std::fmt::Debug + Send,
- {
- async_std::task::spawn(async move {
- let mut clients = HashMap::new();
- loop {
- let action = server_state_receiver.recv().await.unwrap();
- match action {
- AsyncClientAction::NewClient(client_id, sender) => {
- clients.insert(client_id, sender.clone());
- for action in state.on_connect(client_id) {
- match action {
- OnConnect::Cipher((inc, outc)) => {
- sender.send(AsyncServerStateAction::Cipher(inc, outc)).await;
- },
- OnConnect::Packet(pkt) => {
- sender.send(AsyncServerStateAction::Packet(pkt)).await;
- }
- }
- }
- },
- AsyncClientAction::Packet(client_id, pkt) => {
- let k = state.handle(client_id, &pkt);
- let pkts = k.unwrap().collect::<Vec<_>>();
- for (client_id, pkt) in pkts {
- let client = clients.get_mut(&client_id).unwrap();
- client.send(AsyncServerStateAction::Packet(pkt)).await;
- }
- },
- AsyncClientAction::Disconnect(client_id) => {
- let pkts = state.on_disconnect(client_id);
- for (client_id, pkt) in pkts {
- let client = clients.get_mut(&client_id).unwrap();
- client.send(AsyncServerStateAction::Packet(pkt)).await;
- }
- let client = clients.remove(&client_id).unwrap();
- client.send(AsyncServerStateAction::Disconnect).await;
- }
- }
- }
- });
- }
- async fn client_recv_loop<S, R>(client_id: ClientId,
- socket: Arc<async_std::net::TcpStream>,
- cipher: Arc<Mutex<Box<dyn PSOCipher + Send>>>,
- server_sender: async_std::sync::Sender<AsyncClientAction<AsyncServerStateAction<S>, R>>,
- client_sender: async_std::sync::Sender<AsyncServerStateAction<S>>) where
- S: SendServerPacket + std::fmt::Debug + Send + 'static,
- R: RecvServerPacket + std::fmt::Debug + Send + 'static,
- {
- async_std::task::spawn(async move {
- server_sender.send(AsyncClientAction::NewClient(client_id, client_sender)).await;
- let mut pkt_receiver = PacketReceiver::new(socket, cipher);
- loop {
- match pkt_receiver.recv_pkts().await {
- Ok(pkts) => {
- for pkt in pkts {
- trace!("[recv from {:?}] {:?}", client_id, pkt);
- server_sender.send(AsyncClientAction::Packet(client_id, pkt)).await;
- }
- },
- Err(err) => {
- match err {
- PacketReceiverError::ClientDisconnect => {
- server_sender.send(AsyncClientAction::Disconnect(client_id));
- break;
- }
- }
- }
- }
- }
- });
- }
- async fn client_send_loop<S>(client_id: ClientId,
- socket: Arc<async_std::net::TcpStream>,
- cipher_in: Arc<Mutex<Box<dyn PSOCipher + Send>>>,
- cipher_out: Arc<Mutex<Box<dyn PSOCipher + Send>>>,
- client_receiver: async_std::sync::Receiver<AsyncServerStateAction<S>>,
- ) where
- S: SendServerPacket + std::fmt::Debug + Send + 'static,
- {
- async_std::task::spawn(async move {
- loop {
- let action = client_receiver.recv().await.unwrap();
- match action {
- AsyncServerStateAction::Cipher(inc, outc) => {
- *cipher_in.lock().await = inc;
- *cipher_out.lock().await = outc;
- }
- AsyncServerStateAction::Packet(pkt) => {
- trace!("[send to {:?}] {:?}", client_id, pkt);
- send_pkt(socket.clone(), cipher_out.clone(), pkt).await
- },
- AsyncServerStateAction::Disconnect => {
- break;
- }
- };
- }
- });
- }
- pub async fn mainloop_async<STATE, S, R, E>(mut state: STATE, port: u16) where
- STATE: ServerState<SendPacket=S, RecvPacket=R, PacketError=E> + Send + Clone + 'static,
- S: SendServerPacket + std::fmt::Debug + Send + Sync + 'static,
- R: RecvServerPacket + std::fmt::Debug + Send + Sync + 'static,
- E: std::fmt::Debug + Send,
- {
- let listener = async_std::task::spawn(async move {
- let listener = async_std::net::TcpListener::bind(&std::net::SocketAddr::from((std::net::Ipv4Addr::new(0,0,0,0), port))).await.unwrap();
- let mut id = 1;
- let (server_state_sender, server_state_receiver) = async_std::sync::channel(1024);
- server_state_loop(state, server_state_receiver).await;
- loop {
- let (sock, addr) = listener.accept().await.unwrap();
- let client_id = crate::common::serverstate::ClientId(id);
- id += 1;
- info!("new client {:?} {:?} {:?}", client_id, sock, addr);
- let (client_sender, client_receiver) = async_std::sync::channel(64);
- let socket = Arc::new(sock);
- let cipher_in: Arc<Mutex<Box<dyn PSOCipher + Send>>> = Arc::new(Mutex::new(Box::new(NullCipher {})));
- let cipher_out: Arc<Mutex<Box<dyn PSOCipher + Send>>> = Arc::new(Mutex::new(Box::new(NullCipher {})));
- client_recv_loop(client_id, socket.clone(), cipher_in.clone(), server_state_sender.clone(), client_sender).await;
- client_send_loop(client_id, socket.clone(), cipher_in.clone(), cipher_out.clone(), client_receiver).await;
- }
- });
- listener.await
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement