import threading, socket, Queue, sys, pickle
import SocketServer
comm = None
uid = 0
def log(location, msg):
sys.stderr.write("LOG::thread %s::%s::%s\n" %(threading.current_thread().name, location, msg.encode("string_escape")))
class communicator():
def __init__(self):
self.client_q = Queue.Queue()
self.stdin_q = {}
self.stdin_q_lock = threading.Lock()
def stdin_q_put(self, client, msg):
log("stdin_q_put", "acquiring stdin_q lock")
self.stdin_q_lock.acquire()
log("stdin_q_put", "got stdin_q lock")
if not self.stdin_q.has_key(client):
log("stdin_q_put", "stdin queue for client %s not found, creating" % client)
self.stdin_q[client] = Queue.Queue()
self.stdin_q_lock.release()
log("stdin_q_put", "released stdin_q lock")
self.stdin_q[client].put(msg)
log("stdin_q_put", "sent message '%s' on stdin_q %s" %(msg, client))
def stdin_q_get(self, client):
log("stdin_q_get", "acquiring stdin_q lock")
self.stdin_q_lock.acquire()
log("stdin_q_get", "got stdin_q lock")
if not self.stdin_q.has_key(client):
log("stdin_q_get", "stdin queue for client %s not found, creating" % client)
self.stdin_q[client] = Queue.Queue()
self.stdin_q_lock.release()
log("stdin_q_get", "released stdin_q lock")
log("stdin_q_get", "waiting for message on stdin queue %s" % client)
msg = self.stdin_q[client].get()
log("stdin_q_get", "got message %s" % msg)
return msg
def send_msg(self, client, msg):
self.client_q.put((client, msg))
def recv_msg(self, client):
return self.stdin_q_get(client)
def serve_client(self):
log("communicator serve client", "starting serve_client")
while True:
client, msg = self.client_q.get()
log("communicator serve client", "got data: cid='%s', msg='%s'" %(client, msg))
sys.stdout.write(pickle.dumps((client, msg)).encode("string_escape") + "\n")
log("communicator serve client", "wrote to stdout")
def serve_stdin(self):
log("communicator serve stdin", "starting serve_stdin")
while True:
line = sys.stdin.readline()
client, msg = pickle.loads(line[0:-1].decode("string_escape"))
log("communicator serve stdin", "got data: cid='%s', msg='%s'" %(client, msg))
self.stdin_q_put(client, msg)
log("communicator serve stdin", "wrote to stdin_q")
class tcp_server():
def __init__(self, host, port):
self.s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.s.bind((host, port))
self.s.listen(5)
def send_stream(self, cid, s):
log("send_stream", "send stream started")
while True:
msg = s.recv(1024)
log("send_stream", "got message from client: '%s'" %msg)
comm.send_msg(cid, msg)
def recv_stream(self, cid, s):
log("recv_stream", "recv stream started")
while True:
msg = comm.recv_msg(cid)
log("recv_stream", "got message '%s'" % msg)
s.sendall(msg)
log("recv_stream", "sent message to client %s" % cid)
def serve_forever(self):
while True:
clientsocket, address = self.s.accept()
global uid
cid = uid
uid += 1
threading.current_thread().name = "client creation thread %s" %cid
log("incoming connection", "got socket '%s' from address '%s'" %(clientsocket, address))
t1 = threading.Thread(target = self.send_stream, args = (cid, clientsocket))
t1.name = "tcp_send_stream %s" % cid
t1.start()
log("incoming connection", "send_stream thread started")
t2 = threading.Thread(target = self.recv_stream, args = (cid, clientsocket))
t2.name = "tcp_recv_stream %s" % cid
t2.start()
log("incoming connettion", "recv_stream started")
if __name__ == "__main__":
log("init", "program launched")
if len(sys.argv) < 2:
log("ERR", "Usage: python singleconn.py port [host]")
exit()
PORT = int(sys.argv[1])
if len(sys.argv) > 2:
HOST = sys.argv[2]
else:
HOST = "localhost"
log("init", "PORT: %s, HOST: %s" %(PORT, HOST))
svr = tcp_server(HOST, PORT)
log("init", "TCP server initalised")
comm = communicator()
log("init", "communicator intitalised")
t1 = threading.Thread(target = comm.serve_client)
t1.name = "communicator_serve_client"
t1.start()
log("init", "communicator serve_client started")
t2 = threading.Thread(target = comm.serve_stdin)
t2.name = "communicator_serve_stdin"
t2.start()
log("init", "about to serve TCP server")
svr.serve_forever()