""" Experimetal multi-threaded web server created by Massimo Di Pierro For lack of a better we'll call it Sneaky. License: GPL2 This code would have been possible without CherryPy wsgiserver, a great example of Python web server. - This code implements WSGI - This code is API compatible with cherrypy - It consists of less than 260 lines of code - It is multi-threaded - The number of threads changes dynamically between a min and a max - Can handle chunking (request and response) [to be tested] - supports SSL via the Cherrypy ssl adaptors You can find an example of usage at the bottom of this file. here are some tests and comparisons performed by Garrett Smith RPS = requests per second Time = average time in milliseconds to server each request Benchmark = `ab -t 10 -c -r http://localhost` 100 Concurrent Requests ----------------------- =============== App Server RPS ============== Fapws 7174 Landshark 4479 PHP-5 4191 modwsgi 3651 Tomcat 6 3554 Tornado 2641 Sneaky WSGI(*) 2372 CherryPy WSGI 2102 Phusion 1873 Jetty 6 937 Django WSGI 785 WEBrick 43 =============== 1,000 Concurrent Requests ------------------------- =============== App Server RPS =============== Fapws 5359 Landshark 4477 modwsgi 3449 PHP 5 3062 Tomcat 6 3014 Tornado 2452 Sneaky WSGI(*) 2415 CherryPy WSGI 2126 Phusion 1585 Jetty 6 1095 Django WSGI 953 WEBrick 50 =============== 10,000 Concurrent Requests -------------------------- =============== App Server RPS =============== Fapws 5213 Landshark 4239 Tomcat 6 2369 Tornado 2265 PHP 5 2239 Sneaky WSGI (*) 2225 modwsgi 2115 CherryPy WSGI 1731 Phusion 1247 Jetty 6 794 Django WSGI 890 WEBrick 84 =============== 20,000 Concurrent Requests -------------------------- =============== App Server RPS =============== Fapws 4788 Landshark 2936 Tornado 2153 Sneaky WSGI(*) 2130 PHP 5 1728 modwsgi 1374 Tomcat 6 1362 CherryPy WSGI 1294 Phusion 961 Django WSGI 790 Jetty 6 616 WEBrick 63 =============== """ import os import threading import socket import logging import sys import re import errno import signal import time import traceback import copy from cStringIO import StringIO from Queue import Queue regex_head = re.compile('^((http|https|HTTP|HTTPS)\://[^/]+)?(?P\w+)\s+(?P\S+)\s+(?P\S+)') regex_header = re.compile('\s*(?P.*?)\s*\:\s*(?P.*?)\s*$') regex_chunk = re.compile('^(?P\w+)') BUF_SIZE = 10000 SERVER_NAME = 'Sneaky' ACTUAL_SERVER_PROTOCOL = 'HTTP/1.1' # should this be determined from request? def formatdateRFC822(): t=time.gmtime(time.time()) w=("Mon", "Tue", "Wed", "Thu", "Fri", "Sat", "Sun")[t[6]] return w+time.strftime(", %d %b %Y %H:%M%S GMT",t) class ChunkedReader: """ class used to read chunked input """ def __init__(self,stream): self.stream = stream self.buffer = None def __chunk_read(self): if not self.buffer or self.buffer.tell() == self.buffer_size: self.buffer_size = \ int(regex_chunk.match(self.stream.readline()).group('size'),16) if self.buffer_size: self.buffer = StringIO(self.stream.read(self.buffer_size)) def read(self,size): data = '' while size: self.__chunk_read() if not self.buffer_size: break read_size = min(size,self.buffer_size) data += self.buffer.read(read_size) size -= read_size return data def readline(self): data = '' for c in self.read(1): if not c: break data += c if c == '\n': break return data def readlines(self): yield self.readline() def errors_numbers(errnames): """utility to build a list of socket errors""" return set([getattr(errno, k) for k in errnames if hasattr(errno,k)]) socket_errors_to_ignore = errors_numbers(( "EPIPE", "EBADF", "WSAEBADF", "ENOTSOCK", "WSAENOTSOCK", "ETIMEDOUT", "WSAETIMEDOUT", "ECONNREFUSED", "WSAECONNREFUSED", "ECONNRESET", "WSAECONNRESET", "ECONNABORTED", "WSAECONNABORTED", "ENETRESET", "WSAENETRESET", "EHOSTDOWN", "EHOSTUNREACH", )) class Worker(threading.Thread): """class representing a worker node""" queue = Queue() # queue of requests to process (socket,address) threads = set() # set of threads (instances or Worker class wsgi_apps = [] # [the_wsgi_app] server_name = SERVER_NAME min_threads = 10 max_threads = 10 def run(self): """runs the thread: - pick a request from queue - parse input - run wsgi_app - send response - resize set of threads """ while True: (client_socket,client_address) = self.queue.get() if not client_socket: return self.die() if hasattr(client_socket,'settimeout'): client_socket.settimeout(self.timeout) while True: wsgi_file = client_socket.makefile('rb',BUF_SIZE) try: environ = self.build_environ(wsgi_file,client_address) data_items = self.wsgi_apps[0](environ,self.start_response) if self.respond(client_socket, environ, data_items): break except: logging.warn(str(traceback.format_exc())) self.try_error_response(client_socket) break wsgi_file.close() client_socket.close() self.resize_thread_pool() def die(self): """kills this thread, must be called by run()""" self.threads.remove(self) return def build_environ(self,wsgi_file,client_address): """parse request and build WSGI environ""" first_line = wsgi_file.readline() match = regex_head.match(first_line) request_method = match.group('method') uri = match.group('uri') request_protocol = match.group('protocol') k = uri.find('?') if k<0: k = len(uri) (path_info,query_string) = (uri[:k],uri[k+1:]) environ = {'wsgi.version': (1,0), 'wsgi.input': wsgi_file, 'wsgi.url_encoding': 'utf-8', 'wsgi.url_scheme': 'http', 'wsgi.errors': sys.stderr, 'ACTUAL_SERVER_PROTOCOL': ACTUAL_SERVER_PROTOCOL, 'CLIENT_ADDR': client_address[0], 'CLIENT_PORT': client_address[1], 'PATH_INFO': path_info, 'REQUEST_URI': uri, 'REQUEST_METHOD':request_method, 'PATH_INFO': path_info, 'SCRIPT_NAME': '', 'QUERY_STRING': query_string} for line in wsgi_file: if line == '\r\n': break match = regex_header.match(line) if not match: continue key = match.group('key').upper().replace('-','_') if isinstance(key,unicode): key = key.encode('ISO-8859-1') value = match.group('value') try: value = value.decode('ISO-8859-1').encode('utf-8') except: pass environ['HTTP_'+key] = value if key == 'CONTENT_LENGTH': environ[key]=value if key == 'CONTENT_TYPE': envione[key]=value if key == 'TRANSFER_ENCODING' and value[:7].lower() == 'chunked': environ['wsgi.input'] = ChunkedReader(wsgi_file) return environ def start_response(self,status,headers): """to be passed as second argument to wsgi_app""" self.status = status self.headers = headers def respond(self,client_socket,environ,data_items): """called after wsgi_app successfully retruns""" headers = self.headers header_dict = dict([(x.lower(),y.strip()) for (x,y) in headers]) if not 'date' in header_dict: headers.append(('Date',formatdateRFC822())) if not 'server' in header_dict: headers.append(('Server',self.server_name)) chunked = header_dict.get('transfer-encoding','')[:7].lower() == 'chunked' if not 'content-length' in header_dict and not chunked: if isinstance(data_items,list) and len(data_items) == 1: headers.append(('Content-Length',len(data_items[0]))) connection = environ.get('HTTP_CONNECTION','close') headers.append(('Connection',connection)) serialized_headers = \ ''.join(['%s: %s\r\n' % (k,v) for (k,v) in headers]) data = "HTTP/1.1 %s\r\n%s\r\n" % (self.status, serialized_headers) client_socket.sendall(data) for data in data_items: try: if chunked: client_socket.sendall('%x\r\n%s\r\n' % (len(data),data)) else: client_socket.sendall(data) except socket.error, e: if e.args[0] not in socket_errors_to_ignore: raise if chunked: client_socket.sendall('0\r\n') return connection.lower() != 'keep-alive' def try_error_response(self,client_socket, status = "500 INSERTNAL SERVER ERROR"): """called if thread fails""" try: client_socket.sendall( "HTTP/1.0 %s\r\nContent-Length: 0\r\nContent-Type: text/plain\r\n\r\n" % status) except: pass def resize_thread_pool(self): """created new Worker(s) or kills some Worker(s)""" if self.max_threads>self.min_threads: qe = Worker.queue.empty() ql = len(Worker.threads) if qe and ql>self.min_threads: for k in range(self.min_threads): Worker.queue.put((None,None)) elif not qe and ql