Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import copy
- import json
- import re
- import time
- import boto3
- from botocore.vendored import requests
- SUCCESS = "SUCCESS"
- FAILED = "FAILED"
- FAILED_PHYSICAL_RESOURCE_ID = "FAILED_PHYSICAL_RESOURCE_ID"
- class AddOrUpdateTargetArnsError(Exception):
- def __init__(self):
- self.message = 'Target arns are not allowed to be changed/added.'
- super().__init__(self.message)
- class FailedVpcLinkError(Exception):
- def __init__(self, status_message):
- self.message = f'statusMessages: {status_message}'
- super().__init__(self.message)
- def lambda_handler(event, context):
- try:
- _lambda_handler(event, context)
- except Exception as e:
- send(
- event,
- context,
- response_status=FAILED,
- # Do not fail on delete to avoid rollback failure
- response_data=None,
- physical_resource_id=event.get('PhysicalResourceId', FAILED_PHYSICAL_RESOURCE_ID),
- reason=e
- )
- # Must raise, otherwise the Lambda will be marked as successful, and the exception
- # will not be logged to CloudWatch logs.
- raise
- def _lambda_handler(event, context):
- print("Received event: ")
- print(event)
- resource_type = event['ResourceType']
- if resource_type != "Custom::ApiGatewayVpcLink":
- raise ValueError(f'Unexpected resource_type: {resource_type}')
- request_type = event['RequestType']
- wait_for = event.get('WaitFor', None)
- resource_properties = event['ResourceProperties']
- physical_resource_id = event.get('PhysicalResourceId', None)
- apigateway = boto3.client('apigateway')
- if wait_for:
- handle_self_invocation(
- wait_for=wait_for,
- physical_resource_id=physical_resource_id,
- event=event,
- context=context,
- )
- else:
- if request_type == 'Create':
- kwargs = dict(
- name=resource_properties['Name'],
- targetArns=resource_properties['TargetArns'],
- description=resource_properties.get('Description', None)
- )
- response = apigateway.create_vpc_link(**kwargs)
- event_copy = copy.deepcopy(event)
- event_copy['WaitFor'] = 'CreateComplete'
- event_copy['PhysicalResourceId'] = response['id']
- print('Reinvoking function because VPC link creation is asynchronous')
- relaunch_lambda(event=event_copy, context=context)
- return
- elif request_type == 'Update':
- old_resource_properties = event['OldResourceProperties']
- current_target_arns = apigateway.get_vpc_link(
- vpcLinkId=physical_resource_id,
- )['targetArns']
- # must compare current_target_arns to resource_properties['TargetArns'], to protect against
- # UPDATE created by UPDATE_FAILED. In that particular case, current_target_arns will be the same as
- # resource_properties['TargetArns'] but different than old_resource_properties['TargetArns']
- if set(current_target_arns) != set(resource_properties['TargetArns']) and
- set(resource_properties['TargetArns']) != set(old_resource_properties['TargetArns']):
- raise AddOrUpdateTargetArnsError()
- patch_operations = []
- if resource_properties['Name'] != old_resource_properties['Name']:
- patch_operations.append(dict(
- op='replace',
- path='/name',
- value=resource_properties['Name'],
- ))
- if 'Description' in resource_properties and 'Description' in old_resource_properties:
- if resource_properties['Description'] != old_resource_properties['Description']:
- patch_operations.append(dict(
- op='replace',
- path='/description',
- value=resource_properties['Description'],
- ))
- elif 'Description' in resource_properties and 'Description' not in old_resource_properties:
- patch_operations.append(dict(
- op='replace',
- path='/description',
- value=resource_properties['Description'],
- ))
- elif 'Description' not in resource_properties and 'Description' in old_resource_properties:
- patch_operations.append(dict(
- op='replace',
- path='/description',
- value=None,
- ))
- apigateway.update_vpc_link(
- vpcLinkId=physical_resource_id,
- patchOperations=patch_operations,
- )
- elif request_type == 'Delete':
- delete = True
- if physical_resource_id == FAILED_PHYSICAL_RESOURCE_ID:
- delete = False
- print('Custom resource was never properly created, skipping deletion.')
- stack_name = re.match("arn:aws:cloudformation:.+:stack/(?P<stack_name>.+)/.+", event['StackId']).group('stack_name')
- if stack_name in physical_resource_id:
- delete = False
- print(f'Skipping deletion, because VPC link was not created properly. Heuristic: stack name ({stack_name}) found in physical resource ID ({physical_resource_id})')
- logical_resource_id = event['LogicalResourceId']
- if logical_resource_id in physical_resource_id:
- delete = False
- print(f'Skipping deletion, because VPC link was not created properly. Heuristic: logical resource ID ({logical_resource_id}) found in physical resource ID ({physical_resource_id})')
- if delete:
- apigateway.delete_vpc_link(
- vpcLinkId=physical_resource_id
- )
- event_copy = copy.deepcopy(event)
- event_copy['WaitFor'] = 'DeleteComplete'
- print('Reinvoking function because VPC link deletion is asynchronous')
- relaunch_lambda(event=event_copy, context=context)
- return
- else:
- print(f'Request type is {request_type}, doing nothing.')
- send(
- event,
- context,
- response_status=SUCCESS,
- response_data=None,
- physical_resource_id=physical_resource_id,
- )
- def handle_self_invocation(wait_for, physical_resource_id, event, context):
- apigateway = boto3.client('apigateway')
- if wait_for == 'CreateComplete':
- print('Waiting for creation of VPC link: {vpc_link_id}'.format(vpc_link_id=physical_resource_id))
- response = apigateway.get_vpc_link(
- vpcLinkId=physical_resource_id,
- )
- status = response['status']
- print('Status of VPC link {vpc_link_id} is {status}'.format(vpc_link_id=physical_resource_id, status=status))
- if status == 'AVAILABLE':
- send(
- event,
- context,
- response_status=SUCCESS,
- response_data=None,
- physical_resource_id=physical_resource_id,
- )
- elif status == 'FAILED':
- raise FailedVpcLinkError(status_message=response['statusMessage'])
- elif status == 'PENDING':
- # Sleeping here to avoid polluting CloudWatch Logs by reinvoking the Lambda too quickly
- time.sleep(30)
- relaunch_lambda(event, context)
- else:
- print('Unexpected status, doing nothing')
- elif wait_for == 'DeleteComplete':
- print('Waiting for deletion of VPC link: {vpc_link_id}'.format(vpc_link_id=physical_resource_id))
- try:
- response = apigateway.get_vpc_link(
- vpcLinkId=physical_resource_id,
- )
- except apigateway.exceptions.NotFoundException:
- print('VPC link {vpc_link_id} deleted successfully'.format(vpc_link_id=physical_resource_id))
- send(
- event,
- context,
- response_status=SUCCESS,
- response_data=None,
- physical_resource_id=physical_resource_id,
- )
- else:
- status = response['status']
- assert status == 'DELETING', f'status is {status}'
- # Sleeping here to avoid polluting CloudWatch Logs by reinvoking the Lambda too quickly
- time.sleep(10)
- relaunch_lambda(event, context)
- else:
- raise ValueError(f'Unexpected WaitFor: {wait_for}')
- def relaunch_lambda(event, context):
- boto3.client("lambda").invoke(
- FunctionName=context.function_name,
- InvocationType='Event',
- Payload=json.dumps(event),
- )
- def send(event, context, response_status, response_data, physical_resource_id, reason=None):
- response_url = event['ResponseURL']
- response_body = {
- 'Status': response_status,
- 'Reason': str(reason) if reason else 'See the details in CloudWatch Log Stream: ' + context.log_stream_name,
- 'PhysicalResourceId': physical_resource_id,
- 'StackId': event['StackId'],
- 'RequestId': event['RequestId'],
- 'LogicalResourceId': event['LogicalResourceId'],
- 'Data': response_data,
- }
- json_response_body = json.dumps(response_body)
- headers = {
- 'content-type': '',
- 'content-length': str(len(json_response_body))
- }
- try:
- requests.put(
- response_url,
- data=json_response_body,
- headers=headers
- )
- except Exception as e:
- print("send(..) failed executing requests.put(..): " + str(e))
Add Comment
Please, Sign In to add comment