Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import boto3
- import sys
- import fileinput
- client = boto3.client('waf')
- app_name = sys.argv[1]
- acl_action = sys.argv[2]
- rule_action = sys.argv[3]
- ip_set = ''
- rule = ''
- acl = ''
- def get_token():
- return client.get_change_token()['ChangeToken']
- def create_ip_set(app):
- return client.create_ip_set(
- Name=app + '-WAF-WitheList-IPs',
- ChangeToken=get_token()
- )
- def update_ip_set(ip_set_id):
- return client.update_ip_set(
- IPSetId=ip_set_id,
- ChangeToken=get_token(),
- Updates=[
- {
- 'Action': 'INSERT',
- 'IPSetDescriptor': {
- 'Type': 'IPV4',
- 'Value': '200.46.145.2/32'
- }
- },
- ]
- )
- def create_rule(app):
- return client.create_rule(
- Name=app + '-WAF-WitheList-Rule',
- MetricName=app + 'WAFWitheListRuleMetric',
- ChangeToken=get_token()
- )
- def update_rule(rule_id, ip_set_id):
- return client.update_rule(
- RuleId=rule_id,
- ChangeToken=get_token(),
- Updates=[
- {
- 'Action': 'INSERT',
- 'Predicate': {
- 'Negated': False,
- 'Type': 'IPMatch',
- 'DataId': ip_set_id
- }
- },
- ]
- )
- def create_acl(app, action):
- return client.create_web_acl(
- Name=app + '-WAF-WitheList-ACL',
- MetricName=app + 'WAFWitheListACLMetric',
- DefaultAction={
- 'Type': action
- },
- ChangeToken=get_token()
- )
- def update_acl(rule_id, acl_id, acl_act, rule_act):
- return client.update_web_acl(
- WebACLId=acl_id,
- ChangeToken=get_token(),
- Updates=[
- {
- 'Action': 'INSERT',
- 'ActivatedRule': {
- 'Priority': 1,
- 'RuleId': rule_id,
- 'Action': {
- 'Type': rule_act
- }
- }
- },
- ],
- DefaultAction={
- 'Type': acl_act
- }
- )
- def replace_acl_id(acl_id):
- with fileinput.FileInput('parameters.json', inplace=True) as file:
- for line in file:
- print(line.replace('ACL_ID_REPLACE', acl_id), end='')
- ip_sets_list = client.list_ip_sets(
- Limit=100
- )
- for i in ip_sets_list['IPSets']:
- if i['Name'] == app_name + '-WAF-WitheList-IPs':
- ip_set = i
- if not ip_set:
- print('IPSet Not found, creating')
- ip_set = create_ip_set(app_name)['IPSet']
- update_ip_set(ip_set['IPSetId'])
- else:
- print('Updating IPSet')
- update_ip_set(ip_set['IPSetId'])
- rules_list = client.list_rules(
- Limit=100
- )
- for i in rules_list['Rules']:
- if i['Name'] == app_name + '-WAF-WitheList-Rule':
- rule = i
- if not rule:
- print('Rule Not found, creating')
- rule = create_rule(app_name)['Rule']
- update_rule(rule['RuleId'], ip_set['IPSetId'])
- else:
- print('Updating Rule')
- update_rule(rule['RuleId'], ip_set['IPSetId'])
- acls_list = client.list_web_acls(
- Limit=100
- )
- for i in acls_list['WebACLs']:
- if i['Name'] == app_name + '-WAF-WitheList-ACL':
- acl = i
- if not acl:
- print('WebACL Not found, creating')
- acl = create_acl(app_name, 'ALLOW')['WebACL']
- print('ACL ID: ' + acl['WebACLId'])
- update_acl(rule['RuleId'], ip_set['IPSetId'], acl['WebACLId'], acl_action, rule_action)
- else:
- print('Updating WebACL')
- print('ACL ID: ' + acl['WebACLId'])
- update_acl(rule['RuleId'], acl['WebACLId'], acl_action, rule_action)
- replace_acl_id(acl['WebACLId'])
Add Comment
Please, Sign In to add comment