Advertisement
Guest User

Untitled

a guest
Apr 24th, 2017
59
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 4.41 KB | None | 0 0
  1. from glob import glob
  2. import multiprocessing
  3. import itertools
  4. import time
  5. import os
  6.  
  7.  
  8. ###################################################################
  9. # Setup
  10. # -----------------------------------------------------------------
  11. # We are going to be testing different ways that we might improve
  12. # speed of downloading layers. These are Docker layers for
  13. # dumping into Singularity images. The back end client of
  14. # Singularity has a Python API to handle these bits.
  15. #
  16. ###################################################################
  17.  
  18.  
  19. # Let's clear cache between each one
  20. # cd libexec/python --> ipython
  21.  
  22. from sutils import get_cache
  23. cache = get_cache('docker')
  24.  
  25. # Function to clean cache
  26. def clean_cache(cache=None):
  27. if cache is None:
  28. cache = get_cache('docker')
  29. files = glob("%s/*" %cache)
  30. [os.remove(filey) for filey in files]
  31.  
  32. from docker.api import DockerApiConnection
  33. client = DockerApiConnection(image="ubuntu:latest")
  34. images = client.get_images()
  35.  
  36.  
  37. # We will give the multiprocessor a function to download a layer
  38. def download_layer(client,image_id,cache_base,prefix):
  39. targz = client.get_layer(image_id=image_id,
  40. download_folder=cache_base,
  41. prefix=prefix)
  42. client.update_token()
  43. return targz
  44.  
  45.  
  46. # Function to download layers
  47. def test_serial_download():
  48. clean_cache()
  49. layers = []
  50. for ii in range(len(images)):
  51. image_id = images[ii]
  52. targz = "%s/%s.tar.gz" %(cache,image_id)
  53. prefix = "[%s/%s] Download" %((ii+1),len(images))
  54. if not os.path.exists(targz):
  55. targz = download_layer(image_id=image_id,
  56. client=client,
  57. cache_base=cache,
  58. prefix=prefix)
  59. layers.append(targz)
  60. return layers
  61.  
  62. ###################################################################
  63. # Serial
  64. # -----------------------------------------------------------------
  65. # A serial download is the base case - one process, one thread,
  66. # one after the other like tic tacs out of a box.
  67. ###################################################################
  68.  
  69.  
  70. serial_times = []
  71. for ii in range(0,10):
  72. print("STARTING SERIAL TIMING: ITERATION %s" %(ii))
  73. start_time = time.time()
  74. layers = test_serial_download()
  75. end_time = time.time()
  76. final_time = end_time - start_time
  77. print("Serial download time:", final_time)
  78. serial_times.append(final_time)
  79.  
  80. # E.g.:
  81. # Serial download time: 42.460509061813354
  82. serial_times
  83. # [40.7736291885376,
  84. # 40.60954689979553,
  85. # 40.79848909378052,
  86. # 39.37337779998779,
  87. # 39.85921502113342,
  88. # 40.12239909172058,
  89. # 40.35327935218811,
  90. # 40.116194009780884,
  91. # 40.140629053115845,
  92. # 39.861605644226074]
  93. # Just to note, this is easily 7-9 seconds faster than I observed
  94. # on my apartment connection.
  95.  
  96. ###################################################################
  97. # Multiprocessing
  98. ###################################################################
  99.  
  100. # Define number of workers to be 2*CPU
  101. NUM_WORKERS = multiprocessing.cpu_count()*2
  102.  
  103. inputs = []
  104. for ii in range(len(images)):
  105. image_id = images[ii]
  106. targz = "%s/%s.tar.gz" %(cache,image_id)
  107. prefix = "[%s/%s] Download" %((ii+1),len(images))
  108. kwargs = {'image_id':image_id,'prefix':prefix,'client':client,'cache_base':cache}
  109. inputs.append(kwargs)
  110.  
  111. def multi_run_wrapper(args):
  112. return download_layer(**args)
  113.  
  114.  
  115. # Let's just dump our inputs into a list
  116. clean_cache()
  117.  
  118. # Add the calls to the task queue
  119. multi_times = []
  120. for ii in range(0,10):
  121. print("STARTING MULTI TIMING: ITERATION %s" %(ii))
  122. start_time = time.time()
  123. pool = multiprocessing.Pool(processes=NUM_WORKERS)
  124. results = pool.map(multi_run_wrapper,inputs)
  125. end_time = time.time()
  126. final_time = end_time - start_time
  127. print("Multiprocessing time:", final_time)
  128. multi_times.append(final_time)
  129.  
  130. # [38.58448839187622,
  131. # 36.93542289733887,
  132. # 36.940842390060425,
  133. # 36.46030259132385,
  134. # 36.53814649581909,
  135. # 36.838619232177734,
  136. # 38.17390704154968,
  137. # 36.28799319267273,
  138. # 35.855493783950806,
  139. # 39.255677461624146]
  140.  
  141.  
  142. # Now test varying workers
  143. times=dict()
  144. for workers in range(1,30):
  145. start_time = time.time()
  146. pool = multiprocessing.Pool(processes=workers)
  147. results = pool.map(multi_run_wrapper,inputs)
  148. end_time = time.time()
  149. final_time = end_time - start_time
  150. print("Multiprocessing time:", final_time)
  151. times[workers] = final_time
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement