Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #! /usr/bin/env python
- """ A simple utility to export json data to csv """
- # pylint: disable=C0103
- import argparse
- from datetime import datetime, timedelta
- # import boto3
- from pyspark.sql import SparkSession
- def run(spark, s3_client, start, end, no_header, bucket_in, bucket_out):
- """ Runs the json export """
- DATE_TIME_PATTERN = '%Y%m%d%H'
- EXPORT_TIME_PATTERN = "%Y%m%d_%H%M%S"
- INPUT_PATTERN = "s3a://{}/topics/opc-m80/"
- PREFIX_PATTERN = "topics/opc-m80/year={}/month={}/day={}/"
- FOLDER_PATTERN = "s3a://{}/{}"
- OUTPUT_PATTERN = "s3a://{}/json-export-{}"
- start_date = datetime.strptime(start, DATE_TIME_PATTERN)
- end_date = datetime.strptime(end, DATE_TIME_PATTERN)
- delta = end_date - start_date
- folders = []
- for i in range(delta.days + 1):
- current_day = start_date + timedelta(days=i)
- prefix = PREFIX_PATTERN.format(current_day.year,
- current_day.month,
- current_day.day)
- if is_valid_path(s3_client, bucket_in, prefix):
- folder = FOLDER_PATTERN.format(bucket_in, prefix)
- folders.append(folder)
- df = spark \
- .read \
- .option("basePath", INPUT_PATTERN.format(bucket_in)) \
- .json(folders)
- df_date = df.withColumn("date",
- df["year"] * 1000000 +
- df["month"] * 10000 +
- df["day"] * 100 +
- df["hour"])
- filtered_df = df_date.filter(df_date['date'] >= int(start)) \
- .filter(df_date['date'] <= int(end))
- now = datetime.now()
- folder = now.strftime(EXPORT_TIME_PATTERN)
- filtered_df \
- .repartition(1) \
- .write \
- .option("header", str(not no_header).lower()) \
- .option("sep", "\t") \
- .format("csv") \
- .save(OUTPUT_PATTERN.format(bucket_out, folder))
- def is_valid_path(s3_client, bucket_in, prefix):
- """ Checks if a given path exists """
- response = s3_client.list_objects_v2(Bucket=bucket_in,
- Prefix=prefix,
- Delimiter='/')
- return response['KeyCount']
- if __name__ == '__main__':
- # parser = argparse.ArgumentParser(description="A simple utility to export json data to csv.")
- # parser.add_argument("--start", help="start date YYYYMMDDHH, e.g.: 2019081012", required=True)
- # parser.add_argument("--end", help="end date YYYYMMDDHH, e.g.: 2019090218", required=True)
- # parser.add_argument("--no-header", help="surpress csv header", action="store_true")
- # parser.add_argument("--bucket-in", help="bucket with json data", required=True)
- # parser.add_argument("--bucket-out", help="bucket for storing csv", required=True)
- # args = parser.parse_args()
- # client = boto3.client('s3')
- session = SparkSession \
- .builder \
- .getOrCreate()
- session.sparkContext.textFile("s3a://io.crichter.spark.code/s3_test.py")
- print(session.sparkContext.textFile("s3a://io.crichter.spark.code/s3_test.py").count())
- # run(session,
- # client,
- # args.start,
- # args.end,
- # args.no_header,
- # args.bucket_in,
- # args.bucket_out)
- session.stop()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement