Advertisement
Guest User

Ping_Disk

a guest
Mar 31st, 2012
619
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 9.74 KB | None | 0 0
  1. import ping, threading, time, socket, select, sys, struct, logging
  2. import binascii, threading, collections, math, random
  3. import ping, ping_server, ping_reporter
  4.  
  5. log = ping_reporter.setup_log('PingDisk')
  6.  
  7. class PingDisk():
  8.     def __init__(self, d_addr, block_size=1024, timeout=2):
  9.         #self.server = ping_server.PingServerInterface(d_addr,block_size,timeout)
  10.         self.server = ping_server.PingServer(d_addr,block_size,timeout)
  11.         self.server.setup()
  12.         self.server.start()
  13.  
  14.     def stop(self):
  15.         self.server.stop()
  16.  
  17.     def size(self):
  18.         return self.server.block_size * (1<<28)
  19.  
  20.     def block_size(self):
  21.         return self.server.block_size
  22.  
  23.     def region_size(self):
  24.         return max(2,4096/self.block_size())
  25.  
  26.     def read_block(self, ID, datastore, blocking=False):
  27.         event = self.server.read_block(ID, self.__read_callback, datastore, False)
  28.         if not blocking: return event
  29.         event.wait()
  30.  
  31.     def read_block_sync(self, ID):
  32.         data = {}
  33.         self.read_block(ID,[data],True)
  34.         return data[ID]
  35.  
  36.     def read_blocks(self, init_block, fini_block):
  37.         data = {}
  38.         events = []
  39.         result = ''
  40.         blocks = range(init_block,fini_block+1)
  41.         log.debug('PingDisk::read_blocks: blocks %d-%d'%(init_block,fini_block))
  42.         for x in blocks: events.append(self.read_block(x,[data]))
  43.         for x in events: x.wait()
  44.         for x in blocks: result = result + data[x]
  45.         return result
  46.  
  47.     def __read_callback(self, ID, data, data_store):
  48.         log.trace('PingDisk::read::callback: ID=%d bytes=%d'%(ID,len(data)))
  49.         if ID in data_store: log.error('PingDisk::read::overlap on %d'%ID)
  50.         data_store[ID] = data
  51.  
  52.     def read(self, index, length):
  53.         endex = index + length
  54.         init_index = (index % self.server.block_size)
  55.         fini_index = init_index + length
  56.         init_block = (index / self.server.block_size) + 1
  57.         fini_block = (endex / self.server.block_size) + 1
  58.  
  59.         if 0 == endex % self.server.block_size:
  60.             fini_block = max(init_block,fini_block-1)
  61.         data = self.read_blocks(init_block,fini_block)
  62.         return data[init_index:fini_index]
  63.  
  64.     def read_min(self, index, length):
  65.         length = max(length, self.server.block_size)
  66.         return self.read(index,length)
  67.  
  68.     def __block_merge(self, old_data, new_data, index = 0):
  69.         if index >= self.server.block_size: raise Exception('block_merge: invalid index ('+str(index)+')')
  70.         old_data = old_data[:self.server.block_size]
  71.         new_data = new_data[:self.server.block_size-index]
  72.         data = old_data[:index] + new_data + old_data[index+len(new_data):]
  73.         return data
  74.  
  75.     def write_block(self, ID, data, blocking=False):
  76.         #log.trace('PingDisk::write_block: ID=%d bytes=%d'%(ID,len(data)))
  77.         return self.server.write_block(ID,data,blocking)
  78.  
  79.     def write_blocks(self, index, data):
  80.         endex = index + len(data)
  81.         block_size = self.server.block_size
  82.         init_index = (index % self.server.block_size)
  83.         fini_index = (endex % self.server.block_size)
  84.         init_block = (index / self.server.block_size) + 1 # byte 0 is in block 1
  85.         fini_block = (endex / self.server.block_size) + 1
  86.         log.debug('PingDisk::write_blocks: blocks %d-%d'%(init_block,fini_block))
  87.  
  88.         events = []
  89.         if init_index == 0:
  90.             start_block = data[:block_size]
  91.         else:
  92.             start_block = self.read_block_sync(init_block)
  93.             start_block = self.__block_merge(start_block,data,init_index)
  94.         events.append(self.write_block(init_block,start_block))
  95.         if init_block == fini_block: return events
  96.  
  97.         data = data[self.server.block_size - init_index:]
  98.         for x in range(init_block+1,fini_block):
  99.             events.append(self.write_block(x,data[:block_size]))
  100.             data = data[block_size:]
  101.        
  102.         if fini_index != 0:
  103.             end_block = self.read_block_sync(fini_block)
  104.             end_block = self.__block_merge(end_block,data,0)
  105.             events.append(self.write_block(fini_block,end_block))
  106.         return events
  107.  
  108.     def write(self, index, data, blocking=True):
  109.         events = self.write_blocks(index,data)
  110.         if not blocking: return events
  111.         for x in events: x.wait()
  112.  
  113.     def delete_blocks(self, index, length):
  114.         endex = index + length
  115.         init_block = (index / self.server.block_size) + 1 # byte 0 is in block 1
  116.         fini_block = (endex / self.server.block_size) + 1
  117.         log.debug('PingDisk::delete_blocks: blocks %d-%d'%(init_block,fini_block))
  118.  
  119.         events = []
  120.         for x in range(init_block,fini_block+1):
  121.             events.append(self.server.delete_block(x))
  122.         return events
  123.  
  124.     # delete operates at block-level boundaries
  125.     def delete(self, index, length, blocking=False):
  126.         log.debug('PingDisk::delete: index=%d for %d bytes'%(index,length))
  127.         events = self.delete_blocks(index,length)
  128.         if not blocking: return events
  129.         for x in events: x.wait()
  130.  
  131.     def free_blocks(self, timeout=None):
  132.         blocks = ping_server.live_blocks(self.server,timeout)
  133.         log.debug('live_blocks: %s'%blocks)
  134.         return ping_server.free_blocks(blocks)
  135.  
  136.     def used_blocks(self, timeout=None):
  137.         blocks = ping_server.live_blocks(self.server,timeout)
  138.         log.debug('used_blocks: %s'%blocks)
  139.         if blocks: return ping_server.used_blocks(blocks)
  140.         return {}
  141.  
  142.     def get_block_region(self, blocks=1, timeout=None):
  143.         free = self.free_blocks(timeout)
  144.         log.debug('get_block_region: %d blocks'%(blocks))
  145.         log.debug('frees -> %s'%free)
  146.         if not free: return None
  147.         max_blockid = (1<<28)
  148.  
  149.         # 1) allocate an encompassing span of regions
  150.         top_node = max(free.keys())
  151.         reg_size = self.region_size() * int(1 + blocks/self.region_size())
  152.         top_node = reg_size * int(1 + top_node/reg_size)
  153.         if max_blockid - top_node > reg_size: return top_node
  154.  
  155.         # 2)try minimal sufficiently large region
  156.         region = [(v,k) for (k,v) in free.iteritems() if v >= blocks]
  157.         if region: return min(region)[1]
  158.         return None
  159.  
  160.     def timeout(self):              return self.server.timeout()
  161.     def safe_timeout(self):         return self.server.safe_timeout()
  162.     def byte_to_block(self, byte)return int(math.ceil(1.0*byte/self.block_size()))
  163.     def block_to_byte(self, block): return block * self.block_size()
  164.  
  165.     def get_region(self, bytes, timeout=None, target=None):
  166.         log.debug('get_region: %d bytes'%(bytes))
  167.         if not timeout: timeout = self.safe_timeout()
  168.         if target: return self.test_region(target,bytes,timeout)
  169.         block = self.byte_to_block(bytes)              # to blocks
  170.         region = self.get_block_region(block,timeout)  # <------>
  171.         if region: region = self.block_to_byte(region) # to bytes
  172.         log.debug('get_region: allocated region %d (%d bytes)'%(region,bytes))
  173.         return region
  174.  
  175.     def test_region(self, start, end, length, timeout=None):
  176.         if not timeout: timeout = self.safe_timeout()
  177.         log.debug('test_region: region=%d length=%d -> %d'%(start,end,length))
  178.         if length < end: return start # smaller block
  179.         collision = [start+end-1,start+length-1]
  180.         collision2 = [collision[0],collision[1]]
  181.         collision[0] = self.byte_to_block(collision[0])
  182.         collision[1] = self.byte_to_block(collision[1])
  183.         if collision[0] == collision[1]: return start # same block
  184.         used = self.used_blocks(timeout)
  185.         if not used: # 0 used blocks implies no root directory...
  186.             log.exception('test_region: used blocks returned nil')
  187.             raise Exception('test_region: used blocks returned nil')
  188.  
  189.         for x in used:
  190.             if x <= collision[0]: continue # used block before test region (no collision)
  191.             if x  > collision[1]: continue # used block begins after collision space
  192.             log.debug('test_region: collision at node %d'%x)
  193.             return False
  194.         return start
  195.        
  196.  
  197. if __name__ == "__main__":
  198.     Disk = None
  199.     try:
  200.         #ping_reporter.start_log(log,logging.DEBUG)
  201.         ping_reporter.enableAllLogs(logging.DEBUG)
  202.         server = ping.select_server(log)
  203.         Disk = PingDisk(server,4)
  204.         #ping.drop_privileges()
  205.  
  206.         if 0:
  207.             data = "1234567890123456789_123456789012345"
  208.             log.info('blind disk read')
  209.             rData = Disk.read(0,50)
  210.             log.info('1-length: 50 requested -> %d'%(len(rData)))
  211.             if rData != '\0'*50: log.exception('invalid rData: %s'%rData)
  212.             else: log.info('success')
  213.             time.sleep(5)
  214.  
  215.             log.info('writing %d bytes'%len(data))
  216.             Disk.write(0,data)
  217.             time.sleep(5)
  218.             rData = Disk.read(0,len(data))
  219.             log.info('2-length: %d vs %d'%(len(data),len(rData)))
  220.             if rData != data: log.exception('invalid rData: %s'%rData)
  221.             else: log.info('success')
  222.  
  223.             wData = 'abcdefghijk'
  224.             Disk.write(10,wData)
  225.             time.sleep(2)
  226.             rData = Disk.read(0,len(data))
  227.             data = data[0:10] + wData + data[10+len(wData):]
  228.             log.info('3-length: %d vs %d'%(len(data),len(rData)))
  229.             if rData != data: log.exception('invalid rData: %s'%rData)
  230.             else: log.info('success')
  231.  
  232.             time.sleep(2)
  233.             data = data[2:] + '\0\0'
  234.             rData = Disk.read(2,len(data))
  235.             log.info('4-length: %d vs %d'%(len(data),len(rData)))
  236.             if rData != data: log.exception('invalid rData: %s'%rData)
  237.             else: log.info('success')
  238.  
  239.             time.sleep(2)
  240.             data = data[0:10]
  241.             rData = Disk.read(2,10)
  242.             log.info('5-length: %d vs %d'%(len(data),len(rData)))
  243.             if rData != data: log.exception('invalid rData: %s'%rData)
  244.             else: log.info('success')
  245.  
  246.         if 1:
  247.             #free_node = Disk.get_region(1500)
  248.             #log.info('get_region = %s'%free_node)
  249.  
  250.             strA = 'A'
  251.             strB = 'B'*4096*3
  252.             Disk.write(0,strA,False)
  253.             Disk.write(5000,strB,False)
  254.  
  255.             time.sleep(1)
  256.             #log.info('region A [%d] and B [%d] written'%(len(strA),len(strB)))
  257.             readA = Disk.read(0,len(strA))
  258.             readB = Disk.read(5000,len(strB))
  259.             log.info('region A and B read')
  260.             if readA != strA: log.error('corruption in region A (%d bytes)'%len(readA))
  261.             else:             log.debug('region A read successfully')
  262.             if readB != strB: log.error('corruption in region B (%d bytes)'%len(readB))
  263.             else:             log.debug('region B read successfully')
  264.  
  265.         time.sleep(1)
  266.         print 'terminate'
  267.     except KeyboardInterrupt:
  268.         print "Keyboard Interrupt"
  269.     except Exception:
  270.         print 'General Exception'
  271.         from traceback import print_exc
  272.         print_exc()
  273.     finally:
  274.         if Disk: Disk.stop()
  275.         log.info('traffic: %d pings (%s)'
  276.                 %(ping.ping_count,ping_reporter.humanize_bytes(ping.ping_bandwidth)))
  277.         sys.exit(1)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement