Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #!/usr/bin/env python
- import boto3
- import datadog
- from dictdiffer import diff
- import deepdiff
- import json
- import pyjq
- import os
- session = boto3.Session(
- aws_access_key_id=AWS_ACCESS_KEY_ID,
- aws_secret_access_key=AWS_SECRET_ACCESS_KEY
- )
- s3_client = session.client('s3')
- ecr_client = session.client('ecr')
- bucket_name = os.environ['S3_BUCKET_NAME']
- registry_id = os.environ['ECR_REGISTRY_ID']
- def json_dumps(reponse):
- return json.dumps(reponse, indent=4, sort_keys=True, default=str)
- def get_latest_image(repository):
- images= ecr_client.describe_images(repositoryName=repository, maxResults=1000)
- images_sorted = sorted(images['imageDetails'], key = lambda i: i['imagePushedAt'], reverse=True)
- image_digest = images_sorted[0].get('imageDigest')
- return image_digest
- def notify_datadog(repository, differences):
- print('Notifying Datadog...')
- datadog_api_key = environ.get('DATADOG_API_KEY')
- if env == 'prod':
- notification_mentions = '@slack-sunco-infra-notifs'
- else:
- notification_mentions = '@slack-datadog_integration_test'
- payload = {
- 'title': 'New vulnerabilies in ECR images',
- 'text': f'{notification_mentions} New vulnerability on last image in repository {repository}.\n'
- f'Here are the new vulnerabilites:{differences}',
- 'priority': 'normal',
- 'alert_type': 'info'
- }
- datadog.initialize(api_key=datadog_api_key)
- event = datadog.api.Event.create(**payload)
- print(event)
- # Get the descriptions of all repositories in ECR
- repositories_description = json_dumps(ecr_client.describe_repositories())
- # Get a list of all the repositories names
- repositories = pyjq.all('.repositories[] | .repositoryName', json.loads(repositories_description))
- # Retreive scan findings reports for every repositories with a latest image scan findings
- for repository in repositories:
- image_digest = get_latest_image(repository)
- try:
- scan_description = ecr_client.describe_image_scan_findings(
- registryId=registry_id,
- repositoryName=repository,
- imageId={
- 'imageDigest': image_digest
- }
- )
- # Continue to next iteration if an image as no scan yet
- # TODO: If no scans, scan the image
- except:
- continue
- # Upload scan report to S3 bucket with only findings data
- file_name = f"{repository}_scan_report.json"
- scan_json = json_dumps(scan_description)
- scan_findings = pyjq.all('.imageScanFindings | .findings[]', json.loads(scan_json))
- s3_client.put_object(Bucket=bucket_name, Body=json.dumps(scan_findings), Key=file_name)
- # Get last two versions of the scan (newly uploaded and last one)
- versions = json_dumps(s3_client.list_object_versions(Bucket=bucket_name, Prefix=file_name, Delimiter="2"))
- v1 = pyjq.first('.["Versions"][0]["VersionId"]', json.loads(versions))
- v2 = pyjq.first('.["Versions"][1]["VersionId"]', json.loads(versions))
- # Download scan report from version 1 and version 2
- scan_v1 = s3_client.get_object(Bucket=bucket_name, Key=file_name, VersionId=v1)
- scan_v2 = s3_client.get_object(Bucket=bucket_name, Key=file_name, VersionId=v2)
- print(json.loads(scan_v1['Body'].read()))
- # Compare the both file versions to trigger alert
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement