Advertisement
Guest User

Untitled

a guest
Apr 24th, 2019
77
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 4.65 KB | None | 0 0
  1. import vaex
  2. import numpy as np
  3. import dask.dataframe as dd
  4. import dask
  5. import dask.distributed
  6. import json
  7. import os
  8. import time
  9. import argparse
  10. import multiprocessing
  11.  
  12. default_filename = 'string_benchmark.hdf5'
  13.  
  14. parser = argparse.ArgumentParser('bench.py')
  15. parser.add_argument('--number', "-n", dest="n", type=float, default=7,
  16. help="log number of rows to use")
  17. parser.add_argument('--partitions', type=int, default=multiprocessing.cpu_count() * 2,
  18. help="number of partitions to split (default: 2x number cores)")
  19. parser.add_argument('--npandas', dest="npandas", type=float, default=7,
  20. help="number of rows to use for pandas")
  21. parser.add_argument('--filter', dest="filter", default=None,
  22. help="filter for benchmark")
  23. parser.add_argument('--filename', default=default_filename,
  24. help='filename to use for benchmark export/reading')
  25. parser.add_argument('--backend', default='vaex',
  26. help='The backend to test {vaex, dask, pandas}')
  27. args = parser.parse_args()
  28.  
  29.  
  30. timings = {}
  31.  
  32.  
  33. def mytimeit(expr, N, scope):
  34. times = []
  35. for i in range(N):
  36. t0 = time.time()
  37. eval(expr, scope)
  38. times.append(time.time() - t0)
  39. if args.backend == 'dask':
  40. # Give time for dask's GC to run
  41. time.sleep(1.0)
  42. return times
  43.  
  44.  
  45. def vaex_nop(df):
  46. df.nop()
  47.  
  48.  
  49. def dask_nop(df):
  50. # We use `persist` here instead of `compute`. It is uncommon to call
  51. # `compute` on large dataframes in dask, since that will pull the large
  52. # results back to the client process (a potentially expensive process).
  53. # Rather we call `persist` to do all the operations but leave the data on
  54. # the workers. I believe this is a more fair comparison to vaex's `nop`
  55. dask.distributed.wait(df.persist())
  56.  
  57.  
  58. def pandas_nop(df):
  59. pass
  60.  
  61.  
  62. if __name__ == '__main__':
  63. if not os.path.exists(args.filename):
  64. s = np.arange(0, int(10**args.n)).astype(str)
  65. df_vaex = vaex.from_arrays(x=s, s=s)
  66. print("Writing file")
  67. df_vaex.export(args.filename, progress=True, shuffle=True)
  68. del df_vaex
  69.  
  70. df_vaex = vaex.open(args.filename)
  71. if args.backend == 'vaex':
  72. df = df_vaex
  73. df.executor.buffer_size = len(df) // args.partitions
  74. scope = {'df': df, 'nop': vaex_nop}
  75. elif args.backend == 'dask':
  76. # Start a local cluster with 1 thread per process (nprocesses = ncores
  77. # by default)
  78. dask.distributed.Client(threads_per_worker=1)
  79. df_pandas = df_vaex.to_pandas_df()
  80. # Load the data on the cluster already, to be fair in comparison to vaex
  81. df = dd.from_pandas(df_pandas, npartitions=args.partitions).persist()
  82. del df_pandas
  83. scope = {'df': df, 'nop': dask_nop}
  84. elif args.backend == 'pandas':
  85. df = df_vaex.to_pandas_df()
  86. scope = {'df': df, 'nop': pandas_nop}
  87. else:
  88. raise ValueError("Unknown backend %s" % args.backend)
  89. del df_vaex
  90.  
  91. def test(name, expr):
  92. if args.filter and args.filter not in name:
  93. return
  94. print(name)
  95. results = mytimeit('nop(%s)' % expr, 5, scope=scope)
  96. t = min(results) / (10 ** args.n)
  97. timings[name] = t
  98.  
  99. print("Benchmarking %s" % args.backend)
  100. test('capitalize', 'df.s.str.capitalize()')
  101. test('cat', 'df.s.str.cat(df.s)')
  102. test('contains', 'df.s.str.contains("9", regex=False)')
  103. test('contains(regex)', 'df.s.str.contains("9", regex=True)')
  104. test('count', 'df.s.str.count("9")')
  105. test('endswith', 'df.s.str.endswith("9")')
  106. test('find', 'df.s.str.find("4")')
  107. test('get', 'df.s.str.get(1)')
  108. test('split+join', 'df.s.str.split(".").str.join("-")')
  109. test('len', 'df.s.str.len()')
  110. test('ljust', 'df.s.str.ljust(10)')
  111. test('lower', 'df.s.str.lower()')
  112. test('lstrip', 'df.s.str.lstrip("9")')
  113. test('match', 'df.s.str.match("1.*")')
  114. test('pad', 'df.s.str.pad(10)')
  115. test('repeat', 'df.s.str.repeat(2)')
  116. test('replace(default)', 'df.s.str.replace("123", "321")')
  117. test('replace(no regex)', 'df.s.str.replace("123", "321", regex=False)')
  118. test('replace(regex)', 'df.s.str.replace("1?[45]4", "1004", regex=True)')
  119. test('rfind', 'df.s.str.rfind("4")')
  120. test('rjust', 'df.s.str.rjust(10)')
  121. test('rstrip', 'df.s.str.rstrip("9")')
  122. test('slice', 'df.s.str.slice(1, 3)')
  123. test('split', 'df.s.str.split(".")')
  124. test('startswith', 'df.s.str.startswith("9")')
  125. test('strip', 'df.s.str.strip("0")') # issues?
  126. test('title', 'df.s.str.title()')
  127. test('upper', 'df.s.str.upper()')
  128. test('zfill', 'df.s.str.zfill(10)')
  129.  
  130. fn = "%s.json" % args.backend
  131. with open(fn, "w") as f:
  132. json.dump(timings, f)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement