View difference between Paste ID: 7tDmeYXz and
SHOW: | | - or go back to the newest paste.
1-
1+
#
2
# Example completion interface for async IO
3
#
4
5
import os
6
import socket
7
import errno
8
import select
9
10
from select import POLLIN, POLLOUT, POLLHUP
11
12
POLLINOUT = POLLIN | POLLOUT
13
14
class AsyncIO:
15
    def __init__(self):
16
        self._cache = {POLLIN:{}, POLLOUT:{}}
17
        self._poller = select.poll()
18
19
    def empty(self):
20
        return not any(self._cache.values())
21
22
    def recv(self, key, sock, nbytes, flags=0):
23
        def _recv():
24
            return sock.recv(nbytes, flags)
25
        self._register(key, sock.fileno(), POLLIN, _recv)
26
27
    def send(self, key, sock, buf, flags=0):
28
        def _send():
29
            return sock.send(buf, flags)
30
        self._register(key, sock.fileno(), POLLOUT, _send)
31
32
    def accept(self, key, sock):
33
        def _accept():
34
            return sock.accept()
35
        self._register(key, sock.fileno(), POLLIN, _accept)
36
37
    def connect(self, key, sock, addr):
38
        assert sock.gettimeout() == 0
39
        try:
40
            sock.connect(addr)
41
        except OSError as e:
42
            if e.errno != errno.EINPROGRESS:
43
                raise
44
            def _connect():
45
                err = sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
46
                if err != 0:
47
                    raise OSError(err, "failed to connect to socket")
48
            return self._register(key, sock.fileno(), POLLOUT, _connect)
49
        else:
50
            return (key, True, None)
51
52
    def wait(self, timeout=None):
53
        if timeout is None:
54
            ms = -1
55
        elif timeout < 0:
56
            raise ValueError('negative timeout')
57
        else:
58
            ms = int(timeout*1000 + 0.5)
59
        results = []
60
        for fd, flags in self._poller.poll(ms):
61
            if flags & POLLIN or (flags & POLLHUP and
62
                                  fd in self._cache[POLLIN]):
63
                results.append(self._handle_fd(fd, POLLIN))
64
            if flags & POLLOUT:
65
                results.append(self._handle_fd(fd, POLLOUT))
66
            # XXX What about POLLERR, POLLNVAL or POLLPRI?
67
        return results
68
69
    def _handle_fd(self, fd, bit):
70
        queue = self._cache[bit]
71
        L = queue[fd]
72
        key, callback = L.pop(0)
73
        try:
74
            res = callback()
75
        except Exception as e:
76
            success, value = False, e
77
        else:
78
            success, value = True, res
79
        if not L:
80
            del queue[fd]
81
            otherbit = POLLINOUT & ~bit
82
            if fd in self._cache[otherbit]:
83
                self._poller.modify(fd, otherbit)
84
            else:
85
                self._poller.unregister(fd)
86
        return key, success, value
87
88
    def _register(self, key, fd, bit, callback):
89
        queue = self._cache[bit]
90
        if fd not in queue:
91
            if fd not in self._cache[POLLINOUT & ~bit]:
92
                self._poller.register(fd, bit)
93
            else:
94
                self._poller.modify(fd, POLLINOUT)
95
            queue[fd] = []
96
        queue[fd].append((key, callback))
97
98
#
99
# Dumb reactor implementation
100
#
101
102
import sys
103
import collections
104
105
class Reactor:
106
    def __init__(self):
107
        self._asyncio = AsyncIO()
108
        self._pending = collections.deque()
109
110
    def run(self):
111
        pending = self._pending
112
        while True:
113
            while pending:
114
                pending.extend(pending.popleft()._getnewpending())
115
            if self._asyncio.empty():
116
                break
117
            for future, success, value in self._asyncio.wait(1):
118
                future.set(success, value)
119
                pending.append(future)
120
121
reactor = Reactor()
122
123
class Future:
124
    __slots__ = ('success', 'value', '_subscribers')
125
126
    def __init__(self):
127
        self._subscribers = []
128
129
    def set(self, success, value):
130
        self.success = success
131
        self.value = value
132
133
    def result(self):
134
        if not hasattr(self, 'value'):
135
            raise ValueError('not ready')
136
        elif self.success:
137
            return self.value
138
        else:
139
            raise self.value
140
141
    def done(self):
142
        return hasattr(self, 'value')
143
144
    def _getnewpending(self):
145
        subscribers = self._subscribers
146
        while subscribers:
147
            task = subscribers.pop(0)
148
            try:
149
                if self.success:
150
                    new_future = task.send(self.value)
151
                else:
152
                    new_future = task.throw(self.value)
153
            except StopIteration:
154
                pass
155
            except Exception:
156
                sys.excepthook(*sys.exc_info())
157
            else:
158
                new_future._subscribers.append(task)
159
                if new_future.done():
160
                    yield new_future
161
162
def spawn(task):
163
    future = Future()
164
    def inner():
165
        try:
166
            value = yield from task
167
            future.set(True, value)
168
        except Exception as e:
169
            future.set(False, e)
170
            sys.excepthook(*sys.exc_info())
171
        finally:
172
            reactor._pending.append(future)
173
    it = inner()
174
    try:
175
        f = next(it)
176
    except StopIteration:
177
        pass
178
    else:
179
        f._subscribers.append(it)
180
        if hasattr(f, 'value'):
181
            self._pending.append(f)
182
    return future
183
184
def recv(sock, nbytes, flags=0):
185
    f = Future()
186
    reactor._asyncio.recv(f, sock, nbytes, flags)
187
    yield f
188
    return f.result()
189
190
def send(sock, buf, flags=0):
191
    f = Future()
192
    reactor._asyncio.send(f, sock, buf, flags)
193
    yield f
194
    return f.result()
195
196
def accept(sock):
197
    f = Future()
198
    reactor._asyncio.accept(f, sock)
199
    yield f
200
    return f.result()
201
202
def connect(sock, address):
203
    f = Future()
204
    reactor._asyncio.connect(f, sock, address)
205
    yield f
206
    return f.result()
207
208
def run():
209
    reactor.run()
210
211
#
212
# Example using reactor
213
#
214
215
def example():
216
    SERVER_THREADS = 40
217
    REPEAT = 100
218
    words = [b"fee", b"fie", b"foe", b"fum"] * 10
219
220
    # create a listening socket
221
    l = socket.socket()
222
    l.bind(('localhost', 0))
223
    l.listen(100)
224
    l.settimeout(0)
225
    address = l.getsockname()
226
227
    # convert messages from clients to uppercase and send back
228
    def server(i):
229
        count = 0
230
        while True:
231
            conn, addr = yield from accept(l)
232
            with conn:
233
                print("server%d: accepted connection from %r" % (i, addr))
234
                res = yield from recv(conn, 10)
235
                print("server: read %r" % res)
236
                res2 = yield from send(conn, res.upper())
237
                print("server%d: wrote %r" % (i, res2))
238
                if res == b"stop":
239
                    return count
240
                count += 1
241
242
    # ask server to convert message to upper case
243
    def client(num, msg):
244
        with socket.socket() as s:
245
            s.settimeout(0)
246
            res = yield from connect(s, address)
247
            print("client%d: connected" % num)
248
            res = yield from send(s, msg)
249
            print("client%d: wrote %r" % (num, res))
250
            res = yield from recv(s, 10)
251
            print("client%d: read %r" % (num, res))
252
253
    # start some servers
254
    servers = [spawn(server(i)) for i in range(SERVER_THREADS)]
255
256
    for i in range(REPEAT):
257
        # start a few clients
258
        clients = [spawn(client(i, w)) for i, w in enumerate(words)]
259
260
        # wait for all clients to finish
261
        yield from clients
262
263
    # tell servers to stop
264
    for _ in servers:
265
        spawn(client(999, b"stop"))
266
267
    # wait for servers to stop
268
    for i, server in enumerate(servers):
269
        count = yield server
270
        print('server%d serviced %d clients' % (i, count))
271
272
273
if __name__ == '__main__':
274
    spawn(example())
275
    run()