Pastebin is 300% more awesome when you are logged in. Sign Up, it's FREE!
Guest

multiclient to stdout/in proxy

By: a guest on Apr 6th, 2012  |  syntax: Python  |  size: 5.06 KB  |  hits: 50  |  expires: Never
download  |  raw  |  embed  |  report abuse  |  print
This paste has a previous version, view the difference. Text below is selected. Please press Ctrl+C to copy to your clipboard. (⌘+C on Mac)
  1. import threading, socket, Queue, sys, pickle
  2. import SocketServer
  3.  
  4. comm = None
  5. uid = 0
  6.  
  7. def log(location, msg):
  8.     sys.stderr.write("LOG::thread %s::%s::%s\n" %(threading.current_thread().name, location, msg.encode("string_escape")))
  9.  
  10. class communicator():
  11.    
  12.     def __init__(self):
  13.         self.client_q = Queue.Queue()
  14.         self.stdin_q = {}
  15.         self.stdin_q_lock = threading.Lock()
  16.  
  17.     def stdin_q_put(self, client, msg):
  18.         log("stdin_q_put", "acquiring stdin_q lock")
  19.         self.stdin_q_lock.acquire()
  20.         log("stdin_q_put", "got stdin_q lock")
  21.         if not self.stdin_q.has_key(client):
  22.             log("stdin_q_put", "stdin queue for client %s not found, creating" % client)
  23.             self.stdin_q[client] = Queue.Queue()
  24.         self.stdin_q_lock.release()
  25.         log("stdin_q_put", "released stdin_q lock")
  26.         self.stdin_q[client].put(msg)
  27.         log("stdin_q_put", "sent message '%s' on stdin_q %s" %(msg, client))
  28.  
  29.     def stdin_q_get(self, client):
  30.         log("stdin_q_get", "acquiring stdin_q lock")
  31.         self.stdin_q_lock.acquire()
  32.         log("stdin_q_get", "got stdin_q lock")
  33.         if not self.stdin_q.has_key(client):
  34.             log("stdin_q_get", "stdin queue for client %s not found, creating" % client)
  35.             self.stdin_q[client] = Queue.Queue()
  36.         self.stdin_q_lock.release()
  37.         log("stdin_q_get", "released stdin_q lock")
  38.         log("stdin_q_get", "waiting for message on stdin queue %s" % client)
  39.         msg = self.stdin_q[client].get()
  40.         log("stdin_q_get", "got message %s" % msg)
  41.         return msg
  42.            
  43.  
  44.     def send_msg(self, client, msg):
  45.         self.client_q.put((client, msg))
  46.  
  47.     def recv_msg(self, client):
  48.         return self.stdin_q_get(client)
  49.  
  50.     def serve_client(self):
  51.         log("communicator serve client", "starting serve_client")
  52.         while True:
  53.             client, msg = self.client_q.get()
  54.             log("communicator serve client", "got data: cid='%s', msg='%s'" %(client, msg))
  55.             sys.stdout.write(pickle.dumps((client, msg)).encode("string_escape") + "\n")
  56.             log("communicator serve client", "wrote to stdout")
  57.  
  58.     def serve_stdin(self):
  59.         log("communicator serve stdin", "starting serve_stdin")
  60.         while True:
  61.             line = sys.stdin.readline()
  62.             client, msg = pickle.loads(line[0:-1].decode("string_escape"))
  63.             log("communicator serve stdin", "got data: cid='%s', msg='%s'" %(client, msg))
  64.             self.stdin_q_put(client, msg)
  65.             log("communicator serve stdin", "wrote to stdin_q")
  66.  
  67.            
  68.  
  69.  
  70. class tcp_server():
  71.  
  72.  
  73.     def __init__(self, host, port):
  74.         self.s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  75.         self.s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
  76.         self.s.bind((host, port))
  77.         self.s.listen(5)
  78.  
  79.     def send_stream(self, cid, s):
  80.         log("send_stream", "send stream started")
  81.         while True:
  82.             msg = s.recv(1024)
  83.             log("send_stream", "got message from client: '%s'" %msg)
  84.             comm.send_msg(cid, msg)
  85.  
  86.     def recv_stream(self, cid, s):
  87.         log("recv_stream", "recv stream started")
  88.         while True:
  89.             msg = comm.recv_msg(cid)
  90.             log("recv_stream", "got message '%s'" % msg)
  91.             s.sendall(msg)
  92.             log("recv_stream", "sent message to client %s" % cid)
  93.            
  94.            
  95.  
  96.     def serve_forever(self):
  97.         while True:
  98.             clientsocket, address = self.s.accept()
  99.             global uid
  100.             cid = uid
  101.             uid += 1
  102.             threading.current_thread().name = "client creation thread %s" %cid
  103.             log("incoming connection", "got socket '%s' from address '%s'" %(clientsocket, address))
  104.             t1 = threading.Thread(target = self.send_stream, args = (cid, clientsocket))
  105.             t1.name = "tcp_send_stream %s" % cid
  106.             t1.start()
  107.             log("incoming connection", "send_stream thread started")
  108.             t2 = threading.Thread(target = self.recv_stream, args = (cid, clientsocket))
  109.             t2.name = "tcp_recv_stream %s" % cid
  110.             t2.start()
  111.             log("incoming connettion", "recv_stream started")
  112.  
  113.  
  114. if __name__ == "__main__":
  115.  
  116.     log("init", "program launched")
  117.  
  118.     if len(sys.argv) < 2:
  119.         log("ERR", "Usage: python singleconn.py port [host]")
  120.         exit()
  121.  
  122.     PORT = int(sys.argv[1])
  123.     if len(sys.argv) > 2:
  124.         HOST = sys.argv[2]
  125.     else:
  126.         HOST = "localhost"
  127.     log("init", "PORT: %s, HOST: %s" %(PORT, HOST))
  128.     svr = tcp_server(HOST, PORT)
  129.  
  130.     log("init", "TCP server initalised")
  131.  
  132.     comm = communicator()
  133.     log("init", "communicator intitalised")
  134.     t1 = threading.Thread(target = comm.serve_client)
  135.     t1.name = "communicator_serve_client"
  136.     t1.start()
  137.     log("init", "communicator serve_client started")
  138.     t2 = threading.Thread(target = comm.serve_stdin)
  139.     t2.name = "communicator_serve_stdin"
  140.     t2.start()
  141.     log("init", "about to serve TCP server")
  142.     svr.serve_forever()