Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- // io trait and tcp stream code
- use futures::FutureExt;
- pub struct Executor;
- impl<F> hyper::rt::Executor<F> for Executor
- where
- F: Future + Send + 'static,
- F::Output: Send + 'static,
- {
- fn execute(&self, fut: F) {
- tokio_uring::spawn(fut);
- }
- }
- pin_project_lite::pin_project! {
- pub struct IoUring<T> {
- #[pin]
- val: T
- }
- }
- impl<T> IoUring<T> {
- pub fn new(val: T) -> Self {
- Self { val }
- }
- pub fn as_ref(&self) -> &T {
- &self.val
- }
- pub fn as_mut_ref(&mut self) -> &mut T {
- &mut self.val
- }
- pub fn into_owned(self) -> T {
- self.val
- }
- }
- impl<T> hyper::rt::Read for IoUring<T>
- where
- T: tokio::io::AsyncRead,
- {
- fn poll_read(
- self: std::pin::Pin<&mut Self>,
- cx: &mut std::task::Context<'_>,
- mut buf: hyper::rt::ReadBufCursor<'_>,
- ) -> std::task::Poll<Result<(), std::io::Error>> {
- let n = unsafe {
- let mut buf = tokio::io::ReadBuf::uninit(buf.as_mut());
- match tokio::io::AsyncRead::poll_read(self.project().val, cx, &mut buf) {
- std::task::Poll::Ready(Ok(())) => buf.filled().len(),
- other => return other,
- }
- };
- unsafe {
- buf.advance(n);
- }
- std::task::Poll::Ready(Ok(()))
- }
- }
- impl<T> hyper::rt::Write for IoUring<T>
- where
- T: tokio::io::AsyncWrite,
- {
- fn poll_write(
- self: std::pin::Pin<&mut Self>,
- cx: &mut std::task::Context<'_>,
- buf: &[u8],
- ) -> std::task::Poll<Result<usize, std::io::Error>> {
- tokio::io::AsyncWrite::poll_write(self.project().val, cx, buf)
- }
- fn poll_flush(
- self: std::pin::Pin<&mut Self>,
- cx: &mut std::task::Context<'_>,
- ) -> std::task::Poll<Result<(), std::io::Error>> {
- std::task::Poll::Ready(Ok(()))
- }
- fn poll_shutdown(
- self: std::pin::Pin<&mut Self>,
- cx: &mut std::task::Context<'_>,
- ) -> std::task::Poll<Result<(), std::io::Error>> {
- tokio::io::AsyncWrite::poll_shutdown(self.project().val, cx)
- }
- fn is_write_vectored(&self) -> bool {
- tokio::io::AsyncWrite::is_write_vectored(&self.val)
- }
- fn poll_write_vectored(
- self: std::pin::Pin<&mut Self>,
- cx: &mut std::task::Context<'_>,
- bufs: &[std::io::IoSlice<'_>],
- ) -> std::task::Poll<Result<usize, std::io::Error>> {
- tokio::io::AsyncWrite::poll_write_vectored(self.project().val, cx, bufs)
- }
- }
- impl<T> tokio::io::AsyncRead for IoUring<T>
- where
- T: hyper::rt::Read,
- {
- fn poll_read(
- self: std::pin::Pin<&mut Self>,
- cx: &mut std::task::Context<'_>,
- buf: &mut tokio::io::ReadBuf<'_>,
- ) -> std::task::Poll<std::io::Result<()>> {
- let filled = buf.filled().len();
- let sub_filled = unsafe {
- let mut buf = hyper::rt::ReadBuf::uninit(buf.unfilled_mut());
- match hyper::rt::Read::poll_read(self.project().val, cx, buf.unfilled()) {
- std::task::Poll::Ready(Ok(())) => buf.filled().len(),
- other => return other,
- }
- };
- let n_filled = filled + sub_filled;
- let n_init = sub_filled;
- unsafe {
- buf.assume_init(n_init);
- buf.set_filled(n_filled);
- }
- std::task::Poll::Ready(Ok(()))
- }
- }
- impl<T> tokio::io::AsyncWrite for IoUring<T>
- where
- T: hyper::rt::Write,
- {
- fn poll_write(
- self: std::pin::Pin<&mut Self>,
- cx: &mut std::task::Context<'_>,
- buf: &[u8],
- ) -> std::task::Poll<Result<usize, std::io::Error>> {
- hyper::rt::Write::poll_write(self.project().val, cx, buf)
- }
- fn poll_flush(
- self: std::pin::Pin<&mut Self>,
- cx: &mut std::task::Context<'_>,
- ) -> std::task::Poll<Result<(), std::io::Error>> {
- std::task::Poll::Ready(Ok(()))
- }
- fn poll_shutdown(
- self: std::pin::Pin<&mut Self>,
- cx: &mut std::task::Context<'_>,
- ) -> std::task::Poll<Result<(), std::io::Error>> {
- hyper::rt::Write::poll_shutdown(self.project().val, cx)
- }
- fn is_write_vectored(&self) -> bool {
- hyper::rt::Write::is_write_vectored(&self.val)
- }
- fn poll_write_vectored(
- self: std::pin::Pin<&mut Self>,
- cx: &mut std::task::Context<'_>,
- bufs: &[std::io::IoSlice<'_>],
- ) -> std::task::Poll<Result<usize, std::io::Error>> {
- hyper::rt::Write::poll_write_vectored(self.project().val, cx, bufs)
- }
- }
- pub struct TcpStream {
- val: tokio_uring::net::TcpStream,
- }
- impl TcpStream {
- pub fn new(val: tokio_uring::net::TcpStream) -> Self {
- Self { val }
- }
- pub fn as_ref(&self) -> &tokio_uring::net::TcpStream {
- &self.val
- }
- pub fn as_mut_ref(&mut self) -> &mut tokio_uring::net::TcpStream {
- &mut self.val
- }
- }
- use bytes::BufMut;
- impl tokio::io::AsyncRead for TcpStream {
- fn poll_read(
- mut self: std::pin::Pin<&mut Self>,
- cx: &mut std::task::Context<'_>,
- buf: &mut tokio::io::ReadBuf<'_>,
- ) -> std::task::Poll<std::io::Result<()>> {
- let stream = self.as_mut_ref();
- let buf_unfilled = unsafe { buf.unfilled_mut() };
- let buf_len = buf_unfilled.len();
- let buf = vec![0u8; buf_len];
- let res = stream.read(buf);
- let mut res = Box::pin(res);
- let res = match res.poll_unpin(cx) {
- std::task::Poll::Ready((Ok(n), buf)) => Ok((n, buf)),
- std::task::Poll::Ready((Err(e), _)) => return std::task::Poll::Ready(Err(e)),
- std::task::Poll::Pending => {
- return std::task::Poll::Pending;
- }
- };
- std::task::Poll::Ready(res.map(|val| unsafe {
- // buf.assume_init(n);
- let (n, mut buf_back) = val;
- buf_back.advance_mut(n);
- }))
- }
- }
- impl tokio::io::AsyncWrite for TcpStream {
- fn poll_write(
- mut self: std::pin::Pin<&mut Self>,
- cx: &mut std::task::Context<'_>,
- buf: &[u8],
- ) -> std::task::Poll<Result<usize, std::io::Error>> {
- let stream = self.as_mut_ref();
- let buf = vec![0u8; 4096];
- let mut res = stream.write(buf).submit();
- let n = match res.poll_unpin(cx) {
- std::task::Poll::Ready((Ok(n), _)) => n,
- std::task::Poll::Ready((Err(e), _)) => return std::task::Poll::Ready(Err(e)),
- std::task::Poll::Pending => return std::task::Poll::Pending,
- };
- std::task::Poll::Ready(Ok(n))
- }
- fn poll_flush(
- self: std::pin::Pin<&mut Self>,
- cx: &mut std::task::Context<'_>,
- ) -> std::task::Poll<Result<(), std::io::Error>> {
- std::task::Poll::Ready(Ok(()))
- }
- fn poll_shutdown(
- mut self: std::pin::Pin<&mut Self>,
- cx: &mut std::task::Context<'_>,
- ) -> std::task::Poll<Result<(), std::io::Error>> {
- let stream = self.as_mut_ref();
- stream.shutdown(std::net::Shutdown::Both);
- std::task::Poll::Ready(Ok(()))
- }
- }
- // server code
- fn io_uring() {
- let pool = tokio_uring::buf::fixed::FixedBufPool::new(
- iter::repeat_with(|| Vec::with_capacity(8192)).take(1000),
- );
- println!("server is listening");
- tokio_uring::start(async move {
- pool.register().unwrap();
- let address = std::net::SocketAddr::from((std::net::Ipv4Addr::UNSPECIFIED, 8080));
- let socket = crate::system::socket::create_socket(address).unwrap();
- let listener = tokio_uring::net::TcpListener::from_std(socket.into());
- let mut http = hyper::server::conn::http1::Builder::new();
- http.pipeline_flush(true);
- http.auto_date_header(false);
- http.max_buf_size(8192);
- http.ignore_invalid_headers(true);
- let service = hyper::service::service_fn(crate::router::router);
- tokio_uring::spawn(async move {
- loop {
- let (stream, _) = match listener.accept().await {
- Ok(val) => val,
- Err(_) => continue,
- };
- let http = http.clone();
- tokio_uring::spawn(async move {
- let io = crate::system::io_uring::IoUring::new(
- crate::system::io_uring::TcpStream::new(stream),
- );
- if let Err(_) = http.serve_connection(io, service).await {}
- });
- }
- })
- .await
- .unwrap();
- });
- }
- pub fn run_io_uring(threads: usize) -> std::io::Result<()> {
- for _ in 1..threads {
- std::thread::spawn(|| io_uring());
- }
- io_uring();
- Ok(())
- }
Advertisement
Add Comment
Please, Sign In to add comment