1. #! /usr/bin/env python
  2. # coding: utf-8
  3.  
  4. import asyncore, socket, time, struct, logging, threading
  5.  
  6. #USE_POLL = True
  7. USE_POLL = False
  8. ASYNCORE_LOOP_TIMEOUT = 0.1
  9.  
  10.  
  11. SERVER_ADDRESS = 'localhost'
  12. SERVER_PORT = 1234
  13.  
  14. clients_pool = {}
  15.  
  16.  
  17. class ClientConnection(asyncore.dispatcher):
  18.     def __init__(self, thread_id, cb_onread, cb_1hz, cb_onclosed):
  19.         self.thread_id = thread_id
  20.         asyncore.dispatcher.__init__(self)
  21.         self.cb_onread = cb_onread
  22.         self.cb_1hz = cb_1hz
  23.         self.cb_onclosed = cb_onclosed
  24.         self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
  25.         self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 1024*1024)
  26.         self.connect((SERVER_ADDRESS, SERVER_PORT))
  27.         self.buffer_tx = ''
  28.         self.buffer_rx = ''
  29.         self.read_len = 0
  30.         self.read_timeout = 0
  31.         self.last_periodical_service_at = time.time()
  32.         if not thread_id in clients_pool:
  33.             clients_pool.update({thread_id:{}})
  34.         clients_pool[thread_id].update({self: self})
  35.    
  36.     def destroy(self):
  37.         self.close()
  38.         try:
  39.             del clients_pool[self.thread_id][self]
  40.         except:
  41.             pass
  42.    
  43.     def begin_read(self, len, timeout):
  44.         self.read_len = len
  45.         self.read_timeout = timeout
  46.    
  47.     def begin_write(self, data):
  48.         self.buffer_tx += data
  49.         self.handle_write()
  50.    
  51.     def _invoke_onread_safely(self, arg):
  52.         try:
  53.             self.cb_onread(arg)
  54.         except Exception, ex:
  55.             logging.error('ClientConnection: onread callback exception: ' + repr(ex))
  56.    
  57.     def _try_onread(self):
  58.         if (self.read_len > 0) and (len(self.buffer_rx) >= self.read_len) and (self.read_len > 0):
  59.             buf = self.buffer_rx[:self.read_len]
  60.             self.buffer_rx = self.buffer_rx[self.read_len:]
  61.             self.read_len = 0
  62.             self._invoke_onread_safely(buf)
  63.    
  64.     def handle_1hz(self):
  65.         if self.read_len > 0:
  66.             if self.read_timeout == 0:
  67.                 if self.read_len > len(self.buffer_rx):
  68.                     self.read_len = len(self.buffer_rx)
  69.                 buf = self.buffer_rx[:self.read_len]
  70.                 self.buffer_rx = self.buffer_rx[self.read_len:]
  71.                 self.read_len = 0
  72.                 self.buffer_rx = ''
  73.                 self._invoke_onread_safely(buf)
  74.             else:
  75.                 self.read_timeout -= 1
  76.         try:
  77.             self.cb_1hz()
  78.         except Exception, ex:
  79.             logging.error('ClientConnection: 1hz callback exception: ' + repr(ex))
  80.    
  81.     def handle_service_timeout(self):
  82.         self._try_onread()
  83.         now = time.time()
  84.         if now - self.last_periodical_service_at >= 1:
  85.             self.last_periodical_service_at = now
  86.             self.handle_1hz()
  87.    
  88.     def handle_connect(self):
  89.         pass
  90.  
  91.     def handle_close(self):
  92.         logging.error('ClientConnection: connection closed')
  93.         self.close()
  94.         global clients_pool
  95.         if len(clients_pool) == 1 and len(clients_pool[self.thread_id]) == 1:
  96.             clients_pool = None
  97.             exit(1)
  98.         self.cb_onclosed()
  99.  
  100.     def handle_read(self):
  101.         buf = self.recv(1024)
  102.         self.buffer_rx += buf
  103.         self.handle_service_timeout()
  104.  
  105.     def writable(self):
  106.         self.handle_service_timeout()
  107.         return (len(self.buffer_tx) > 0)
  108.  
  109.     def handle_write(self):
  110.         sent = self.send(self.buffer_tx)
  111.         self.buffer_tx = self.buffer_tx[sent:]
  112.  
  113. # -----------------------------------------------------------------------------
  114.  
  115. def spin_once(thread_id):
  116.     if not thread_id in clients_pool:
  117.         time.sleep(ASYNCORE_LOOP_TIMEOUT)
  118.         return
  119.     try:
  120.         asyncore.loop(timeout=ASYNCORE_LOOP_TIMEOUT, use_poll=USE_POLL, map=clients_pool[thread_id], count=1)
  121.     except socket.error, ex:
  122.         logging.warning('ClientConnection: socket error: "' + str(ex) + '", ignoring...')
  123.  
  124. def spin(thread_id):
  125.     logging.info('ClientConnection: asyncore thread "%s" with %i connections'%(str(thread_id), len(clients_pool[thread_id])))
  126.     while True:
  127.         try:
  128.             asyncore.loop(timeout=ASYNCORE_LOOP_TIMEOUT, use_poll=USE_POLL, map=clients_pool[thread_id])
  129.         except socket.error, ex:
  130.             logging.warning('ClientConnection: socket error: "' + str(ex) + '", ignoring...')
  131.  
  132. def spin_all():
  133.     threads = [threading.Thread(target=spin, args=(invokearg,)) for invokearg in clients_pool.keys()]
  134.     map(lambda self: self.start(), threads)
  135.     map(lambda self: self.join(), threads)
  136.  
  137.  
  138.