Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #!/usr/bin/env python
- import boto3
- import json
- import ast
- import sys
- from collections import defaultdict, Counter
- from datetime import timedelta
- from datetime import datetime
- import time
- region = 'us-east-1'
- skipList = ['AtomicCounters']
- cwNameSpace = 'ddbAggregates'
- timeDelta = 5
- period = 60
- debug = False
- def Debug ( functionName, things ):
- if debug:
- print "\n########################## %s" % functionName
- for thing in things:
- print thing
- def GetTableNames ():
- allTableNames = ast.literal_eval(json.dumps(ddbClient.list_tables()['TableNames'])) #removes the unicode 'u from what dynamo returns
- allTableNames = set(allTableNames) - set(skipList)
- Debug ( sys._getframe().f_code.co_name, allTableNames )
- return sorted(set(allTableNames))
- def GetDdbObjects ( tableNames ):
- ddbObjects = []
- for tableName in tableNames:
- try:
- stackName = tableName.split("-")[1] #env-stack-tableName-uid
- except:
- stackName = False
- if stackName:
- ddbObj = InitDdbObj ( stackName, tableName, None, 'table')
- ddbObjects.append(ddbObj)
- table = ddbResource.Table(tableName)
- gsIndexes = table.global_secondary_indexes
- if gsIndexes is not None:
- for gsIndex in gsIndexes:
- gsiName = ast.literal_eval(json.dumps(gsIndex['IndexName']))
- ddbObj = InitDdbObj ( stackName, gsiName, tableName, 'gsi')
- ddbObjects.append(ddbObj)
- Debug ( sys._getframe().f_code.co_name, ddbObjects )
- return ddbObjects
- def InitDdbObj ( stackName, objName, parentTable, objType):
- attribs = {}
- attribs['stackName'] = stackName
- attribs['name'] = objName
- attribs['type'] = objType
- attribs['provisionedWriteCapacityUnits'] = 0
- attribs['provisionedReadCapacityUnits'] = 0
- attribs['consumedReadCapacityUnits'] = 0
- attribs['consumedWriteCapacityUnits'] = 0
- attribs['numberOfDecreasesToday'] = 0
- if parentTable:
- attribs['parentTable'] = parentTable
- return attribs
- def GetProvisionedMetrics ( ddbObjects ):
- for ddbObj in ddbObjects:
- if ddbObj['type'] == 'table':
- table = ddbResource.Table(ddbObj['name'])
- if table.provisioned_throughput['WriteCapacityUnits'] > 0:
- ddbObj['provisionedWriteCapacityUnits'] = table.provisioned_throughput['WriteCapacityUnits']
- if table.provisioned_throughput['ReadCapacityUnits'] > 0:
- ddbObj['provisionedReadCapacityUnits'] = table.provisioned_throughput['ReadCapacityUnits']
- if table.provisioned_throughput['NumberOfDecreasesToday'] > 0:
- ddbObj['numberOfDecreasesToday'] = table.provisioned_throughput['NumberOfDecreasesToday']
- if ddbObj['type'] == 'gsi':
- table = ddbResource.Table(ddbObj['parentTable'])
- gsIndexes = table.global_secondary_indexes
- for gsIndex in gsIndexes:
- if ddbObj['name'] == gsIndex['IndexName']:
- if gsIndex['ProvisionedThroughput']['WriteCapacityUnits'] > 0:
- ddbObj['provisionedWriteCapacityUnits'] = gsIndex['ProvisionedThroughput']['WriteCapacityUnits']
- if gsIndex['ProvisionedThroughput']['ReadCapacityUnits'] > 0:
- ddbObj['provisionedReadCapacityUnits'] = gsIndex['ProvisionedThroughput']['ReadCapacityUnits']
- if gsIndex['ProvisionedThroughput']['NumberOfDecreasesToday'] > 0:
- ddbObj['numberOfDecreasesToday'] = gsIndex['ProvisionedThroughput']['NumberOfDecreasesToday']
- Debug ( sys._getframe().f_code.co_name, ddbObjects )
- return ddbObjects
- def GetConsumedMetrics ( ddbOjbects ):
- debugObjects = []
- for ddbObj in ddbObjects:
- addedToDebug = False
- dimValue = ddbObj['name']
- if ddbObj['type'] == 'table':
- dimName = 'TableName'
- dimensions = [{'Name': dimName,'Value': dimValue}]
- if ddbObj['type'] == 'gsi':
- dimName = 'GlobalSecondaryIndexName'
- dimensions = [{'Name': dimName,'Value': dimValue}, {'Name': 'TableName', 'Value': ddbObj['parentTable']}]
- consumedReadCapacityUnits = GetConsumedMetricBuddy( 'ConsumedReadCapacityUnits', dimensions)
- if consumedReadCapacityUnits['Datapoints']:
- ddbObj['consumedReadCapacityUnits'] = int(round(consumedReadCapacityUnits['Datapoints'][0]['Sum'] / period,0))
- addedToDebug = True
- debugObjects.append(ddbObj)
- consumedWriteCapacityUnits = GetConsumedMetricBuddy( 'ConsumedWriteCapacityUnits', dimensions)
- if consumedWriteCapacityUnits['Datapoints']:
- ddbObj['consumedWriteCapacityUnits'] = int(round(consumedWriteCapacityUnits['Datapoints'][0]['Sum'] / period,0))
- if addedToDebug != True:
- debugObjects.append(ddbObj)
- Debug ( sys._getframe().f_code.co_name, debugObjects )
- return ddbOjbects
- def GetConsumedMetricBuddy ( metricName, dimensions ):
- response = cwClient.get_metric_statistics(
- Namespace='AWS/DynamoDB',
- MetricName=metricName,
- Dimensions=dimensions,
- StartTime=datetime.utcnow() - timedelta(minutes=timeDelta),
- EndTime=datetime.utcnow(),
- Period=period,
- Statistics=['Sum'],
- Unit='Count')
- return response
- def AggregateMetrics( dataset, group_by_key, sum_value_keys ):
- dic = defaultdict(Counter)
- for item in dataset:
- key = item[group_by_key]
- vals = {k:item[k] for k in sum_value_keys}
- dic[key].update(vals)
- Debug ( sys._getframe().f_code.co_name, dic.items() )
- return dic.items()
- def PostAggregatedMetrics ( aggregatedMetrics ):
- for metric in aggregatedMetrics:
- dimensions = [{'Name': 'stackName', 'Value': metric[0]}]
- metricData = [
- {'MetricName': 'provisionedWriteCapacityUnits', 'Dimensions': dimensions, 'Value': metric[1]['provisionedWriteCapacityUnits'], 'Unit': 'Count'},
- {'MetricName': 'provisionedReadCapacityUnits', 'Dimensions': dimensions, 'Value': metric[1]['provisionedReadCapacityUnits'], 'Unit': 'Count'},
- {'MetricName': 'consumedReadCapacityUnits', 'Dimensions': dimensions, 'Value': metric[1]['consumedReadCapacityUnits'], 'Unit': 'Count'},
- {'MetricName': 'consumedWriteCapacityUnits', 'Dimensions': dimensions,'Value': metric[1]['consumedWriteCapacityUnits'], 'Unit': 'Count'}]
- cwClient.put_metric_data(Namespace=cwNameSpace,MetricData=metricData)
- # not married with PostAggregatedMetrics, these metrics are obj by obj
- def PostDecreasesTodayMetrics ( ddbOjbects ):
- for ddbObj in ddbOjbects:
- dimensions = [{'Name': 'stackName', 'Value': ddbObj['stackName']}, {'Name': 'Name', 'Value': ddbObj['name']}]
- metricData = [{'MetricName': 'NumberOfDecreasesToday', 'Dimensions': dimensions, 'Value': ddbObj['numberOfDecreasesToday'], 'Unit': 'Count'}]
- cwClient.put_metric_data(Namespace=cwNameSpace,MetricData=metricData)
- if __name__ == "__main__":
- #while True:
- ddbClient = boto3.client('dynamodb', region_name=region)
- tableNames = GetTableNames ()
- ddbResource = boto3.resource('dynamodb', region_name=region)
- ddbObjects = GetDdbObjects ( tableNames )
- ddbObjects = GetProvisionedMetrics ( ddbObjects )
- cwClient = boto3.client('cloudwatch',region_name=region)
- ddbOjbects = GetConsumedMetrics ( ddbObjects )
- aggregatedMetrics = AggregateMetrics( ddbOjbects, 'stackName', ['provisionedWriteCapacityUnits', 'provisionedReadCapacityUnits', 'consumedReadCapacityUnits', 'consumedWriteCapacityUnits'])
- PostAggregatedMetrics ( aggregatedMetrics )
- PostDecreasesTodayMetrics ( ddbOjbects )
- #time.sleep(55)
- #sudo apt-get update
- #sudo apt-get install -y python-pip
- #sudo pip install boto3
- #wget --user=userName --password=pw urlToRaw
- #sudo chmod +x dynamoMetricAggregator.py
- #export VISUAL=nano; crontab -e
- #* * * * * /home/ubuntu/dynamoMetricAggregator.py
- #tail -F /var/log/syslog (cron logs here on ubuntu)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement