Advertisement
Guest User

dataTransfer.py

a guest
Mar 20th, 2010
205
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 5.36 KB | None | 0 0
  1. #!/usr/bin/python -u
  2.  
  3. import os, sys
  4. import time
  5. import subprocess
  6. import threading
  7. import select
  8. from optparse import OptionParser
  9.  
  10. gThreading = True
  11.  
  12. class ThreadManager(threading.Thread):
  13.  
  14.     def __init__(self, cmdline):
  15.         self.pool = []
  16.         self.running = []
  17.         self.parallel = 5
  18.         self.lock = threading.Lock()
  19.         self.cmdline = cmdline
  20.         threading.Thread.__init__(self)
  21.  
  22.     def clean_pool(self, pool, join=False):
  23.         for th in [x for x in pool if not x.isAlive()]:
  24.             if join: th.join()
  25.             pool.remove(th)
  26.             del th
  27.         return pool
  28.  
  29.     def spawnThreads(self, threadnum = 5):
  30.         self.pool = []
  31.         self.running = []
  32.         if threadnum: self.parallel = threadnum
  33.         self.addThreads()
  34.  
  35.     def addThreads(self):
  36.         for r in xrange(0, self.parallel):
  37.             self.pool.append(TrickleClass(self.cmdline))
  38.  
  39.     def run(self):
  40.         self.started = True
  41.         parallel = self.parallel
  42.         while len(self.running)+len(self.pool) > 0:
  43.             self.clean_pool(self.running, join=True)
  44.             n = min(max(parallel - len(self.running), 0), len(self.pool))
  45.             if n > 0:
  46.                 for th in self.pool[0:n]: th.start()
  47.                 self.running.extend(self.pool[0:n])
  48.                 del self.pool[0:n]
  49.             time.sleep(.01)
  50.         for th in self.running+self.pool: th.join()
  51.  
  52.     def get_completed_total(self):
  53.         sum = 0
  54.         for thread in self.running:
  55.             sum += thread.get_completed()
  56.         return sum
  57.  
  58.     def isRunning(self):
  59.         return len(self.running) + len(self.pool)
  60.  
  61.  
  62. class TrickleClass(threading.Thread):
  63.     def __init__(self, cmdline):
  64.         self.stdout = None
  65.         self.stderr = None
  66.         self.completed = 0
  67.         self.process = None
  68.         self.pid = 0
  69.         self.data = ""
  70.         self.cmdline = cmdline
  71.         self.lock = threading.RLock()
  72.         threading.Thread.__init__(self)
  73.  
  74.     def run(self):
  75.         try:
  76.             self.process = subprocess.Popen(self.cmdline, stdout=subprocess.PIPE, bufsize=1, shell=False)
  77.         except OSError:
  78.             print "ERROR"
  79.             return
  80.         self.pid = self.process.pid
  81.         while True:
  82.             if self.update_completed():
  83.                 break
  84.  
  85.     def update_completed(self):
  86.         self.lock.acquire()
  87.         try:
  88.             r = select.select([self.process.stdout.fileno()], [], [], 5)[0]
  89.         except:
  90.             r = False
  91.         if r:
  92.             d = os.read(r[0], 1)
  93.             self.data += d
  94.             lastline = self.data.split("\n")[-1]
  95.             if lastline == "NO DATA":
  96.                 gThreading = False
  97.                 return True
  98.             elif lastline == "DONE":
  99.                 return True
  100.             try:
  101.                 self.completed = int(self.data.split("\n")[-2])
  102.             except:
  103.                 self.completed = 0
  104.         self.lock.release()
  105.         return False
  106.            
  107.  
  108.     def get_completed(self):
  109.         return self.completed
  110.  
  111.  
  112. def format_time(seconds):
  113.     m, s = divmod(seconds, 60)
  114.     h, m = divmod(m, 60)
  115.     d, h = divmod(h, 24)
  116.     return "%dd %02d:%02d:%02d" %(d, h, m, s)
  117.  
  118.  
  119. if __name__ == "__main__":
  120.     parser = OptionParser()
  121.     parser.add_option('-n', '--threads', action='store', type="int", help='number of threads', default=10)
  122.     parser.add_option('-e', '--exit', action='store_true', default=False)
  123.     parser.add_option('--php', action='store', type="string", help="Path to php executable", default="php")
  124.     parser.add_option('--index', action='store', type="string", help="Path to index.php", default="")
  125.     parser.add_option('-u', '--update', action='store', type="int", help="Number of records completed between updates, default 10", default=10)
  126.     (options, args) = parser.parse_args()
  127.     if not options.index:
  128.         print "Error: --index must be supplied\n"
  129.         parser.print_help()
  130.         sys.exit()
  131.     t1 = time.time()
  132.     base_complete = 0
  133.     total = 0
  134.     cmdline = [options.php, options.index, "--uri=legacy/count"]
  135.     process = subprocess.Popen(cmdline, stdout=subprocess.PIPE, bufsize=0, shell=False)
  136.     pid = process.pid
  137.     records = float(process.stdout.read())
  138.  
  139.     cmdline = [options.php, options.index, "--uri=/legacy"]
  140.     if options.exit:
  141.         sys.exit()
  142.     while gThreading:
  143.         threadManager = ThreadManager(cmdline)
  144.         threadManager.spawnThreads(options.threads)
  145.         threadManager.start()
  146.         group_total = 0
  147.         while threadManager.isAlive():
  148.             t = threadManager.get_completed_total()
  149.             if t > group_total:
  150.                 group_total = t
  151.                 total += 1
  152.                 if not total % options.update:
  153.                     pct_completed = total / records
  154.                     time_diff = time.time() - t1
  155.                     if time_diff > 0:
  156.                         seconds = (time_diff / pct_completed) - time_diff
  157.                         remaining = "Remaining: %s" %(format_time(seconds))
  158.                         elapsed = "Elasped Time: %s" %(format_time(time_diff))
  159.                         avg = "Avg: %.02f/s"  %(total / time_diff)
  160.                         sys.stdout.write("%s\r" %(" " * 120))
  161.                         sys.stdout.write("%s\t\tTotal Completed: %s\t\t%s\t\t%s\r" % (remaining, total, elapsed, avg))
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement