Advertisement
Guest User

Untitled

a guest
May 22nd, 2019
94
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 2.82 KB | None | 0 0
  1. import logging
  2. import sys
  3. import json
  4. from argparse import ArgumentParser
  5. from pyspark.sql.functions import col, countDistinct
  6. from pyspark import SparkContext, SparkConf
  7. from pyspark.sql import SparkSession, HiveContext
  8.  
  9.  
  10. def stats_per_column(dataset, cols, stats):
  11. """
  12. :param dataset: hive Dataset
  13. :param cols: columns to do summary on
  14. :param stats: the statistics included in the summary
  15. :return: the summary wanted
  16. """
  17.  
  18. describe = dataset.select(cols).describe().collect()
  19.  
  20. result = {}
  21. for col in cols:
  22. all_stats = {
  23. 'count': describe[0][col],
  24. 'mean': describe[1][col],
  25. 'stddev': describe[2][col],
  26. 'min': describe[3][col],
  27. 'max': describe[4][col]
  28. }
  29. result[col] = {k:v for k, v in all_stats.iteritems() if k in stats}
  30.  
  31. return result
  32.  
  33.  
  34. def number_of_rows_above_mean_per_column(dataset, cols):
  35. """
  36. :param df: hive dataset
  37. :param cols: columns names
  38. :return: number of rows above the columns mean
  39. """
  40.  
  41. all_stats = dataset.select(cols).describe().collect()
  42. result = {}
  43.  
  44. for col in cols:
  45. mean = all_stats[1][col]
  46. result[col] = dataset.filter(dataset[col] > mean).count()
  47.  
  48. return result
  49.  
  50.  
  51. def number_of_unique_values_per_column(dataset, cols):
  52. """
  53. :param dataset: hive dataset
  54. :param cols: table columns
  55. :return: the number of uniques value in each column of the dataset
  56. """
  57.  
  58. unique_values_row = dataset.agg(*(countDistinct(col(c)).alias(c) for c in cols)).take(1)[0]
  59. return {col:unique_values_row[col] for col in cols}
  60.  
  61.  
  62. database = 'foodmart'
  63. table = 'store'
  64. cols = ['store_id', 'store_type', 'region_id', 'store_name', 'store_number', 'store_street_address', 'store_city', 'store_state', 'store_postal_code', 'store_country', 'store_manager', 'store_phone', 'store_fax', 'first_opened_date', 'last_remodel_date', 'store_sqft', 'grocery_sqft', 'frozen_sqft', 'meat_sqft', 'coffee_bar', 'video_store', 'salad_bar', 'prepared_food', 'florist']
  65. stats = ['count', 'mean', 'stddev', 'min', 'max']
  66. output_file = '/home/raj_ops/00488c66-c305-4c7f-be64-24cda9458ba2.json'
  67.  
  68. sparkSession = SparkSession.builder.config(conf=SparkConf()).enableHiveSupport().getOrCreate()
  69. hiveContext = HiveContext(sparkSession)
  70. dataset = hiveContext.sql("select * from {0}.{1}".format(database, table))
  71.  
  72. result = stats_per_column(dataset, cols, stats)
  73.  
  74. if 'countDistinct' in stats:
  75. for col, value in number_of_unique_values_per_column(dataset, cols).iteritems():
  76. result[col]['countDistinct'] = value
  77.  
  78. if 'countAboveMean' in stats:
  79. for col, value in number_of_rows_above_mean_per_column(dataset, cols).iteritems():
  80. result[col]['countAboveMean'] = value
  81.  
  82. with open(output_file, 'w') as f:
  83. json.dump(result, f)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement