Want more features on Pastebin? Sign Up, it's FREE!
Guest

illume

By: a guest on Sep 19th, 2009  |  syntax: None  |  size: 16.02 KB  |  views: 41  |  expires: Never
download  |  raw  |  embed  |  report abuse  |  print
Text below is selected. Please press Ctrl+C to copy to your clipboard. (⌘+C on Mac)
  1. """
  2. Experimetal multi-threaded web server created by Massimo Di Pierro
  3. For lack of a better we'll call it Sneaky.
  4. License: GPL2
  5.  
  6. This code would have been possible without CherryPy wsgiserver,
  7. a great example of Python web server.
  8.  
  9. - This code implements WSGI
  10. - This code is API compatible with cherrypy
  11. - It consists of less than 260 lines of code
  12. - It is multi-threaded
  13. - The number of threads changes dynamically between a min and a max
  14. - Can handle chunking (request and response) [to be tested]
  15. - supports SSL via the Cherrypy ssl adaptors
  16.  
  17. You can find an example of usage at the bottom of this file.
  18.  
  19. here are some tests and comparisons performed by Garrett Smith
  20.  
  21. RPS = requests per second
  22. Time = average time in milliseconds to server each request
  23. Benchmark = `ab -t 10 -c <number of concurrent requests>-r http://localhost`
  24.  
  25. 100 Concurrent Requests
  26. -----------------------
  27. ===============
  28. App Server         RPS  
  29. ==============
  30. Fapws             7174          
  31. Landshark         4479          
  32. PHP-5             4191  
  33. modwsgi           3651        
  34. Tomcat 6          3554          
  35. Tornado           2641          
  36. Sneaky WSGI(*)    2372          
  37. CherryPy WSGI     2102          
  38. Phusion           1873
  39. Jetty 6            937            
  40. Django WSGI        785        
  41. WEBrick             43        
  42. ===============
  43.  
  44. 1,000 Concurrent Requests
  45. -------------------------
  46. ===============
  47. App Server         RPS  
  48. ===============
  49. Fapws             5359      
  50. Landshark         4477
  51. modwsgi           3449  
  52. PHP 5             3062      
  53. Tomcat 6          3014  
  54. Tornado           2452  
  55. Sneaky WSGI(*)    2415
  56. CherryPy WSGI     2126
  57. Phusion           1585  
  58. Jetty 6           1095      
  59. Django WSGI        953
  60. WEBrick             50    
  61. ===============
  62.  
  63. 10,000 Concurrent Requests
  64. --------------------------
  65. ===============
  66. App Server         RPS  
  67. ===============
  68. Fapws             5213      
  69. Landshark         4239  
  70. Tomcat 6          2369  
  71. Tornado           2265  
  72. PHP 5             2239    
  73. Sneaky WSGI (*)   2225
  74. modwsgi           2115
  75. CherryPy WSGI     1731
  76. Phusion           1247
  77. Jetty 6            794      
  78. Django WSGI        890
  79. WEBrick             84    
  80. ===============
  81.  
  82. 20,000 Concurrent Requests
  83. --------------------------
  84. ===============
  85. App Server         RPS  
  86. ===============
  87. Fapws             4788      
  88. Landshark         2936    
  89. Tornado           2153    
  90. Sneaky WSGI(*)    2130
  91. PHP 5             1728  
  92. modwsgi           1374    
  93. Tomcat 6          1362    
  94. CherryPy WSGI     1294
  95. Phusion            961
  96. Django WSGI        790  
  97. Jetty 6            616      
  98. WEBrick             63    
  99. ===============
  100.  
  101. """
  102.  
  103. import os
  104. import threading
  105. import socket
  106. import logging
  107. import sys
  108. import re
  109. import errno
  110. import signal
  111. import time
  112. import traceback
  113. import copy
  114.  
  115. from cStringIO import StringIO
  116. from Queue import Queue
  117. regex_head = re.compile('^((http|https|HTTP|HTTPS)\://[^/]+)?(?P<method>\w+)\s+(?P<uri>\S+)\s+(?P<protocol>\S+)')
  118. regex_header = re.compile('\s*(?P<key>.*?)\s*\:\s*(?P<value>.*?)\s*$')
  119. regex_chunk = re.compile('^(?P<size>\w+)')
  120.  
  121. BUF_SIZE = 10000
  122. SERVER_NAME = 'Sneaky'
  123. ACTUAL_SERVER_PROTOCOL = 'HTTP/1.1' # should this be determined from request?
  124.  
  125. def formatdateRFC822():
  126.     t=time.gmtime(time.time())
  127.     w=("Mon", "Tue", "Wed", "Thu", "Fri", "Sat", "Sun")[t[6]]    
  128.     return w+time.strftime(", %d %b %Y %H:%M%S GMT",t)
  129.  
  130. class ChunkedReader:
  131.     """ class used to read chunked input """
  132.     def __init__(self,stream):
  133.         self.stream = stream        
  134.         self.buffer = None
  135.  
  136.     def __chunk_read(self):
  137.         if not self.buffer or self.buffer.tell() == self.buffer_size:
  138.             self.buffer_size = \
  139.                 int(regex_chunk.match(self.stream.readline()).group('size'),16)
  140.             if self.buffer_size:
  141.                 self.buffer = StringIO(self.stream.read(self.buffer_size))
  142.  
  143.     def read(self,size):
  144.         data = ''
  145.         while size:
  146.             self.__chunk_read()
  147.             if not self.buffer_size:
  148.                 break
  149.             read_size = min(size,self.buffer_size)
  150.             data += self.buffer.read(read_size)
  151.             size -= read_size
  152.         return data
  153.  
  154.     def readline(self):
  155.         data = ''
  156.         for c in self.read(1):
  157.             if not c:
  158.                 break
  159.             data += c
  160.             if c == '\n':
  161.                 break
  162.         return data
  163.  
  164.     def readlines(self):
  165.         yield self.readline()
  166.      
  167. def errors_numbers(errnames):
  168.     """utility to build a list of socket errors"""
  169.     return set([getattr(errno, k) for k in errnames if hasattr(errno,k)])
  170.  
  171. socket_errors_to_ignore = errors_numbers((
  172.     "EPIPE",
  173.     "EBADF", "WSAEBADF",
  174.     "ENOTSOCK", "WSAENOTSOCK",
  175.     "ETIMEDOUT", "WSAETIMEDOUT",
  176.     "ECONNREFUSED", "WSAECONNREFUSED",
  177.     "ECONNRESET", "WSAECONNRESET",
  178.     "ECONNABORTED", "WSAECONNABORTED",
  179.     "ENETRESET", "WSAENETRESET",
  180.     "EHOSTDOWN", "EHOSTUNREACH",
  181.     ))
  182.  
  183. class Worker(threading.Thread):
  184.     """class representing a worker node"""
  185.     queue = Queue()               # queue of requests to process (socket,address)
  186.     threads = set()               # set of threads (instances or Worker class
  187.     wsgi_apps = []                # [the_wsgi_app]
  188.     server_name = SERVER_NAME
  189.     min_threads = 10
  190.     max_threads = 10
  191.  
  192.     def run(self):  
  193.         """runs the thread:
  194.         - pick a request from queue
  195.         - parse input
  196.         - run wsgi_app
  197.         - send response
  198.         - resize set of threads
  199.         """
  200.         while True:
  201.             (client_socket,client_address) = self.queue.get()
  202.             if not client_socket:
  203.                 return self.die()
  204.             if hasattr(client_socket,'settimeout'):
  205.                 client_socket.settimeout(self.timeout)            
  206.             while True:
  207.                 wsgi_file = client_socket.makefile('rb',BUF_SIZE)
  208.                 try:
  209.                     environ = self.build_environ(wsgi_file,client_address)
  210.                     data_items = self.wsgi_apps[0](environ,self.start_response)                    
  211.                     if self.respond(client_socket, environ, data_items):
  212.                         break
  213.                 except:
  214.                     logging.warn(str(traceback.format_exc()))
  215.                     self.try_error_response(client_socket)
  216.                     break            
  217.             wsgi_file.close()
  218.             client_socket.close()
  219.             self.resize_thread_pool()
  220.  
  221.     def die(self):
  222.         """kills this thread, must be called by run()"""
  223.         self.threads.remove(self)
  224.         return
  225.            
  226.     def build_environ(self,wsgi_file,client_address):
  227.         """parse request and build WSGI environ"""
  228.         first_line = wsgi_file.readline()
  229.         match = regex_head.match(first_line)        
  230.         request_method = match.group('method')
  231.         uri = match.group('uri')
  232.         request_protocol = match.group('protocol')
  233.         k = uri.find('?')
  234.         if k<0:
  235.             k = len(uri)
  236.         (path_info,query_string) = (uri[:k],uri[k+1:])
  237.         environ = {'wsgi.version': (1,0),
  238.                    'wsgi.input': wsgi_file,
  239.                    'wsgi.url_encoding': 'utf-8',
  240.                    'wsgi.url_scheme': 'http',
  241.                    'wsgi.errors': sys.stderr,
  242.                    'ACTUAL_SERVER_PROTOCOL': ACTUAL_SERVER_PROTOCOL,
  243.                    'CLIENT_ADDR': client_address[0],                
  244.                    'CLIENT_PORT': client_address[1],
  245.                    'PATH_INFO': path_info,
  246.                    'REQUEST_URI': uri,
  247.                    'REQUEST_METHOD':request_method,
  248.                    'PATH_INFO': path_info,
  249.                    'SCRIPT_NAME': '',                
  250.                    'QUERY_STRING': query_string}
  251.         for line in wsgi_file:
  252.             if line == '\r\n':
  253.                 break
  254.             match = regex_header.match(line)
  255.             if not match:
  256.                 continue
  257.             key = match.group('key').upper().replace('-','_')
  258.             if isinstance(key,unicode):
  259.                 key = key.encode('ISO-8859-1')
  260.             value = match.group('value')
  261.             try:
  262.                 value = value.decode('ISO-8859-1').encode('utf-8')
  263.             except:
  264.                 pass
  265.             environ['HTTP_'+key] = value
  266.             if key == 'CONTENT_LENGTH':
  267.                 environ[key]=value
  268.             if key == 'CONTENT_TYPE':
  269.                 envione[key]=value
  270.             if key == 'TRANSFER_ENCODING' and value[:7].lower() == 'chunked':
  271.                 environ['wsgi.input'] = ChunkedReader(wsgi_file)
  272.         return environ
  273.  
  274.     def start_response(self,status,headers):
  275.         """to be passed as second argument to wsgi_app"""
  276.         self.status = status
  277.         self.headers = headers
  278.  
  279.     def respond(self,client_socket,environ,data_items):
  280.         """called after wsgi_app successfully retruns"""
  281.         headers = self.headers
  282.         header_dict = dict([(x.lower(),y.strip()) for (x,y) in headers])
  283.         if not 'date' in header_dict:
  284.             headers.append(('Date',formatdateRFC822()))
  285.         if not 'server' in header_dict:
  286.             headers.append(('Server',self.server_name))
  287.         chunked = header_dict.get('transfer-encoding','')[:7].lower() == 'chunked'
  288.         if not 'content-length' in header_dict and not chunked:
  289.             if isinstance(data_items,list) and len(data_items) == 1:
  290.                 headers.append(('Content-Length',len(data_items[0])))
  291.         connection = environ.get('HTTP_CONNECTION','close')
  292.         headers.append(('Connection',connection))        
  293.         serialized_headers = \
  294.             ''.join(['%s: %s\r\n' % (k,v) for (k,v) in headers])
  295.         data = "HTTP/1.1 %s\r\n%s\r\n" % (self.status, serialized_headers)
  296.         client_socket.sendall(data)
  297.         for data in data_items:                  
  298.             try:
  299.                 if chunked:
  300.                     client_socket.sendall('%x\r\n%s\r\n' % (len(data),data))
  301.                 else:
  302.                     client_socket.sendall(data)
  303.             except socket.error, e:
  304.                 if e.args[0] not in socket_errors_to_ignore:
  305.                     raise
  306.         if chunked:
  307.             client_socket.sendall('0\r\n')
  308.         return connection.lower() != 'keep-alive'
  309.  
  310.     def try_error_response(self,client_socket,
  311.                            status = "500 INSERTNAL SERVER ERROR"):        
  312.         """called if thread fails"""
  313.         try:
  314.             client_socket.sendall(
  315.                     "HTTP/1.0 %s\r\nContent-Length: 0\r\nContent-Type: text/plain\r\n\r\n" % status)
  316.         except: pass
  317.  
  318.     def resize_thread_pool(self):
  319.         """created new Worker(s) or kills some Worker(s)"""
  320.         if self.max_threads>self.min_threads:
  321.             qe = Worker.queue.empty()
  322.             ql = len(Worker.threads)
  323.             if qe and ql>self.min_threads:
  324.                 for k in range(self.min_threads):
  325.                     Worker.queue.put((None,None))
  326.             elif not qe and ql<self.max_threads:
  327.                 for k in range(self.min_threads):
  328.                     new_worker = Worker()
  329.                     Worker.threads.add(new_worker)
  330.                     new_worker.start()
  331.                    
  332. class Sneaky:
  333.     """the actual web server"""
  334.     def __init__(self,  bind_addr, wsgi_app,
  335.                  numthreads = 10,
  336.                  server_name = SERVER_NAME,
  337.                  max_threads = None,
  338.                  request_queue_size = None,
  339.                  timeout = 10,
  340.                  shutdown_timeout = 5):
  341.         """
  342.         Example::
  343.        
  344.         s = Sneaky('127.0.0.1:8000',test_wsgi_app,100)
  345.         s.start()
  346.  
  347.         :bind_addr can be ('127.0.0.1',8000) or '127.0.0.1:8000'
  348.         :wsgi_app is a generic WSGI application
  349.         :numthreads is the min number of threads (10 by default)
  350.         :server_name ("Skeaky" by default)
  351.         :max_threads is the max number of threads or None (default)
  352.                      should be a multiple of numthreads
  353.         :request_queue_size if set to None (default) adjusts automatically
  354.         :timeout on socket IO in seconds (10 secs default)
  355.         :shotdown_timeout in seconds (5 secs default)
  356.         """
  357.         if isinstance(bind_addr,str):
  358.             bind_addr = bind_addr.split(':')
  359.         self.address = bind_addr[0]
  360.         self.port = bind_addr[1]
  361.         self.request_queue_size = request_queue_size
  362.         self.shutdown_timeout = shutdown_timeout
  363.         self.ssl_interface = None
  364.  
  365.         Worker.wsgi_apps.append(wsgi_app)
  366.         Worker.server_name = server_name
  367.         Worker.min_threads = numthreads
  368.         Worker.max_threads = max_threads
  369.         Worker.timeout = timeout
  370.         Worker.threads.update([Worker() for k in range(numthreads)])
  371.  
  372.     def set_listen_queue_size(self):
  373.         """tries a listen argument that works"""
  374.         if self.request_queue_size:
  375.             self.socket.listen(self.request_queue_size)
  376.         else:
  377.             for request_queue_size in [1024,128,5,1]:
  378.                 try:
  379.                     self.socket.listen(request_queue_size)
  380.                     break
  381.                 except:
  382.                     pass
  383.  
  384.     def start(self):
  385.         """starts the server"""
  386.         print 'Experimental "Sneaky" WSGI web server. Starting...'
  387.         for thread in Worker.threads:
  388.             thread.start()
  389.         self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  390.         if not self.socket:
  391.             raise IOException # unable to connect        
  392.         try:
  393.             self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
  394.         except:
  395.             logging.error("Unable to set SO_REUSEADDR")
  396.         try:
  397.             self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
  398.         except:
  399.             logging.error("Unable to set TCP_NODELAY")
  400.         try:
  401.             self.socket.bind((self.address,int(self.port)))
  402.         except:
  403.             logging.error("Port taken by other process. Unable to bind")
  404.             sys.exit(1)
  405.         self.set_listen_queue_size()
  406.         if self.ssl_interface:
  407.             self.socket = self.ssl_interface(self.socket)
  408.         try:
  409.             while True:
  410.                 try:
  411.                     (client_socket,client_address) = self.socket.accept()
  412.                     Worker.queue.put((client_socket,client_address))
  413.                 except KeyboardInterrupt:
  414.                     return self.stop()
  415.                 except Exception:
  416.                     logging.warn(str(traceback.format_exc()))
  417.                     continue
  418.         except Exception:
  419.             logging.warn(str(traceback.format_exc()))
  420.             return self.stop()
  421.  
  422.     def kill(self,status,frame):
  423.         """kills the server"""
  424.         logging.error('forcefully killing server')
  425.         sys.exit(1)
  426.  
  427.     def stop(self):
  428.         """tries to gracefully quit the server"""
  429.         try:
  430.             signal.signal(signal.SIGALRM,self.kill)
  431.             signal.alarm(self.shutdown_timeout)
  432.         except:
  433.             pass
  434.         threads = copy.copy(Worker.threads)
  435.         for thread in threads:
  436.             Worker.queue.put((None,None))
  437.         while Worker.threads: time.sleep(1)
  438.         try:
  439.             signal.alarm(0)
  440.         except:
  441.             pass
  442.  
  443. def test_wsgi_app(environ, start_response):
  444.     """just a test app"""
  445.     status = '200 OK'
  446.     response_headers = [('Content-type','text/plain')]
  447.     start_response(status, response_headers)
  448.     return ['hello world!\n']
  449.  
  450. if __name__ == '__main__':
  451.  
  452.     if '-debug' in sys.argv[1:]:
  453.         logging.basicConfig(level = logging.INFO)
  454.  
  455.     address = ([a for a in sys.argv[1:] if a[0]!='-'] + ['127.0.0.1:8000'])[0]
  456.  
  457.     print 'serving from: '+address
  458.  
  459.     server = Sneaky(address, # the ip:port
  460.                     test_wsgi_app,  # the SWGI application
  461.                     numthreads = 100, # min number of threads
  462.                     max_threads = 100 # max number of threads
  463.                     )
  464.     server.start()
clone this paste RAW Paste Data