runnig

select.select server

Apr 26th, 2013
204
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 6.91 KB | None | 0 0
  1. '''
  2. Created on Apr 25, 2013
  3.  
  4. I'd use Twisted, zeromq or at least asyncore of course
  5. but bare sockets were required.
  6.  
  7. command line: python server.py [--serve port|--test server_ip port|--debug|--help]
  8.  
  9. --serve launches a server that serves on "port"
  10. --test runs MAX_CLIENTS that connect to a running server at "server_ip" on "port"
  11. --debug turns on verbose output
  12.  
  13. I use a thread for serving and a thread for output messages from server
  14. because serving and console output may have different speeds.
  15.  
  16. I decided to work with python threading module because multiprocessing module somehow
  17. doesn't work on my windows machine (processes fail to communicate via Pipes, spent whole day trying to fix that).
  18. Otherwise multiprocessing module is preferable
  19. because it allows to avoid python global interpreter lock.
  20. '''
  21.  
  22. import sys, socket, select, io, random, struct, time
  23. import Queue
  24. import threading as parallel
  25.  
  26. MAX_CONN = 5
  27. MAX_BUF_SIZE = 128
  28. MAX_CLIENTS = 10
  29.  
  30. #HOST = '10.89.62.178'
  31. #PORT = 17171
  32.  
  33. DEBUG = '--debug' in sys.argv
  34.  
  35. ORDER_LITTLE_ENDIAN = '<'
  36. ORDER_BIG_ENDIAN = '>'
  37. ORDER_NETWORK = '!'
  38. ORDER = ORDER_NETWORK
  39.  
  40. NOT_FOUND = -1
  41.  
  42. MAX_PAYLOAD_OUTPUT_SIZE = 10
  43.  
  44. PROTOCOL_VERSION = 0x7D
  45.  
  46. def trace(lk, msg):
  47.     with lk:
  48.         print "TRACE:", msg
  49.  
  50. def debug(lk, msg):
  51.     if not DEBUG:
  52.         return
  53.     with lk:
  54.         print "DEBUG:", msg
  55.  
  56. def fmt_bytes(bytes):
  57.     return ":".join("%02x" % ord(c) for c in bytes)
  58.  
  59. def unpack(lk, bytes):
  60.     N = len(bytes)
  61.  
  62.     fmt = "%cBHL" % ORDER
  63.  
  64.     begin = 0
  65.  
  66.     while begin + 7 < N:
  67.  
  68.         payload_pos = begin + 7
  69.         header = bytes[begin:payload_pos]
  70.         v, t, c = struct.unpack(fmt, header)
  71.  
  72.         if DEBUG:
  73.             assert v == PROTOCOL_VERSION
  74.  
  75.         msg_end_pos = bytes.find('\0', payload_pos)
  76.         if msg_end_pos == NOT_FOUND:
  77.             msg_end_pos = N
  78.  
  79.  
  80.         payload = fmt_bytes(bytes[payload_pos: msg_end_pos])
  81.  
  82.         trace(lk, "v:%02x, msgtype:%04d, client:%08x, payload:%s" % (v, t, c, payload))
  83.         begin = msg_end_pos + 1
  84.  
  85. def pack(version, msg_type, client_id, payload):
  86.     fmt = "%cBHL%dsc" % (ORDER, len(payload))
  87.     return struct.pack(fmt, version, msg_type, client_id, payload,'\0')
  88.  
  89. def output_thread(stop_signal, lk, Q):
  90.  
  91.     while True:
  92.         if stop_signal.is_set():
  93.             debug(lk, "output: received stop signal, exit.")
  94.             break
  95.  
  96.         try:
  97.             msg = Q.get(block = True, timeout = 0.25)
  98.             unpack(lk, msg)
  99.  
  100.         except Queue.Empty:
  101.             pass
  102.  
  103.  
  104. def read_all(s):
  105.     data = []
  106.     buf = s.recv(MAX_BUF_SIZE)
  107.  
  108.     if len(buf) > 0:
  109.         data.extend(buf)
  110.         closed = False
  111.     else:
  112.         closed = True
  113.  
  114.     while len(buf) == MAX_BUF_SIZE:
  115.         buf = s.recv(MAX_BUF_SIZE)
  116.         if len(buf) > 0:
  117.             data.extend(buf)
  118.         else:
  119.             closed = True
  120.  
  121.     return ''.join(data), closed
  122.  
  123.  
  124. def serve(host, port, stop_signal, lk, Q):
  125.     server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  126.     server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
  127.     server.setblocking(0)
  128.     server.bind((host, port))
  129.     server.listen(MAX_CONN)
  130.     rlist = set([server])
  131.     wlist = set()
  132.  
  133.     #while not stop_signal.wait() and rlist:
  134.     while rlist:
  135.         if stop_signal.is_set():
  136.             trace(lk, "server: received stop signal, exit.")
  137.             break
  138.  
  139.         debug(lk, "server: listening...")
  140.  
  141.         readables, writables, xlist = select.select(rlist, wlist, rlist, 1)
  142.  
  143.         for s in readables:
  144.             if s is server:
  145.                 conn, addr = s.accept()
  146.                 debug(lk, "server: new conn from %s" % str(addr))
  147.                 conn.setblocking(0)
  148.                 rlist.add(conn)
  149.  
  150.             else:
  151.                 #data = s.recv(MAX_BUF_SIZE)
  152.                 try:
  153.                     data,closed = read_all(s)
  154.                     if len(data) > 0:
  155.                         Q.put_nowait(data)
  156.                     if closed:
  157.                         debug(lk, "server: closing %s after reading no data" % str(conn))
  158.                         if s in wlist:
  159.                             wlist.remove(s)
  160.                         rlist.remove(s)
  161.                         s.close()
  162.                 except:
  163.                     debug(lk, "server: closing %s on error" % str(conn))
  164.                     if s in wlist:
  165.                         wlist.remove(s)
  166.                     if s in rlist:
  167.                         rlist.remove(s)
  168.                     s.close()
  169.  
  170.     if rlist or wlist:
  171.         for s in rlist:
  172.             s.close()
  173.         for s in wlist:
  174.             s.close()
  175.  
  176.  
  177. def test_client(client_id, host, port, stop_signal, lk):
  178.  
  179.  
  180.     s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  181.     debug(lk, 'client %d: connecting to %s:%d' % (client_id, host, port) )
  182.  
  183.  
  184.     s.connect((host,port))
  185.  
  186.     for payload_size in range(1, 4 * MAX_BUF_SIZE):
  187.  
  188.         if stop_signal.is_set():
  189.             debug(lk, "cliend %d: received stop signal, exit" % client_id)
  190.             break
  191.  
  192.         payload = ''.join([chr(random.randint(1,255)) for i in range(payload_size)])
  193.  
  194.         msg_type = payload_size
  195.         bytes = pack(PROTOCOL_VERSION, msg_type, client_id, payload)
  196.         #debug(lk, 'client %d: sending message "%s", bytes: %s' % (client_id, payload, fmt_bytes(bytes)))
  197.         try:
  198.             s.send(bytes)
  199.         except:
  200.             debug(lk, "exception on send()")
  201.             break
  202.  
  203.     s.close()
  204.  
  205. def main():
  206.     if '--help' in sys.argv:
  207.         print 'usage: %s [--serve port|--test host port|--debug|--help]' % sys.argv[0]
  208.         return
  209.  
  210.     #parallel.freeze_support()
  211.     #manager = parallel.Manager()
  212.     lk = parallel.Lock()
  213.  
  214.     threads = []
  215.  
  216.     try:
  217.         p = None
  218.         stop_signal = parallel.Event()
  219.  
  220.         Q = Queue.Queue()
  221.         output = parallel.Thread(target = output_thread, args = (stop_signal, lk, Q))
  222.         output.daemon = True
  223.         output.start()
  224.         threads.append(output)
  225.  
  226.         if "--serve" in sys.argv:
  227.             port = int(sys.argv[-1])
  228.             myaddr = str(socket.gethostbyname(socket.getfqdn()))
  229.             print "Serving at %s:%d; Ctrl-C to stop server" % (myaddr, port)
  230.             srv = parallel.Thread(target = serve, args = ('', port, stop_signal, lk, Q))
  231.             srv.start()
  232.             threads.append(srv)
  233.  
  234.         if "--test" in sys.argv:
  235.             host = sys.argv[-2]
  236.             port = int(sys.argv[-1])
  237.             for i in range(1,MAX_CLIENTS + 1):
  238.                 p = parallel.Thread(target = test_client, args = (i, host, port, stop_signal, lk))
  239.                 threads.append(p)
  240.  
  241.         [t.start() for t in threads if not t.is_alive()]
  242.  
  243.         while len(threads) > 0:
  244.             threads[0].join(1)
  245.  
  246.     except (KeyboardInterrupt, SystemExit): # Ctrl-C
  247.         stop_signal.set()
  248.         [t.join() for t in threads]
  249.  
  250. if __name__ == '__main__':
  251.     main()
Advertisement
Add Comment
Please, Sign In to add comment