Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #! /usr/bin/env python
- # coding: utf-8
- import asyncore, socket, time, struct, logging, threading
- #USE_POLL = True
- USE_POLL = False
- ASYNCORE_LOOP_TIMEOUT = 0.1
- SERVER_ADDRESS = 'localhost'
- SERVER_PORT = 1234
- clients_pool = {}
- class ClientConnection(asyncore.dispatcher):
- def __init__(self, thread_id, cb_onread, cb_1hz, cb_onclosed):
- self.thread_id = thread_id
- asyncore.dispatcher.__init__(self)
- self.cb_onread = cb_onread
- self.cb_1hz = cb_1hz
- self.cb_onclosed = cb_onclosed
- self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
- self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 1024*1024)
- self.connect((SERVER_ADDRESS, SERVER_PORT))
- self.buffer_tx = ''
- self.buffer_rx = ''
- self.read_len = 0
- self.read_timeout = 0
- self.last_periodical_service_at = time.time()
- if not thread_id in clients_pool:
- clients_pool.update({thread_id:{}})
- clients_pool[thread_id].update({self: self})
- def destroy(self):
- self.close()
- try:
- del clients_pool[self.thread_id][self]
- except:
- pass
- def begin_read(self, len, timeout):
- self.read_len = len
- self.read_timeout = timeout
- def begin_write(self, data):
- self.buffer_tx += data
- self.handle_write()
- def _invoke_onread_safely(self, arg):
- try:
- self.cb_onread(arg)
- except Exception, ex:
- logging.error('ClientConnection: onread callback exception: ' + repr(ex))
- def _try_onread(self):
- if (self.read_len > 0) and (len(self.buffer_rx) >= self.read_len) and (self.read_len > 0):
- buf = self.buffer_rx[:self.read_len]
- self.buffer_rx = self.buffer_rx[self.read_len:]
- self.read_len = 0
- self._invoke_onread_safely(buf)
- def handle_1hz(self):
- if self.read_len > 0:
- if self.read_timeout == 0:
- if self.read_len > len(self.buffer_rx):
- self.read_len = len(self.buffer_rx)
- buf = self.buffer_rx[:self.read_len]
- self.buffer_rx = self.buffer_rx[self.read_len:]
- self.read_len = 0
- self.buffer_rx = ''
- self._invoke_onread_safely(buf)
- else:
- self.read_timeout -= 1
- try:
- self.cb_1hz()
- except Exception, ex:
- logging.error('ClientConnection: 1hz callback exception: ' + repr(ex))
- def handle_service_timeout(self):
- self._try_onread()
- now = time.time()
- if now - self.last_periodical_service_at >= 1:
- self.last_periodical_service_at = now
- self.handle_1hz()
- def handle_connect(self):
- pass
- def handle_close(self):
- logging.error('ClientConnection: connection closed')
- self.close()
- global clients_pool
- if len(clients_pool) == 1 and len(clients_pool[self.thread_id]) == 1:
- clients_pool = None
- exit(1)
- self.cb_onclosed()
- def handle_read(self):
- buf = self.recv(1024)
- self.buffer_rx += buf
- self.handle_service_timeout()
- def writable(self):
- self.handle_service_timeout()
- return (len(self.buffer_tx) > 0)
- def handle_write(self):
- sent = self.send(self.buffer_tx)
- self.buffer_tx = self.buffer_tx[sent:]
- # -----------------------------------------------------------------------------
- def spin_once(thread_id):
- if not thread_id in clients_pool:
- time.sleep(ASYNCORE_LOOP_TIMEOUT)
- return
- try:
- asyncore.loop(timeout=ASYNCORE_LOOP_TIMEOUT, use_poll=USE_POLL, map=clients_pool[thread_id], count=1)
- except socket.error, ex:
- logging.warning('ClientConnection: socket error: "' + str(ex) + '", ignoring...')
- def spin(thread_id):
- logging.info('ClientConnection: asyncore thread "%s" with %i connections'%(str(thread_id), len(clients_pool[thread_id])))
- while True:
- try:
- asyncore.loop(timeout=ASYNCORE_LOOP_TIMEOUT, use_poll=USE_POLL, map=clients_pool[thread_id])
- except socket.error, ex:
- logging.warning('ClientConnection: socket error: "' + str(ex) + '", ignoring...')
- def spin_all():
- threads = [threading.Thread(target=spin, args=(invokearg,)) for invokearg in clients_pool.keys()]
- map(lambda self: self.start(), threads)
- map(lambda self: self.join(), threads)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement