Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import boto3
- import json
- import logging
- import os
- logger = logging.getLogger()
- logger.setLevel(logging.INFO)
- def lambda_handler(event, context):
- """Secrets Manager Rotation Template
- This is a template for creating an AWS Secrets Manager rotation lambda
- Args:
- event (dict): Lambda dictionary of event parameters. These keys must include the following:
- - SecretId: The secret ARN or identifier
- - ClientRequestToken: The ClientRequestToken of the secret version
- - Step: The rotation step (one of createSecret, setSecret, testSecret, or finishSecret)
- context (LambdaContext): The Lambda runtime information
- Raises:
- ResourceNotFoundException: If the secret with the specified arn and stage does not exist
- ValueError: If the secret is not properly configured for rotation
- KeyError: If the event parameters do not contain the expected keys
- """
- arn = event['SecretId']
- token = event['ClientRequestToken']
- step = event['Step']
- # Setup the client
- service_client = boto3.client('secretsmanager', endpoint_url=os.environ['SECRETS_MANAGER_ENDPOINT'])
- # Make sure the version is staged correctly
- metadata = service_client.describe_secret(SecretId=arn)
- if not metadata['RotationEnabled']:
- logger.error("Secret %s is not enabled for rotation" % arn)
- raise ValueError("Secret %s is not enabled for rotation" % arn)
- versions = metadata['VersionIdsToStages']
- if token not in versions:
- logger.error("Secret version %s has no stage for rotation of secret %s." % (token, arn))
- raise ValueError("Secret version %s has no stage for rotation of secret %s." % (token, arn))
- if "AWSCURRENT" in versions[token]:
- logger.info("Secret version %s already set as AWSCURRENT for secret %s." % (token, arn))
- return
- elif "AWSPENDING" not in versions[token]:
- logger.error("Secret version %s not set as AWSPENDING for rotation of secret %s." % (token, arn))
- raise ValueError("Secret version %s not set as AWSPENDING for rotation of secret %s." % (token, arn))
- if step == "createSecret":
- create_secret(service_client, arn, token)
- elif step == "setSecret":
- set_secret(service_client, arn, token)
- elif step == "testSecret":
- test_secret(service_client, arn, token)
- elif step == "finishSecret":
- finish_secret(service_client, arn, token)
- else:
- raise ValueError("Invalid step parameter")
- def create_secret(service_client, arn, token):
- """Create the secret
- This method first checks for the existence of a secret for the passed in token. If one does not exist, it will generate a
- new secret and put it with the passed in token.
- Args:
- service_client (client): The secrets manager service client
- arn (string): The secret ARN or other identifier
- token (string): The ClientRequestToken associated with the secret version
- Raises:
- ResourceNotFoundException: If the secret with the specified arn and stage does not exist
- """
- # Make sure the current secret exists
- current_dict = get_secret_dict(service_client, arn, "AWSCURRENT")
- # Now try to get the secret version, if that fails, put a new secret
- try:
- get_secret_dict(service_client, arn, "AWSPENDING")
- logger.info("createSecret: Successfully retrieved secret for %s." % arn)
- except service_client.exceptions.ResourceNotFoundException:
- iam_user = current_dict['smtpUser']
- iam = boto3.client('iam')
- # Remove the oldest key if there are multiple
- logger.debug("Retrieving keys for user %s" % iam_user)
- keys = iam.list_access_keys(UserName=iam_user)['AccessKeyMetadata']
- logger.debug("Found %s key(s)" % len(keys))
- if len(keys) > 1:
- oldest_key = keys[0]
- for key in keys:
- if key['CreateDate'] < oldest_key['CreateDate']:
- logger.debug("%s is older than %s" % (key, oldest_key))
- oldest_key = key
- key_id_to_delete = oldest_key['AccessKeyId']
- key_date_to_delete = oldest_key['CreateDate']
- iam.delete_access_key(UserName=iam_user, AccessKeyId=key_id_to_delete)
- logger.info("Deleted old key %s from %s" % (key_id_to_delete, key_date_to_delete))
- # Generate a new access key
- logger.info("createSecret: Creating new access key for %s." % iam_user)
- newKey = iam.create_access_key(UserName=iam_user)['AccessKey']
- current_dict['smtpAccessKeyId'] = newKey['AccessKeyId']
- current_dict['smtpSecretKey'] = newKey['SecretAccessKey']
- current_dict['smtpUsername'] = newKey['AccessKeyId']
- current_dict['smtpPassword'] = generate_ses_password(newKey['SecretAccessKey'])
- # Put the secret
- service_client.put_secret_value(SecretId=arn, ClientRequestToken=token, SecretString=json.dumps(current_dict), VersionStages=['AWSPENDING'])
- logger.info("createSecret: Successfully put secret for ARN %s and version %s." % (arn, token))
- def generate_ses_password(secret_key, charset='utf-8'):
- import hmac #required to compute the HMAC key
- import hashlib #required to create a SHA256 hash
- import base64 #required to encode the computed key
- # These variables are used when calculating the SMTP password. You shouldn't
- # change them.
- message = 'SendRawEmail'
- version = '\x02'
- # Compute an HMAC-SHA256 key from the AWS secret access key.
- signatureInBytes = hmac.new(secret_key.encode(charset),message.encode(charset),hashlib.sha256).digest()
- # Prepend the version number to the signature.
- signatureAndVersion = version.encode(charset) + signatureInBytes
- # Base64-encode the string that contains the version number and signature.
- smtpPassword = base64.b64encode(signatureAndVersion)
- # Decode the string and return it
- return smtpPassword.decode(charset)
- def set_secret(service_client, arn, token):
- """Set the secret
- This method should set the AWSPENDING secret in the service that the secret belongs to. For example, if the secret is a database
- credential, this method should take the value of the AWSPENDING secret and set the user's password to this value in the database.
- Args:
- service_client (client): The secrets manager service client
- arn (string): The secret ARN or other identifier
- token (string): The ClientRequestToken associated with the secret version
- """
- # This is where the secret should be set in the service
- # NB This is not applicable for access keys, since they are created and set in one operation
- def test_secret(service_client, arn, token):
- """Test the secret
- This method should validate that the AWSPENDING secret works in the service that the secret belongs to.
- Args:
- service_client (client): The secrets manager service client
- arn (string): The secret ARN or other identifier
- token (string): The ClientRequestToken associated with the secret version
- """
- # This is where the secret should be tested against the service
- pending_dict = get_secret_dict(service_client, arn, "AWSPENDING")
- iam_user = pending_dict['smtpUser']
- logger.info("Checking that the pending key is valid for user %s" % iam_user)
- iam = boto3.client('iam')
- logger.debug("Retrieving keys for user %s" % iam_user)
- keys = iam.list_access_keys(UserName=iam_user)['AccessKeyMetadata']
- logger.debug("Found %s key(s)" % len(keys))
- pending_key = pending_dict['smtpAccessKeyId']
- for key in keys:
- if key["AccessKeyId"] == pending_key:
- logger.info("Pending access key %s was found on the user" % pending_key)
- return
- raise ValueError("Unable to find pending access key on user %s" % iam_user)
- def finish_secret(service_client, arn, token):
- """Finish the secret
- This method finalizes the rotation process by marking the secret version passed in as the AWSCURRENT secret.
- Args:
- service_client (client): The secrets manager service client
- arn (string): The secret ARN or other identifier
- token (string): The ClientRequestToken associated with the secret version
- Raises:
- ResourceNotFoundException: If the secret with the specified arn does not exist
- """
- # First describe the secret to get the current version
- metadata = service_client.describe_secret(SecretId=arn)
- current_version = None
- for version in metadata["VersionIdsToStages"]:
- if "AWSCURRENT" in metadata["VersionIdsToStages"][version]:
- if version == token:
- # The correct version is already marked as current, return
- logger.info("finishSecret: Version %s already marked as AWSCURRENT for %s" % (version, arn))
- publish_update(service_client, arn)
- return
- current_version = version
- break
- # Finalize by staging the secret version current
- service_client.update_secret_version_stage(SecretId=arn, VersionStage="AWSCURRENT", MoveToVersionId=token, RemoveFromVersionId=current_version)
- logger.info("finishSecret: Successfully set AWSCURRENT stage to version %s for secret %s." % (version, arn))
- publish_update(service_client, arn)
- def publish_update(service_client, arn):
- logger.info("Notifying consumers that the secret has updated")
- current_dict = get_secret_dict(service_client, arn, "AWSCURRENT")
- topic_arn = current_dict['snsTopic']
- if not topic_arn:
- logger.warn("No topic ARN provided; unable to notify")
- return
- sns = boto3.client('sns')
- result = sns.publish(TopicArn=topic_arn, Message='Updated %s' % arn)
- logger.info("Notification sent as message %s" % result['MessageId'])
- def get_secret_dict(service_client, arn, stage, token=None):
- """Gets the secret dictionary corresponding for the secret arn, stage, and token
- This helper function gets credentials for the arn and stage passed in and returns the dictionary by parsing the JSON string
- Args:
- service_client (client): The secrets manager service client
- arn (string): The secret ARN or other identifier
- token (string): The ClientRequestToken associated with the secret version, or None if no validation is desired
- stage (string): The stage identifying the secret version
- Returns:
- SecretDictionary: Secret dictionary
- Raises:
- ResourceNotFoundException: If the secret with the specified arn and stage does not exist
- ValueError: If the secret is not valid JSON
- """
- required_fields = ['smtpUser', 'smtpAccessKeyId', 'smtpSecretKey']
- # Only do VersionId validation against the stage if a token is passed in
- if token:
- secret = service_client.get_secret_value(SecretId=arn, VersionId=token, VersionStage=stage)
- else:
- secret = service_client.get_secret_value(SecretId=arn, VersionStage=stage)
- plaintext = secret['SecretString']
- secret_dict = json.loads(plaintext)
- # Run validations against the secret
- for field in required_fields:
- if field not in secret_dict:
- raise KeyError("%s key is missing from secret JSON" % field)
- # Parse and return the secret JSON string
- return secret_dict
Add Comment
Please, Sign In to add comment