Advertisement
Guest User

Untitled

a guest
Jun 28th, 2017
119
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 4.22 KB | None | 0 0
  1. # A simple script to convert traffic csv to parquet file
  2.  
  3. import argparse
  4. from pyspark.sql import SparkSession
  5. # Import data types
  6. from pyspark.sql.types import *
  7. from pyspark.sql.functions import when, lit, col, udf
  8.  
  9. def convert_csv_to_parquet(spark_context, custom_schema, csv_file, parquet_file):
  10. """
  11. spark_context: spark instance
  12. custom_schema: the schema of the source csv file
  13. csv_file: the source csv_file
  14. parquet_file: destination parquet file
  15.  
  16. converts the csv file to parquet file
  17. """
  18. df = spark_context.read.csv(csv_file, sep="\t", schema=custom_schema)
  19.  
  20. df_columns = df.columns
  21.  
  22. # read the model and parse through each column
  23. # if the row in model is present in df_columns then replace the default values
  24. # if it is not present means a new column needs to be added,
  25. # add it and assign a default value
  26. # for column in df_columns:
  27.  
  28. update_columns = {}
  29. for update_column in df_columns:
  30. if update_column in traffic_model:
  31. update_columns[update_column] = traffic_model[update_column]
  32.  
  33. df = df.na.fill(update_columns)
  34. for update_column in traffic_model:
  35. if update_column not in df_columns:
  36. df = df.withColumn(update_column, lit(traffic_model[update_column]))
  37.  
  38. # add three more attributes
  39. # 1 dayOFWeek : 1 Monday ... 7 Sunday
  40. # 2 unique_browser on formulae
  41. # 3 visitor_time format: yyyy-MM-dd’T’HH:mm:ss’Z’
  42. df.withColumn('unique_browser')
  43. df.show(5)
  44. df.write.parquet(parquet_file)
  45.  
  46. def get_command_line_opts():
  47. p = argparse.ArgumentParser(description='Specify the source traffic CSV and Parquett')
  48. p.add_argument('-s', '--source_csv', help='source traffic csv file', required=True, type=str)
  49. p.add_argument('-t', '--target_parquet', help='coma separated country codes <c1,c2>', required=False)
  50. args = p.parse_args()
  51.  
  52. return [args.source_csv, args.target_parquet]
  53.  
  54.  
  55. def get_traffic_schema():
  56. """
  57. Defines the schema of the input csv file
  58. """
  59. fields = [
  60. StructField('publisher_id', IntegerType(), True),
  61. StructField('prev_publisher_id', IntegerType(), True),
  62. StructField('publication_id', IntegerType(), True),
  63. StructField('prev_publication_id', IntegerType(), True),
  64. StructField('section_id', IntegerType(), True),
  65. StructField('network_id', IntegerType(), True),
  66. StructField('cat1_id', IntegerType(), True),
  67. StructField('cat2_id', IntegerType(), True),
  68. StructField('cat3_id', IntegerType(), True),
  69. StructField('visitor_id', StringType(), True),
  70. StructField('visit_id', StringType(), True),
  71. StructField('xvisit_id', StringType(), True),
  72. StructField('country', IntegerType(), True),
  73. StructField('state', StringType(), True),
  74. StructField('duration', IntegerType(), True),
  75. StructField('xduration', IntegerType(), True),
  76. StructField('new_visitor', IntegerType(), True),
  77. StructField('is_mobile', IntegerType(), True),
  78. StructField('mobile_vendor', StringType(), True),
  79. StructField('mobile_handset', StringType(), True),
  80. StructField('browser_vendor', StringType(), True),
  81. StructField('browser_version', StringType(), True),
  82. StructField('os', StringType(), True),
  83. StructField('platform', StringType(), True),
  84. StructField('connection', StringType(), True),
  85. StructField('page_domain', StringType(), True),
  86. StructField('ref_domain', StringType(), True),
  87. StructField('page_url', StringType(), True),
  88. StructField('ad_slot_id', StringType(), True),
  89. StructField('hour', IntegerType(), True),
  90. StructField('plugin_partner', StringType(), False),
  91. StructField('postal_code', StringType(), False),
  92. StructField('ip', StringType(), True),
  93. StructField('agent', StringType(), True),
  94. StructField('referrer', StringType(), True),
  95. StructField('timestamp', IntegerType(), True),
  96. StructField('timezone', StringType(), True),
  97. StructField('is_panel', IntegerType(), True),
  98. StructField('time_index', IntegerType(), True),
  99. StructField('region', IntegerType(), True)
  100. ]
  101.  
  102. return StructType(fields)
  103.  
  104.  
  105. if __name__ == "__main__":
  106. source_csv, target_parquet = get_command_line_opts()
  107.  
  108. with open("model.json") as json_data_file:
  109. traffic_model = json.load(json_data_file)
  110.  
  111. spark = SparkSession \
  112. .builder \
  113. .appName("Python spark Traffic CSV to Parquet") \
  114. .getOrCreate()
  115.  
  116. convert_csv_to_parquet(spark, get_traffic_schema(), source_csv, target_parquet)
  117.  
  118. spark.stop()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement