"""
Experimetal multi-threaded web server created by Massimo Di Pierro
For lack of a better we'll call it Sneaky.
License: GPL2
This code would have been possible without CherryPy wsgiserver,
a great example of Python web server.
- This code implements WSGI
- This code is API compatible with cherrypy
- It consists of less than 260 lines of code
- It is multi-threaded
- The number of threads changes dynamically between a min and a max
- Can handle chunking (request and response) [to be tested]
- supports SSL via the Cherrypy ssl adaptors
You can find an example of usage at the bottom of this file.
here are some tests and comparisons performed by Garrett Smith
RPS = requests per second
Time = average time in milliseconds to server each request
Benchmark = `ab -t 10 -c <number of concurrent requests>-r http://localhost`
100 Concurrent Requests
-----------------------
===============
App Server RPS
==============
Fapws 7174
Landshark 4479
PHP-5 4191
modwsgi 3651
Tomcat 6 3554
Tornado 2641
Sneaky WSGI(*) 2372
CherryPy WSGI 2102
Phusion 1873
Jetty 6 937
Django WSGI 785
WEBrick 43
===============
1,000 Concurrent Requests
-------------------------
===============
App Server RPS
===============
Fapws 5359
Landshark 4477
modwsgi 3449
PHP 5 3062
Tomcat 6 3014
Tornado 2452
Sneaky WSGI(*) 2415
CherryPy WSGI 2126
Phusion 1585
Jetty 6 1095
Django WSGI 953
WEBrick 50
===============
10,000 Concurrent Requests
--------------------------
===============
App Server RPS
===============
Fapws 5213
Landshark 4239
Tomcat 6 2369
Tornado 2265
PHP 5 2239
Sneaky WSGI (*) 2225
modwsgi 2115
CherryPy WSGI 1731
Phusion 1247
Jetty 6 794
Django WSGI 890
WEBrick 84
===============
20,000 Concurrent Requests
--------------------------
===============
App Server RPS
===============
Fapws 4788
Landshark 2936
Tornado 2153
Sneaky WSGI(*) 2130
PHP 5 1728
modwsgi 1374
Tomcat 6 1362
CherryPy WSGI 1294
Phusion 961
Django WSGI 790
Jetty 6 616
WEBrick 63
===============
"""
import os
import threading
import socket
import logging
import sys
import re
import errno
import signal
import time
import traceback
import copy
from io import StringIO
from queue import Queue
regex_head = re.compile(b'^((http|https|HTTP|HTTPS)\://[^/]+)?(?P<method>\w+)\s+(?P<uri>\S+)\s+(?P<protocol>\S+)')
regex_header = re.compile(b'\s*(?P<key>.*?)\s*\:\s*(?P<value>.*?)\s*$')
regex_chunk = re.compile(b'^(?P<size>\w+)')
BUF_SIZE = 10000
SERVER_NAME = 'Sneaky'
ACTUAL_SERVER_PROTOCOL = 'HTTP/1.1' # should this be determined from request?
def formatdateRFC822():
t=time.gmtime(time.time())
w=("Mon", "Tue", "Wed", "Thu", "Fri", "Sat", "Sun")[t[6]]
return w+time.strftime(", %d %b %Y %H:%M%S GMT",t)
class ChunkedReader:
""" class used to read chunked input """
def __init__(self,stream):
self.stream = stream
self.buffer = None
def __chunk_read(self):
if not self.buffer or self.buffer.tell() == self.buffer_size:
self.buffer_size = \
int(regex_chunk.match(self.stream.readline()).group('size'),16)
if self.buffer_size:
self.buffer = StringIO(self.stream.read(self.buffer_size))
def read(self,size):
data = ''
while size:
self.__chunk_read()
if not self.buffer_size:
break
read_size = min(size,self.buffer_size)
data += self.buffer.read(read_size)
size -= read_size
return data
def readline(self):
data = ''
for c in self.read(1):
if not c:
break
data += c
if c == '\n':
break
return data
def readlines(self):
yield self.readline()
def errors_numbers(errnames):
"""utility to build a list of socket errors"""
return set([getattr(errno, k) for k in errnames if hasattr(errno,k)])
socket_errors_to_ignore = errors_numbers((
"EPIPE",
"EBADF", "WSAEBADF",
"ENOTSOCK", "WSAENOTSOCK",
"ETIMEDOUT", "WSAETIMEDOUT",
"ECONNREFUSED", "WSAECONNREFUSED",
"ECONNRESET", "WSAECONNRESET",
"ECONNABORTED", "WSAECONNABORTED",
"ENETRESET", "WSAENETRESET",
"EHOSTDOWN", "EHOSTUNREACH",
))
class Worker(threading.Thread):
"""class representing a worker node"""
queue = Queue() # queue of requests to process (socket,address)
threads = set() # set of threads (instances or Worker class
wsgi_apps = [] # [the_wsgi_app]
server_name = SERVER_NAME
min_threads = 10
max_threads = 10
def run(self):
"""runs the thread:
- pick a request from queue
- parse input
- run wsgi_app
- send response
- resize set of threads
"""
while True:
(client_socket,client_address) = self.queue.get()
if not client_socket:
return self.die()
if hasattr(client_socket,'settimeout'):
client_socket.settimeout(self.timeout)
while True:
wsgi_file = client_socket.makefile('rb',BUF_SIZE)
try:
environ = self.build_environ(wsgi_file,client_address)
data_items = self.wsgi_apps[0](environ,self.start_response)
if self.respond(client_socket, environ, data_items):
break
except:
logging.warn(str(traceback.format_exc()))
self.try_error_response(client_socket)
break
wsgi_file.close()
client_socket.close()
self.resize_thread_pool()
def die(self):
"""kills this thread, must be called by run()"""
self.threads.remove(self)
return
def build_environ(self,wsgi_file,client_address):
"""parse request and build WSGI environ"""
first_line = wsgi_file.readline()
match = regex_head.match(first_line)
request_method = match.group('method')
uri = str(match.group('uri'))
request_protocol = match.group('protocol')
k = uri.find('?')
if k<0:
k = len(uri)
(path_info,query_string) = (uri[:k],uri[k+1:])
environ = {'wsgi.version': (1,0),
'wsgi.input': wsgi_file,
'wsgi.url_encoding': 'utf-8',
'wsgi.url_scheme': 'http',
'wsgi.errors': sys.stderr,
'ACTUAL_SERVER_PROTOCOL': ACTUAL_SERVER_PROTOCOL,
'CLIENT_ADDR': client_address[0],
'CLIENT_PORT': client_address[1],
'PATH_INFO': path_info,
'REQUEST_URI': uri,
'REQUEST_METHOD':request_method,
'PATH_INFO': path_info,
'SCRIPT_NAME': '',
'QUERY_STRING': query_string}
for line in wsgi_file:
if line == b'\r\n':
break
match = regex_header.match(line)
if not match:
continue
key = str(match.group('key')).upper().replace('-','_')
value = str(match.group('value'))
try:
value = value.decode('ISO-8859-1').encode('utf-8')
except:
pass
environ['HTTP_'+key] = value
if key == 'CONTENT_LENGTH':
environ[key]=value
if key == 'CONTENT_TYPE':
envione[key]=value
if key == 'TRANSFER_ENCODING' and value[:7].lower() == 'chunked':
environ['wsgi.input'] = ChunkedReader(wsgi_file)
return environ
def start_response(self,status,headers):
"""to be passed as second argument to wsgi_app"""
self.status = status
self.headers = headers
def respond(self,client_socket,environ,data_items):
"""called after wsgi_app successfully retruns"""
headers = self.headers
header_dict = dict([(x.lower(),y.strip()) for (x,y) in headers])
if not 'date' in header_dict:
headers.append(('Date',formatdateRFC822()))
if not 'server' in header_dict:
headers.append(('Server',self.server_name))
chunked = header_dict.get('transfer-encoding','')[:7].lower() == 'chunked'
if not 'content-length' in header_dict and not chunked:
if isinstance(data_items,list) and len(data_items) == 1:
headers.append(('Content-Length',len(bytes(data_items[0],'utf8'))))
connection = environ.get('HTTP_CONNECTION','close')
headers.append(('Connection',connection))
serialized_headers = \
''.join(['%s: %s\r\n' % (k,v) for (k,v) in headers])
data = "HTTP/1.1 %s\r\n%s\r\n" % (self.status, serialized_headers)
client_socket.sendall(bytes(data,'utf8'))
for data in data_items:
try:
if chunked:
client_socket.sendall(bytes('%x\r\n%s\r\n' % (len(data),data),'utf8'))
else:
client_socket.sendall(bytes(data,'utf8'))
except socket.error as e:
if e.args[0] not in socket_errors_to_ignore:
raise
if chunked:
client_socket.sendall(b'0\r\n')
return connection.lower() != 'keep-alive'
def try_error_response(self,client_socket,
status = "500 INSERTNAL SERVER ERROR"):
"""called if thread fails"""
try:
client_socket.sendall(
b"HTTP/1.0 %s\r\nContent-Length: 0\r\nContent-Type: text/plain\r\n\r\n" % status)
except: pass
def resize_thread_pool(self):
"""created new Worker(s) or kills some Worker(s)"""
if self.max_threads>self.min_threads:
qe = Worker.queue.empty()
ql = len(Worker.threads)
if qe and ql>self.min_threads:
for k in range(self.min_threads):
Worker.queue.put((None,None))
elif not qe and ql<self.max_threads:
for k in range(self.min_threads):
new_worker = Worker()
Worker.threads.add(new_worker)
new_worker.start()
class Sneaky:
"""the actual web server"""
def __init__(self, bind_addr, wsgi_app,
numthreads = 10,
server_name = SERVER_NAME,
max_threads = None,
request_queue_size = None,
timeout = 10,
shutdown_timeout = 5):
"""
Example::
s = Sneaky('127.0.0.1:8000',test_wsgi_app,100)
s.start()
:bind_addr can be ('127.0.0.1',8000) or '127.0.0.1:8000'
:wsgi_app is a generic WSGI application
:numthreads is the min number of threads (10 by default)
:server_name ("Skeaky" by default)
:max_threads is the max number of threads or None (default)
should be a multiple of numthreads
:request_queue_size if set to None (default) adjusts automatically
:timeout on socket IO in seconds (10 secs default)
:shotdown_timeout in seconds (5 secs default)
"""
if isinstance(bind_addr,str):
bind_addr = bind_addr.split(':')
self.address = bind_addr[0]
self.port = bind_addr[1]
self.request_queue_size = request_queue_size
self.shutdown_timeout = shutdown_timeout
self.ssl_interface = None
Worker.wsgi_apps.append(wsgi_app)
Worker.server_name = server_name
Worker.min_threads = numthreads
Worker.max_threads = max_threads
Worker.timeout = timeout
Worker.threads.update([Worker() for k in range(numthreads)])
def set_listen_queue_size(self):
"""tries a listen argument that works"""
if self.request_queue_size:
self.socket.listen(self.request_queue_size)
else:
for request_queue_size in [1024,128,5,1]:
try:
self.socket.listen(request_queue_size)
break
except:
pass
def start(self):
"""starts the server"""
print ('Experimental "Sneaky" WSGI web server. Starting...')
for thread in Worker.threads:
thread.start()
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
if not self.socket:
raise IOException # unable to connect
try:
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
except:
logging.error("Unable to set SO_REUSEADDR")
try:
self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
except:
logging.error("Unable to set TCP_NODELAY")
try:
self.socket.bind((self.address,int(self.port)))
except:
logging.error("Port taken by other process. Unable to bind")
sys.exit(1)
self.set_listen_queue_size()
if self.ssl_interface:
self.socket = self.ssl_interface(self.socket)
try:
while True:
try:
(client_socket,client_address) = self.socket.accept()
Worker.queue.put((client_socket,client_address))
except KeyboardInterrupt:
return self.stop()
except Exception:
logging.warn(str(traceback.format_exc()))
continue
except Exception:
logging.warn(str(traceback.format_exc()))
return self.stop()
def kill(self,status,frame):
"""kills the server"""
logging.error('forcefully killing server')
sys.exit(1)
def stop(self):
"""tries to gracefully quit the server"""
try:
signal.signal(signal.SIGALRM,self.kill)
signal.alarm(self.shutdown_timeout)
except:
pass
threads = copy.copy(Worker.threads)
for thread in threads:
Worker.queue.put((None,None))
while Worker.threads: time.sleep(1)
try:
signal.alarm(0)
except:
pass
def test_wsgi_app(environ, start_response):
"""just a test app"""
status = '200 OK'
response_headers = [('Content-type','text/plain')]
start_response(status, response_headers)
return ['hello world!\n']
if __name__ == '__main__':
if '-debug' in sys.argv[1:]:
logging.basicConfig(level = logging.INFO)
address = ([a for a in sys.argv[1:] if a[0]!='-'] + ['127.0.0.1:8000'])[0]
print ('serving from: '+address)
server = Sneaky(address, # the ip:port
test_wsgi_app, # the SWGI application
numthreads = 100, # min number of threads
max_threads = 100 # max number of threads
)
server.start()