import threading, socket, Queue, sys, pickle import SocketServer comm = None uid = 0 def log(location, msg): sys.stderr.write("LOG::thread %s::%s::%s\n" %(threading.current_thread().name, location, msg.encode("string_escape"))) class communicator(): def __init__(self): self.client_q = Queue.Queue() self.stdin_q = {} self.stdin_q_lock = threading.Lock() def stdin_q_put(self, client, msg): log("stdin_q_put", "acquiring stdin_q lock") self.stdin_q_lock.acquire() log("stdin_q_put", "got stdin_q lock") if not self.stdin_q.has_key(client): log("stdin_q_put", "stdin queue for client %s not found, creating" % client) self.stdin_q[client] = Queue.Queue() self.stdin_q_lock.release() log("stdin_q_put", "released stdin_q lock") self.stdin_q[client].put(msg) log("stdin_q_put", "sent message '%s' on stdin_q %s" %(msg, client)) def stdin_q_get(self, client): log("stdin_q_get", "acquiring stdin_q lock") self.stdin_q_lock.acquire() log("stdin_q_get", "got stdin_q lock") if not self.stdin_q.has_key(client): log("stdin_q_get", "stdin queue for client %s not found, creating" % client) self.stdin_q[client] = Queue.Queue() self.stdin_q_lock.release() log("stdin_q_get", "released stdin_q lock") log("stdin_q_get", "waiting for message on stdin queue %s" % client) msg = self.stdin_q[client].get() log("stdin_q_get", "got message %s" % msg) return msg def send_msg(self, client, msg): self.client_q.put((client, msg)) def recv_msg(self, client): return self.stdin_q_get(client) def serve_client(self): log("communicator serve client", "starting serve_client") while True: client, msg = self.client_q.get() log("communicator serve client", "got data: cid='%s', msg='%s'" %(client, msg)) sys.stdout.write(pickle.dumps((client, msg)).encode("string_escape") + "\n") log("communicator serve client", "wrote to stdout") def serve_stdin(self): log("communicator serve stdin", "starting serve_stdin") while True: line = sys.stdin.readline() client, msg = pickle.loads(line[0:-1].decode("string_escape")) log("communicator serve stdin", "got data: cid='%s', msg='%s'" %(client, msg)) self.stdin_q_put(client, msg) log("communicator serve stdin", "wrote to stdin_q") class tcp_server(): def __init__(self, host, port): self.s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.s.bind((host, port)) self.s.listen(5) def send_stream(self, cid, s): log("send_stream", "send stream started") while True: msg = s.recv(1024) log("send_stream", "got message from client: '%s'" %msg) comm.send_msg(cid, msg) def recv_stream(self, cid, s): log("recv_stream", "recv stream started") while True: msg = comm.recv_msg(cid) log("recv_stream", "got message '%s'" % msg) s.sendall(msg) log("recv_stream", "sent message to client %s" % cid) def serve_forever(self): while True: clientsocket, address = self.s.accept() global uid cid = uid uid += 1 threading.current_thread().name = "client creation thread %s" %cid log("incoming connection", "got socket '%s' from address '%s'" %(clientsocket, address)) t1 = threading.Thread(target = self.send_stream, args = (cid, clientsocket)) t1.name = "tcp_send_stream %s" % cid t1.start() log("incoming connection", "send_stream thread started") t2 = threading.Thread(target = self.recv_stream, args = (cid, clientsocket)) t2.name = "tcp_recv_stream %s" % cid t2.start() log("incoming connettion", "recv_stream started") if __name__ == "__main__": log("init", "program launched") if len(sys.argv) < 2: log("ERR", "Usage: python singleconn.py port [host]") exit() PORT = int(sys.argv[1]) if len(sys.argv) > 2: HOST = sys.argv[2] else: HOST = "localhost" log("init", "PORT: %s, HOST: %s" %(PORT, HOST)) svr = tcp_server(HOST, PORT) log("init", "TCP server initalised") comm = communicator() log("init", "communicator intitalised") t1 = threading.Thread(target = comm.serve_client) t1.name = "communicator_serve_client" t1.start() log("init", "communicator serve_client started") t2 = threading.Thread(target = comm.serve_stdin) t2.name = "communicator_serve_stdin" t2.start() log("init", "about to serve TCP server") svr.serve_forever()