Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- from glob import glob
- import multiprocessing
- import itertools
- import time
- import os
- ###################################################################
- # Setup
- # -----------------------------------------------------------------
- # We are going to be testing different ways that we might improve
- # speed of downloading layers. These are Docker layers for
- # dumping into Singularity images. The back end client of
- # Singularity has a Python API to handle these bits.
- #
- ###################################################################
- # Let's clear cache between each one
- # cd libexec/python --> ipython
- from sutils import get_cache
- cache = get_cache('docker')
- # Function to clean cache
- def clean_cache(cache=None):
- if cache is None:
- cache = get_cache('docker')
- files = glob("%s/*" %cache)
- [os.remove(filey) for filey in files]
- from docker.api import DockerApiConnection
- client = DockerApiConnection(image="ubuntu:latest")
- images = client.get_images()
- # We will give the multiprocessor a function to download a layer
- def download_layer(client,image_id,cache_base,prefix):
- targz = client.get_layer(image_id=image_id,
- download_folder=cache_base,
- prefix=prefix)
- client.update_token()
- return targz
- # Function to download layers
- def test_serial_download():
- clean_cache()
- layers = []
- for ii in range(len(images)):
- image_id = images[ii]
- targz = "%s/%s.tar.gz" %(cache,image_id)
- prefix = "[%s/%s] Download" %((ii+1),len(images))
- if not os.path.exists(targz):
- targz = download_layer(image_id=image_id,
- client=client,
- cache_base=cache,
- prefix=prefix)
- layers.append(targz)
- return layers
- ###################################################################
- # Serial
- # -----------------------------------------------------------------
- # A serial download is the base case - one process, one thread,
- # one after the other like tic tacs out of a box.
- ###################################################################
- serial_times = []
- for ii in range(0,10):
- print("STARTING SERIAL TIMING: ITERATION %s" %(ii))
- start_time = time.time()
- layers = test_serial_download()
- end_time = time.time()
- final_time = end_time - start_time
- print("Serial download time:", final_time)
- serial_times.append(final_time)
- # E.g.:
- # Serial download time: 42.460509061813354
- serial_times
- # [40.7736291885376,
- # 40.60954689979553,
- # 40.79848909378052,
- # 39.37337779998779,
- # 39.85921502113342,
- # 40.12239909172058,
- # 40.35327935218811,
- # 40.116194009780884,
- # 40.140629053115845,
- # 39.861605644226074]
- # Just to note, this is easily 7-9 seconds faster than I observed
- # on my apartment connection.
- ###################################################################
- # Multiprocessing
- ###################################################################
- # Define number of workers to be 2*CPU
- NUM_WORKERS = multiprocessing.cpu_count()*2
- inputs = []
- for ii in range(len(images)):
- image_id = images[ii]
- targz = "%s/%s.tar.gz" %(cache,image_id)
- prefix = "[%s/%s] Download" %((ii+1),len(images))
- kwargs = {'image_id':image_id,'prefix':prefix,'client':client,'cache_base':cache}
- inputs.append(kwargs)
- def multi_run_wrapper(args):
- return download_layer(**args)
- # Let's just dump our inputs into a list
- clean_cache()
- # Add the calls to the task queue
- multi_times = []
- for ii in range(0,10):
- print("STARTING MULTI TIMING: ITERATION %s" %(ii))
- start_time = time.time()
- pool = multiprocessing.Pool(processes=NUM_WORKERS)
- results = pool.map(multi_run_wrapper,inputs)
- end_time = time.time()
- final_time = end_time - start_time
- print("Multiprocessing time:", final_time)
- multi_times.append(final_time)
- # [38.58448839187622,
- # 36.93542289733887,
- # 36.940842390060425,
- # 36.46030259132385,
- # 36.53814649581909,
- # 36.838619232177734,
- # 38.17390704154968,
- # 36.28799319267273,
- # 35.855493783950806,
- # 39.255677461624146]
- # Now test varying workers
- times=dict()
- for workers in range(1,30):
- start_time = time.time()
- pool = multiprocessing.Pool(processes=workers)
- results = pool.map(multi_run_wrapper,inputs)
- end_time = time.time()
- final_time = end_time - start_time
- print("Multiprocessing time:", final_time)
- times[workers] = final_time
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement