Advertisement
Guest User

Ping_Server

a guest
Mar 31st, 2012
1,341
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 14.23 KB | None | 0 0
  1. import ping, threading, time, socket, select, sys, struct, Queue
  2. import binascii, collections, math, random, logging
  3. import ping_reporter
  4.  
  5. log = ping_reporter.setup_log('PingServer')
  6.  
  7. class PingTimer(threading.Thread): # helper class for PingServer to manage timeouts
  8.     def __init__(self, event):
  9.         self.queue = Queue.PriorityQueue()
  10.         threading.Thread.__init__(self)
  11.         self.running = False
  12.         self.event = event
  13.  
  14.     def stop(self):
  15.         log.debug('PingTimeout terminating')
  16.         self.running = False
  17.         self.event.set()
  18.  
  19.     def run(self):
  20.         self.running = True
  21.         log.debug('PingTimeout starting')
  22.         while self.running:
  23.             timeout = None
  24.             self.event.clear()
  25.             timeout = self.process()
  26.             self.event.wait(timeout)
  27.             #time.sleep(0.1)
  28.  
  29.     def process(self):
  30.         while self.queue.qsize() > 0:
  31.             try:                expire,event,callback,cb_args = item = self.queue.get_nowait()
  32.             except Queue.Empty: break # our qsize check isn't guaranteed to prevent this
  33.             if event.is_set():  continue # event was completed; ignore it
  34.             if expire > time.time():
  35.                 self.queue.put(item)
  36.                 return expire - time.time()
  37.             callback(*cb_args)
  38.             event.set() # make sure no one executes it
  39.         return None
  40.  
  41.     def add_callback(self, timeout, handler, args):
  42.         event = threading.Event()
  43.         item = (time.time()+timeout,event,handler,args)
  44.         self.queue.put(item)
  45.         self.event.set()
  46.         return event
  47.  
  48.     def mult_callback(self, count, timeout, handler, args):
  49.         events = []
  50.         timeout += time.time() + 1
  51.         for i in range(0,count): events.append(threading.Event())
  52.         for i in range(0,count): self.queue.put((timeout,events[i],handler,args[i]))
  53.         self.event.set()
  54.         return events
  55.  
  56. class PingServer(threading.Thread):
  57.     def __init__(self, d_addr, block_size=1024, initial_timeout=2):
  58.         self.block_size = block_size # default; use setup for exact
  59.         self.server = d_addr,socket.gethostbyname(d_addr)
  60.         self.running_timeout = initial_timeout
  61.         threading.Thread.__init__(self)
  62.         self.listeners = []
  63.         self.debug = 0
  64.  
  65.         # timeout events are queued and executed in a seperate thread
  66.         self.timer_event = threading.Event()
  67.         self.timer = PingTimer(self.timer_event)
  68.  
  69.  
  70.         self.blocks = 0
  71.         self.running = False
  72.         self.socket = ping.build_socket()
  73.         self.empty_block = self.null_block()
  74.         self.queued_events = collections.defaultdict(collections.deque)
  75.    
  76.     def timeout(self):      return 2.0/5.0 # self.running_timeout
  77.     def safe_timeout(self): return 3 * self.timeout()
  78.  
  79.     def setup_timeout(self, ID=0):
  80.         Time = time.time()
  81.         Times = struct.pack('d',Time)
  82.         if ID == 0: ID = random.getrandbits(32) # ID size in bits
  83.  
  84.         ping.data_ping(self.socket,self.server[1],ID,Times)
  85.         msg = ping.read_ping(self.socket,self.timeout())
  86.         if not msg:                   raise Exception('PingServer::setup_timeout: no valid response from '+self.server[0])
  87.         addr,rID,data = msg['address'],msg['ID'],msg['payload']
  88.         log.debug("Addr=%s rID=%d Data=%d bytes"%(addr[0],rID,len(data)))
  89.         if len(data) == 0:            raise Exception('PingServer::setup_timeout: null response from '+self.server[0])
  90.         if rID != ID:                 raise Exception('PingServer::setup_timeout: invalid response id from '+self.server[0])
  91.         if data != Times:             raise Exception('PingServer::setup_timeout: invalid response data from '+self.server[0])
  92.         if addr[0] != self.server[1]: raise Exception('PingServer::setup_timeout: invalid response server from '+self.server[0])
  93.         delay = time.time() - Time
  94.         log.notice('echo delay: %.02fms'%(1000*delay))
  95.  
  96.     def setup_block(self, ID = 0):
  97.         if ID == 0: ID = random.getrandbits(32) # ID size in bits
  98.         Fill = chr(random.getrandbits(8)) # repeated data
  99.         Filler = self.block_size * Fill
  100.  
  101.         ping.data_ping(self.socket,self.server[1],ID,Filler)
  102.         msg = ping.read_ping(self.socket,self.timeout())
  103.         if not msg:                   raise Exception('PingServer::setup_block: no valid response from '+self.server[0])
  104.         addr,rID,data = msg['address'],msg['ID'],msg['payload']
  105.         log.debug("Addr=%s rID=%d Data=%d bytes"%(addr[0],rID,len(data)))
  106.         if len(data) == 0:            raise Exception('PingServer::setup_block: null response from '+self.server[0])
  107.         if rID != ID:                 raise Exception('PingServer::setup_block: invalid response id from '+self.server[0])
  108.         if data != len(data)*Fill:    raise Exception('PingServer::setup_block: invalid response data from '+self.server[0])
  109.         if addr[0] != self.server[1]: raise Exception('PingServer::setup_block: invalid response server from '+self.server[0])
  110.         self.block_size = len(data)
  111.         self.empty_block = self.null_block()
  112.         log.notice('echo length: %d bytes'%self.block_size)
  113.        
  114.     def setup(self):
  115.         log.trace('PingServer::setup: testing server "%s"'%self.server[0])
  116.         ID = random.getrandbits(32)
  117.         self.setup_timeout(ID)
  118.         self.setup_block(ID)
  119.         self.strip_counter = 0
  120.         self.strip_interval = 97
  121.  
  122.     def stop(self):
  123.         self.running = False
  124.         log.info('PingServer terminating')
  125.         self.timer.stop()
  126.  
  127.     def run(self):
  128.         self.running = True
  129.         log.notice('PingServer starting')
  130.         self.timer.start()
  131.         while self.running:
  132.             start_blocks = self.blocks # updated asynchronously
  133.             ready = select.select([self.socket], [], [], self.timeout())
  134.             if ready[0] == []: # timeout
  135.                 if start_blocks != 0 and self.blocks != 0:
  136.                     log.error('%s timed out'%self.server[0])
  137.             try:
  138.                 msg = ping.recv_ping(self.socket,self.timeout())
  139.                 if not msg: continue
  140.             except:
  141.                 continue
  142.             addr,block_id,data = msg['address'],msg['ID'],msg['payload']
  143.             if block_id == 0:
  144.                 import binascii
  145.                 raise Exception('received packet w/ ID 0 packet: '+binascii.hexlify(msg['raw']))
  146.             self.process_block(addr[0],block_id,data)
  147.  
  148.     def process_block(self, addr, ID, data):
  149.         if ID == 0: raise Exception('server responded with ID 0 packet')
  150.  
  151.         while len(self.queued_events[ID]):
  152.             handler,event,args = self.queued_events[ID].popleft()
  153.             if event.is_set(): continue
  154.  
  155.             if handler == self.write_block_timeout:
  156.                 if self.debug: log.trace('%s (block %d) updated'%(self.server[0],ID))
  157.                 data = args[1]
  158.             elif handler == self.read_block_timeout:
  159.                 if self.debug: log.trace('%s (block %d) read'%(self.server[0],ID))
  160.                 callback,cb_args = args[1],args[2]
  161.                 if len(data) > 0: callback(ID,data,*cb_args)
  162.                 else:             callback(ID,self.null_block(),*cb_args)
  163.             elif handler == self.delete_block_timeout:
  164.                 if self.debug: log.trace('%s (block %d) deleted'%(self.server[0],ID))
  165.                 data = ''
  166.             event.set()
  167.  
  168.         self.strip_counter += 1
  169.         if len(data) and not self.strip_counter % self.strip_interval:
  170.             data = data.rstrip('\0')
  171.         if len(data) == 0:
  172.             self.blocks = self.blocks - 1
  173.             return
  174.         if len(self.listeners): self.process_listeners(addr, ID, data)
  175.         #log.trace('%s: sending %d bytes from block %d'%(self.server[0],len(data),ID))
  176.         ping.send_ping(self.socket, addr, ID, data)
  177.  
  178.     def process_listeners(self, addr, ID, data):
  179.         if not self.listeners: raise Exception('process_listeners invoked without valid listeners on ID=%d'%ID)
  180.         self.listeners = [l for l in self.listeners if l[0] >= time.time()] # clean the listeners
  181.         for x in self.listeners:
  182.             expire,handler,cb_args = x
  183.             handler(ID, addr, data, *cb_args)
  184.  
  185.     def add_listener(self, handler, timeout, args):
  186.         log.debug('add_listener: timeout=%d handler=%s'%(timeout,handler))
  187.         expire = time.time() + timeout
  188.         self.listeners.append((expire,handler,args))
  189.  
  190.     def null_block(self, length=None):
  191.         if length: return length * '\0'
  192.         return self.block_size * '\0'
  193.        
  194.     def event_insert(self, ID, handler, args):
  195.         event = self.timer.add_callback(self.timeout(), handler, args)
  196.         self.queued_events[ID].append((handler,event,args))
  197.         return event
  198.  
  199.     # read / write / delete a single block
  200.     def write_block(self, ID, data, blocking = False):
  201.         # add a block to the queue (or delete if equivalent)
  202.         #log.trace('PingServer::write_block: ID=%d bytes=%d blocking=%s'%(ID,len(data),blocking))
  203.         if ID == 0: raise Exception('write_block: invalid block ID (0)')
  204.         if data == self.null_block()[:len(data)]: return self.delete_block(ID,blocking)
  205.         event = self.event_insert(ID,self.write_block_timeout,[ID,data[:self.block_size]])
  206.         if blocking: event.wait()
  207.         return event
  208.  
  209.     def write_blocks(self, IDs, datas, blocking = False):
  210.         log.trace('PingServer::write_blocks: IDs=%d-%d blocking=%s'%(IDs[0],IDs[-1],blocking))
  211.  
  212.         args = []
  213.         handler = self.write_block_timeout
  214.         for i in range(0,len(IDs)): args.append([IDs[i],datas[i][:self.block_size]])
  215.         events = self.timer.mult_callback(len(args),self.timeout(), handler, args)
  216.         for i in range(0,len(IDs)): self.queued_events[IDs[i]].append((handler,events[i],args[i]))
  217.         if blocking:
  218.             for e in events: e.wait()
  219.         return events
  220.  
  221.     def delete_block(self, ID, blocking = False):
  222.         log.trace('PingServer::delete_block: ID=%d blocking=%s'%(ID,blocking))
  223.         if ID == 0: raise Exception('delete_block: invalid block ID (0)')
  224.         t = self.event_insert(ID,self.delete_block_timeout,[ID])
  225.         if blocking: t.wait()
  226.         return t
  227.  
  228.     def read_block(self, ID, callback, cb_args = [], blocking = False):
  229.         log.trace('PingServer::read_block: ID=%d blocking=%s'%(ID,blocking))
  230.         if ID == 0: raise Exception('read_block: invalid block ID (0)')
  231.         t = self.event_insert(ID,self.read_block_timeout,[ID,callback,cb_args])
  232.         if blocking: t.wait()
  233.         return t
  234.  
  235.     def read_blocks(self, IDs, callback, cb_args, blocking = False):
  236.         log.trace('PingServer::read_blocks: IDs=%d-%d blocking=%s'%(IDs[0],IDs[-1],blocking))
  237.  
  238.         args = []
  239.         handler = self.read_block_timeout
  240.         for ID in IDs: args.append([ID,callback,cb_args])
  241.         events = self.timer.mult_callback(len(args),self.timeout(), handler, args)
  242.         for i in range(0,len(IDs)): self.queued_events[IDs[i]].append((handler,events[i],args[i]))
  243.         if blocking:
  244.             for e in events: e.wait()
  245.         return events
  246.  
  247.     def read_block_timeout(self, ID, callback, cb_args):
  248.         log.debug('PingServer::read_block_timeout: ID=%d callback=%s'%(ID,callback.__name__))
  249.         callback(ID,self.null_block(),*cb_args)
  250.  
  251.     def delete_block_timeout(self, ID):
  252.         log.debug('PingServer::delete_block_timeout: ID=%d'%ID)
  253.         # do nothing; we're marked invalid anyhow
  254.         pass
  255.  
  256.     def write_block_timeout(self, ID, data):
  257.         log.trace('PingServer::write_block_timeout: ID=%d bytes=%d'%(ID,len(data)))
  258.         self.blocks = self.blocks + 1
  259.         # force update queue (as if packet arrived)
  260.         if ID == 0: raise Exception('write_block_timeout: ID == 0')
  261.         self.process_block(self.server[1], ID, data)
  262.  
  263. def print_block(ID, data):
  264.     print '----- print block -----'
  265.     print 'block',ID,'bytes',len(data)
  266.     print data
  267.     print '----- print block -----'
  268.  
  269. def __live_blocks(ID, addr, data, datastore):
  270.     datastore[ID] = 1
  271.  
  272. def live_blocks(PServer, timeout=None):
  273.     store = {}
  274.     if not timeout: timeout = PServer.safe_timeout()
  275.     PServer.add_listener(__live_blocks,timeout,[store])
  276.     time.sleep(timeout)
  277.     return store
  278.        
  279. def used_blocks(blocks):
  280.     result,lookup = {},{}
  281.     for x in blocks:
  282.         if x-1 in lookup:
  283.             lookup[x] = lookup[x-1]
  284.             result[lookup[x]] += 1
  285.         else:
  286.             lookup[x] = x
  287.             result[x] = 1
  288.     return result
  289.  
  290.  
  291. def free_blocks(blocks):
  292.     result = {}
  293.     if 1 not in blocks:
  294.         if not blocks:         result[1] = 0
  295.         elif len(blocks) == 0: result[1] = 0
  296.         else:                  result[1] = min(blocks.keys())-1
  297.     for x in blocks:
  298.         if not x+1 in blocks: result[x+1] = 0
  299.         if not x-1 in blocks:
  300.             if not len(result): continue
  301.             block = max(result.keys())
  302.             result[block] = x-block
  303.     return result
  304.  
  305. if __name__ == "__main__":
  306.     ping_reporter.start_log(log,logging.DEBUG)
  307.     server = ping.select_server(log,1)
  308.  
  309.     from ping_reporter import humanize_bytes
  310.     try:
  311.         PS = PingServer(server,4)
  312.         PS.debug = 1
  313.         PS.setup()
  314.         PS.start()
  315.         if 0:
  316.             print 'traffic:',ping.ping_count,'pings ('+humanize_bytes(ping.ping_bandwidth)+')'
  317.             PS.read_block(2,print_block)
  318.             time.sleep(2)
  319.             PS.write_block(2,'coconut')
  320.             time.sleep(1)
  321.             print 'traffic:',ping.ping_count,'pings ('+humanize_bytes(ping.ping_bandwidth)+')'
  322.  
  323.             PS.write_block(1,'apples')
  324.             PS.read_block(1,print_block)
  325.             PS.read_block(1,print_block)
  326.             time.sleep(2)
  327.             print 'traffic:',ping.ping_count,'pings ('+humanize_bytes(ping.ping_bandwidth)+')'
  328.            
  329.             log.info('testing block metrics')
  330.             blocks = live_blocks(PS)
  331.             log.debug('blocks: %s'%blocks)
  332.             log.debug('--used: %s'%used_blocks(blocks))
  333.             log.debug('--free: %s'%free_blocks(blocks))
  334.                
  335.             PS.delete_block(1)
  336.             time.sleep(2)
  337.             print 'traffic:',ping.ping_count,'pings ('+humanize_bytes(ping.ping_bandwidth)+')'
  338.             PS.write_block(1,'apples')
  339.             time.sleep(2)
  340.             PS.read_block(1,print_block)
  341.             time.sleep(4)
  342.             PS.read_block(1,print_block)
  343.             time.sleep(1)
  344.             PS.write_block(1,'bananas')
  345.             time.sleep(1)
  346.             PS.read_block(1,print_block)
  347.             time.sleep(1)
  348.             PS.read_block(1,print_block)
  349.             PS.read_block(1,print_block)
  350.             time.sleep(1)
  351.             PS.delete_block(1)
  352.             print 'traffic:',ping.ping_count,'pings ('+humanize_bytes(ping.ping_bandwidth)+')'
  353.             while True:
  354.                 time.sleep(1)
  355.  
  356.         if 1:
  357.             a = 2500
  358.             time.sleep(2)
  359.             log.debug('%s: writing %d blocks'%(time.time(),a))
  360.             events = PS.write_blocks(range(1,a+1),['AAAA']*a,False)
  361.             log.debug('%s: inserted writes'%(time.time()))
  362.  
  363.             #import ping_reporter
  364.             #PR = ping_reporter.PingReporter(log,PS,5)
  365.             #PR.start()
  366.             def __read_callback(ID, data, data_store):
  367.                 data_store[ID] = data
  368.             def test_block_read(PS,rData,count):
  369.                 data = {}
  370.                 blob = ''
  371.                 log.debug('%s: reading %d blocks'%(time.time(),a))
  372.                 PS.read_blocks(range(1,a+1), __read_callback, [data], True)
  373.                 log.debug('%s: completed read'%(time.time()))
  374.  
  375.                 for i in range(1,a+1): blob = blob + data[i]
  376.  
  377.                 missing = 0
  378.                 for i in range(1,a+1):
  379.                     if data[i] == '\0'*len(data[i]):
  380.                         missing += 1
  381.  
  382.                 if blob == rData:
  383.                     log.trace('block read successfully')
  384.                     return True
  385.                 else:
  386.                     log.error('block read failed: data corrupted')
  387.                     log.error('%d of %d blocks missing'%(missing,count))
  388.                     return False
  389.             data = 'A'*4*a
  390.             for i in range(12):
  391.                 time.sleep(5)
  392.                 if not test_block_read(PS,data,a): break
  393.         print 'terminate'
  394.     except KeyboardInterrupt:
  395.         print "Keyboard Interrupt"
  396.     except Exception:
  397.         print 'General Exception'
  398.         from traceback import print_exc
  399.         print_exc()
  400.     finally:
  401.         PS.stop()
  402.         print 'traffic:',ping.ping_count,'pings ('+humanize_bytes(ping.ping_bandwidth)+')'
  403.         sys.exit(1)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement