pexea12

Oulu

Sep 14th, 2021 (edited)
121
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 2.55 KB | None | 0 0
  1. import boto3
  2. import subprocess
  3. import json
  4.  
  5. client = boto3.client("dynamodb")
  6.  
  7. customer_ids = [
  8.     "34460669"
  9. ]
  10.  
  11. prefixes = [
  12.     "cashflow-amount"
  13. ]
  14.  
  15. def fetch_timeseries(customer_id, prefix):
  16.     exclusive_start_key = None
  17.     items = []
  18.  
  19.     while True:
  20.         kwargs = {
  21.             "TableName": "tcb-ps5-data-uat1b-customer-features-ts",
  22.             "KeyConditionExpression": "customer_code = :customer_code and begins_with(timeseries_key, :timeserie_prefix)",
  23.             "ExpressionAttributeValues": {
  24.                 ":customer_code": { "S": customer_id },
  25.                 ":timeserie_prefix": { "S": customer_id + "__" + prefix },
  26.             },
  27.         }
  28.  
  29.         if exclusive_start_key:
  30.             kwargs["ExclusiveStartKey"] = exclusive_start_key
  31.  
  32.         response = client.query(**kwargs)
  33.  
  34.         exclusive_start_key = response.get("LastEvaluatedKey", None)
  35.         for item in response["Items"]:
  36.             items.append({
  37.                 "customer_code": item["customer_code"]["S"],
  38.                 "timeseries_key": item["timeseries_key"]["S"],
  39.             })
  40.  
  41.         if not exclusive_start_key:
  42.             break
  43.  
  44.     return items
  45.  
  46.  
  47. def parse_data(data):
  48.     lines = data.split("\n")
  49.     output = { "history": {} }
  50.     for line in lines:
  51.         if line.startswith("TimeSeriesKey"):
  52.             item = line.split(": ")
  53.             output["time_series_key"] = item[1]
  54.         elif line.startswith("Customer Code"):
  55.             item = line.split(": ")
  56.             output["customer_code"] = item[1]
  57.         elif line.startswith("Dimensions") or line.startswith("Dimemsions"):
  58.             item = line.split(": ")
  59.             output["dimensions"] = item[1]
  60.         elif line[:4].isnumeric():
  61.             item = line.split(" = ")
  62.             output["history"][item[0]] = item[1]
  63.     return output
  64.  
  65.  
  66. def decrypt_data(item):
  67.     process = subprocess.Popen(
  68.         [
  69.             "./timeseries-cli.sh",
  70.             "read-partition",
  71.             item["timeseries_key"],
  72.         ],
  73.         stdout=subprocess.PIPE,
  74.         stderr=subprocess.PIPE,
  75.     )
  76.  
  77.     stdout, stderr = process.communicate()
  78.     return parse_data(stdout.decode("utf-8"))
  79.  
  80.  
  81.  
  82. result = []
  83. for customer_id in customer_ids:
  84.     for prefix in prefixes:
  85.         items = fetch_timeseries(customer_id, prefix)
  86.         for item in items:
  87.             result.append(decrypt_data(item))
  88.  
  89. with open("result.json", "w") as f:
  90.     json.dump(result, f)
  91.  
  92. print(json.dumps(result[:1], indent=2))
  93. print("Count", len(result))
  94.  
  95.  
  96. # aws s3 cp result.json s3://atasync1
Advertisement
Add Comment
Please, Sign In to add comment