Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- '''
- Created on Apr 25, 2013
- I'd use Twisted, zeromq or at least asyncore of course
- but bare sockets were required.
- command line: python server.py [--serve port|--test server_ip port|--debug|--help]
- --serve launches a server that serves on "port"
- --test runs MAX_CLIENTS that connect to a running server at "server_ip" on "port"
- --debug turns on verbose output
- I use a thread for serving and a thread for output messages from server
- because serving and console output may have different speeds.
- I decided to work with python threading module because multiprocessing module somehow
- doesn't work on my windows machine (processes fail to communicate via Pipes, spent whole day trying to fix that).
- Otherwise multiprocessing module is preferable
- because it allows to avoid python global interpreter lock.
- '''
- import sys, socket, select, io, random, struct, time
- import Queue
- import threading as parallel
- MAX_CONN = 5
- MAX_BUF_SIZE = 128
- MAX_CLIENTS = 10
- #HOST = '10.89.62.178'
- #PORT = 17171
- DEBUG = '--debug' in sys.argv
- ORDER_LITTLE_ENDIAN = '<'
- ORDER_BIG_ENDIAN = '>'
- ORDER_NETWORK = '!'
- ORDER = ORDER_NETWORK
- NOT_FOUND = -1
- MAX_PAYLOAD_OUTPUT_SIZE = 10
- PROTOCOL_VERSION = 0x7D
- def trace(lk, msg):
- with lk:
- print "TRACE:", msg
- def debug(lk, msg):
- if not DEBUG:
- return
- with lk:
- print "DEBUG:", msg
- def fmt_bytes(bytes):
- return ":".join("%02x" % ord(c) for c in bytes)
- def unpack(lk, bytes):
- N = len(bytes)
- fmt = "%cBHL" % ORDER
- begin = 0
- while begin + 7 < N:
- payload_pos = begin + 7
- header = bytes[begin:payload_pos]
- v, t, c = struct.unpack(fmt, header)
- if DEBUG:
- assert v == PROTOCOL_VERSION
- msg_end_pos = bytes.find('\0', payload_pos)
- if msg_end_pos == NOT_FOUND:
- msg_end_pos = N
- payload = fmt_bytes(bytes[payload_pos: msg_end_pos])
- trace(lk, "v:%02x, msgtype:%04d, client:%08x, payload:%s" % (v, t, c, payload))
- begin = msg_end_pos + 1
- def pack(version, msg_type, client_id, payload):
- fmt = "%cBHL%dsc" % (ORDER, len(payload))
- return struct.pack(fmt, version, msg_type, client_id, payload,'\0')
- def output_thread(stop_signal, lk, Q):
- while True:
- if stop_signal.is_set():
- debug(lk, "output: received stop signal, exit.")
- break
- try:
- msg = Q.get(block = True, timeout = 0.25)
- unpack(lk, msg)
- except Queue.Empty:
- pass
- def read_all(s):
- data = []
- buf = s.recv(MAX_BUF_SIZE)
- if len(buf) > 0:
- data.extend(buf)
- closed = False
- else:
- closed = True
- while len(buf) == MAX_BUF_SIZE:
- buf = s.recv(MAX_BUF_SIZE)
- if len(buf) > 0:
- data.extend(buf)
- else:
- closed = True
- return ''.join(data), closed
- def serve(host, port, stop_signal, lk, Q):
- server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- server.setblocking(0)
- server.bind((host, port))
- server.listen(MAX_CONN)
- rlist = set([server])
- wlist = set()
- #while not stop_signal.wait() and rlist:
- while rlist:
- if stop_signal.is_set():
- trace(lk, "server: received stop signal, exit.")
- break
- debug(lk, "server: listening...")
- readables, writables, xlist = select.select(rlist, wlist, rlist, 1)
- for s in readables:
- if s is server:
- conn, addr = s.accept()
- debug(lk, "server: new conn from %s" % str(addr))
- conn.setblocking(0)
- rlist.add(conn)
- else:
- #data = s.recv(MAX_BUF_SIZE)
- try:
- data,closed = read_all(s)
- if len(data) > 0:
- Q.put_nowait(data)
- if closed:
- debug(lk, "server: closing %s after reading no data" % str(conn))
- if s in wlist:
- wlist.remove(s)
- rlist.remove(s)
- s.close()
- except:
- debug(lk, "server: closing %s on error" % str(conn))
- if s in wlist:
- wlist.remove(s)
- if s in rlist:
- rlist.remove(s)
- s.close()
- if rlist or wlist:
- for s in rlist:
- s.close()
- for s in wlist:
- s.close()
- def test_client(client_id, host, port, stop_signal, lk):
- s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- debug(lk, 'client %d: connecting to %s:%d' % (client_id, host, port) )
- s.connect((host,port))
- for payload_size in range(1, 4 * MAX_BUF_SIZE):
- if stop_signal.is_set():
- debug(lk, "cliend %d: received stop signal, exit" % client_id)
- break
- payload = ''.join([chr(random.randint(1,255)) for i in range(payload_size)])
- msg_type = payload_size
- bytes = pack(PROTOCOL_VERSION, msg_type, client_id, payload)
- #debug(lk, 'client %d: sending message "%s", bytes: %s' % (client_id, payload, fmt_bytes(bytes)))
- try:
- s.send(bytes)
- except:
- debug(lk, "exception on send()")
- break
- s.close()
- def main():
- if '--help' in sys.argv:
- print 'usage: %s [--serve port|--test host port|--debug|--help]' % sys.argv[0]
- return
- #parallel.freeze_support()
- #manager = parallel.Manager()
- lk = parallel.Lock()
- threads = []
- try:
- p = None
- stop_signal = parallel.Event()
- Q = Queue.Queue()
- output = parallel.Thread(target = output_thread, args = (stop_signal, lk, Q))
- output.daemon = True
- output.start()
- threads.append(output)
- if "--serve" in sys.argv:
- port = int(sys.argv[-1])
- myaddr = str(socket.gethostbyname(socket.getfqdn()))
- print "Serving at %s:%d; Ctrl-C to stop server" % (myaddr, port)
- srv = parallel.Thread(target = serve, args = ('', port, stop_signal, lk, Q))
- srv.start()
- threads.append(srv)
- if "--test" in sys.argv:
- host = sys.argv[-2]
- port = int(sys.argv[-1])
- for i in range(1,MAX_CLIENTS + 1):
- p = parallel.Thread(target = test_client, args = (i, host, port, stop_signal, lk))
- threads.append(p)
- [t.start() for t in threads if not t.is_alive()]
- while len(threads) > 0:
- threads[0].join(1)
- except (KeyboardInterrupt, SystemExit): # Ctrl-C
- stop_signal.set()
- [t.join() for t in threads]
- if __name__ == '__main__':
- main()
Advertisement
Add Comment
Please, Sign In to add comment