Guest User

Untitled

a guest
Sep 27th, 2025
52
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Rust 8.74 KB | None | 0 0
  1. // io trait and tcp stream code
  2. use futures::FutureExt;
  3.  
  4. pub struct Executor;
  5.  
  6. impl<F> hyper::rt::Executor<F> for Executor
  7. where
  8.     F: Future + Send + 'static,
  9.    F::Output: Send + 'static,
  10. {
  11.     fn execute(&self, fut: F) {
  12.         tokio_uring::spawn(fut);
  13.     }
  14. }
  15.  
  16. pin_project_lite::pin_project! {
  17. pub struct IoUring<T> {
  18.         #[pin]
  19.         val: T
  20.     }
  21. }
  22.  
  23. impl<T> IoUring<T> {
  24.     pub fn new(val: T) -> Self {
  25.         Self { val }
  26.     }
  27.     pub fn as_ref(&self) -> &T {
  28.         &self.val
  29.     }
  30.     pub fn as_mut_ref(&mut self) -> &mut T {
  31.         &mut self.val
  32.     }
  33.     pub fn into_owned(self) -> T {
  34.         self.val
  35.     }
  36. }
  37.  
  38. impl<T> hyper::rt::Read for IoUring<T>
  39. where
  40.     T: tokio::io::AsyncRead,
  41. {
  42.     fn poll_read(
  43.         self: std::pin::Pin<&mut Self>,
  44.         cx: &mut std::task::Context<'_>,
  45.        mut buf: hyper::rt::ReadBufCursor<'_>,
  46.     ) -> std::task::Poll<Result<(), std::io::Error>> {
  47.         let n = unsafe {
  48.             let mut buf = tokio::io::ReadBuf::uninit(buf.as_mut());
  49.             match tokio::io::AsyncRead::poll_read(self.project().val, cx, &mut buf) {
  50.                 std::task::Poll::Ready(Ok(())) => buf.filled().len(),
  51.                 other => return other,
  52.             }
  53.         };
  54.         unsafe {
  55.             buf.advance(n);
  56.         }
  57.         std::task::Poll::Ready(Ok(()))
  58.     }
  59. }
  60.  
  61. impl<T> hyper::rt::Write for IoUring<T>
  62. where
  63.     T: tokio::io::AsyncWrite,
  64. {
  65.     fn poll_write(
  66.         self: std::pin::Pin<&mut Self>,
  67.         cx: &mut std::task::Context<'_>,
  68.        buf: &[u8],
  69.    ) -> std::task::Poll<Result<usize, std::io::Error>> {
  70.        tokio::io::AsyncWrite::poll_write(self.project().val, cx, buf)
  71.    }
  72.    fn poll_flush(
  73.        self: std::pin::Pin<&mut Self>,
  74.        cx: &mut std::task::Context<'_>,
  75.     ) -> std::task::Poll<Result<(), std::io::Error>> {
  76.         std::task::Poll::Ready(Ok(()))
  77.     }
  78.     fn poll_shutdown(
  79.         self: std::pin::Pin<&mut Self>,
  80.         cx: &mut std::task::Context<'_>,
  81.    ) -> std::task::Poll<Result<(), std::io::Error>> {
  82.        tokio::io::AsyncWrite::poll_shutdown(self.project().val, cx)
  83.    }
  84.    fn is_write_vectored(&self) -> bool {
  85.        tokio::io::AsyncWrite::is_write_vectored(&self.val)
  86.    }
  87.    fn poll_write_vectored(
  88.        self: std::pin::Pin<&mut Self>,
  89.        cx: &mut std::task::Context<'_>,
  90.         bufs: &[std::io::IoSlice<'_>],
  91.    ) -> std::task::Poll<Result<usize, std::io::Error>> {
  92.        tokio::io::AsyncWrite::poll_write_vectored(self.project().val, cx, bufs)
  93.    }
  94. }
  95.  
  96. impl<T> tokio::io::AsyncRead for IoUring<T>
  97. where
  98.    T: hyper::rt::Read,
  99. {
  100.    fn poll_read(
  101.        self: std::pin::Pin<&mut Self>,
  102.        cx: &mut std::task::Context<'_>,
  103.         buf: &mut tokio::io::ReadBuf<'_>,
  104.    ) -> std::task::Poll<std::io::Result<()>> {
  105.        let filled = buf.filled().len();
  106.        let sub_filled = unsafe {
  107.            let mut buf = hyper::rt::ReadBuf::uninit(buf.unfilled_mut());
  108.            match hyper::rt::Read::poll_read(self.project().val, cx, buf.unfilled()) {
  109.                std::task::Poll::Ready(Ok(())) => buf.filled().len(),
  110.                other => return other,
  111.            }
  112.        };
  113.  
  114.        let n_filled = filled + sub_filled;
  115.        let n_init = sub_filled;
  116.        unsafe {
  117.            buf.assume_init(n_init);
  118.            buf.set_filled(n_filled);
  119.        }
  120.        std::task::Poll::Ready(Ok(()))
  121.    }
  122. }
  123.  
  124. impl<T> tokio::io::AsyncWrite for IoUring<T>
  125. where
  126.    T: hyper::rt::Write,
  127. {
  128.    fn poll_write(
  129.        self: std::pin::Pin<&mut Self>,
  130.        cx: &mut std::task::Context<'_>,
  131.         buf: &[u8],
  132.     ) -> std::task::Poll<Result<usize, std::io::Error>> {
  133.         hyper::rt::Write::poll_write(self.project().val, cx, buf)
  134.     }
  135.     fn poll_flush(
  136.         self: std::pin::Pin<&mut Self>,
  137.         cx: &mut std::task::Context<'_>,
  138.    ) -> std::task::Poll<Result<(), std::io::Error>> {
  139.        std::task::Poll::Ready(Ok(()))
  140.    }
  141.    fn poll_shutdown(
  142.        self: std::pin::Pin<&mut Self>,
  143.        cx: &mut std::task::Context<'_>,
  144.     ) -> std::task::Poll<Result<(), std::io::Error>> {
  145.         hyper::rt::Write::poll_shutdown(self.project().val, cx)
  146.     }
  147.     fn is_write_vectored(&self) -> bool {
  148.         hyper::rt::Write::is_write_vectored(&self.val)
  149.     }
  150.     fn poll_write_vectored(
  151.         self: std::pin::Pin<&mut Self>,
  152.         cx: &mut std::task::Context<'_>,
  153.        bufs: &[std::io::IoSlice<'_>],
  154.     ) -> std::task::Poll<Result<usize, std::io::Error>> {
  155.         hyper::rt::Write::poll_write_vectored(self.project().val, cx, bufs)
  156.     }
  157. }
  158.  
  159. pub struct TcpStream {
  160.     val: tokio_uring::net::TcpStream,
  161. }
  162.  
  163. impl TcpStream {
  164.     pub fn new(val: tokio_uring::net::TcpStream) -> Self {
  165.         Self { val }
  166.     }
  167.     pub fn as_ref(&self) -> &tokio_uring::net::TcpStream {
  168.         &self.val
  169.     }
  170.     pub fn as_mut_ref(&mut self) -> &mut tokio_uring::net::TcpStream {
  171.         &mut self.val
  172.     }
  173. }
  174.  
  175. use bytes::BufMut;
  176.  
  177. impl tokio::io::AsyncRead for TcpStream {
  178.     fn poll_read(
  179.         mut self: std::pin::Pin<&mut Self>,
  180.         cx: &mut std::task::Context<'_>,
  181.        buf: &mut tokio::io::ReadBuf<'_>,
  182.     ) -> std::task::Poll<std::io::Result<()>> {
  183.         let stream = self.as_mut_ref();
  184.         let buf_unfilled = unsafe { buf.unfilled_mut() };
  185.         let buf_len = buf_unfilled.len();
  186.         let buf = vec![0u8; buf_len];
  187.         let res = stream.read(buf);
  188.         let mut res = Box::pin(res);
  189.  
  190.         let res = match res.poll_unpin(cx) {
  191.             std::task::Poll::Ready((Ok(n), buf)) => Ok((n, buf)),
  192.             std::task::Poll::Ready((Err(e), _)) => return std::task::Poll::Ready(Err(e)),
  193.             std::task::Poll::Pending => {
  194.                 return std::task::Poll::Pending;
  195.             }
  196.         };
  197.         std::task::Poll::Ready(res.map(|val| unsafe {
  198.             // buf.assume_init(n);
  199.             let (n, mut buf_back) = val;
  200.             buf_back.advance_mut(n);
  201.         }))
  202.     }
  203. }
  204.  
  205. impl tokio::io::AsyncWrite for TcpStream {
  206.     fn poll_write(
  207.         mut self: std::pin::Pin<&mut Self>,
  208.         cx: &mut std::task::Context<'_>,
  209.        buf: &[u8],
  210.    ) -> std::task::Poll<Result<usize, std::io::Error>> {
  211.        let stream = self.as_mut_ref();
  212.        let buf = vec![0u8; 4096];
  213.        let mut res = stream.write(buf).submit();
  214.        let n = match res.poll_unpin(cx) {
  215.            std::task::Poll::Ready((Ok(n), _)) => n,
  216.            std::task::Poll::Ready((Err(e), _)) => return std::task::Poll::Ready(Err(e)),
  217.            std::task::Poll::Pending => return std::task::Poll::Pending,
  218.        };
  219.        std::task::Poll::Ready(Ok(n))
  220.    }
  221.    fn poll_flush(
  222.        self: std::pin::Pin<&mut Self>,
  223.        cx: &mut std::task::Context<'_>,
  224.     ) -> std::task::Poll<Result<(), std::io::Error>> {
  225.         std::task::Poll::Ready(Ok(()))
  226.     }
  227.     fn poll_shutdown(
  228.         mut self: std::pin::Pin<&mut Self>,
  229.         cx: &mut std::task::Context<'_>,
  230.    ) -> std::task::Poll<Result<(), std::io::Error>> {
  231.        let stream = self.as_mut_ref();
  232.        stream.shutdown(std::net::Shutdown::Both);
  233.  
  234.        std::task::Poll::Ready(Ok(()))
  235.    }
  236. }
  237.  
  238.  
  239. // server code
  240. fn io_uring() {
  241.    let pool = tokio_uring::buf::fixed::FixedBufPool::new(
  242.        iter::repeat_with(|| Vec::with_capacity(8192)).take(1000),
  243.    );
  244.  
  245.    println!("server is listening");
  246.  
  247.    tokio_uring::start(async move {
  248.        pool.register().unwrap();
  249.  
  250.        let address = std::net::SocketAddr::from((std::net::Ipv4Addr::UNSPECIFIED, 8080));
  251.        let socket = crate::system::socket::create_socket(address).unwrap();
  252.        let listener = tokio_uring::net::TcpListener::from_std(socket.into());
  253.  
  254.        let mut http = hyper::server::conn::http1::Builder::new();
  255.        http.pipeline_flush(true);
  256.        http.auto_date_header(false);
  257.        http.max_buf_size(8192);
  258.        http.ignore_invalid_headers(true);
  259.  
  260.        let service = hyper::service::service_fn(crate::router::router);
  261.  
  262.        tokio_uring::spawn(async move {
  263.            loop {
  264.                let (stream, _) = match listener.accept().await {
  265.                    Ok(val) => val,
  266.                    Err(_) => continue,
  267.                };
  268.                let http = http.clone();
  269.                tokio_uring::spawn(async move {
  270.                    let io = crate::system::io_uring::IoUring::new(
  271.                        crate::system::io_uring::TcpStream::new(stream),
  272.                    );
  273.                    if let Err(_) = http.serve_connection(io, service).await {}
  274.                });
  275.            }
  276.        })
  277.        .await
  278.        .unwrap();
  279.    });
  280. }
  281.  
  282. pub fn run_io_uring(threads: usize) -> std::io::Result<()> {
  283.    for _ in 1..threads {
  284.        std::thread::spawn(|| io_uring());
  285.    }
  286.  
  287.    io_uring();
  288.  
  289.    Ok(())
  290. }
Advertisement
Add Comment
Please, Sign In to add comment