Advertisement
Guest User

dynamoDbMetricAggregator

a guest
Apr 19th, 2016
124
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 8.10 KB | None | 0 0
  1. #!/usr/bin/env python
  2. import boto3
  3. import json
  4. import ast
  5. import sys
  6. from collections import defaultdict, Counter
  7. from datetime import timedelta
  8. from datetime import datetime
  9. import time
  10.  
  11. region = 'us-east-1'
  12. skipList = ['AtomicCounters']
  13. cwNameSpace = 'ddbAggregates'
  14. timeDelta = 5
  15. period = 60
  16. debug = False
  17.  
  18. def Debug ( functionName, things ):
  19.     if debug:
  20.         print "\n########################## %s" % functionName
  21.         for thing in things:
  22.             print thing
  23.  
  24. def GetTableNames ():
  25.     allTableNames = ast.literal_eval(json.dumps(ddbClient.list_tables()['TableNames'])) #removes the unicode 'u from what dynamo returns
  26.     allTableNames = set(allTableNames) - set(skipList)
  27.     Debug ( sys._getframe().f_code.co_name, allTableNames )
  28.     return sorted(set(allTableNames))
  29.  
  30. def GetDdbObjects ( tableNames ):
  31.     ddbObjects = []
  32.     for tableName in tableNames:
  33.         try:
  34.             stackName = tableName.split("-")[1] #env-stack-tableName-uid
  35.         except:
  36.             stackName = False
  37.         if stackName:
  38.             ddbObj = InitDdbObj ( stackName, tableName, None, 'table')
  39.             ddbObjects.append(ddbObj)
  40.             table = ddbResource.Table(tableName)
  41.             gsIndexes = table.global_secondary_indexes
  42.             if gsIndexes is not None:
  43.                 for gsIndex in gsIndexes:
  44.                     gsiName = ast.literal_eval(json.dumps(gsIndex['IndexName']))
  45.                     ddbObj = InitDdbObj ( stackName, gsiName, tableName, 'gsi')
  46.                     ddbObjects.append(ddbObj)
  47.     Debug ( sys._getframe().f_code.co_name, ddbObjects )
  48.     return ddbObjects
  49.  
  50. def InitDdbObj ( stackName, objName, parentTable, objType):
  51.     attribs = {}
  52.     attribs['stackName'] = stackName
  53.     attribs['name'] = objName
  54.     attribs['type'] = objType
  55.     attribs['provisionedWriteCapacityUnits'] = 0
  56.     attribs['provisionedReadCapacityUnits'] = 0
  57.     attribs['consumedReadCapacityUnits'] = 0
  58.     attribs['consumedWriteCapacityUnits'] = 0
  59.     attribs['numberOfDecreasesToday'] = 0
  60.     if parentTable:
  61.             attribs['parentTable'] = parentTable
  62.     return attribs
  63.  
  64. def GetProvisionedMetrics ( ddbObjects ):
  65.     for ddbObj in ddbObjects:
  66.         if ddbObj['type'] == 'table':
  67.             table = ddbResource.Table(ddbObj['name'])
  68.             if table.provisioned_throughput['WriteCapacityUnits'] > 0:
  69.                 ddbObj['provisionedWriteCapacityUnits'] = table.provisioned_throughput['WriteCapacityUnits']
  70.             if table.provisioned_throughput['ReadCapacityUnits'] > 0:
  71.                 ddbObj['provisionedReadCapacityUnits'] = table.provisioned_throughput['ReadCapacityUnits']
  72.             if table.provisioned_throughput['NumberOfDecreasesToday'] > 0:
  73.                 ddbObj['numberOfDecreasesToday'] = table.provisioned_throughput['NumberOfDecreasesToday']
  74.         if ddbObj['type'] == 'gsi':
  75.             table = ddbResource.Table(ddbObj['parentTable'])
  76.             gsIndexes = table.global_secondary_indexes
  77.             for gsIndex in gsIndexes:
  78.                 if ddbObj['name'] == gsIndex['IndexName']:
  79.                     if gsIndex['ProvisionedThroughput']['WriteCapacityUnits'] > 0:
  80.                         ddbObj['provisionedWriteCapacityUnits'] = gsIndex['ProvisionedThroughput']['WriteCapacityUnits']
  81.                     if gsIndex['ProvisionedThroughput']['ReadCapacityUnits'] > 0:
  82.                         ddbObj['provisionedReadCapacityUnits'] = gsIndex['ProvisionedThroughput']['ReadCapacityUnits']
  83.                     if gsIndex['ProvisionedThroughput']['NumberOfDecreasesToday'] > 0:
  84.                         ddbObj['numberOfDecreasesToday'] = gsIndex['ProvisionedThroughput']['NumberOfDecreasesToday']
  85.     Debug ( sys._getframe().f_code.co_name, ddbObjects )
  86.     return ddbObjects
  87.  
  88. def GetConsumedMetrics ( ddbOjbects ):
  89.     debugObjects = []
  90.     for ddbObj in ddbObjects:
  91.         addedToDebug = False
  92.         dimValue = ddbObj['name']
  93.         if ddbObj['type'] == 'table':
  94.             dimName = 'TableName'
  95.             dimensions = [{'Name': dimName,'Value': dimValue}]
  96.         if ddbObj['type'] == 'gsi':
  97.             dimName = 'GlobalSecondaryIndexName'
  98.             dimensions = [{'Name': dimName,'Value': dimValue}, {'Name': 'TableName', 'Value': ddbObj['parentTable']}]
  99.         consumedReadCapacityUnits = GetConsumedMetricBuddy( 'ConsumedReadCapacityUnits', dimensions)
  100.         if consumedReadCapacityUnits['Datapoints']:
  101.             ddbObj['consumedReadCapacityUnits'] = int(round(consumedReadCapacityUnits['Datapoints'][0]['Sum'] / period,0))
  102.             addedToDebug = True
  103.             debugObjects.append(ddbObj)
  104.         consumedWriteCapacityUnits = GetConsumedMetricBuddy( 'ConsumedWriteCapacityUnits', dimensions)
  105.         if consumedWriteCapacityUnits['Datapoints']:
  106.             ddbObj['consumedWriteCapacityUnits'] = int(round(consumedWriteCapacityUnits['Datapoints'][0]['Sum'] / period,0))
  107.             if addedToDebug != True:
  108.                 debugObjects.append(ddbObj)
  109.     Debug ( sys._getframe().f_code.co_name, debugObjects )
  110.     return ddbOjbects
  111.  
  112. def GetConsumedMetricBuddy ( metricName, dimensions ):
  113.     response = cwClient.get_metric_statistics(
  114.         Namespace='AWS/DynamoDB',
  115.         MetricName=metricName,
  116.         Dimensions=dimensions,
  117.         StartTime=datetime.utcnow() - timedelta(minutes=timeDelta),
  118.         EndTime=datetime.utcnow(),
  119.         Period=period,
  120.         Statistics=['Sum'],
  121.         Unit='Count')
  122.     return response
  123.  
  124. def AggregateMetrics( dataset, group_by_key, sum_value_keys ):
  125.     dic = defaultdict(Counter)
  126.     for item in dataset:
  127.         key = item[group_by_key]
  128.         vals = {k:item[k] for k in sum_value_keys}
  129.         dic[key].update(vals)
  130.     Debug ( sys._getframe().f_code.co_name, dic.items() )
  131.     return dic.items()
  132.  
  133. def PostAggregatedMetrics ( aggregatedMetrics ):
  134.     for metric in aggregatedMetrics:
  135.         dimensions = [{'Name': 'stackName', 'Value': metric[0]}]
  136.         metricData = [
  137.             {'MetricName': 'provisionedWriteCapacityUnits', 'Dimensions': dimensions, 'Value': metric[1]['provisionedWriteCapacityUnits'], 'Unit': 'Count'},
  138.             {'MetricName': 'provisionedReadCapacityUnits', 'Dimensions': dimensions, 'Value': metric[1]['provisionedReadCapacityUnits'], 'Unit': 'Count'},
  139.             {'MetricName': 'consumedReadCapacityUnits', 'Dimensions': dimensions, 'Value': metric[1]['consumedReadCapacityUnits'], 'Unit': 'Count'},
  140.             {'MetricName': 'consumedWriteCapacityUnits', 'Dimensions': dimensions,'Value': metric[1]['consumedWriteCapacityUnits'], 'Unit': 'Count'}]
  141.         cwClient.put_metric_data(Namespace=cwNameSpace,MetricData=metricData)
  142.  
  143. # not married with PostAggregatedMetrics, these metrics are obj by obj
  144. def PostDecreasesTodayMetrics ( ddbOjbects ):
  145.     for ddbObj in ddbOjbects:
  146.         dimensions = [{'Name': 'stackName', 'Value': ddbObj['stackName']}, {'Name': 'Name', 'Value': ddbObj['name']}]
  147.         metricData = [{'MetricName': 'NumberOfDecreasesToday', 'Dimensions': dimensions, 'Value': ddbObj['numberOfDecreasesToday'], 'Unit': 'Count'}]
  148.         cwClient.put_metric_data(Namespace=cwNameSpace,MetricData=metricData)
  149.  
  150.  
  151. if __name__ == "__main__":
  152.     #while True:
  153.     ddbClient = boto3.client('dynamodb', region_name=region)
  154.     tableNames = GetTableNames ()
  155.  
  156.     ddbResource = boto3.resource('dynamodb', region_name=region)
  157.     ddbObjects = GetDdbObjects ( tableNames )
  158.     ddbObjects = GetProvisionedMetrics ( ddbObjects )
  159.  
  160.     cwClient = boto3.client('cloudwatch',region_name=region)
  161.     ddbOjbects = GetConsumedMetrics ( ddbObjects )
  162.     aggregatedMetrics = AggregateMetrics( ddbOjbects, 'stackName', ['provisionedWriteCapacityUnits', 'provisionedReadCapacityUnits', 'consumedReadCapacityUnits', 'consumedWriteCapacityUnits'])
  163.     PostAggregatedMetrics ( aggregatedMetrics )
  164.     PostDecreasesTodayMetrics ( ddbOjbects )
  165.  
  166.     #time.sleep(55)
  167.  
  168. #sudo apt-get update
  169. #sudo apt-get install -y python-pip
  170. #sudo pip install boto3
  171. #wget --user=userName --password=pw urlToRaw
  172. #sudo chmod +x dynamoMetricAggregator.py
  173. #export VISUAL=nano; crontab -e
  174. #* * * * * /home/ubuntu/dynamoMetricAggregator.py
  175. #tail -F /var/log/syslog (cron logs here on ubuntu)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement