Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #
- # Example completion interface for async IO
- #
- import os
- import socket
- import errno
- import select
- from select import POLLIN, POLLOUT, POLLHUP
- POLLINOUT = POLLIN | POLLOUT
- class AsyncIO:
- def __init__(self):
- self._cache = {POLLIN:{}, POLLOUT:{}}
- self._poller = select.poll()
- def empty(self):
- return not any(self._cache.values())
- def recv(self, key, sock, nbytes, flags=0):
- def _recv():
- return sock.recv(nbytes, flags)
- self._register(key, sock.fileno(), POLLIN, _recv)
- def send(self, key, sock, buf, flags=0):
- def _send():
- return sock.send(buf, flags)
- self._register(key, sock.fileno(), POLLOUT, _send)
- def accept(self, key, sock):
- def _accept():
- return sock.accept()
- self._register(key, sock.fileno(), POLLIN, _accept)
- def connect(self, key, sock, addr):
- assert sock.gettimeout() == 0
- try:
- sock.connect(addr)
- except OSError as e:
- if e.errno != errno.EINPROGRESS:
- raise
- def _connect():
- err = sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
- if err != 0:
- raise OSError(err, "failed to connect to socket")
- return self._register(key, sock.fileno(), POLLOUT, _connect)
- else:
- return (key, True, None)
- def wait(self, timeout=None):
- if timeout is None:
- ms = -1
- elif timeout < 0:
- raise ValueError('negative timeout')
- else:
- ms = int(timeout*1000 + 0.5)
- results = []
- for fd, flags in self._poller.poll(ms):
- if flags & POLLIN or (flags & POLLHUP and
- fd in self._cache[POLLIN]):
- results.append(self._handle_fd(fd, POLLIN))
- if flags & POLLOUT:
- results.append(self._handle_fd(fd, POLLOUT))
- # XXX What about POLLERR, POLLNVAL or POLLPRI?
- return results
- def _handle_fd(self, fd, bit):
- queue = self._cache[bit]
- L = queue[fd]
- key, callback = L.pop(0)
- try:
- res = callback()
- except Exception as e:
- success, value = False, e
- else:
- success, value = True, res
- if not L:
- del queue[fd]
- otherbit = POLLINOUT & ~bit
- if fd in self._cache[otherbit]:
- self._poller.modify(fd, otherbit)
- else:
- self._poller.unregister(fd)
- return key, success, value
- def _register(self, key, fd, bit, callback):
- queue = self._cache[bit]
- if fd not in queue:
- if fd not in self._cache[POLLINOUT & ~bit]:
- self._poller.register(fd, bit)
- else:
- self._poller.modify(fd, POLLINOUT)
- queue[fd] = []
- queue[fd].append((key, callback))
- #
- # Dumb reactor implementation
- #
- import sys
- import collections
- class Reactor:
- def __init__(self):
- self._asyncio = AsyncIO()
- self._pending = collections.deque()
- def run(self):
- pending = self._pending
- while True:
- while pending:
- pending.extend(pending.popleft()._getnewpending())
- if self._asyncio.empty():
- break
- for future, success, value in self._asyncio.wait(1):
- future.set(success, value)
- pending.append(future)
- reactor = Reactor()
- class Future:
- __slots__ = ('success', 'value', '_subscribers')
- def __init__(self):
- self._subscribers = []
- def set(self, success, value):
- self.success = success
- self.value = value
- def result(self):
- if not hasattr(self, 'value'):
- raise ValueError('not ready')
- elif self.success:
- return self.value
- else:
- raise self.value
- def done(self):
- return hasattr(self, 'value')
- def _getnewpending(self):
- subscribers = self._subscribers
- while subscribers:
- task = subscribers.pop(0)
- try:
- if self.success:
- new_future = task.send(self.value)
- else:
- new_future = task.throw(self.value)
- except StopIteration:
- pass
- except Exception:
- sys.excepthook(*sys.exc_info())
- else:
- new_future._subscribers.append(task)
- if new_future.done():
- yield new_future
- def spawn(task):
- future = Future()
- def inner():
- try:
- value = yield from task
- future.set(True, value)
- except Exception as e:
- future.set(False, e)
- sys.excepthook(*sys.exc_info())
- finally:
- reactor._pending.append(future)
- it = inner()
- try:
- f = next(it)
- except StopIteration:
- pass
- else:
- f._subscribers.append(it)
- if hasattr(f, 'value'):
- self._pending.append(f)
- return future
- def recv(sock, nbytes, flags=0):
- f = Future()
- reactor._asyncio.recv(f, sock, nbytes, flags)
- yield f
- return f.result()
- def send(sock, buf, flags=0):
- f = Future()
- reactor._asyncio.send(f, sock, buf, flags)
- yield f
- return f.result()
- def accept(sock):
- f = Future()
- reactor._asyncio.accept(f, sock)
- yield f
- return f.result()
- def connect(sock, address):
- f = Future()
- reactor._asyncio.connect(f, sock, address)
- yield f
- return f.result()
- def run():
- reactor.run()
- #
- # Example using reactor
- #
- def example():
- SERVER_THREADS = 40
- REPEAT = 100
- words = [b"fee", b"fie", b"foe", b"fum"] * 10
- # create a listening socket
- l = socket.socket()
- l.bind(('localhost', 0))
- l.listen(100)
- l.settimeout(0)
- address = l.getsockname()
- # convert messages from clients to uppercase and send back
- def server(i):
- count = 0
- while True:
- conn, addr = yield from accept(l)
- with conn:
- print("server%d: accepted connection from %r" % (i, addr))
- res = yield from recv(conn, 10)
- print("server: read %r" % res)
- res2 = yield from send(conn, res.upper())
- print("server%d: wrote %r" % (i, res2))
- if res == b"stop":
- return count
- count += 1
- # ask server to convert message to upper case
- def client(num, msg):
- with socket.socket() as s:
- s.settimeout(0)
- res = yield from connect(s, address)
- print("client%d: connected" % num)
- res = yield from send(s, msg)
- print("client%d: wrote %r" % (num, res))
- res = yield from recv(s, 10)
- print("client%d: read %r" % (num, res))
- # start some servers
- servers = [spawn(server(i)) for i in range(SERVER_THREADS)]
- for i in range(REPEAT):
- # start a few clients
- clients = [spawn(client(i, w)) for i, w in enumerate(words)]
- # wait for all clients to finish
- yield from clients
- # tell servers to stop
- for _ in servers:
- spawn(client(999, b"stop"))
- # wait for servers to stop
- for i, server in enumerate(servers):
- count = yield server
- print('server%d serviced %d clients' % (i, count))
- if __name__ == '__main__':
- spawn(example())
- run()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement