Advertisement
Guest User

illume

a guest
Sep 19th, 2009
104
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 15.99 KB | None | 0 0
  1. """
  2. Experimetal multi-threaded web server created by Massimo Di Pierro
  3. For lack of a better we'll call it Sneaky.
  4. License: GPL2
  5.  
  6. This code would have been possible without CherryPy wsgiserver,
  7. a great example of Python web server.
  8.  
  9. - This code implements WSGI
  10. - This code is API compatible with cherrypy
  11. - It consists of less than 260 lines of code
  12. - It is multi-threaded
  13. - The number of threads changes dynamically between a min and a max
  14. - Can handle chunking (request and response) [to be tested]
  15. - supports SSL via the Cherrypy ssl adaptors
  16.  
  17. You can find an example of usage at the bottom of this file.
  18.  
  19. here are some tests and comparisons performed by Garrett Smith
  20.  
  21. RPS = requests per second
  22. Time = average time in milliseconds to server each request
  23. Benchmark = `ab -t 10 -c <number of concurrent requests>-r http://localhost`
  24.  
  25. 100 Concurrent Requests
  26. -----------------------
  27. ===============
  28. App Server RPS
  29. ==============
  30. Fapws 7174
  31. Landshark 4479
  32. PHP-5 4191
  33. modwsgi 3651
  34. Tomcat 6 3554
  35. Tornado 2641
  36. Sneaky WSGI(*) 2372
  37. CherryPy WSGI 2102
  38. Phusion 1873
  39. Jetty 6 937
  40. Django WSGI 785
  41. WEBrick 43
  42. ===============
  43.  
  44. 1,000 Concurrent Requests
  45. -------------------------
  46. ===============
  47. App Server RPS
  48. ===============
  49. Fapws 5359
  50. Landshark 4477
  51. modwsgi 3449
  52. PHP 5 3062
  53. Tomcat 6 3014
  54. Tornado 2452
  55. Sneaky WSGI(*) 2415
  56. CherryPy WSGI 2126
  57. Phusion 1585
  58. Jetty 6 1095
  59. Django WSGI 953
  60. WEBrick 50
  61. ===============
  62.  
  63. 10,000 Concurrent Requests
  64. --------------------------
  65. ===============
  66. App Server RPS
  67. ===============
  68. Fapws 5213
  69. Landshark 4239
  70. Tomcat 6 2369
  71. Tornado 2265
  72. PHP 5 2239
  73. Sneaky WSGI (*) 2225
  74. modwsgi 2115
  75. CherryPy WSGI 1731
  76. Phusion 1247
  77. Jetty 6 794
  78. Django WSGI 890
  79. WEBrick 84
  80. ===============
  81.  
  82. 20,000 Concurrent Requests
  83. --------------------------
  84. ===============
  85. App Server RPS
  86. ===============
  87. Fapws 4788
  88. Landshark 2936
  89. Tornado 2153
  90. Sneaky WSGI(*) 2130
  91. PHP 5 1728
  92. modwsgi 1374
  93. Tomcat 6 1362
  94. CherryPy WSGI 1294
  95. Phusion 961
  96. Django WSGI 790
  97. Jetty 6 616
  98. WEBrick 63
  99. ===============
  100.  
  101. """
  102.  
  103. import os
  104. import threading
  105. import socket
  106. import logging
  107. import sys
  108. import re
  109. import errno
  110. import signal
  111. import time
  112. import traceback
  113. import copy
  114.  
  115. from io import StringIO
  116. from queue import Queue
  117. regex_head = re.compile(b'^((http|https|HTTP|HTTPS)\://[^/]+)?(?P<method>\w+)\s+(?P<uri>\S+)\s+(?P<protocol>\S+)')
  118. regex_header = re.compile(b'\s*(?P<key>.*?)\s*\:\s*(?P<value>.*?)\s*$')
  119. regex_chunk = re.compile(b'^(?P<size>\w+)')
  120.  
  121. BUF_SIZE = 10000
  122. SERVER_NAME = 'Sneaky'
  123. ACTUAL_SERVER_PROTOCOL = 'HTTP/1.1' # should this be determined from request?
  124.  
  125. def formatdateRFC822():
  126. t=time.gmtime(time.time())
  127. w=("Mon", "Tue", "Wed", "Thu", "Fri", "Sat", "Sun")[t[6]]
  128. return w+time.strftime(", %d %b %Y %H:%M%S GMT",t)
  129.  
  130. class ChunkedReader:
  131. """ class used to read chunked input """
  132. def __init__(self,stream):
  133. self.stream = stream
  134. self.buffer = None
  135.  
  136. def __chunk_read(self):
  137. if not self.buffer or self.buffer.tell() == self.buffer_size:
  138. self.buffer_size = \
  139. int(regex_chunk.match(self.stream.readline()).group('size'),16)
  140. if self.buffer_size:
  141. self.buffer = StringIO(self.stream.read(self.buffer_size))
  142.  
  143. def read(self,size):
  144. data = ''
  145. while size:
  146. self.__chunk_read()
  147. if not self.buffer_size:
  148. break
  149. read_size = min(size,self.buffer_size)
  150. data += self.buffer.read(read_size)
  151. size -= read_size
  152. return data
  153.  
  154. def readline(self):
  155. data = ''
  156. for c in self.read(1):
  157. if not c:
  158. break
  159. data += c
  160. if c == '\n':
  161. break
  162. return data
  163.  
  164. def readlines(self):
  165. yield self.readline()
  166.  
  167. def errors_numbers(errnames):
  168. """utility to build a list of socket errors"""
  169. return set([getattr(errno, k) for k in errnames if hasattr(errno,k)])
  170.  
  171. socket_errors_to_ignore = errors_numbers((
  172. "EPIPE",
  173. "EBADF", "WSAEBADF",
  174. "ENOTSOCK", "WSAENOTSOCK",
  175. "ETIMEDOUT", "WSAETIMEDOUT",
  176. "ECONNREFUSED", "WSAECONNREFUSED",
  177. "ECONNRESET", "WSAECONNRESET",
  178. "ECONNABORTED", "WSAECONNABORTED",
  179. "ENETRESET", "WSAENETRESET",
  180. "EHOSTDOWN", "EHOSTUNREACH",
  181. ))
  182.  
  183. class Worker(threading.Thread):
  184. """class representing a worker node"""
  185. queue = Queue() # queue of requests to process (socket,address)
  186. threads = set() # set of threads (instances or Worker class
  187. wsgi_apps = [] # [the_wsgi_app]
  188. server_name = SERVER_NAME
  189. min_threads = 10
  190. max_threads = 10
  191.  
  192. def run(self):
  193. """runs the thread:
  194. - pick a request from queue
  195. - parse input
  196. - run wsgi_app
  197. - send response
  198. - resize set of threads
  199. """
  200. while True:
  201. (client_socket,client_address) = self.queue.get()
  202. if not client_socket:
  203. return self.die()
  204. if hasattr(client_socket,'settimeout'):
  205. client_socket.settimeout(self.timeout)
  206. while True:
  207. wsgi_file = client_socket.makefile('rb',BUF_SIZE)
  208. try:
  209. environ = self.build_environ(wsgi_file,client_address)
  210. data_items = self.wsgi_apps[0](environ,self.start_response)
  211. if self.respond(client_socket, environ, data_items):
  212. break
  213. except:
  214. logging.warn(str(traceback.format_exc()))
  215. self.try_error_response(client_socket)
  216. break
  217. wsgi_file.close()
  218. client_socket.close()
  219. self.resize_thread_pool()
  220.  
  221. def die(self):
  222. """kills this thread, must be called by run()"""
  223. self.threads.remove(self)
  224. return
  225.  
  226. def build_environ(self,wsgi_file,client_address):
  227. """parse request and build WSGI environ"""
  228. first_line = wsgi_file.readline()
  229. match = regex_head.match(first_line)
  230. request_method = match.group('method')
  231. uri = str(match.group('uri'))
  232. request_protocol = match.group('protocol')
  233. k = uri.find('?')
  234. if k<0:
  235. k = len(uri)
  236. (path_info,query_string) = (uri[:k],uri[k+1:])
  237. environ = {'wsgi.version': (1,0),
  238. 'wsgi.input': wsgi_file,
  239. 'wsgi.url_encoding': 'utf-8',
  240. 'wsgi.url_scheme': 'http',
  241. 'wsgi.errors': sys.stderr,
  242. 'ACTUAL_SERVER_PROTOCOL': ACTUAL_SERVER_PROTOCOL,
  243. 'CLIENT_ADDR': client_address[0],
  244. 'CLIENT_PORT': client_address[1],
  245. 'PATH_INFO': path_info,
  246. 'REQUEST_URI': uri,
  247. 'REQUEST_METHOD':request_method,
  248. 'PATH_INFO': path_info,
  249. 'SCRIPT_NAME': '',
  250. 'QUERY_STRING': query_string}
  251. for line in wsgi_file:
  252. if line == b'\r\n':
  253. break
  254. match = regex_header.match(line)
  255. if not match:
  256. continue
  257. key = str(match.group('key')).upper().replace('-','_')
  258. value = str(match.group('value'))
  259. try:
  260. value = value.decode('ISO-8859-1').encode('utf-8')
  261. except:
  262. pass
  263. environ['HTTP_'+key] = value
  264. if key == 'CONTENT_LENGTH':
  265. environ[key]=value
  266. if key == 'CONTENT_TYPE':
  267. envione[key]=value
  268. if key == 'TRANSFER_ENCODING' and value[:7].lower() == 'chunked':
  269. environ['wsgi.input'] = ChunkedReader(wsgi_file)
  270. return environ
  271.  
  272. def start_response(self,status,headers):
  273. """to be passed as second argument to wsgi_app"""
  274. self.status = status
  275. self.headers = headers
  276.  
  277. def respond(self,client_socket,environ,data_items):
  278. """called after wsgi_app successfully retruns"""
  279. headers = self.headers
  280. header_dict = dict([(x.lower(),y.strip()) for (x,y) in headers])
  281. if not 'date' in header_dict:
  282. headers.append(('Date',formatdateRFC822()))
  283. if not 'server' in header_dict:
  284. headers.append(('Server',self.server_name))
  285. chunked = header_dict.get('transfer-encoding','')[:7].lower() == 'chunked'
  286. if not 'content-length' in header_dict and not chunked:
  287. if isinstance(data_items,list) and len(data_items) == 1:
  288. headers.append(('Content-Length',len(bytes(data_items[0],'utf8'))))
  289. connection = environ.get('HTTP_CONNECTION','close')
  290. headers.append(('Connection',connection))
  291. serialized_headers = \
  292. ''.join(['%s: %s\r\n' % (k,v) for (k,v) in headers])
  293. data = "HTTP/1.1 %s\r\n%s\r\n" % (self.status, serialized_headers)
  294. client_socket.sendall(bytes(data,'utf8'))
  295. for data in data_items:
  296. try:
  297. if chunked:
  298. client_socket.sendall(bytes('%x\r\n%s\r\n' % (len(data),data),'utf8'))
  299. else:
  300. client_socket.sendall(bytes(data,'utf8'))
  301. except socket.error as e:
  302. if e.args[0] not in socket_errors_to_ignore:
  303. raise
  304. if chunked:
  305. client_socket.sendall(b'0\r\n')
  306. return connection.lower() != 'keep-alive'
  307.  
  308. def try_error_response(self,client_socket,
  309. status = "500 INSERTNAL SERVER ERROR"):
  310. """called if thread fails"""
  311. try:
  312. client_socket.sendall(
  313. b"HTTP/1.0 %s\r\nContent-Length: 0\r\nContent-Type: text/plain\r\n\r\n" % status)
  314. except: pass
  315.  
  316. def resize_thread_pool(self):
  317. """created new Worker(s) or kills some Worker(s)"""
  318. if self.max_threads>self.min_threads:
  319. qe = Worker.queue.empty()
  320. ql = len(Worker.threads)
  321. if qe and ql>self.min_threads:
  322. for k in range(self.min_threads):
  323. Worker.queue.put((None,None))
  324. elif not qe and ql<self.max_threads:
  325. for k in range(self.min_threads):
  326. new_worker = Worker()
  327. Worker.threads.add(new_worker)
  328. new_worker.start()
  329.  
  330. class Sneaky:
  331. """the actual web server"""
  332. def __init__(self, bind_addr, wsgi_app,
  333. numthreads = 10,
  334. server_name = SERVER_NAME,
  335. max_threads = None,
  336. request_queue_size = None,
  337. timeout = 10,
  338. shutdown_timeout = 5):
  339. """
  340. Example::
  341.  
  342. s = Sneaky('127.0.0.1:8000',test_wsgi_app,100)
  343. s.start()
  344.  
  345. :bind_addr can be ('127.0.0.1',8000) or '127.0.0.1:8000'
  346. :wsgi_app is a generic WSGI application
  347. :numthreads is the min number of threads (10 by default)
  348. :server_name ("Skeaky" by default)
  349. :max_threads is the max number of threads or None (default)
  350. should be a multiple of numthreads
  351. :request_queue_size if set to None (default) adjusts automatically
  352. :timeout on socket IO in seconds (10 secs default)
  353. :shotdown_timeout in seconds (5 secs default)
  354. """
  355. if isinstance(bind_addr,str):
  356. bind_addr = bind_addr.split(':')
  357. self.address = bind_addr[0]
  358. self.port = bind_addr[1]
  359. self.request_queue_size = request_queue_size
  360. self.shutdown_timeout = shutdown_timeout
  361. self.ssl_interface = None
  362.  
  363. Worker.wsgi_apps.append(wsgi_app)
  364. Worker.server_name = server_name
  365. Worker.min_threads = numthreads
  366. Worker.max_threads = max_threads
  367. Worker.timeout = timeout
  368. Worker.threads.update([Worker() for k in range(numthreads)])
  369.  
  370. def set_listen_queue_size(self):
  371. """tries a listen argument that works"""
  372. if self.request_queue_size:
  373. self.socket.listen(self.request_queue_size)
  374. else:
  375. for request_queue_size in [1024,128,5,1]:
  376. try:
  377. self.socket.listen(request_queue_size)
  378. break
  379. except:
  380. pass
  381.  
  382. def start(self):
  383. """starts the server"""
  384. print ('Experimental "Sneaky" WSGI web server. Starting...')
  385. for thread in Worker.threads:
  386. thread.start()
  387. self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  388. if not self.socket:
  389. raise IOException # unable to connect
  390. try:
  391. self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
  392. except:
  393. logging.error("Unable to set SO_REUSEADDR")
  394. try:
  395. self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
  396. except:
  397. logging.error("Unable to set TCP_NODELAY")
  398. try:
  399. self.socket.bind((self.address,int(self.port)))
  400. except:
  401. logging.error("Port taken by other process. Unable to bind")
  402. sys.exit(1)
  403. self.set_listen_queue_size()
  404. if self.ssl_interface:
  405. self.socket = self.ssl_interface(self.socket)
  406. try:
  407. while True:
  408. try:
  409. (client_socket,client_address) = self.socket.accept()
  410. Worker.queue.put((client_socket,client_address))
  411. except KeyboardInterrupt:
  412. return self.stop()
  413. except Exception:
  414. logging.warn(str(traceback.format_exc()))
  415. continue
  416. except Exception:
  417. logging.warn(str(traceback.format_exc()))
  418. return self.stop()
  419.  
  420. def kill(self,status,frame):
  421. """kills the server"""
  422. logging.error('forcefully killing server')
  423. sys.exit(1)
  424.  
  425. def stop(self):
  426. """tries to gracefully quit the server"""
  427. try:
  428. signal.signal(signal.SIGALRM,self.kill)
  429. signal.alarm(self.shutdown_timeout)
  430. except:
  431. pass
  432. threads = copy.copy(Worker.threads)
  433. for thread in threads:
  434. Worker.queue.put((None,None))
  435. while Worker.threads: time.sleep(1)
  436. try:
  437. signal.alarm(0)
  438. except:
  439. pass
  440.  
  441. def test_wsgi_app(environ, start_response):
  442. """just a test app"""
  443. status = '200 OK'
  444. response_headers = [('Content-type','text/plain')]
  445. start_response(status, response_headers)
  446. return ['hello world!\n']
  447.  
  448. if __name__ == '__main__':
  449.  
  450. if '-debug' in sys.argv[1:]:
  451. logging.basicConfig(level = logging.INFO)
  452.  
  453. address = ([a for a in sys.argv[1:] if a[0]!='-'] + ['127.0.0.1:8000'])[0]
  454.  
  455. print ('serving from: '+address)
  456.  
  457. server = Sneaky(address, # the ip:port
  458. test_wsgi_app, # the SWGI application
  459. numthreads = 100, # min number of threads
  460. max_threads = 100 # max number of threads
  461. )
  462. server.start()
  463.  
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement