SHARE
TWEET

Ping_Disk

a guest Mar 31st, 2012 358 Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
  1. import ping, threading, time, socket, select, sys, struct, logging
  2. import binascii, threading, collections, math, random
  3. import ping, ping_server, ping_reporter
  4.  
  5. log = ping_reporter.setup_log('PingDisk')
  6.  
  7. class PingDisk():
  8.         def __init__(self, d_addr, block_size=1024, timeout=2):
  9.                 #self.server = ping_server.PingServerInterface(d_addr,block_size,timeout)
  10.                 self.server = ping_server.PingServer(d_addr,block_size,timeout)
  11.                 self.server.setup()
  12.                 self.server.start()
  13.  
  14.         def stop(self):
  15.                 self.server.stop()
  16.  
  17.         def size(self):
  18.                 return self.server.block_size * (1<<28)
  19.  
  20.         def block_size(self):
  21.                 return self.server.block_size
  22.  
  23.         def region_size(self):
  24.                 return max(2,4096/self.block_size())
  25.  
  26.         def read_block(self, ID, datastore, blocking=False):
  27.                 event = self.server.read_block(ID, self.__read_callback, datastore, False)
  28.                 if not blocking: return event
  29.                 event.wait()
  30.  
  31.         def read_block_sync(self, ID):
  32.                 data = {}
  33.                 self.read_block(ID,[data],True)
  34.                 return data[ID]
  35.  
  36.         def read_blocks(self, init_block, fini_block):
  37.                 data = {}
  38.                 events = []
  39.                 result = ''
  40.                 blocks = range(init_block,fini_block+1)
  41.                 log.debug('PingDisk::read_blocks: blocks %d-%d'%(init_block,fini_block))
  42.                 for x in blocks: events.append(self.read_block(x,[data]))
  43.                 for x in events: x.wait()
  44.                 for x in blocks: result = result + data[x]
  45.                 return result
  46.  
  47.         def __read_callback(self, ID, data, data_store):
  48.                 log.trace('PingDisk::read::callback: ID=%d bytes=%d'%(ID,len(data)))
  49.                 if ID in data_store: log.error('PingDisk::read::overlap on %d'%ID)
  50.                 data_store[ID] = data
  51.  
  52.         def read(self, index, length):
  53.                 endex = index + length
  54.                 init_index = (index % self.server.block_size)
  55.                 fini_index = init_index + length
  56.                 init_block = (index / self.server.block_size) + 1
  57.                 fini_block = (endex / self.server.block_size) + 1
  58.  
  59.                 if 0 == endex % self.server.block_size:
  60.                         fini_block = max(init_block,fini_block-1)
  61.                 data = self.read_blocks(init_block,fini_block)
  62.                 return data[init_index:fini_index]
  63.  
  64.         def read_min(self, index, length):
  65.                 length = max(length, self.server.block_size)
  66.                 return self.read(index,length)
  67.  
  68.         def __block_merge(self, old_data, new_data, index = 0):
  69.                 if index >= self.server.block_size: raise Exception('block_merge: invalid index ('+str(index)+')')
  70.                 old_data = old_data[:self.server.block_size]
  71.                 new_data = new_data[:self.server.block_size-index]
  72.                 data = old_data[:index] + new_data + old_data[index+len(new_data):]
  73.                 return data
  74.  
  75.         def write_block(self, ID, data, blocking=False):
  76.                 #log.trace('PingDisk::write_block: ID=%d bytes=%d'%(ID,len(data)))
  77.                 return self.server.write_block(ID,data,blocking)
  78.  
  79.         def write_blocks(self, index, data):
  80.                 endex = index + len(data)
  81.                 block_size = self.server.block_size
  82.                 init_index = (index % self.server.block_size)
  83.                 fini_index = (endex % self.server.block_size)
  84.                 init_block = (index / self.server.block_size) + 1 # byte 0 is in block 1
  85.                 fini_block = (endex / self.server.block_size) + 1
  86.                 log.debug('PingDisk::write_blocks: blocks %d-%d'%(init_block,fini_block))
  87.  
  88.                 events = []
  89.                 if init_index == 0:
  90.                         start_block = data[:block_size]
  91.                 else:
  92.                         start_block = self.read_block_sync(init_block)
  93.                         start_block = self.__block_merge(start_block,data,init_index)
  94.                 events.append(self.write_block(init_block,start_block))
  95.                 if init_block == fini_block: return events
  96.  
  97.                 data = data[self.server.block_size - init_index:]
  98.                 for x in range(init_block+1,fini_block):
  99.                         events.append(self.write_block(x,data[:block_size]))
  100.                         data = data[block_size:]
  101.                
  102.                 if fini_index != 0:
  103.                         end_block = self.read_block_sync(fini_block)
  104.                         end_block = self.__block_merge(end_block,data,0)
  105.                         events.append(self.write_block(fini_block,end_block))
  106.                 return events
  107.  
  108.         def write(self, index, data, blocking=True):
  109.                 events = self.write_blocks(index,data)
  110.                 if not blocking: return events
  111.                 for x in events: x.wait()
  112.  
  113.         def delete_blocks(self, index, length):
  114.                 endex = index + length
  115.                 init_block = (index / self.server.block_size) + 1 # byte 0 is in block 1
  116.                 fini_block = (endex / self.server.block_size) + 1
  117.                 log.debug('PingDisk::delete_blocks: blocks %d-%d'%(init_block,fini_block))
  118.  
  119.                 events = []
  120.                 for x in range(init_block,fini_block+1):
  121.                         events.append(self.server.delete_block(x))
  122.                 return events
  123.  
  124.         # delete operates at block-level boundaries
  125.         def delete(self, index, length, blocking=False):
  126.                 log.debug('PingDisk::delete: index=%d for %d bytes'%(index,length))
  127.                 events = self.delete_blocks(index,length)
  128.                 if not blocking: return events
  129.                 for x in events: x.wait()
  130.  
  131.         def free_blocks(self, timeout=None):
  132.                 blocks = ping_server.live_blocks(self.server,timeout)
  133.                 log.debug('live_blocks: %s'%blocks)
  134.                 return ping_server.free_blocks(blocks)
  135.  
  136.         def used_blocks(self, timeout=None):
  137.                 blocks = ping_server.live_blocks(self.server,timeout)
  138.                 log.debug('used_blocks: %s'%blocks)
  139.                 if blocks: return ping_server.used_blocks(blocks)
  140.                 return {}
  141.  
  142.         def get_block_region(self, blocks=1, timeout=None):
  143.                 free = self.free_blocks(timeout)
  144.                 log.debug('get_block_region: %d blocks'%(blocks))
  145.                 log.debug('frees -> %s'%free)
  146.                 if not free: return None
  147.                 max_blockid = (1<<28)
  148.  
  149.                 # 1) allocate an encompassing span of regions
  150.                 top_node = max(free.keys())
  151.                 reg_size = self.region_size() * int(1 + blocks/self.region_size())
  152.                 top_node = reg_size * int(1 + top_node/reg_size)
  153.                 if max_blockid - top_node > reg_size: return top_node
  154.  
  155.                 # 2)try minimal sufficiently large region
  156.                 region = [(v,k) for (k,v) in free.iteritems() if v >= blocks]
  157.                 if region: return min(region)[1]
  158.                 return None
  159.  
  160.         def timeout(self):                              return self.server.timeout()
  161.         def safe_timeout(self):                 return self.server.safe_timeout()
  162.         def byte_to_block(self, byte)return int(math.ceil(1.0*byte/self.block_size()))
  163.         def block_to_byte(self, block): return block * self.block_size()
  164.  
  165.         def get_region(self, bytes, timeout=None, target=None):
  166.                 log.debug('get_region: %d bytes'%(bytes))
  167.                 if not timeout: timeout = self.safe_timeout()
  168.                 if target: return self.test_region(target,bytes,timeout)
  169.                 block = self.byte_to_block(bytes)              # to blocks
  170.                 region = self.get_block_region(block,timeout)  # <------>
  171.                 if region: region = self.block_to_byte(region) # to bytes
  172.                 log.debug('get_region: allocated region %d (%d bytes)'%(region,bytes))
  173.                 return region
  174.  
  175.         def test_region(self, start, end, length, timeout=None):
  176.                 if not timeout: timeout = self.safe_timeout()
  177.                 log.debug('test_region: region=%d length=%d -> %d'%(start,end,length))
  178.                 if length < end: return start # smaller block
  179.                 collision = [start+end-1,start+length-1]
  180.                 collision2 = [collision[0],collision[1]]
  181.                 collision[0] = self.byte_to_block(collision[0])
  182.                 collision[1] = self.byte_to_block(collision[1])
  183.                 if collision[0] == collision[1]: return start # same block
  184.                 used = self.used_blocks(timeout)
  185.                 if not used: # 0 used blocks implies no root directory...
  186.                         log.exception('test_region: used blocks returned nil')
  187.                         raise Exception('test_region: used blocks returned nil')
  188.  
  189.                 for x in used:
  190.                         if x <= collision[0]: continue # used block before test region (no collision)
  191.                         if x  > collision[1]: continue # used block begins after collision space
  192.                         log.debug('test_region: collision at node %d'%x)
  193.                         return False
  194.                 return start
  195.                
  196.  
  197. if __name__ == "__main__":
  198.         Disk = None
  199.         try:
  200.                 #ping_reporter.start_log(log,logging.DEBUG)
  201.                 ping_reporter.enableAllLogs(logging.DEBUG)
  202.                 server = ping.select_server(log)
  203.                 Disk = PingDisk(server,4)
  204.                 #ping.drop_privileges()
  205.  
  206.                 if 0:
  207.                         data = "1234567890123456789_123456789012345"
  208.                         log.info('blind disk read')
  209.                         rData = Disk.read(0,50)
  210.                         log.info('1-length: 50 requested -> %d'%(len(rData)))
  211.                         if rData != '\0'*50: log.exception('invalid rData: %s'%rData)
  212.                         else: log.info('success')
  213.                         time.sleep(5)
  214.  
  215.                         log.info('writing %d bytes'%len(data))
  216.                         Disk.write(0,data)
  217.                         time.sleep(5)
  218.                         rData = Disk.read(0,len(data))
  219.                         log.info('2-length: %d vs %d'%(len(data),len(rData)))
  220.                         if rData != data: log.exception('invalid rData: %s'%rData)
  221.                         else: log.info('success')
  222.  
  223.                         wData = 'abcdefghijk'
  224.                         Disk.write(10,wData)
  225.                         time.sleep(2)
  226.                         rData = Disk.read(0,len(data))
  227.                         data = data[0:10] + wData + data[10+len(wData):]
  228.                         log.info('3-length: %d vs %d'%(len(data),len(rData)))
  229.                         if rData != data: log.exception('invalid rData: %s'%rData)
  230.                         else: log.info('success')
  231.  
  232.                         time.sleep(2)
  233.                         data = data[2:] + '\0\0'
  234.                         rData = Disk.read(2,len(data))
  235.                         log.info('4-length: %d vs %d'%(len(data),len(rData)))
  236.                         if rData != data: log.exception('invalid rData: %s'%rData)
  237.                         else: log.info('success')
  238.  
  239.                         time.sleep(2)
  240.                         data = data[0:10]
  241.                         rData = Disk.read(2,10)
  242.                         log.info('5-length: %d vs %d'%(len(data),len(rData)))
  243.                         if rData != data: log.exception('invalid rData: %s'%rData)
  244.                         else: log.info('success')
  245.  
  246.                 if 1:
  247.                         #free_node = Disk.get_region(1500)
  248.                         #log.info('get_region = %s'%free_node)
  249.  
  250.                         strA = 'A'
  251.                         strB = 'B'*4096*3
  252.                         Disk.write(0,strA,False)
  253.                         Disk.write(5000,strB,False)
  254.  
  255.                         time.sleep(1)
  256.                         #log.info('region A [%d] and B [%d] written'%(len(strA),len(strB)))
  257.                         readA = Disk.read(0,len(strA))
  258.                         readB = Disk.read(5000,len(strB))
  259.                         log.info('region A and B read')
  260.                         if readA != strA: log.error('corruption in region A (%d bytes)'%len(readA))
  261.                         else:             log.debug('region A read successfully')
  262.                         if readB != strB: log.error('corruption in region B (%d bytes)'%len(readB))
  263.                         else:             log.debug('region B read successfully')
  264.  
  265.                 time.sleep(1)
  266.                 print 'terminate'
  267.         except KeyboardInterrupt:
  268.                 print "Keyboard Interrupt"
  269.         except Exception:
  270.                 print 'General Exception'
  271.                 from traceback import print_exc
  272.                 print_exc()
  273.         finally:
  274.                 if Disk: Disk.stop()
  275.                 log.info('traffic: %d pings (%s)'
  276.                                 %(ping.ping_count,ping_reporter.humanize_bytes(ping.ping_bandwidth)))
  277.                 sys.exit(1)
RAW Paste Data
We use cookies for various purposes including analytics. By continuing to use Pastebin, you agree to our use of cookies as described in the Cookies Policy. OK, I Understand
 
Top