Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- # A simple script to convert traffic csv to parquet file
- import argparse
- from pyspark.sql import SparkSession
- # Import data types
- from pyspark.sql.types import *
- from pyspark.sql.functions import when, lit, col, udf
- def convert_csv_to_parquet(spark_context, custom_schema, csv_file, parquet_file):
- """
- spark_context: spark instance
- custom_schema: the schema of the source csv file
- csv_file: the source csv_file
- parquet_file: destination parquet file
- converts the csv file to parquet file
- """
- df = spark_context.read.csv(csv_file, sep="\t", schema=custom_schema)
- df_columns = df.columns
- # read the model and parse through each column
- # if the row in model is present in df_columns then replace the default values
- # if it is not present means a new column needs to be added,
- # add it and assign a default value
- # for column in df_columns:
- update_columns = {}
- for update_column in df_columns:
- if update_column in traffic_model:
- update_columns[update_column] = traffic_model[update_column]
- df = df.na.fill(update_columns)
- for update_column in traffic_model:
- if update_column not in df_columns:
- df = df.withColumn(update_column, lit(traffic_model[update_column]))
- # add three more attributes
- # 1 dayOFWeek : 1 Monday ... 7 Sunday
- # 2 unique_browser on formulae
- # 3 visitor_time format: yyyy-MM-dd’T’HH:mm:ss’Z’
- df.withColumn('unique_browser')
- df.show(5)
- df.write.parquet(parquet_file)
- def get_command_line_opts():
- p = argparse.ArgumentParser(description='Specify the source traffic CSV and Parquett')
- p.add_argument('-s', '--source_csv', help='source traffic csv file', required=True, type=str)
- p.add_argument('-t', '--target_parquet', help='coma separated country codes <c1,c2>', required=False)
- args = p.parse_args()
- return [args.source_csv, args.target_parquet]
- def get_traffic_schema():
- """
- Defines the schema of the input csv file
- """
- fields = [
- StructField('publisher_id', IntegerType(), True),
- StructField('prev_publisher_id', IntegerType(), True),
- StructField('publication_id', IntegerType(), True),
- StructField('prev_publication_id', IntegerType(), True),
- StructField('section_id', IntegerType(), True),
- StructField('network_id', IntegerType(), True),
- StructField('cat1_id', IntegerType(), True),
- StructField('cat2_id', IntegerType(), True),
- StructField('cat3_id', IntegerType(), True),
- StructField('visitor_id', StringType(), True),
- StructField('visit_id', StringType(), True),
- StructField('xvisit_id', StringType(), True),
- StructField('country', IntegerType(), True),
- StructField('state', StringType(), True),
- StructField('duration', IntegerType(), True),
- StructField('xduration', IntegerType(), True),
- StructField('new_visitor', IntegerType(), True),
- StructField('is_mobile', IntegerType(), True),
- StructField('mobile_vendor', StringType(), True),
- StructField('mobile_handset', StringType(), True),
- StructField('browser_vendor', StringType(), True),
- StructField('browser_version', StringType(), True),
- StructField('os', StringType(), True),
- StructField('platform', StringType(), True),
- StructField('connection', StringType(), True),
- StructField('page_domain', StringType(), True),
- StructField('ref_domain', StringType(), True),
- StructField('page_url', StringType(), True),
- StructField('ad_slot_id', StringType(), True),
- StructField('hour', IntegerType(), True),
- StructField('plugin_partner', StringType(), False),
- StructField('postal_code', StringType(), False),
- StructField('ip', StringType(), True),
- StructField('agent', StringType(), True),
- StructField('referrer', StringType(), True),
- StructField('timestamp', IntegerType(), True),
- StructField('timezone', StringType(), True),
- StructField('is_panel', IntegerType(), True),
- StructField('time_index', IntegerType(), True),
- StructField('region', IntegerType(), True)
- ]
- return StructType(fields)
- if __name__ == "__main__":
- source_csv, target_parquet = get_command_line_opts()
- with open("model.json") as json_data_file:
- traffic_model = json.load(json_data_file)
- spark = SparkSession \
- .builder \
- .appName("Python spark Traffic CSV to Parquet") \
- .getOrCreate()
- convert_csv_to_parquet(spark, get_traffic_schema(), source_csv, target_parquet)
- spark.stop()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement