Advertisement
Guest User

Untitled

a guest
Oct 15th, 2019
167
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 3.25 KB | None | 0 0
  1. #! /usr/bin/env python
  2.  
  3. """ A simple utility to export json data to csv """
  4. # pylint: disable=C0103
  5. import argparse
  6. from datetime import datetime, timedelta
  7.  
  8. # import boto3
  9. from pyspark.sql import SparkSession
  10.  
  11.  
  12. def run(spark, s3_client, start, end, no_header, bucket_in, bucket_out):
  13. """ Runs the json export """
  14. DATE_TIME_PATTERN = '%Y%m%d%H'
  15. EXPORT_TIME_PATTERN = "%Y%m%d_%H%M%S"
  16.  
  17. INPUT_PATTERN = "s3a://{}/topics/opc-m80/"
  18. PREFIX_PATTERN = "topics/opc-m80/year={}/month={}/day={}/"
  19. FOLDER_PATTERN = "s3a://{}/{}"
  20. OUTPUT_PATTERN = "s3a://{}/json-export-{}"
  21.  
  22. start_date = datetime.strptime(start, DATE_TIME_PATTERN)
  23. end_date = datetime.strptime(end, DATE_TIME_PATTERN)
  24.  
  25. delta = end_date - start_date
  26.  
  27. folders = []
  28. for i in range(delta.days + 1):
  29. current_day = start_date + timedelta(days=i)
  30. prefix = PREFIX_PATTERN.format(current_day.year,
  31. current_day.month,
  32. current_day.day)
  33.  
  34. if is_valid_path(s3_client, bucket_in, prefix):
  35. folder = FOLDER_PATTERN.format(bucket_in, prefix)
  36. folders.append(folder)
  37.  
  38. df = spark \
  39. .read \
  40. .option("basePath", INPUT_PATTERN.format(bucket_in)) \
  41. .json(folders)
  42.  
  43. df_date = df.withColumn("date",
  44. df["year"] * 1000000 +
  45. df["month"] * 10000 +
  46. df["day"] * 100 +
  47. df["hour"])
  48.  
  49. filtered_df = df_date.filter(df_date['date'] >= int(start)) \
  50. .filter(df_date['date'] <= int(end))
  51.  
  52. now = datetime.now()
  53. folder = now.strftime(EXPORT_TIME_PATTERN)
  54.  
  55. filtered_df \
  56. .repartition(1) \
  57. .write \
  58. .option("header", str(not no_header).lower()) \
  59. .option("sep", "\t") \
  60. .format("csv") \
  61. .save(OUTPUT_PATTERN.format(bucket_out, folder))
  62.  
  63.  
  64. def is_valid_path(s3_client, bucket_in, prefix):
  65. """ Checks if a given path exists """
  66. response = s3_client.list_objects_v2(Bucket=bucket_in,
  67. Prefix=prefix,
  68. Delimiter='/')
  69. return response['KeyCount']
  70.  
  71.  
  72. if __name__ == '__main__':
  73.  
  74. # parser = argparse.ArgumentParser(description="A simple utility to export json data to csv.")
  75. # parser.add_argument("--start", help="start date YYYYMMDDHH, e.g.: 2019081012", required=True)
  76. # parser.add_argument("--end", help="end date YYYYMMDDHH, e.g.: 2019090218", required=True)
  77. # parser.add_argument("--no-header", help="surpress csv header", action="store_true")
  78. # parser.add_argument("--bucket-in", help="bucket with json data", required=True)
  79. # parser.add_argument("--bucket-out", help="bucket for storing csv", required=True)
  80.  
  81. # args = parser.parse_args()
  82.  
  83. # client = boto3.client('s3')
  84.  
  85. session = SparkSession \
  86. .builder \
  87. .getOrCreate()
  88.  
  89. session.sparkContext.textFile("s3a://io.crichter.spark.code/s3_test.py")
  90. print(session.sparkContext.textFile("s3a://io.crichter.spark.code/s3_test.py").count())
  91. # run(session,
  92. # client,
  93. # args.start,
  94. # args.end,
  95. # args.no_header,
  96. # args.bucket_in,
  97. # args.bucket_out)
  98.  
  99. session.stop()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement