View difference between Paste ID: 1E45Exsy and
SHOW: | | - or go back to the newest paste.
1-
1+
import threading, socket, Queue, sys, pickle
2
import SocketServer
3
4
comm = None
5
uid = 0
6
7
def log(location, msg):
8
    sys.stderr.write("LOG::thread %s::%s::%s\n" %(threading.current_thread().name, location, msg.encode("string_escape")))
9
10
class communicator():
11
    
12
    def __init__(self):
13
        self.client_q = Queue.Queue()
14
        self.stdin_q = {} 
15
        self.stdin_q_lock = threading.Lock()
16
17
    def stdin_q_put(self, client, msg):
18
        log("stdin_q_put", "acquiring stdin_q lock")
19
        self.stdin_q_lock.acquire()
20
        log("stdin_q_put", "got stdin_q lock")
21
        if not self.stdin_q.has_key(client):
22
            log("stdin_q_put", "stdin queue for client %s not found, creating" % client)
23
            self.stdin_q[client] = Queue.Queue()
24
        self.stdin_q_lock.release()
25
        log("stdin_q_put", "released stdin_q lock")
26
        self.stdin_q[client].put(msg)
27
        log("stdin_q_put", "sent message '%s' on stdin_q %s" %(msg, client))
28
29
    def stdin_q_get(self, client):
30
        log("stdin_q_get", "acquiring stdin_q lock")
31
        self.stdin_q_lock.acquire()
32
        log("stdin_q_get", "got stdin_q lock")
33
        if not self.stdin_q.has_key(client):
34
            log("stdin_q_get", "stdin queue for client %s not found, creating" % client)
35
            self.stdin_q[client] = Queue.Queue()
36
        self.stdin_q_lock.release()
37
        log("stdin_q_get", "released stdin_q lock")
38
        log("stdin_q_get", "waiting for message on stdin queue %s" % client)
39
        msg = self.stdin_q[client].get()
40
        log("stdin_q_get", "got message %s" % msg)
41
        return msg
42
            
43
44
    def send_msg(self, client, msg):
45
        self.client_q.put((client, msg))
46
47
    def recv_msg(self, client):
48
        return self.stdin_q_get(client)
49
50
    def serve_client(self):
51
        log("communicator serve client", "starting serve_client")
52
        while True:
53
            client, msg = self.client_q.get()
54
            log("communicator serve client", "got data: cid='%s', msg='%s'" %(client, msg))
55
            sys.stdout.write(pickle.dumps((client, msg)).encode("string_escape") + "\n")
56
            log("communicator serve client", "wrote to stdout")
57
58
    def serve_stdin(self):
59
        log("communicator serve stdin", "starting serve_stdin")
60
        while True:
61
            line = sys.stdin.readline()
62
            client, msg = pickle.loads(line[0:-1].decode("string_escape"))
63
            log("communicator serve stdin", "got data: cid='%s', msg='%s'" %(client, msg))
64
            self.stdin_q_put(client, msg)
65
            log("communicator serve stdin", "wrote to stdin_q")
66
67
            
68
69
70
class tcp_server():
71
72
73
    def __init__(self, host, port):
74
        self.s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
75
        self.s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
76
        self.s.bind((host, port))
77
        self.s.listen(5)
78
79
    def send_stream(self, cid, s):
80
        log("send_stream", "send stream started")
81
        while True:
82
            msg = s.recv(1024)
83
            log("send_stream", "got message from client: '%s'" %msg)
84
            comm.send_msg(cid, msg)
85
86
    def recv_stream(self, cid, s):
87
        log("recv_stream", "recv stream started")
88
        while True:
89
            msg = comm.recv_msg(cid)
90
            log("recv_stream", "got message '%s'" % msg)
91
            s.sendall(msg)
92
            log("recv_stream", "sent message to client %s" % cid)
93
            
94
            
95
96
    def serve_forever(self):
97
        while True:
98
            clientsocket, address = self.s.accept()
99
            global uid
100
            cid = uid
101
            uid += 1
102
            threading.current_thread().name = "client creation thread %s" %cid
103
            log("incoming connection", "got socket '%s' from address '%s'" %(clientsocket, address))
104
            t1 = threading.Thread(target = self.send_stream, args = (cid, clientsocket))
105
            t1.name = "tcp_send_stream %s" % cid
106
            t1.start()
107
            log("incoming connection", "send_stream thread started")
108
            t2 = threading.Thread(target = self.recv_stream, args = (cid, clientsocket))
109
            t2.name = "tcp_recv_stream %s" % cid
110
            t2.start()
111
            log("incoming connettion", "recv_stream started")
112
113
114
if __name__ == "__main__":
115
116
    log("init", "program launched")
117
118
    if len(sys.argv) < 2:
119
        log("ERR", "Usage: python singleconn.py port [host]")
120
        exit()
121
122
    PORT = int(sys.argv[1])
123
    if len(sys.argv) > 2:
124
        HOST = sys.argv[2]
125
    else:
126
        HOST = "localhost"
127
    log("init", "PORT: %s, HOST: %s" %(PORT, HOST))
128
    svr = tcp_server(HOST, PORT)
129
130
    log("init", "TCP server initalised")
131
132
    comm = communicator()
133
    log("init", "communicator intitalised")
134
    t1 = threading.Thread(target = comm.serve_client)
135
    t1.name = "communicator_serve_client"
136
    t1.start()
137
    log("init", "communicator serve_client started")
138
    t2 = threading.Thread(target = comm.serve_stdin)
139
    t2.name = "communicator_serve_stdin"
140
    t2.start()
141
    log("init", "about to serve TCP server")
142
    svr.serve_forever()