Advertisement
Guest User

Untitled

a guest
Apr 4th, 2020
1,143
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 7.86 KB | None | 0 0
  1. import collections
  2. import errno
  3. import logging
  4. import re
  5. import select
  6. import socket
  7. import time
  8.  
  9.  
  10. class Scheduler:
  11.  
  12.     def __init__(self):
  13.         self.runnable = collections.deque()
  14.         self.current = None
  15.         self.readers = {}
  16.         self.writers = {}
  17.  
  18.     def run(self, task):
  19.         self.runnable.append(task)
  20.  
  21.     def loop(self):
  22.         while self.runnable or self.readers or self.writers:
  23.             self.loop1()
  24.  
  25.     def loop1(self):
  26. ##         print('loop1')
  27.         while self.runnable:
  28.             self.current = self.runnable.popleft()
  29.             try:
  30.                 next(self.current)
  31.             except StopIteration:
  32.                 self.current = None
  33.             except Exception:
  34.                 self.current = None
  35.                 logging.exception('Exception in task')
  36.             else:
  37.                 if self.current is not None:
  38.                     self.runnable.append(self.current)
  39.                     self.current = None
  40.         if self.readers or self.writers:
  41.             # TODO: Schedule timed calls as well.
  42.             # TODO: Use poll() or better.
  43.             t0 = time.time()
  44.             ready_r, ready_w, _ = select.select(self.readers, self.writers, [])
  45.             t1 = time.time()
  46. ##             print('select({}, {}) took {:.3f} secs to return {}, {}'
  47. ##                   .format(list(self.readers), list(self.writers),
  48. ##                           t1 - t0, ready_r, ready_w))
  49.             for fd in ready_r:
  50.                 self.unblock(self.readers.pop(fd))
  51.             for fd in ready_w:
  52.                 self.unblock(self.writers.pop(fd))
  53.  
  54.     def unblock(self, task):
  55.         assert task
  56.         self.runnable.append(task)
  57.  
  58.     def block(self, queue, fd):
  59.         assert isinstance(fd, int)
  60.         assert fd not in queue
  61.         assert self.current is not None
  62.         queue[fd] = self.current
  63.         self.current = None
  64.  
  65.     def block_r(self, fd):
  66.         self.block(self.readers, fd)
  67.  
  68.     def block_w(self, fd):
  69.         self.block(self.writers, fd)
  70.  
  71.  
  72. sched = Scheduler()
  73.  
  74.  
  75. class RawReader:
  76.  
  77.     def __init__(self, sock):
  78.         self.sock = sock
  79.  
  80.     def read(self, n):
  81.         """Read up to n bytes, blocking at most once."""
  82.         assert n >= 0, n
  83.         sched.block_r(self.sock.fileno())
  84.         yield
  85.         return self.sock.recv(n)
  86.  
  87.  
  88. class BufferedReader:
  89.  
  90.     def __init__(self, raw, limit=8192):
  91.         self.raw = raw
  92.         self.limit = limit
  93.         self.buffer = b''
  94.         self.eof = False
  95.  
  96.     def read(self, n):
  97.         """Read up to n bytes, blocking at most once."""
  98.         assert n >= 0, n
  99.         if not self.buffer and not self.eof:
  100.             yield from self.fillbuffer(max(n, self.limit))
  101.         return self.getfrombuffer(n)
  102.  
  103.     def readexactly(self, n):
  104.         """Read exactly n bytes, or until EOF."""
  105.         blocks = []
  106.         count = 0
  107.         while n > count:
  108.             block = yield from self.read(n - count)
  109.             blocks.append(block)
  110.             count += len(block)
  111.         return b''.join(blocks)
  112.  
  113.     def readline(self):
  114.         """Read up to newline or limit, whichever comes first."""
  115.         end = self.buffer.find(b'\n') + 1  # Point past newline, or 0.
  116.         while not end and not self.eof and len(self.buffer) < self.limit:
  117.             anchor = len(self.buffer)
  118.             yield from self.fillbuffer(self.limit)
  119.             end = self.buffer.find(b'\n', anchor) + 1
  120.         if not end:
  121.             end = len(self.buffer)
  122.         if end > self.limit:
  123.             end = self.limit
  124.         return self.getfrombuffer(end)
  125.  
  126.     def getfrombuffer(self, n):
  127.         """Read up to n bytes without blocking."""
  128.         if n >= len(self.buffer):
  129.             result, self.buffer = self.buffer, b''
  130.         else:
  131.             result, self.buffer = self.buffer[:n], self.buffer[n:]
  132.         return result
  133.  
  134.     def fillbuffer(self, n):
  135.         """Fill buffer with one (up to) n bytes from raw reader."""
  136.         assert not self.eof, 'fillbuffer called at eof'
  137.         data = yield from self.raw.read(n)
  138. ##        print('fillbuffer:', repr(data)[:100])
  139.         if data:
  140.             self.buffer += data
  141.         else:
  142.             self.eof = True
  143.  
  144.  
  145. def send(sock, data):
  146. ##     print('send:', repr(data))
  147.     while data:
  148.         sched.block_w(sock.fileno())
  149.         yield
  150.         n = sock.send(data)
  151.         assert 0 <= n <= len(data), (n, len(data))
  152.         if n == len(data):
  153.             break
  154.         data = data[n:]
  155.  
  156.  
  157. def newsocket():
  158.     sock = socket.socket()
  159.     sock.setblocking(False)
  160.     return sock
  161.  
  162.  
  163. def connect(sock, address):
  164. ##     print('connect:', address)
  165.     err = sock.connect_ex(address)
  166.     assert err == errno.EINPROGRESS, err
  167.     sched.block_w(sock.fileno())
  168.     yield
  169.     err = sock.connect_ex(address)
  170.     if err == errno.ECONNREFUSED:
  171.         raise IOError('Connection refused')
  172.     if err != errno.EISCONN:
  173.         raise IOError('Connect error %d: %s' % (err, errno.errorcode.get(err)))
  174.  
  175.  
  176. def urlfetch(host, port=80, method='GET', path='/',
  177.              body=None, hdrs=None, encoding='utf-8'):
  178.     t0 = time.time()
  179.     # Must pass in an IP address.  Later we'll call getaddrinfo()
  180.     # using a thread pool.  We'll also support IPv6.
  181.     assert re.match(r'(\d+)(\.\d+)(\.\d+)(\.\d+)\Z', host), repr(host)
  182.     sock = newsocket()
  183.     yield from connect(sock, (host, port))
  184.     yield from send(sock,
  185.                     method.encode(encoding) + b' ' +
  186.                     path.encode(encoding) + b' HTTP/1.0\r\n')
  187.     if hdrs:
  188.         kwds = dict(hdrs)
  189.     else:
  190.         kwds = {}
  191.     if body is not None:
  192.         kwds['content_length'] = len(body)
  193.     for header, value in kwds.items():
  194.         yield from send(sock,
  195.                         header.replace('_', '-').encode(encoding) + b': ' +
  196.                         value.encode(encoding) + b'\r\n')
  197.  
  198.     yield from send(sock, b'\r\n')
  199.     if body is not None:
  200.         yield from send(sock, body)
  201.     ##sock.shutdown(1)  # Close the writing end of the socket.
  202.  
  203.     # Read HTTP response line.
  204.     raw = RawReader(sock)
  205.     buf = BufferedReader(raw)
  206.     resp = yield from buf.readline()
  207. ##     print('resp =', repr(resp))
  208.     m = re.match(br'(?ix) http/(\d\.\d) \s+ (\d\d\d) \s+ ([^\r]*)\r?\n\Z', resp)
  209.     if not m:
  210.         sock.close()
  211.         raise IOError('No valid HTTP response: %r' % response)
  212.     http_version, status, message = m.groups()
  213.  
  214.     # Read HTTP headers.
  215.     headers = []
  216.     hdict = {}
  217.     while True:
  218.         line = yield from buf.readline()
  219.         if not line.strip():
  220.             break
  221.         m = re.match(br'([^\s:]+):\s*([^\r]*)\r?\n\Z', line)
  222.         if not m:
  223.             raise IOError('Invalid header: %r' % line)
  224.         header, value = m.groups()
  225.         headers.append((header, value))
  226.         hdict[header.decode(encoding).lower()] = value.decode(encoding)
  227.  
  228.     # Read response body.
  229.     content_length = hdict.get('content-length')
  230.     if content_length is not None:
  231.         size = int(content_length)  # TODO: Catch errors.
  232.         assert size >= 0, size
  233.     else:
  234.         size = 2**20  # Protective limit (1 MB).
  235.     data = yield from buf.readexactly(size)
  236.     sock.close()  # Can this block?
  237.     t1 = time.time()
  238. ##     print(http_version, status, message, headers, hdict, len(data))
  239.     print(host, port, path, status, len(data), '{:.3}'.format(t1-t0))
  240.  
  241.  
  242. def doit():
  243.     gen1 = urlfetch('127.0.0.1', 8080, path='/', hdrs={'host': 'localhost'})
  244.     gen2 = urlfetch('82.94.164.162', 80, path='/', hdrs={'host': 'python.org'})
  245.     sched.run(gen1)
  246.     sched.run(gen2)
  247.     for x in '123':
  248.         for y in '0123456789':
  249.             g = urlfetch('82.94.164.162', 80,
  250.                          path='/{}.{}'.format(x, y),
  251.                          hdrs={'host': 'python.org'})
  252.             sched.run(g)
  253.     sched.loop()
  254.  
  255.  
  256. def main():
  257.     doit()
  258.  
  259.  
  260. if __name__ == '__main__':
  261.     main()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement