SHARE
TWEET

Untitled

a guest Aug 17th, 2019 78 Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
  1. import functools
  2. import dask
  3. import dask.dataframe as dd
  4. import pandas as pd
  5.  
  6. pdf = pd.DataFrame({
  7.     'x': range(0, 100),
  8.     'y': range(0, 100),
  9.     'z': range(0, 100)
  10. })
  11.  
  12. ddf = dd.from_pandas(pdf, npartitions=8)
  13.  
  14. print('Number of partitions', ddf.npartitions)
  15.  
  16.  
  17. def compute_stats(row):
  18.     return {
  19.         'sum': row['x'] + row['y'] + row['z'],
  20.         'min': min(row),
  21.         'max': max(row)
  22.     }
  23.  
  24.  
  25. def accum_stats(stats_accum, stats):
  26.     return {
  27.         'sum': stats_accum['sum'] + stats['sum'],
  28.         'min': min(stats_accum['min'], stats['min']),
  29.         'max': max(stats_accum['max'], stats['max'])
  30.     }
  31.  
  32.  
  33. def compute_stats_partition(pdf):
  34.     pds = pdf.apply(compute_stats, axis=1)
  35.     return functools.reduce(accum_stats, pds)
  36.  
  37.  
  38. def merge_stats_series(pds):
  39.     return functools.reduce(accum_stats, pds)
  40.  
  41.  
  42. res = ddf.reduction(
  43.     compute_stats_partition,
  44.     merge_stats_series,
  45.     meta={
  46.         'sum': 'int64',
  47.         'min': 'int64',
  48.         'max': 'int64'
  49.     })
  50.  
  51. # singleton dataframe to list of delayed objects
  52. # where each row is a delayed object
  53. # and in this case we just want the first one
  54. delayed_dict = res.to_delayed()[0]
  55.  
  56. delayed_dict.visualize('graph.svg')
RAW Paste Data
We use cookies for various purposes including analytics. By continuing to use Pastebin, you agree to our use of cookies as described in the Cookies Policy. OK, I Understand
 
Top