Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import boto3
- import subprocess
- import json
- client = boto3.client("dynamodb")
- customer_ids = [
- "34460669"
- ]
- prefixes = [
- "cashflow-amount"
- ]
- def fetch_timeseries(customer_id, prefix):
- exclusive_start_key = None
- items = []
- while True:
- kwargs = {
- "TableName": "tcb-ps5-data-uat1b-customer-features-ts",
- "KeyConditionExpression": "customer_code = :customer_code and begins_with(timeseries_key, :timeserie_prefix)",
- "ExpressionAttributeValues": {
- ":customer_code": { "S": customer_id },
- ":timeserie_prefix": { "S": customer_id + "__" + prefix },
- },
- }
- if exclusive_start_key:
- kwargs["ExclusiveStartKey"] = exclusive_start_key
- response = client.query(**kwargs)
- exclusive_start_key = response.get("LastEvaluatedKey", None)
- for item in response["Items"]:
- items.append({
- "customer_code": item["customer_code"]["S"],
- "timeseries_key": item["timeseries_key"]["S"],
- })
- if not exclusive_start_key:
- break
- return items
- def parse_data(data):
- lines = data.split("\n")
- output = { "history": {} }
- for line in lines:
- if line.startswith("TimeSeriesKey"):
- item = line.split(": ")
- output["time_series_key"] = item[1]
- elif line.startswith("Customer Code"):
- item = line.split(": ")
- output["customer_code"] = item[1]
- elif line.startswith("Dimensions") or line.startswith("Dimemsions"):
- item = line.split(": ")
- output["dimensions"] = item[1]
- elif line[:4].isnumeric():
- item = line.split(" = ")
- output["history"][item[0]] = item[1]
- return output
- def decrypt_data(item):
- process = subprocess.Popen(
- [
- "./timeseries-cli.sh",
- "read-partition",
- item["timeseries_key"],
- ],
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE,
- )
- stdout, stderr = process.communicate()
- return parse_data(stdout.decode("utf-8"))
- result = []
- for customer_id in customer_ids:
- for prefix in prefixes:
- items = fetch_timeseries(customer_id, prefix)
- for item in items:
- result.append(decrypt_data(item))
- with open("result.json", "w") as f:
- json.dump(result, f)
- print(json.dumps(result[:1], indent=2))
- print("Count", len(result))
- # aws s3 cp result.json s3://atasync1
Advertisement
Add Comment
Please, Sign In to add comment