Advertisement
misaalanshori

ahttpserver

Jun 29th, 2023 (edited)
862
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 1.94 KB | Source Code | 0 0
  1. try:
  2.     import asyncio
  3. except ImportError:
  4.     import uasyncio as asyncio
  5.  
  6. import gc
  7.  
  8.  
  9. loop = asyncio.get_event_loop()
  10.  
  11. def simpleHTML(text):
  12.     return f'''<html><head><title>{text}</title></head><body><h1>{text}</h1></body></html>'''
  13.  
  14. class HttpServer:
  15.     def __init__(self, host="0.0.0.0", port=80):
  16.         self.host = host
  17.         self.port = port
  18.         self.routes = {}
  19.         self.server = None
  20.  
  21.     def start(self):
  22.         loop.create_task(asyncio.start_server(self.__handle_request, self.host, self.port, 4096))
  23.         loop.run_forever()
  24.  
  25.     def add_route(self, method, path):
  26.         def decorator(callback):
  27.             self.routes[(method, path)] = callback
  28.             return callback
  29.         return decorator
  30.  
  31.     async def __handle_route(self, request):
  32.         print("Handling: ", request)
  33.         if (request["method"], request["path"]) in self.routes:
  34.             return await self.routes[(request["method"], request["path"])](request)
  35.         else:
  36.             return ["404", "text/html", simpleHTML("Not Found")]
  37.    
  38.     async def __process_header(self, reader):
  39.         header = (await reader.readline()).decode().split(" ")
  40.         path = header[1].split("?")
  41.         query = {}
  42.         if len(path) > 1:
  43.             query = dict(q.split("=") for q in path[1].split("&"))
  44.         return {"method": header[0], "path": path[0], "query": query}
  45.    
  46.     async def __handle_request(self, reader, writer):
  47.         gc.collect()
  48.         result = await self.__handle_route(await self.__process_header(reader))
  49.         writer.write(f"HTTP/1.0 {result[0]} Works\r\n")
  50.         writer.write(f"Content-Type: {result[1]}\r\n")
  51.         writer.write(f"Content-Length: {len(result[2])}\r\n")
  52.         writer.write(f"\r\n")
  53.         writer.write(result[2])
  54.         print("Sending: ", result[2])
  55.         await writer.drain()
  56.         await asyncio.sleep(0.5)
  57.         writer.close()
  58.         await writer.wait_closed()
  59.  
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement