SHARE
TWEET

Ping_Server

a guest Mar 31st, 2012 615 Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
  1. import ping, threading, time, socket, select, sys, struct, Queue
  2. import binascii, collections, math, random, logging
  3. import ping_reporter
  4.  
  5. log = ping_reporter.setup_log('PingServer')
  6.  
  7. class PingTimer(threading.Thread): # helper class for PingServer to manage timeouts
  8.         def __init__(self, event):
  9.                 self.queue = Queue.PriorityQueue()
  10.                 threading.Thread.__init__(self)
  11.                 self.running = False
  12.                 self.event = event
  13.  
  14.         def stop(self):
  15.                 log.debug('PingTimeout terminating')
  16.                 self.running = False
  17.                 self.event.set()
  18.  
  19.         def run(self):
  20.                 self.running = True
  21.                 log.debug('PingTimeout starting')
  22.                 while self.running:
  23.                         timeout = None
  24.                         self.event.clear()
  25.                         timeout = self.process()
  26.                         self.event.wait(timeout)
  27.                         #time.sleep(0.1)
  28.  
  29.         def process(self):
  30.                 while self.queue.qsize() > 0:
  31.                         try:                expire,event,callback,cb_args = item = self.queue.get_nowait()
  32.                         except Queue.Empty: break # our qsize check isn't guaranteed to prevent this
  33.                         if event.is_set():  continue # event was completed; ignore it
  34.                         if expire > time.time():
  35.                                 self.queue.put(item)
  36.                                 return expire - time.time()
  37.                         callback(*cb_args)
  38.                         event.set() # make sure no one executes it
  39.                 return None
  40.  
  41.         def add_callback(self, timeout, handler, args):
  42.                 event = threading.Event()
  43.                 item = (time.time()+timeout,event,handler,args)
  44.                 self.queue.put(item)
  45.                 self.event.set()
  46.                 return event
  47.  
  48.         def mult_callback(self, count, timeout, handler, args):
  49.                 events = []
  50.                 timeout += time.time() + 1
  51.                 for i in range(0,count): events.append(threading.Event())
  52.                 for i in range(0,count): self.queue.put((timeout,events[i],handler,args[i]))
  53.                 self.event.set()
  54.                 return events
  55.  
  56. class PingServer(threading.Thread):
  57.         def __init__(self, d_addr, block_size=1024, initial_timeout=2):
  58.                 self.block_size = block_size # default; use setup for exact
  59.                 self.server = d_addr,socket.gethostbyname(d_addr)
  60.                 self.running_timeout = initial_timeout
  61.                 threading.Thread.__init__(self)
  62.                 self.listeners = []
  63.                 self.debug = 0
  64.  
  65.                 # timeout events are queued and executed in a seperate thread
  66.                 self.timer_event = threading.Event()
  67.                 self.timer = PingTimer(self.timer_event)
  68.  
  69.  
  70.                 self.blocks = 0
  71.                 self.running = False
  72.                 self.socket = ping.build_socket()
  73.                 self.empty_block = self.null_block()
  74.                 self.queued_events = collections.defaultdict(collections.deque)
  75.        
  76.         def timeout(self):              return 2.0/5.0 # self.running_timeout
  77.         def safe_timeout(self): return 3 * self.timeout()
  78.  
  79.         def setup_timeout(self, ID=0):
  80.                 Time = time.time()
  81.                 Times = struct.pack('d',Time)
  82.                 if ID == 0: ID = random.getrandbits(32) # ID size in bits
  83.  
  84.                 ping.data_ping(self.socket,self.server[1],ID,Times)
  85.                 msg = ping.read_ping(self.socket,self.timeout())
  86.                 if not msg:                   raise Exception('PingServer::setup_timeout: no valid response from '+self.server[0])
  87.                 addr,rID,data = msg['address'],msg['ID'],msg['payload']
  88.                 log.debug("Addr=%s rID=%d Data=%d bytes"%(addr[0],rID,len(data)))
  89.                 if len(data) == 0:            raise Exception('PingServer::setup_timeout: null response from '+self.server[0])
  90.                 if rID != ID:                 raise Exception('PingServer::setup_timeout: invalid response id from '+self.server[0])
  91.                 if data != Times:             raise Exception('PingServer::setup_timeout: invalid response data from '+self.server[0])
  92.                 if addr[0] != self.server[1]: raise Exception('PingServer::setup_timeout: invalid response server from '+self.server[0])
  93.                 delay = time.time() - Time
  94.                 log.notice('echo delay: %.02fms'%(1000*delay))
  95.  
  96.         def setup_block(self, ID = 0):
  97.                 if ID == 0: ID = random.getrandbits(32) # ID size in bits
  98.                 Fill = chr(random.getrandbits(8)) # repeated data
  99.                 Filler = self.block_size * Fill
  100.  
  101.                 ping.data_ping(self.socket,self.server[1],ID,Filler)
  102.                 msg = ping.read_ping(self.socket,self.timeout())
  103.                 if not msg:                   raise Exception('PingServer::setup_block: no valid response from '+self.server[0])
  104.                 addr,rID,data = msg['address'],msg['ID'],msg['payload']
  105.                 log.debug("Addr=%s rID=%d Data=%d bytes"%(addr[0],rID,len(data)))
  106.                 if len(data) == 0:            raise Exception('PingServer::setup_block: null response from '+self.server[0])
  107.                 if rID != ID:                 raise Exception('PingServer::setup_block: invalid response id from '+self.server[0])
  108.                 if data != len(data)*Fill:    raise Exception('PingServer::setup_block: invalid response data from '+self.server[0])
  109.                 if addr[0] != self.server[1]: raise Exception('PingServer::setup_block: invalid response server from '+self.server[0])
  110.                 self.block_size = len(data)
  111.                 self.empty_block = self.null_block()
  112.                 log.notice('echo length: %d bytes'%self.block_size)
  113.                
  114.         def setup(self):
  115.                 log.trace('PingServer::setup: testing server "%s"'%self.server[0])
  116.                 ID = random.getrandbits(32)
  117.                 self.setup_timeout(ID)
  118.                 self.setup_block(ID)
  119.                 self.strip_counter = 0
  120.                 self.strip_interval = 97
  121.  
  122.         def stop(self):
  123.                 self.running = False
  124.                 log.info('PingServer terminating')
  125.                 self.timer.stop()
  126.  
  127.         def run(self):
  128.                 self.running = True
  129.                 log.notice('PingServer starting')
  130.                 self.timer.start()
  131.                 while self.running:
  132.                         start_blocks = self.blocks # updated asynchronously
  133.                         ready = select.select([self.socket], [], [], self.timeout())
  134.                         if ready[0] == []: # timeout
  135.                                 if start_blocks != 0 and self.blocks != 0:
  136.                                         log.error('%s timed out'%self.server[0])
  137.                         try:
  138.                                 msg = ping.recv_ping(self.socket,self.timeout())
  139.                                 if not msg: continue
  140.                         except:
  141.                                 continue
  142.                         addr,block_id,data = msg['address'],msg['ID'],msg['payload']
  143.                         if block_id == 0:
  144.                                 import binascii
  145.                                 raise Exception('received packet w/ ID 0 packet: '+binascii.hexlify(msg['raw']))
  146.                         self.process_block(addr[0],block_id,data)
  147.  
  148.         def process_block(self, addr, ID, data):
  149.                 if ID == 0: raise Exception('server responded with ID 0 packet')
  150.  
  151.                 while len(self.queued_events[ID]):
  152.                         handler,event,args = self.queued_events[ID].popleft()
  153.                         if event.is_set(): continue
  154.  
  155.                         if handler == self.write_block_timeout:
  156.                                 if self.debug: log.trace('%s (block %d) updated'%(self.server[0],ID))
  157.                                 data = args[1]
  158.                         elif handler == self.read_block_timeout:
  159.                                 if self.debug: log.trace('%s (block %d) read'%(self.server[0],ID))
  160.                                 callback,cb_args = args[1],args[2]
  161.                                 if len(data) > 0: callback(ID,data,*cb_args)
  162.                                 else:             callback(ID,self.null_block(),*cb_args)
  163.                         elif handler == self.delete_block_timeout:
  164.                                 if self.debug: log.trace('%s (block %d) deleted'%(self.server[0],ID))
  165.                                 data = ''
  166.                         event.set()
  167.  
  168.                 self.strip_counter += 1
  169.                 if len(data) and not self.strip_counter % self.strip_interval:
  170.                         data = data.rstrip('\0')
  171.                 if len(data) == 0:
  172.                         self.blocks = self.blocks - 1
  173.                         return
  174.                 if len(self.listeners): self.process_listeners(addr, ID, data)
  175.                 #log.trace('%s: sending %d bytes from block %d'%(self.server[0],len(data),ID))
  176.                 ping.send_ping(self.socket, addr, ID, data)
  177.  
  178.         def process_listeners(self, addr, ID, data):
  179.                 if not self.listeners: raise Exception('process_listeners invoked without valid listeners on ID=%d'%ID)
  180.                 self.listeners = [l for l in self.listeners if l[0] >= time.time()] # clean the listeners
  181.                 for x in self.listeners:
  182.                         expire,handler,cb_args = x
  183.                         handler(ID, addr, data, *cb_args)
  184.  
  185.         def add_listener(self, handler, timeout, args):
  186.                 log.debug('add_listener: timeout=%d handler=%s'%(timeout,handler))
  187.                 expire = time.time() + timeout
  188.                 self.listeners.append((expire,handler,args))
  189.  
  190.         def null_block(self, length=None):
  191.                 if length: return length * '\0'
  192.                 return self.block_size * '\0'
  193.                
  194.         def event_insert(self, ID, handler, args):
  195.                 event = self.timer.add_callback(self.timeout(), handler, args)
  196.                 self.queued_events[ID].append((handler,event,args))
  197.                 return event
  198.  
  199.         # read / write / delete a single block
  200.         def write_block(self, ID, data, blocking = False):
  201.                 # add a block to the queue (or delete if equivalent)
  202.                 #log.trace('PingServer::write_block: ID=%d bytes=%d blocking=%s'%(ID,len(data),blocking))
  203.                 if ID == 0: raise Exception('write_block: invalid block ID (0)')
  204.                 if data == self.null_block()[:len(data)]: return self.delete_block(ID,blocking)
  205.                 event = self.event_insert(ID,self.write_block_timeout,[ID,data[:self.block_size]])
  206.                 if blocking: event.wait()
  207.                 return event
  208.  
  209.         def write_blocks(self, IDs, datas, blocking = False):
  210.                 log.trace('PingServer::write_blocks: IDs=%d-%d blocking=%s'%(IDs[0],IDs[-1],blocking))
  211.  
  212.                 args = []
  213.                 handler = self.write_block_timeout
  214.                 for i in range(0,len(IDs)): args.append([IDs[i],datas[i][:self.block_size]])
  215.                 events = self.timer.mult_callback(len(args),self.timeout(), handler, args)
  216.                 for i in range(0,len(IDs)):     self.queued_events[IDs[i]].append((handler,events[i],args[i]))
  217.                 if blocking:
  218.                         for e in events: e.wait()
  219.                 return events
  220.  
  221.         def delete_block(self, ID, blocking = False):
  222.                 log.trace('PingServer::delete_block: ID=%d blocking=%s'%(ID,blocking))
  223.                 if ID == 0: raise Exception('delete_block: invalid block ID (0)')
  224.                 t = self.event_insert(ID,self.delete_block_timeout,[ID])
  225.                 if blocking: t.wait()
  226.                 return t
  227.  
  228.         def read_block(self, ID, callback, cb_args = [], blocking = False):
  229.                 log.trace('PingServer::read_block: ID=%d blocking=%s'%(ID,blocking))
  230.                 if ID == 0: raise Exception('read_block: invalid block ID (0)')
  231.                 t = self.event_insert(ID,self.read_block_timeout,[ID,callback,cb_args])
  232.                 if blocking: t.wait()
  233.                 return t
  234.  
  235.         def read_blocks(self, IDs, callback, cb_args, blocking = False):
  236.                 log.trace('PingServer::read_blocks: IDs=%d-%d blocking=%s'%(IDs[0],IDs[-1],blocking))
  237.  
  238.                 args = []
  239.                 handler = self.read_block_timeout
  240.                 for ID in IDs: args.append([ID,callback,cb_args])
  241.                 events = self.timer.mult_callback(len(args),self.timeout(), handler, args)
  242.                 for i in range(0,len(IDs)):     self.queued_events[IDs[i]].append((handler,events[i],args[i]))
  243.                 if blocking:
  244.                         for e in events: e.wait()
  245.                 return events
  246.  
  247.         def read_block_timeout(self, ID, callback, cb_args):
  248.                 log.debug('PingServer::read_block_timeout: ID=%d callback=%s'%(ID,callback.__name__))
  249.                 callback(ID,self.null_block(),*cb_args)
  250.  
  251.         def delete_block_timeout(self, ID):
  252.                 log.debug('PingServer::delete_block_timeout: ID=%d'%ID)
  253.                 # do nothing; we're marked invalid anyhow
  254.                 pass
  255.  
  256.         def write_block_timeout(self, ID, data):
  257.                 log.trace('PingServer::write_block_timeout: ID=%d bytes=%d'%(ID,len(data)))
  258.                 self.blocks = self.blocks + 1
  259.                 # force update queue (as if packet arrived)
  260.                 if ID == 0: raise Exception('write_block_timeout: ID == 0')
  261.                 self.process_block(self.server[1], ID, data)
  262.  
  263. def print_block(ID, data):
  264.         print '----- print block -----'
  265.         print 'block',ID,'bytes',len(data)
  266.         print data
  267.         print '----- print block -----'
  268.  
  269. def __live_blocks(ID, addr, data, datastore):
  270.         datastore[ID] = 1
  271.  
  272. def live_blocks(PServer, timeout=None):
  273.         store = {}
  274.         if not timeout: timeout = PServer.safe_timeout()
  275.         PServer.add_listener(__live_blocks,timeout,[store])
  276.         time.sleep(timeout)
  277.         return store
  278.                
  279. def used_blocks(blocks):
  280.         result,lookup = {},{}
  281.         for x in blocks:
  282.                 if x-1 in lookup:
  283.                         lookup[x] = lookup[x-1]
  284.                         result[lookup[x]] += 1
  285.                 else:
  286.                         lookup[x] = x
  287.                         result[x] = 1
  288.         return result
  289.  
  290.  
  291. def free_blocks(blocks):
  292.         result = {}
  293.         if 1 not in blocks:
  294.                 if not blocks:         result[1] = 0
  295.                 elif len(blocks) == 0: result[1] = 0
  296.                 else:                  result[1] = min(blocks.keys())-1
  297.         for x in blocks:
  298.                 if not x+1 in blocks: result[x+1] = 0
  299.                 if not x-1 in blocks:
  300.                         if not len(result): continue
  301.                         block = max(result.keys())
  302.                         result[block] = x-block
  303.         return result
  304.  
  305. if __name__ == "__main__":
  306.         ping_reporter.start_log(log,logging.DEBUG)
  307.         server = ping.select_server(log,1)
  308.  
  309.         from ping_reporter import humanize_bytes
  310.         try:
  311.                 PS = PingServer(server,4)
  312.                 PS.debug = 1
  313.                 PS.setup()
  314.                 PS.start()
  315.                 if 0:
  316.                         print 'traffic:',ping.ping_count,'pings ('+humanize_bytes(ping.ping_bandwidth)+')'
  317.                         PS.read_block(2,print_block)
  318.                         time.sleep(2)
  319.                         PS.write_block(2,'coconut')
  320.                         time.sleep(1)
  321.                         print 'traffic:',ping.ping_count,'pings ('+humanize_bytes(ping.ping_bandwidth)+')'
  322.  
  323.                         PS.write_block(1,'apples')
  324.                         PS.read_block(1,print_block)
  325.                         PS.read_block(1,print_block)
  326.                         time.sleep(2)
  327.                         print 'traffic:',ping.ping_count,'pings ('+humanize_bytes(ping.ping_bandwidth)+')'
  328.                        
  329.                         log.info('testing block metrics')
  330.                         blocks = live_blocks(PS)
  331.                         log.debug('blocks: %s'%blocks)
  332.                         log.debug('--used: %s'%used_blocks(blocks))
  333.                         log.debug('--free: %s'%free_blocks(blocks))
  334.                                
  335.                         PS.delete_block(1)
  336.                         time.sleep(2)
  337.                         print 'traffic:',ping.ping_count,'pings ('+humanize_bytes(ping.ping_bandwidth)+')'
  338.                         PS.write_block(1,'apples')
  339.                         time.sleep(2)
  340.                         PS.read_block(1,print_block)
  341.                         time.sleep(4)
  342.                         PS.read_block(1,print_block)
  343.                         time.sleep(1)
  344.                         PS.write_block(1,'bananas')
  345.                         time.sleep(1)
  346.                         PS.read_block(1,print_block)
  347.                         time.sleep(1)
  348.                         PS.read_block(1,print_block)
  349.                         PS.read_block(1,print_block)
  350.                         time.sleep(1)
  351.                         PS.delete_block(1)
  352.                         print 'traffic:',ping.ping_count,'pings ('+humanize_bytes(ping.ping_bandwidth)+')'
  353.                         while True:
  354.                                 time.sleep(1)
  355.  
  356.                 if 1:
  357.                         a = 2500
  358.                         time.sleep(2)
  359.                         log.debug('%s: writing %d blocks'%(time.time(),a))
  360.                         events = PS.write_blocks(range(1,a+1),['AAAA']*a,False)
  361.                         log.debug('%s: inserted writes'%(time.time()))
  362.  
  363.                         #import ping_reporter
  364.                         #PR = ping_reporter.PingReporter(log,PS,5)
  365.                         #PR.start()
  366.                         def __read_callback(ID, data, data_store):
  367.                                 data_store[ID] = data
  368.                         def test_block_read(PS,rData,count):
  369.                                 data = {}
  370.                                 blob = ''
  371.                                 log.debug('%s: reading %d blocks'%(time.time(),a))
  372.                                 PS.read_blocks(range(1,a+1), __read_callback, [data], True)
  373.                                 log.debug('%s: completed read'%(time.time()))
  374.  
  375.                                 for i in range(1,a+1): blob = blob + data[i]
  376.  
  377.                                 missing = 0
  378.                                 for i in range(1,a+1):
  379.                                         if data[i] == '\0'*len(data[i]):
  380.                                                 missing += 1
  381.  
  382.                                 if blob == rData:
  383.                                         log.trace('block read successfully')
  384.                                         return True
  385.                                 else:
  386.                                         log.error('block read failed: data corrupted')
  387.                                         log.error('%d of %d blocks missing'%(missing,count))
  388.                                         return False
  389.                         data = 'A'*4*a
  390.                         for i in range(12):
  391.                                 time.sleep(5)
  392.                                 if not test_block_read(PS,data,a): break
  393.                 print 'terminate'
  394.         except KeyboardInterrupt:
  395.                 print "Keyboard Interrupt"
  396.         except Exception:
  397.                 print 'General Exception'
  398.                 from traceback import print_exc
  399.                 print_exc()
  400.         finally:
  401.                 PS.stop()
  402.                 print 'traffic:',ping.ping_count,'pings ('+humanize_bytes(ping.ping_bandwidth)+')'
  403.                 sys.exit(1)
RAW Paste Data
We use cookies for various purposes including analytics. By continuing to use Pastebin, you agree to our use of cookies as described in the Cookies Policy. OK, I Understand
 
Top