# # Example completion interface for async IO # import os import socket import errno import select from select import POLLIN, POLLOUT, POLLHUP POLLINOUT = POLLIN | POLLOUT class AsyncIO: def __init__(self): self._cache = {POLLIN:{}, POLLOUT:{}} self._poller = select.poll() def empty(self): return not any(self._cache.values()) def recv(self, key, sock, nbytes, flags=0): def _recv(): return sock.recv(nbytes, flags) self._register(key, sock.fileno(), POLLIN, _recv) def send(self, key, sock, buf, flags=0): def _send(): return sock.send(buf, flags) self._register(key, sock.fileno(), POLLOUT, _send) def accept(self, key, sock): def _accept(): return sock.accept() self._register(key, sock.fileno(), POLLIN, _accept) def connect(self, key, sock, addr): assert sock.gettimeout() == 0 try: sock.connect(addr) except OSError as e: if e.errno != errno.EINPROGRESS: raise def _connect(): err = sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR) if err != 0: raise OSError(err, "failed to connect to socket") return self._register(key, sock.fileno(), POLLOUT, _connect) else: return (key, True, None) def wait(self, timeout=None): if timeout is None: ms = -1 elif timeout < 0: raise ValueError('negative timeout') else: ms = int(timeout*1000 + 0.5) results = [] for fd, flags in self._poller.poll(ms): if flags & POLLIN or (flags & POLLHUP and fd in self._cache[POLLIN]): results.append(self._handle_fd(fd, POLLIN)) if flags & POLLOUT: results.append(self._handle_fd(fd, POLLOUT)) # XXX What about POLLERR, POLLNVAL or POLLPRI? return results def _handle_fd(self, fd, bit): queue = self._cache[bit] L = queue[fd] key, callback = L.pop(0) try: res = callback() except Exception as e: success, value = False, e else: success, value = True, res if not L: del queue[fd] otherbit = POLLINOUT & ~bit if fd in self._cache[otherbit]: self._poller.modify(fd, otherbit) else: self._poller.unregister(fd) return key, success, value def _register(self, key, fd, bit, callback): queue = self._cache[bit] if fd not in queue: if fd not in self._cache[POLLINOUT & ~bit]: self._poller.register(fd, bit) else: self._poller.modify(fd, POLLINOUT) queue[fd] = [] queue[fd].append((key, callback)) # # Dumb reactor implementation # import sys import collections class Reactor: def __init__(self): self._asyncio = AsyncIO() self._pending = collections.deque() def run(self): pending = self._pending while True: while pending: pending.extend(pending.popleft()._getnewpending()) if self._asyncio.empty(): break for future, success, value in self._asyncio.wait(1): future.set(success, value) pending.append(future) reactor = Reactor() class Future: __slots__ = ('success', 'value', '_subscribers') def __init__(self): self._subscribers = [] def set(self, success, value): self.success = success self.value = value def result(self): if not hasattr(self, 'value'): raise ValueError('not ready') elif self.success: return self.value else: raise self.value def done(self): return hasattr(self, 'value') def _getnewpending(self): subscribers = self._subscribers while subscribers: task = subscribers.pop(0) try: if self.success: new_future = task.send(self.value) else: new_future = task.throw(self.value) except StopIteration: pass except Exception: sys.excepthook(*sys.exc_info()) else: new_future._subscribers.append(task) if new_future.done(): yield new_future def spawn(task): future = Future() def inner(): try: value = yield from task future.set(True, value) except Exception as e: future.set(False, e) sys.excepthook(*sys.exc_info()) finally: reactor._pending.append(future) it = inner() try: f = next(it) except StopIteration: pass else: f._subscribers.append(it) if hasattr(f, 'value'): self._pending.append(f) return future def recv(sock, nbytes, flags=0): f = Future() reactor._asyncio.recv(f, sock, nbytes, flags) yield f return f.result() def send(sock, buf, flags=0): f = Future() reactor._asyncio.send(f, sock, buf, flags) yield f return f.result() def accept(sock): f = Future() reactor._asyncio.accept(f, sock) yield f return f.result() def connect(sock, address): f = Future() reactor._asyncio.connect(f, sock, address) yield f return f.result() def run(): reactor.run() # # Example using reactor # def example(): SERVER_THREADS = 40 REPEAT = 100 words = [b"fee", b"fie", b"foe", b"fum"] * 10 # create a listening socket l = socket.socket() l.bind(('localhost', 0)) l.listen(100) l.settimeout(0) address = l.getsockname() # convert messages from clients to uppercase and send back def server(i): count = 0 while True: conn, addr = yield from accept(l) with conn: print("server%d: accepted connection from %r" % (i, addr)) res = yield from recv(conn, 10) print("server: read %r" % res) res2 = yield from send(conn, res.upper()) print("server%d: wrote %r" % (i, res2)) if res == b"stop": return count count += 1 # ask server to convert message to upper case def client(num, msg): with socket.socket() as s: s.settimeout(0) res = yield from connect(s, address) print("client%d: connected" % num) res = yield from send(s, msg) print("client%d: wrote %r" % (num, res)) res = yield from recv(s, 10) print("client%d: read %r" % (num, res)) # start some servers servers = [spawn(server(i)) for i in range(SERVER_THREADS)] for i in range(REPEAT): # start a few clients clients = [spawn(client(i, w)) for i, w in enumerate(words)] # wait for all clients to finish yield from clients # tell servers to stop for _ in servers: spawn(client(999, b"stop")) # wait for servers to stop for i, server in enumerate(servers): count = yield server print('server%d serviced %d clients' % (i, count)) if __name__ == '__main__': spawn(example()) run()