Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import logging
- import sys
- import json
- from argparse import ArgumentParser
- from pyspark.sql.functions import col, countDistinct
- from pyspark import SparkContext, SparkConf
- from pyspark.sql import SparkSession, HiveContext
- def stats_per_column(dataset, cols, stats):
- """
- :param dataset: hive Dataset
- :param cols: columns to do summary on
- :param stats: the statistics included in the summary
- :return: the summary wanted
- """
- describe = dataset.select(cols).describe().collect()
- result = {}
- for col in cols:
- all_stats = {
- 'count': describe[0][col],
- 'mean': describe[1][col],
- 'stddev': describe[2][col],
- 'min': describe[3][col],
- 'max': describe[4][col]
- }
- result[col] = {k:v for k, v in all_stats.iteritems() if k in stats}
- return result
- def number_of_rows_above_mean_per_column(dataset, cols):
- """
- :param df: hive dataset
- :param cols: columns names
- :return: number of rows above the columns mean
- """
- all_stats = dataset.select(cols).describe().collect()
- result = {}
- for col in cols:
- mean = all_stats[1][col]
- result[col] = dataset.filter(dataset[col] > mean).count()
- return result
- def number_of_unique_values_per_column(dataset, cols):
- """
- :param dataset: hive dataset
- :param cols: table columns
- :return: the number of uniques value in each column of the dataset
- """
- unique_values_row = dataset.agg(*(countDistinct(col(c)).alias(c) for c in cols)).take(1)[0]
- return {col:unique_values_row[col] for col in cols}
- database = 'foodmart'
- table = 'store'
- cols = ['store_id', 'store_type', 'region_id', 'store_name', 'store_number', 'store_street_address', 'store_city', 'store_state', 'store_postal_code', 'store_country', 'store_manager', 'store_phone', 'store_fax', 'first_opened_date', 'last_remodel_date', 'store_sqft', 'grocery_sqft', 'frozen_sqft', 'meat_sqft', 'coffee_bar', 'video_store', 'salad_bar', 'prepared_food', 'florist']
- stats = ['count', 'mean', 'stddev', 'min', 'max']
- output_file = '/home/raj_ops/00488c66-c305-4c7f-be64-24cda9458ba2.json'
- sparkSession = SparkSession.builder.config(conf=SparkConf()).enableHiveSupport().getOrCreate()
- hiveContext = HiveContext(sparkSession)
- dataset = hiveContext.sql("select * from {0}.{1}".format(database, table))
- result = stats_per_column(dataset, cols, stats)
- if 'countDistinct' in stats:
- for col, value in number_of_unique_values_per_column(dataset, cols).iteritems():
- result[col]['countDistinct'] = value
- if 'countAboveMean' in stats:
- for col, value in number_of_rows_above_mean_per_column(dataset, cols).iteritems():
- result[col]['countAboveMean'] = value
- with open(output_file, 'w') as f:
- json.dump(result, f)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement