#! /usr/bin/env python
# coding: utf-8
import asyncore, socket, time, struct, logging, threading
#USE_POLL = True
USE_POLL = False
ASYNCORE_LOOP_TIMEOUT = 0.1
SERVER_ADDRESS = 'localhost'
SERVER_PORT = 1234
clients_pool = {}
class ClientConnection(asyncore.dispatcher):
def __init__(self, thread_id, cb_onread, cb_1hz, cb_onclosed):
self.thread_id = thread_id
asyncore.dispatcher.__init__(self)
self.cb_onread = cb_onread
self.cb_1hz = cb_1hz
self.cb_onclosed = cb_onclosed
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 1024*1024)
self.connect((SERVER_ADDRESS, SERVER_PORT))
self.buffer_tx = ''
self.buffer_rx = ''
self.read_len = 0
self.read_timeout = 0
self.last_periodical_service_at = time.time()
if not thread_id in clients_pool:
clients_pool.update({thread_id:{}})
clients_pool[thread_id].update({self: self})
def destroy(self):
self.close()
try:
del clients_pool[self.thread_id][self]
except:
pass
def begin_read(self, len, timeout):
self.read_len = len
self.read_timeout = timeout
def begin_write(self, data):
self.buffer_tx += data
self.handle_write()
def _invoke_onread_safely(self, arg):
try:
self.cb_onread(arg)
except Exception, ex:
logging.error('ClientConnection: onread callback exception: ' + repr(ex))
def _try_onread(self):
if (self.read_len > 0) and (len(self.buffer_rx) >= self.read_len) and (self.read_len > 0):
buf = self.buffer_rx[:self.read_len]
self.buffer_rx = self.buffer_rx[self.read_len:]
self.read_len = 0
self._invoke_onread_safely(buf)
def handle_1hz(self):
if self.read_len > 0:
if self.read_timeout == 0:
if self.read_len > len(self.buffer_rx):
self.read_len = len(self.buffer_rx)
buf = self.buffer_rx[:self.read_len]
self.buffer_rx = self.buffer_rx[self.read_len:]
self.read_len = 0
self.buffer_rx = ''
self._invoke_onread_safely(buf)
else:
self.read_timeout -= 1
try:
self.cb_1hz()
except Exception, ex:
logging.error('ClientConnection: 1hz callback exception: ' + repr(ex))
def handle_service_timeout(self):
self._try_onread()
now = time.time()
if now - self.last_periodical_service_at >= 1:
self.last_periodical_service_at = now
self.handle_1hz()
def handle_connect(self):
pass
def handle_close(self):
logging.error('ClientConnection: connection closed')
self.close()
global clients_pool
if len(clients_pool) == 1 and len(clients_pool[self.thread_id]) == 1:
clients_pool = None
exit(1)
self.cb_onclosed()
def handle_read(self):
buf = self.recv(1024)
self.buffer_rx += buf
self.handle_service_timeout()
def writable(self):
self.handle_service_timeout()
return (len(self.buffer_tx) > 0)
def handle_write(self):
sent = self.send(self.buffer_tx)
self.buffer_tx = self.buffer_tx[sent:]
# -----------------------------------------------------------------------------
def spin_once(thread_id):
if not thread_id in clients_pool:
time.sleep(ASYNCORE_LOOP_TIMEOUT)
return
try:
asyncore.loop(timeout=ASYNCORE_LOOP_TIMEOUT, use_poll=USE_POLL, map=clients_pool[thread_id], count=1)
except socket.error, ex:
logging.warning('ClientConnection: socket error: "' + str(ex) + '", ignoring...')
def spin(thread_id):
logging.info('ClientConnection: asyncore thread "%s" with %i connections'%(str(thread_id), len(clients_pool[thread_id])))
while True:
try:
asyncore.loop(timeout=ASYNCORE_LOOP_TIMEOUT, use_poll=USE_POLL, map=clients_pool[thread_id])
except socket.error, ex:
logging.warning('ClientConnection: socket error: "' + str(ex) + '", ignoring...')
def spin_all():
threads = [threading.Thread(target=spin, args=(invokearg,)) for invokearg in clients_pool.keys()]
map(lambda self: self.start(), threads)
map(lambda self: self.join(), threads)