Advertisement
Guest User

Untitled

a guest
Oct 20th, 2012
134
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 7.63 KB | None | 0 0
  1. #
  2. # Example completion interface for async IO
  3. #
  4.  
  5. import os
  6. import socket
  7. import errno
  8. import select
  9.  
  10. from select import POLLIN, POLLOUT, POLLHUP
  11.  
  12. POLLINOUT = POLLIN | POLLOUT
  13.  
  14. class AsyncIO:
  15.     def __init__(self):
  16.         self._cache = {POLLIN:{}, POLLOUT:{}}
  17.         self._poller = select.poll()
  18.  
  19.     def empty(self):
  20.         return not any(self._cache.values())
  21.  
  22.     def recv(self, key, sock, nbytes, flags=0):
  23.         def _recv():
  24.             return sock.recv(nbytes, flags)
  25.         self._register(key, sock.fileno(), POLLIN, _recv)
  26.  
  27.     def send(self, key, sock, buf, flags=0):
  28.         def _send():
  29.             return sock.send(buf, flags)
  30.         self._register(key, sock.fileno(), POLLOUT, _send)
  31.  
  32.     def accept(self, key, sock):
  33.         def _accept():
  34.             return sock.accept()
  35.         self._register(key, sock.fileno(), POLLIN, _accept)
  36.  
  37.     def connect(self, key, sock, addr):
  38.         assert sock.gettimeout() == 0
  39.         try:
  40.             sock.connect(addr)
  41.         except OSError as e:
  42.             if e.errno != errno.EINPROGRESS:
  43.                 raise
  44.             def _connect():
  45.                 err = sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
  46.                 if err != 0:
  47.                     raise OSError(err, "failed to connect to socket")
  48.             return self._register(key, sock.fileno(), POLLOUT, _connect)
  49.         else:
  50.             return (key, True, None)
  51.  
  52.     def wait(self, timeout=None):
  53.         if timeout is None:
  54.             ms = -1
  55.         elif timeout < 0:
  56.             raise ValueError('negative timeout')
  57.         else:
  58.             ms = int(timeout*1000 + 0.5)
  59.         results = []
  60.         for fd, flags in self._poller.poll(ms):
  61.             if flags & POLLIN or (flags & POLLHUP and
  62.                                   fd in self._cache[POLLIN]):
  63.                 results.append(self._handle_fd(fd, POLLIN))
  64.             if flags & POLLOUT:
  65.                 results.append(self._handle_fd(fd, POLLOUT))
  66.             # XXX What about POLLERR, POLLNVAL or POLLPRI?
  67.         return results
  68.  
  69.     def _handle_fd(self, fd, bit):
  70.         queue = self._cache[bit]
  71.         L = queue[fd]
  72.         key, callback = L.pop(0)
  73.         try:
  74.             res = callback()
  75.         except Exception as e:
  76.             success, value = False, e
  77.         else:
  78.             success, value = True, res
  79.         if not L:
  80.             del queue[fd]
  81.             otherbit = POLLINOUT & ~bit
  82.             if fd in self._cache[otherbit]:
  83.                 self._poller.modify(fd, otherbit)
  84.             else:
  85.                 self._poller.unregister(fd)
  86.         return key, success, value
  87.  
  88.     def _register(self, key, fd, bit, callback):
  89.         queue = self._cache[bit]
  90.         if fd not in queue:
  91.             if fd not in self._cache[POLLINOUT & ~bit]:
  92.                 self._poller.register(fd, bit)
  93.             else:
  94.                 self._poller.modify(fd, POLLINOUT)
  95.             queue[fd] = []
  96.         queue[fd].append((key, callback))
  97.  
  98. #
  99. # Dumb reactor implementation
  100. #
  101.  
  102. import sys
  103. import collections
  104.  
  105. class Reactor:
  106.     def __init__(self):
  107.         self._asyncio = AsyncIO()
  108.         self._pending = collections.deque()
  109.  
  110.     def run(self):
  111.         pending = self._pending
  112.         while True:
  113.             while pending:
  114.                 pending.extend(pending.popleft()._getnewpending())
  115.             if self._asyncio.empty():
  116.                 break
  117.             for future, success, value in self._asyncio.wait(1):
  118.                 future.set(success, value)
  119.                 pending.append(future)
  120.  
  121. reactor = Reactor()
  122.  
  123. class Future:
  124.     __slots__ = ('success', 'value', '_subscribers')
  125.  
  126.     def __init__(self):
  127.         self._subscribers = []
  128.  
  129.     def set(self, success, value):
  130.         self.success = success
  131.         self.value = value
  132.  
  133.     def result(self):
  134.         if not hasattr(self, 'value'):
  135.             raise ValueError('not ready')
  136.         elif self.success:
  137.             return self.value
  138.         else:
  139.             raise self.value
  140.  
  141.     def done(self):
  142.         return hasattr(self, 'value')
  143.  
  144.     def _getnewpending(self):
  145.         subscribers = self._subscribers
  146.         while subscribers:
  147.             task = subscribers.pop(0)
  148.             try:
  149.                 if self.success:
  150.                     new_future = task.send(self.value)
  151.                 else:
  152.                     new_future = task.throw(self.value)
  153.             except StopIteration:
  154.                 pass
  155.             except Exception:
  156.                 sys.excepthook(*sys.exc_info())
  157.             else:
  158.                 new_future._subscribers.append(task)
  159.                 if new_future.done():
  160.                     yield new_future
  161.  
  162. def spawn(task):
  163.     future = Future()
  164.     def inner():
  165.         try:
  166.             value = yield from task
  167.             future.set(True, value)
  168.         except Exception as e:
  169.             future.set(False, e)
  170.             sys.excepthook(*sys.exc_info())
  171.         finally:
  172.             reactor._pending.append(future)
  173.     it = inner()
  174.     try:
  175.         f = next(it)
  176.     except StopIteration:
  177.         pass
  178.     else:
  179.         f._subscribers.append(it)
  180.         if hasattr(f, 'value'):
  181.             self._pending.append(f)
  182.     return future
  183.  
  184. def recv(sock, nbytes, flags=0):
  185.     f = Future()
  186.     reactor._asyncio.recv(f, sock, nbytes, flags)
  187.     yield f
  188.     return f.result()
  189.  
  190. def send(sock, buf, flags=0):
  191.     f = Future()
  192.     reactor._asyncio.send(f, sock, buf, flags)
  193.     yield f
  194.     return f.result()
  195.  
  196. def accept(sock):
  197.     f = Future()
  198.     reactor._asyncio.accept(f, sock)
  199.     yield f
  200.     return f.result()
  201.  
  202. def connect(sock, address):
  203.     f = Future()
  204.     reactor._asyncio.connect(f, sock, address)
  205.     yield f
  206.     return f.result()
  207.  
  208. def run():
  209.     reactor.run()
  210.  
  211. #
  212. # Example using reactor
  213. #
  214.  
  215. def example():
  216.     SERVER_THREADS = 40
  217.     REPEAT = 100
  218.     words = [b"fee", b"fie", b"foe", b"fum"] * 10
  219.  
  220.     # create a listening socket
  221.     l = socket.socket()
  222.     l.bind(('localhost', 0))
  223.     l.listen(100)
  224.     l.settimeout(0)
  225.     address = l.getsockname()
  226.  
  227.     # convert messages from clients to uppercase and send back
  228.     def server(i):
  229.         count = 0
  230.         while True:
  231.             conn, addr = yield from accept(l)
  232.             with conn:
  233.                 print("server%d: accepted connection from %r" % (i, addr))
  234.                 res = yield from recv(conn, 10)
  235.                 print("server: read %r" % res)
  236.                 res2 = yield from send(conn, res.upper())
  237.                 print("server%d: wrote %r" % (i, res2))
  238.                 if res == b"stop":
  239.                     return count
  240.                 count += 1
  241.  
  242.     # ask server to convert message to upper case
  243.     def client(num, msg):
  244.         with socket.socket() as s:
  245.             s.settimeout(0)
  246.             res = yield from connect(s, address)
  247.             print("client%d: connected" % num)
  248.             res = yield from send(s, msg)
  249.             print("client%d: wrote %r" % (num, res))
  250.             res = yield from recv(s, 10)
  251.             print("client%d: read %r" % (num, res))
  252.  
  253.     # start some servers
  254.     servers = [spawn(server(i)) for i in range(SERVER_THREADS)]
  255.  
  256.     for i in range(REPEAT):
  257.         # start a few clients
  258.         clients = [spawn(client(i, w)) for i, w in enumerate(words)]
  259.  
  260.         # wait for all clients to finish
  261.         yield from clients
  262.  
  263.     # tell servers to stop
  264.     for _ in servers:
  265.         spawn(client(999, b"stop"))
  266.  
  267.     # wait for servers to stop
  268.     for i, server in enumerate(servers):
  269.         count = yield server
  270.         print('server%d serviced %d clients' % (i, count))
  271.  
  272.  
  273. if __name__ == '__main__':
  274.     spawn(example())
  275.     run()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement