Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- try:
- import asyncio
- except ImportError:
- import uasyncio as asyncio
- import gc
- loop = asyncio.get_event_loop()
- def simpleHTML(text):
- return f'''<html><head><title>{text}</title></head><body><h1>{text}</h1></body></html>'''
- class HttpServer:
- def __init__(self, host="0.0.0.0", port=80):
- self.host = host
- self.port = port
- self.routes = {}
- self.server = None
- def start(self):
- loop.create_task(asyncio.start_server(self.__handle_request, self.host, self.port, 4096))
- loop.run_forever()
- def add_route(self, method, path):
- def decorator(callback):
- self.routes[(method, path)] = callback
- return callback
- return decorator
- async def __handle_route(self, request):
- print("Handling: ", request)
- if (request["method"], request["path"]) in self.routes:
- return await self.routes[(request["method"], request["path"])](request)
- else:
- return ["404", "text/html", simpleHTML("Not Found")]
- async def __process_header(self, reader):
- header = (await reader.readline()).decode().split(" ")
- path = header[1].split("?")
- query = {}
- if len(path) > 1:
- query = dict(q.split("=") for q in path[1].split("&"))
- return {"method": header[0], "path": path[0], "query": query}
- async def __handle_request(self, reader, writer):
- gc.collect()
- result = await self.__handle_route(await self.__process_header(reader))
- writer.write(f"HTTP/1.0 {result[0]} Works\r\n")
- writer.write(f"Content-Type: {result[1]}\r\n")
- writer.write(f"Content-Length: {len(result[2])}\r\n")
- writer.write(f"\r\n")
- writer.write(result[2])
- print("Sending: ", result[2])
- await writer.drain()
- await asyncio.sleep(0.5)
- writer.close()
- await writer.wait_closed()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement