SHARE
TWEET

Untitled

a guest May 22nd, 2019 69 Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
  1. import logging
  2. import sys
  3. import json
  4. from argparse import ArgumentParser
  5. from pyspark.sql.functions import col, countDistinct
  6. from pyspark import SparkContext, SparkConf
  7. from pyspark.sql import SparkSession, HiveContext
  8.  
  9.  
  10. def stats_per_column(dataset, cols, stats):
  11.     """
  12.     :param dataset: hive Dataset
  13.     :param cols: columns to do summary on
  14.     :param stats: the statistics included in the summary
  15.     :return: the summary wanted
  16.     """
  17.  
  18.     describe = dataset.select(cols).describe().collect()
  19.  
  20.     result = {}
  21.     for col in cols:
  22.         all_stats = {
  23.             'count': describe[0][col],
  24.             'mean': describe[1][col],
  25.             'stddev': describe[2][col],
  26.             'min': describe[3][col],
  27.             'max': describe[4][col]
  28.         }
  29.         result[col] = {k:v for k, v in all_stats.iteritems() if k in stats}
  30.  
  31.     return result
  32.  
  33.  
  34. def number_of_rows_above_mean_per_column(dataset, cols):
  35.     """
  36.     :param df: hive dataset
  37.     :param cols: columns names
  38.     :return: number of rows above the columns mean
  39.     """
  40.  
  41.     all_stats = dataset.select(cols).describe().collect()
  42.     result = {}
  43.  
  44.     for col in cols:
  45.         mean = all_stats[1][col]
  46.         result[col] = dataset.filter(dataset[col] > mean).count()
  47.  
  48.     return result
  49.  
  50.  
  51. def number_of_unique_values_per_column(dataset, cols):
  52.     """
  53.     :param dataset: hive dataset
  54.     :param cols: table columns
  55.     :return: the number of uniques value in each column of the dataset
  56.     """
  57.  
  58.     unique_values_row = dataset.agg(*(countDistinct(col(c)).alias(c) for c in cols)).take(1)[0]
  59.     return {col:unique_values_row[col] for col in cols}
  60.  
  61.  
  62. database = 'foodmart'
  63. table = 'store'
  64. cols = ['store_id', 'store_type', 'region_id', 'store_name', 'store_number', 'store_street_address', 'store_city', 'store_state', 'store_postal_code', 'store_country', 'store_manager', 'store_phone', 'store_fax', 'first_opened_date', 'last_remodel_date', 'store_sqft', 'grocery_sqft', 'frozen_sqft', 'meat_sqft', 'coffee_bar', 'video_store', 'salad_bar', 'prepared_food', 'florist']
  65. stats = ['count', 'mean', 'stddev', 'min', 'max']
  66. output_file = '/home/raj_ops/00488c66-c305-4c7f-be64-24cda9458ba2.json'
  67.  
  68. sparkSession = SparkSession.builder.config(conf=SparkConf()).enableHiveSupport().getOrCreate()
  69. hiveContext = HiveContext(sparkSession)
  70. dataset = hiveContext.sql("select * from {0}.{1}".format(database, table))
  71.  
  72. result = stats_per_column(dataset, cols, stats)
  73.  
  74. if 'countDistinct' in stats:
  75.     for col, value in number_of_unique_values_per_column(dataset, cols).iteritems():
  76.         result[col]['countDistinct'] = value
  77.  
  78. if 'countAboveMean' in stats:
  79.     for col, value in number_of_rows_above_mean_per_column(dataset, cols).iteritems():
  80.         result[col]['countAboveMean'] = value
  81.  
  82. with open(output_file, 'w') as f:
  83. json.dump(result, f)
RAW Paste Data
We use cookies for various purposes including analytics. By continuing to use Pastebin, you agree to our use of cookies as described in the Cookies Policy. OK, I Understand
 
Top