Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- # python3
- import json
- import socket
- import sys
- from email.parser import Parser
- from functools import lru_cache
- from urllib.parse import parse_qs, urlparse
- MAX_LINE = 64 * 1024
- MAX_HEADERS = 100
- class MyHTTPServer:
- def __init__(self, _host, _port, server_name):
- self._host = host
- self._port = port
- self._server_name = server_name
- self._users = {}
- def serve_forever(self):
- serv_sock = socket.socket(
- socket.AF_INET,
- socket.SOCK_STREAM,
- proto=0)
- try:
- serv_sock.bind((self._host, self._port))
- serv_sock.listen()
- while True:
- conn, _ = serv_sock.accept()
- try:
- self.serve_client(conn)
- except Exception as e:
- print('Client serving failed', e)
- finally:
- serv_sock.close()
- def parse_request(self, conn,):
- rfile = conn.makefile('rb')
- method, target, ver = self.parse_request_line(rfile)
- headers = self.parse_headers(rfile)
- _host = headers.get('Host')
- if not _host:
- raise HTTPError(400, 'Bad request',
- 'Host header is missing')
- if _host not in (self._server_name,
- f'{self._server_name}:{self._port}'):
- raise HTTPError(404, 'Not found')
- return Request(method, target, ver, headers, rfile)
- def serve_client(self, conn):
- try:
- req = self.parse_request(conn)
- resp = self.handle_request(req)
- self.send_response(conn, resp)
- except ConnectionResetError:
- conn = None
- except Exception as e:
- self.send_error(conn, e)
- if conn:
- req.rfile.close()
- conn.close()
- @staticmethod
- def parse_request_line(rfile):
- raw = rfile.readline(MAX_LINE + 1)
- if len(raw) > MAX_LINE:
- raise HTTPError(400, 'Bad request',
- 'Request line is too long')
- req_line = str(raw, 'iso-8859-1')
- words = req_line.split()
- if len(words) != 3:
- raise HTTPError(400, 'Bad request',
- 'Malformed request line')
- method, target, ver = words
- if ver != 'HTTP/1.1':
- raise HTTPError(505, 'HTTP Version Not Supported')
- return method, target, ver
- @staticmethod
- def parse_headers(rfile):
- headers = []
- while True:
- line = rfile.readline(MAX_LINE + 1)
- if len(line) > MAX_LINE:
- raise HTTPError(494, 'Request header too large')
- if line in (b'\r\n', b'\n', b''):
- break
- headers.append(line)
- if len(headers) > MAX_HEADERS:
- raise HTTPError(494, 'Too many headers')
- sheaders = b''.join(headers).decode('iso-8859-1')
- return Parser().parsestr(sheaders)
- def handle_request(self, req):
- if req.path == '/users' and req.method == 'POST':
- return self.handle_post_users(req)
- if req.path == '/users' and req.method == 'GET':
- return self.handle_get_users(req)
- if req.path.startswith('/users/'):
- user_id = req.path[len('/users/'):]
- if user_id.isdigit():
- return self.handle_get_user(req, user_id)
- raise Exception('Not found')
- @staticmethod
- def send_response(conn, resp):
- wfile = conn.makefile('wb')
- status_line = f'HTTP/1.1 {resp.status} {resp.reason}\r\n'
- wfile.write(status_line.encode('iso-8859-1'))
- if resp.headers:
- for (key, value) in resp.headers:
- header_line = f'{key}: {value}\r\n'
- wfile.write(header_line.encode('iso-8859-1'))
- wfile.write(b'\r\n')
- if resp.body:
- wfile.write(resp.body)
- wfile.flush()
- wfile.close()
- def send_error(self, conn, err):
- # noinspection PyBroadException
- try:
- status = err.status
- reason = err.reason
- body = (err.body or err.reason).encode('utf-8')
- except:
- status = 500
- reason = b'Internal Server Error'
- body = (err.args[0]).encode('utf-8')
- resp = Response(status, reason,
- [('Content-Length', len(body))],
- body)
- self.send_response(conn, resp)
- def handle_post_users(self, req):
- user_id = len(self._users) + 1
- self._users[user_id] = {'id': user_id,
- 'name': req.query['name'][0],
- 'age': req.query['age'][0]}
- return Response(204, 'Created')
- def handle_get_users(self, req):
- accept = req.headers.get('Accept')
- if 'text/html' in accept:
- contentType = 'text/html; charset=utf-8'
- body = '<html><head></head><body>'
- body += f'<div>Пользователи ({len ( self._users )})</div>'
- body += '<ul>'
- for u in self._users.values():
- body += f'<li>#{u["id"]} {u["name"]}, {u["age"]}</li>'
- body += '</ul>'
- body += '</body></html>'
- elif 'application/json' in accept:
- contentType = 'application/json; charset=utf-8'
- body = json.dumps(self._users)
- else:
- # https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/406
- return Response(406, 'Not Acceptable')
- body = body.encode('utf-8')
- headers = [('Content-Type', contentType),
- ('Content-Length', len(body))]
- return Response(200, 'OK', headers, body)
- def handle_get_user(self, req, user_id):
- user = self._users.get(int(user_id))
- if not user:
- raise HTTPError(404, 'Not found')
- accept = req.headers.get('Accept')
- if 'text/html' in accept:
- contentType = 'text/html; charset=utf-8'
- body = '<html><head></head><body>'
- body += f'#{user["id"]} {user["name"]}, {user["age"]}'
- body += '</body></html>'
- elif 'application/json' in accept:
- contentType = 'application/json; charset=utf-8'
- body = json.dumps(user)
- else:
- # https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/406
- return Response(406, 'Not Acceptable')
- body = body.encode('utf-8')
- headers = [('Content-Type', contentType),
- ('Content-Length', len(body))]
- return Response(200, 'OK', headers, body)
- class Request:
- def __init__(self, method, target, version, headers, rfile):
- self.method = method
- self.target = target
- self.version = version
- self.headers = headers
- self.rfile = rfile
- @property
- def path(self):
- return self.url.path
- @property
- @lru_cache(maxsize=None)
- def query(self):
- return parse_qs(self.url.query)
- @property
- @lru_cache(maxsize=None)
- def url(self):
- return urlparse(self.target)
- def body(self):
- size = self.headers.get('Content-Length')
- if not size:
- return None
- return self.rfile.read(size)
- class Response:
- def __init__(self, status, reason, headers=None, body=None):
- self.status = status
- self.reason = reason
- self.headers = headers
- self.body = body
- class HTTPError (Exception):
- def __init__(self, status, reason, body=None):
- super()
- self.status = status
- self.reason = reason
- self.body = body
- if __name__ == '__main__':
- host = '127.0.0.1'
- port = 8080
- name = 'example.local'
- serv = MyHTTPServer(host, port, name)
- try:
- serv.serve_forever()
- except KeyboardInterrupt:
- pass
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement