Want more features on Pastebin? Sign Up, it's FREE!
Guest

asyncore

By: a guest on Dec 8th, 2011  |  syntax: Python  |  size: 3.92 KB  |  views: 28  |  expires: Never
download  |  raw  |  embed  |  report abuse  |  print
Text below is selected. Please press Ctrl+C to copy to your clipboard. (⌘+C on Mac)
  1. #! /usr/bin/env python
  2. # coding: utf-8
  3.  
  4. import asyncore, socket, time, struct, logging, threading
  5.  
  6. #USE_POLL = True
  7. USE_POLL = False
  8. ASYNCORE_LOOP_TIMEOUT = 0.1
  9.  
  10.  
  11. SERVER_ADDRESS = 'localhost'
  12. SERVER_PORT = 1234
  13.  
  14. clients_pool = {}
  15.  
  16.  
  17. class ClientConnection(asyncore.dispatcher):
  18.         def __init__(self, thread_id, cb_onread, cb_1hz, cb_onclosed):
  19.                 self.thread_id = thread_id
  20.                 asyncore.dispatcher.__init__(self)
  21.                 self.cb_onread = cb_onread
  22.                 self.cb_1hz = cb_1hz
  23.                 self.cb_onclosed = cb_onclosed
  24.                 self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
  25.                 self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 1024*1024)
  26.                 self.connect((SERVER_ADDRESS, SERVER_PORT))
  27.                 self.buffer_tx = ''
  28.                 self.buffer_rx = ''
  29.                 self.read_len = 0
  30.                 self.read_timeout = 0
  31.                 self.last_periodical_service_at = time.time()
  32.                 if not thread_id in clients_pool:
  33.                         clients_pool.update({thread_id:{}})
  34.                 clients_pool[thread_id].update({self: self})
  35.        
  36.         def destroy(self):
  37.                 self.close()
  38.                 try:
  39.                         del clients_pool[self.thread_id][self]
  40.                 except:
  41.                         pass
  42.        
  43.         def begin_read(self, len, timeout):
  44.                 self.read_len = len
  45.                 self.read_timeout = timeout
  46.        
  47.         def begin_write(self, data):
  48.                 self.buffer_tx += data
  49.                 self.handle_write()
  50.        
  51.         def _invoke_onread_safely(self, arg):
  52.                 try:
  53.                         self.cb_onread(arg)
  54.                 except Exception, ex:
  55.                         logging.error('ClientConnection: onread callback exception: ' + repr(ex))
  56.        
  57.         def _try_onread(self):
  58.                 if (self.read_len > 0) and (len(self.buffer_rx) >= self.read_len) and (self.read_len > 0):
  59.                         buf = self.buffer_rx[:self.read_len]
  60.                         self.buffer_rx = self.buffer_rx[self.read_len:]
  61.                         self.read_len = 0
  62.                         self._invoke_onread_safely(buf)
  63.        
  64.         def handle_1hz(self):
  65.                 if self.read_len > 0:
  66.                         if self.read_timeout == 0:
  67.                                 if self.read_len > len(self.buffer_rx):
  68.                                         self.read_len = len(self.buffer_rx)
  69.                                 buf = self.buffer_rx[:self.read_len]
  70.                                 self.buffer_rx = self.buffer_rx[self.read_len:]
  71.                                 self.read_len = 0
  72.                                 self.buffer_rx = ''
  73.                                 self._invoke_onread_safely(buf)
  74.                         else:
  75.                                 self.read_timeout -= 1
  76.                 try:
  77.                         self.cb_1hz()
  78.                 except Exception, ex:
  79.                         logging.error('ClientConnection: 1hz callback exception: ' + repr(ex))
  80.        
  81.         def handle_service_timeout(self):
  82.                 self._try_onread()
  83.                 now = time.time()
  84.                 if now - self.last_periodical_service_at >= 1:
  85.                         self.last_periodical_service_at = now
  86.                         self.handle_1hz()
  87.        
  88.         def handle_connect(self):
  89.                 pass
  90.  
  91.         def handle_close(self):
  92.                 logging.error('ClientConnection: connection closed')
  93.                 self.close()
  94.                 global clients_pool
  95.                 if len(clients_pool) == 1 and len(clients_pool[self.thread_id]) == 1:
  96.                         clients_pool = None
  97.                         exit(1)
  98.                 self.cb_onclosed()
  99.  
  100.         def handle_read(self):
  101.                 buf = self.recv(1024)
  102.                 self.buffer_rx += buf
  103.                 self.handle_service_timeout()
  104.  
  105.         def writable(self):
  106.                 self.handle_service_timeout()
  107.                 return (len(self.buffer_tx) > 0)
  108.  
  109.         def handle_write(self):
  110.                 sent = self.send(self.buffer_tx)
  111.                 self.buffer_tx = self.buffer_tx[sent:]
  112.  
  113. # -----------------------------------------------------------------------------
  114.  
  115. def spin_once(thread_id):
  116.         if not thread_id in clients_pool:
  117.                 time.sleep(ASYNCORE_LOOP_TIMEOUT)
  118.                 return
  119.         try:
  120.                 asyncore.loop(timeout=ASYNCORE_LOOP_TIMEOUT, use_poll=USE_POLL, map=clients_pool[thread_id], count=1)
  121.         except socket.error, ex:
  122.                 logging.warning('ClientConnection: socket error: "' + str(ex) + '", ignoring...')
  123.  
  124. def spin(thread_id):
  125.         logging.info('ClientConnection: asyncore thread "%s" with %i connections'%(str(thread_id), len(clients_pool[thread_id])))
  126.         while True:
  127.                 try:
  128.                         asyncore.loop(timeout=ASYNCORE_LOOP_TIMEOUT, use_poll=USE_POLL, map=clients_pool[thread_id])
  129.                 except socket.error, ex:
  130.                         logging.warning('ClientConnection: socket error: "' + str(ex) + '", ignoring...')
  131.  
  132. def spin_all():
  133.         threads = [threading.Thread(target=spin, args=(invokearg,)) for invokearg in clients_pool.keys()]
  134.         map(lambda self: self.start(), threads)
  135.         map(lambda self: self.join(), threads)
  136.  
  137.  
  138.  
clone this paste RAW Paste Data