View difference between Paste ID: 1E45Exsy and
SHOW:
|
|
- or go back to the newest paste.
1 | - | |
1 | + | import threading, socket, Queue, sys, pickle |
2 | import SocketServer | |
3 | ||
4 | comm = None | |
5 | uid = 0 | |
6 | ||
7 | def log(location, msg): | |
8 | sys.stderr.write("LOG::thread %s::%s::%s\n" %(threading.current_thread().name, location, msg.encode("string_escape"))) | |
9 | ||
10 | class communicator(): | |
11 | ||
12 | def __init__(self): | |
13 | self.client_q = Queue.Queue() | |
14 | self.stdin_q = {} | |
15 | self.stdin_q_lock = threading.Lock() | |
16 | ||
17 | def stdin_q_put(self, client, msg): | |
18 | log("stdin_q_put", "acquiring stdin_q lock") | |
19 | self.stdin_q_lock.acquire() | |
20 | log("stdin_q_put", "got stdin_q lock") | |
21 | if not self.stdin_q.has_key(client): | |
22 | log("stdin_q_put", "stdin queue for client %s not found, creating" % client) | |
23 | self.stdin_q[client] = Queue.Queue() | |
24 | self.stdin_q_lock.release() | |
25 | log("stdin_q_put", "released stdin_q lock") | |
26 | self.stdin_q[client].put(msg) | |
27 | log("stdin_q_put", "sent message '%s' on stdin_q %s" %(msg, client)) | |
28 | ||
29 | def stdin_q_get(self, client): | |
30 | log("stdin_q_get", "acquiring stdin_q lock") | |
31 | self.stdin_q_lock.acquire() | |
32 | log("stdin_q_get", "got stdin_q lock") | |
33 | if not self.stdin_q.has_key(client): | |
34 | log("stdin_q_get", "stdin queue for client %s not found, creating" % client) | |
35 | self.stdin_q[client] = Queue.Queue() | |
36 | self.stdin_q_lock.release() | |
37 | log("stdin_q_get", "released stdin_q lock") | |
38 | log("stdin_q_get", "waiting for message on stdin queue %s" % client) | |
39 | msg = self.stdin_q[client].get() | |
40 | log("stdin_q_get", "got message %s" % msg) | |
41 | return msg | |
42 | ||
43 | ||
44 | def send_msg(self, client, msg): | |
45 | self.client_q.put((client, msg)) | |
46 | ||
47 | def recv_msg(self, client): | |
48 | return self.stdin_q_get(client) | |
49 | ||
50 | def serve_client(self): | |
51 | log("communicator serve client", "starting serve_client") | |
52 | while True: | |
53 | client, msg = self.client_q.get() | |
54 | log("communicator serve client", "got data: cid='%s', msg='%s'" %(client, msg)) | |
55 | sys.stdout.write(pickle.dumps((client, msg)).encode("string_escape") + "\n") | |
56 | log("communicator serve client", "wrote to stdout") | |
57 | ||
58 | def serve_stdin(self): | |
59 | log("communicator serve stdin", "starting serve_stdin") | |
60 | while True: | |
61 | line = sys.stdin.readline() | |
62 | client, msg = pickle.loads(line[0:-1].decode("string_escape")) | |
63 | log("communicator serve stdin", "got data: cid='%s', msg='%s'" %(client, msg)) | |
64 | self.stdin_q_put(client, msg) | |
65 | log("communicator serve stdin", "wrote to stdin_q") | |
66 | ||
67 | ||
68 | ||
69 | ||
70 | class tcp_server(): | |
71 | ||
72 | ||
73 | def __init__(self, host, port): | |
74 | self.s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
75 | self.s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) | |
76 | self.s.bind((host, port)) | |
77 | self.s.listen(5) | |
78 | ||
79 | def send_stream(self, cid, s): | |
80 | log("send_stream", "send stream started") | |
81 | while True: | |
82 | msg = s.recv(1024) | |
83 | log("send_stream", "got message from client: '%s'" %msg) | |
84 | comm.send_msg(cid, msg) | |
85 | ||
86 | def recv_stream(self, cid, s): | |
87 | log("recv_stream", "recv stream started") | |
88 | while True: | |
89 | msg = comm.recv_msg(cid) | |
90 | log("recv_stream", "got message '%s'" % msg) | |
91 | s.sendall(msg) | |
92 | log("recv_stream", "sent message to client %s" % cid) | |
93 | ||
94 | ||
95 | ||
96 | def serve_forever(self): | |
97 | while True: | |
98 | clientsocket, address = self.s.accept() | |
99 | global uid | |
100 | cid = uid | |
101 | uid += 1 | |
102 | threading.current_thread().name = "client creation thread %s" %cid | |
103 | log("incoming connection", "got socket '%s' from address '%s'" %(clientsocket, address)) | |
104 | t1 = threading.Thread(target = self.send_stream, args = (cid, clientsocket)) | |
105 | t1.name = "tcp_send_stream %s" % cid | |
106 | t1.start() | |
107 | log("incoming connection", "send_stream thread started") | |
108 | t2 = threading.Thread(target = self.recv_stream, args = (cid, clientsocket)) | |
109 | t2.name = "tcp_recv_stream %s" % cid | |
110 | t2.start() | |
111 | log("incoming connettion", "recv_stream started") | |
112 | ||
113 | ||
114 | if __name__ == "__main__": | |
115 | ||
116 | log("init", "program launched") | |
117 | ||
118 | if len(sys.argv) < 2: | |
119 | log("ERR", "Usage: python singleconn.py port [host]") | |
120 | exit() | |
121 | ||
122 | PORT = int(sys.argv[1]) | |
123 | if len(sys.argv) > 2: | |
124 | HOST = sys.argv[2] | |
125 | else: | |
126 | HOST = "localhost" | |
127 | log("init", "PORT: %s, HOST: %s" %(PORT, HOST)) | |
128 | svr = tcp_server(HOST, PORT) | |
129 | ||
130 | log("init", "TCP server initalised") | |
131 | ||
132 | comm = communicator() | |
133 | log("init", "communicator intitalised") | |
134 | t1 = threading.Thread(target = comm.serve_client) | |
135 | t1.name = "communicator_serve_client" | |
136 | t1.start() | |
137 | log("init", "communicator serve_client started") | |
138 | t2 = threading.Thread(target = comm.serve_stdin) | |
139 | t2.name = "communicator_serve_stdin" | |
140 | t2.start() | |
141 | log("init", "about to serve TCP server") | |
142 | svr.serve_forever() |