Advertisement
Guest User

Untitled

a guest
Jul 24th, 2017
66
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 6.08 KB | None | 0 0
  1. import json
  2. import socket
  3. import socketserver
  4. import threading
  5. import time
  6. from queue import Queue, Empty
  7. import signal
  8.  
  9.  
  10. class ThreadingTcpServer(socketserver.ThreadingMixIn, socketserver.TCPServer):
  11. pass
  12.  
  13.  
  14. class FooServerConnection(object):
  15. """A class for making a tcp connection to a foo server"""
  16.  
  17. def __init__(self, ip_address, port):
  18. self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  19. self._sock.connect((ip_address, port))
  20.  
  21. def send(self, data):
  22. self._sock.send(data)
  23.  
  24. def recv(self, buffer_size=1024):
  25. return self._sock.recv(buffer_size)
  26.  
  27.  
  28. def read_message(sock, term_char='\n', buffer_size=1024, timeout=1):
  29. """Read a full message from a socket, terminated by term_char"""
  30. # Add a timeout in case a client or sends an incorrectly formatted message or stops midway through sending
  31. timeout = time.time() + timeout
  32. mess = "".encode()
  33. term_char = term_char.encode()
  34. while (not mess.endswith(term_char)) and (time.time() < timeout):
  35. mess += sock.recv(buffer_size)
  36. return mess
  37.  
  38.  
  39. class FooClientRequestHandler(socketserver.BaseRequestHandler):
  40. """Proxy class which forwards messages between a foo server and a foo client"""
  41.  
  42. proxy = None # Must be set to a Proxy object before class can be used
  43. # TODO find a neater way to have the request handler send message to the proxy for storing stats
  44.  
  45. def handle(self):
  46. # print("New connection : ", id(self))
  47. # Open connection to server
  48. fooServer = FooServerConnection(self.proxy.foo_server_ip_address, self.proxy.foo_server_port)
  49. # Repeat Until no message
  50. while True:
  51. # Read Message
  52. message = read_message(self.request)
  53. if not message:
  54. # print("No Message")
  55. break
  56. self.proxy.store_message(message)
  57. # Forward Message to Server
  58. fooServer.send(message)
  59. # Read Response
  60. response = read_message(fooServer)
  61. if not response:
  62. # print("No Response")
  63. break
  64. self.proxy.store_message(response)
  65. # Return Response
  66. self.request.sendall(response)
  67. return
  68.  
  69.  
  70. def get_from_queue(q):
  71. """Get all items in a Queue and return them as a list"""
  72. r = []
  73. assert type(q) == Queue
  74. while q.not_empty:
  75. try:
  76. r.append(q.get(block=False))
  77. except Empty:
  78. break
  79. return r
  80.  
  81.  
  82. class Proxy(object):
  83.  
  84. def __init__(self, ip_address='127.0.0.1', client_port=8002, server_port=8001):
  85. self.foo_server_ip_address = ip_address
  86. self.foo_server_port = server_port
  87. FooClientRequestHandler.proxy = self
  88. self._sockserver = ThreadingTcpServer((ip_address, client_port), FooClientRequestHandler)
  89. self._serverthread = None
  90. self._statsthread = None
  91. self._shutdown = threading.Event()
  92.  
  93. # Queues for sharing messages between the connection threads and the stats thread
  94. self._reqs = Queue()
  95. self._acks = Queue()
  96. self._naks = Queue()
  97.  
  98. self._stats_lock = threading.Lock()
  99.  
  100. self._total_reqs = 0
  101. self._total_acks = 0
  102. self._total_naks = 0
  103. self._reqs_10s = []
  104. self._acks_10s = []
  105. self._naks_10s = []
  106. self._reqs_1s = []
  107. self._acks_naks_1s = []
  108.  
  109.  
  110.  
  111. def store_message(self, message):
  112. """Add a message to the appropriate Queue"""
  113. t = time.time()
  114. message = message.decode()
  115. if message.startswith("REQ"):
  116. self._reqs.put(t)
  117. self._total_reqs += 1
  118. elif message.startswith("ACK"):
  119. self._acks.put(t)
  120. self._total_acks += 1
  121. elif message.startswith("NAK"):
  122. self._naks.put(t)
  123. self._total_naks += 1
  124.  
  125. def process_stats(self):
  126. """Read in and count all the messages which have been processed"""
  127. while not self._shutdown.is_set():
  128. with self._stats_lock:
  129. self._reqs_10s += get_from_queue(self._reqs)
  130. self._acks_10s += get_from_queue(self._acks)
  131. self._naks_10s += get_from_queue(self._naks)
  132. t = time.time()
  133. self._reqs_10s = [r for r in self._reqs_10s if r > t - 10]
  134. self._acks_10s = [r for r in self._acks_10s if r > t - 10]
  135. self._naks_10s = [r for r in self._naks_10s if r > t - 10]
  136. self._reqs_1s = [r for r in self._reqs_10s if r > t - 1]
  137. self._acks_naks_1s = [a for a in self._acks_10s if a > t - 1] + [n for n in self._naks_10s if n > t - 1]
  138.  
  139.  
  140. def print_stats(self, *args):
  141. """Print the stats to STDOUT"""
  142. with self._stats_lock:
  143. print(json.dumps(
  144. {
  145. "msg_tot": self._total_reqs + self._total_acks + self._total_naks,
  146. "msg_reqs": self._total_reqs,
  147. "msg_ack": self._total_acks,
  148. "msg_nak": self._total_naks,
  149. "request_rate_1s": len(self._reqs_1s),
  150. "request_rate_10s": len(self._reqs_10s)/10,
  151. "response_rate_10s": (len(self._acks_10s) + len(self._naks_10s))/10,
  152. "response_rate_1s": len(self._acks_naks_1s)
  153. },
  154. indent=4,
  155. sort_keys=True
  156. ))
  157.  
  158.  
  159. def start(self):
  160. """Start the proxy server listening for client connections"""
  161. print("Starting Foo Proxy")
  162. self._statsthread = threading.Thread(target=self.process_stats)
  163. self._statsthread.start()
  164. signal.signal(signal.SIGUSR2, self.print_stats)
  165. self._sockserver.serve_forever()
  166.  
  167.  
  168. def stop(self):
  169. """Stop the server from listening for client connections"""
  170. print("Stopping Foo Proxy")
  171. self._sockserver.server_close()
  172.  
  173. def __del__(self):
  174. try:
  175. self.stop()
  176. except:
  177. pass
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement