Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import vaex
- import numpy as np
- import dask.dataframe as dd
- import dask
- import dask.distributed
- import json
- import os
- import time
- import argparse
- import multiprocessing
- default_filename = 'string_benchmark.hdf5'
- parser = argparse.ArgumentParser('bench.py')
- parser.add_argument('--number', "-n", dest="n", type=float, default=7,
- help="log number of rows to use")
- parser.add_argument('--partitions', type=int, default=multiprocessing.cpu_count() * 2,
- help="number of partitions to split (default: 2x number cores)")
- parser.add_argument('--npandas', dest="npandas", type=float, default=7,
- help="number of rows to use for pandas")
- parser.add_argument('--filter', dest="filter", default=None,
- help="filter for benchmark")
- parser.add_argument('--filename', default=default_filename,
- help='filename to use for benchmark export/reading')
- parser.add_argument('--backend', default='vaex',
- help='The backend to test {vaex, dask, pandas}')
- args = parser.parse_args()
- timings = {}
- def mytimeit(expr, N, scope):
- times = []
- for i in range(N):
- t0 = time.time()
- eval(expr, scope)
- times.append(time.time() - t0)
- if args.backend == 'dask':
- # Give time for dask's GC to run
- time.sleep(1.0)
- return times
- def vaex_nop(df):
- df.nop()
- def dask_nop(df):
- # We use `persist` here instead of `compute`. It is uncommon to call
- # `compute` on large dataframes in dask, since that will pull the large
- # results back to the client process (a potentially expensive process).
- # Rather we call `persist` to do all the operations but leave the data on
- # the workers. I believe this is a more fair comparison to vaex's `nop`
- dask.distributed.wait(df.persist())
- def pandas_nop(df):
- pass
- if __name__ == '__main__':
- if not os.path.exists(args.filename):
- s = np.arange(0, int(10**args.n)).astype(str)
- df_vaex = vaex.from_arrays(x=s, s=s)
- print("Writing file")
- df_vaex.export(args.filename, progress=True, shuffle=True)
- del df_vaex
- df_vaex = vaex.open(args.filename)
- if args.backend == 'vaex':
- df = df_vaex
- df.executor.buffer_size = len(df) // args.partitions
- scope = {'df': df, 'nop': vaex_nop}
- elif args.backend == 'dask':
- # Start a local cluster with 1 thread per process (nprocesses = ncores
- # by default)
- dask.distributed.Client(threads_per_worker=1)
- df_pandas = df_vaex.to_pandas_df()
- # Load the data on the cluster already, to be fair in comparison to vaex
- df = dd.from_pandas(df_pandas, npartitions=args.partitions).persist()
- del df_pandas
- scope = {'df': df, 'nop': dask_nop}
- elif args.backend == 'pandas':
- df = df_vaex.to_pandas_df()
- scope = {'df': df, 'nop': pandas_nop}
- else:
- raise ValueError("Unknown backend %s" % args.backend)
- del df_vaex
- def test(name, expr):
- if args.filter and args.filter not in name:
- return
- print(name)
- results = mytimeit('nop(%s)' % expr, 5, scope=scope)
- t = min(results) / (10 ** args.n)
- timings[name] = t
- print("Benchmarking %s" % args.backend)
- test('capitalize', 'df.s.str.capitalize()')
- test('cat', 'df.s.str.cat(df.s)')
- test('contains', 'df.s.str.contains("9", regex=False)')
- test('contains(regex)', 'df.s.str.contains("9", regex=True)')
- test('count', 'df.s.str.count("9")')
- test('endswith', 'df.s.str.endswith("9")')
- test('find', 'df.s.str.find("4")')
- test('get', 'df.s.str.get(1)')
- test('split+join', 'df.s.str.split(".").str.join("-")')
- test('len', 'df.s.str.len()')
- test('ljust', 'df.s.str.ljust(10)')
- test('lower', 'df.s.str.lower()')
- test('lstrip', 'df.s.str.lstrip("9")')
- test('match', 'df.s.str.match("1.*")')
- test('pad', 'df.s.str.pad(10)')
- test('repeat', 'df.s.str.repeat(2)')
- test('replace(default)', 'df.s.str.replace("123", "321")')
- test('replace(no regex)', 'df.s.str.replace("123", "321", regex=False)')
- test('replace(regex)', 'df.s.str.replace("1?[45]4", "1004", regex=True)')
- test('rfind', 'df.s.str.rfind("4")')
- test('rjust', 'df.s.str.rjust(10)')
- test('rstrip', 'df.s.str.rstrip("9")')
- test('slice', 'df.s.str.slice(1, 3)')
- test('split', 'df.s.str.split(".")')
- test('startswith', 'df.s.str.startswith("9")')
- test('strip', 'df.s.str.strip("0")') # issues?
- test('title', 'df.s.str.title()')
- test('upper', 'df.s.str.upper()')
- test('zfill', 'df.s.str.zfill(10)')
- fn = "%s.json" % args.backend
- with open(fn, "w") as f:
- json.dump(timings, f)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement