#
# Example completion interface for async IO
#
import os
import socket
import errno
import select
from select import POLLIN, POLLOUT, POLLHUP
POLLINOUT = POLLIN | POLLOUT
class AsyncIO:
def __init__(self):
self._cache = {POLLIN:{}, POLLOUT:{}}
self._poller = select.poll()
def empty(self):
return not any(self._cache.values())
def recv(self, key, sock, nbytes, flags=0):
def _recv():
return sock.recv(nbytes, flags)
self._register(key, sock.fileno(), POLLIN, _recv)
def send(self, key, sock, buf, flags=0):
def _send():
return sock.send(buf, flags)
self._register(key, sock.fileno(), POLLOUT, _send)
def accept(self, key, sock):
def _accept():
return sock.accept()
self._register(key, sock.fileno(), POLLIN, _accept)
def connect(self, key, sock, addr):
assert sock.gettimeout() == 0
try:
sock.connect(addr)
except OSError as e:
if e.errno != errno.EINPROGRESS:
raise
def _connect():
err = sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
if err != 0:
raise OSError(err, "failed to connect to socket")
return self._register(key, sock.fileno(), POLLOUT, _connect)
else:
return (key, True, None)
def wait(self, timeout=None):
if timeout is None:
ms = -1
elif timeout < 0:
raise ValueError('negative timeout')
else:
ms = int(timeout*1000 + 0.5)
results = []
for fd, flags in self._poller.poll(ms):
if flags & POLLIN or (flags & POLLHUP and
fd in self._cache[POLLIN]):
results.append(self._handle_fd(fd, POLLIN))
if flags & POLLOUT:
results.append(self._handle_fd(fd, POLLOUT))
# XXX What about POLLERR, POLLNVAL or POLLPRI?
return results
def _handle_fd(self, fd, bit):
queue = self._cache[bit]
L = queue[fd]
key, callback = L.pop(0)
try:
res = callback()
except Exception as e:
success, value = False, e
else:
success, value = True, res
if not L:
del queue[fd]
otherbit = POLLINOUT & ~bit
if fd in self._cache[otherbit]:
self._poller.modify(fd, otherbit)
else:
self._poller.unregister(fd)
return key, success, value
def _register(self, key, fd, bit, callback):
queue = self._cache[bit]
if fd not in queue:
if fd not in self._cache[POLLINOUT & ~bit]:
self._poller.register(fd, bit)
else:
self._poller.modify(fd, POLLINOUT)
queue[fd] = []
queue[fd].append((key, callback))
#
# Dumb reactor implementation
#
import sys
import collections
class Reactor:
def __init__(self):
self._asyncio = AsyncIO()
self._pending = collections.deque()
def run(self):
pending = self._pending
while True:
while pending:
pending.extend(pending.popleft()._getnewpending())
if self._asyncio.empty():
break
for future, success, value in self._asyncio.wait(1):
future.set(success, value)
pending.append(future)
reactor = Reactor()
class Future:
__slots__ = ('success', 'value', '_subscribers')
def __init__(self):
self._subscribers = []
def set(self, success, value):
self.success = success
self.value = value
def result(self):
if not hasattr(self, 'value'):
raise ValueError('not ready')
elif self.success:
return self.value
else:
raise self.value
def done(self):
return hasattr(self, 'value')
def _getnewpending(self):
subscribers = self._subscribers
while subscribers:
task = subscribers.pop(0)
try:
if self.success:
new_future = task.send(self.value)
else:
new_future = task.throw(self.value)
except StopIteration:
pass
except Exception:
sys.excepthook(*sys.exc_info())
else:
new_future._subscribers.append(task)
if new_future.done():
yield new_future
def spawn(task):
future = Future()
def inner():
try:
value = yield from task
future.set(True, value)
except Exception as e:
future.set(False, e)
sys.excepthook(*sys.exc_info())
finally:
reactor._pending.append(future)
it = inner()
try:
f = next(it)
except StopIteration:
pass
else:
f._subscribers.append(it)
if hasattr(f, 'value'):
self._pending.append(f)
return future
def recv(sock, nbytes, flags=0):
f = Future()
reactor._asyncio.recv(f, sock, nbytes, flags)
yield f
return f.result()
def send(sock, buf, flags=0):
f = Future()
reactor._asyncio.send(f, sock, buf, flags)
yield f
return f.result()
def accept(sock):
f = Future()
reactor._asyncio.accept(f, sock)
yield f
return f.result()
def connect(sock, address):
f = Future()
reactor._asyncio.connect(f, sock, address)
yield f
return f.result()
def run():
reactor.run()
#
# Example using reactor
#
def example():
SERVER_THREADS = 40
REPEAT = 100
words = [b"fee", b"fie", b"foe", b"fum"] * 10
# create a listening socket
l = socket.socket()
l.bind(('localhost', 0))
l.listen(100)
l.settimeout(0)
address = l.getsockname()
# convert messages from clients to uppercase and send back
def server(i):
count = 0
while True:
conn, addr = yield from accept(l)
with conn:
print("server%d: accepted connection from %r" % (i, addr))
res = yield from recv(conn, 10)
print("server: read %r" % res)
res2 = yield from send(conn, res.upper())
print("server%d: wrote %r" % (i, res2))
if res == b"stop":
return count
count += 1
# ask server to convert message to upper case
def client(num, msg):
with socket.socket() as s:
s.settimeout(0)
res = yield from connect(s, address)
print("client%d: connected" % num)
res = yield from send(s, msg)
print("client%d: wrote %r" % (num, res))
res = yield from recv(s, 10)
print("client%d: read %r" % (num, res))
# start some servers
servers = [spawn(server(i)) for i in range(SERVER_THREADS)]
for i in range(REPEAT):
# start a few clients
clients = [spawn(client(i, w)) for i, w in enumerate(words)]
# wait for all clients to finish
yield from clients
# tell servers to stop
for _ in servers:
spawn(client(999, b"stop"))
# wait for servers to stop
for i, server in enumerate(servers):
count = yield server
print('server%d serviced %d clients' % (i, count))
if __name__ == '__main__':
spawn(example())
run()