Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import sys
- import boto3
- import time
- import json
- import psycopg2
- import os
- from time import sleep
- client = boto3.client('dms')
- # set variables
- parent_table = os.environ["parent_table"]
- parent_schema = os.environ["parent_schema"]
- arn_task = os.environ["arn_task"]
- #interval=os.environ["interval"]
- pk_column= os.environ["pk_column"]
- range_column= os.environ["range_column"]
- rs_table_name=os.environ["rs_table_name"]
- # DB parameters
- pg_user=os.environ["pg_db_user"]
- pg_host=os.environ["pg_db_server"]
- pg_pass=os.environ["pg_db_pass"]
- rs_user=os.environ["rs_db_user"]
- rs_host=os.environ["rs_db_server"]
- rs_pass=os.environ["rs_db_pass"]
- sns_topic_arn=os.environ["sns_topic_arn"]
- lambda_name=os.environ["lambda_name"]
- def waiting_for(status, progress_message):
- while True:
- response = client.describe_replication_tasks(
- Filters=[
- {
- 'Name' : 'replication-task-arn',
- 'Values' : [
- arn_task
- ]
- }
- ])
- if response["ReplicationTasks"][0]["Status"] == status:
- break
- sleep(3)
- print(progress_message)
- def send_email(email_msg):
- # Create an SNS client
- sas_client = boto3.client("sns")
- # Publish a message.
- sas_client.publish(Message=email_msg, TopicArn=sns_topic_arn)
- def exit_lambda(err_msg):
- print(err_msg)
- send_email(err_msg)
- sys.exit()
- def lambda_handler(event, context):
- print('Start Task')
- print(arn_task)
- response_desc = client.describe_replication_tasks(
- Filters=[
- {
- 'Name' : 'replication-task-arn',
- 'Values' : [
- arn_task
- ]
- }
- ])
- old_table_name = json.loads(response_desc['ReplicationTasks'][0]['TableMappings'])['rules'][0]['object-locator']['table-name']
- conn_pg = psycopg2.connect("dbname='db' user='{0}' host='{1}' password='{2}'".format(pg_user,pg_host,pg_pass))
- conn_rs = psycopg2.connect("dbname='db' user='{0}' host='{1}' password='{2}' port='5439'".format(rs_user,rs_host,rs_pass))
- cur_pg = conn_pg.cursor()
- cur_rs = conn_rs.cursor()
- # Check if old partition are fully synced
- cur_pg.execute('SELECT count(distinct {0}), max({1}), min({1}) from {2}.{3}'.format(pk_column,range_column,parent_schema,old_table_name))
- if "hist" not in old_table_name:
- sp = old_table_name.split("_")
- cnt = old_table_name.count("_")
- start_date = sp[cnt-1]
- end_date = sp[cnt]
- print("SELECT count(distinct {0}) from {1}.{2} where {3} >= to_date('{4}','DDMMYYYY') and {3} < to_date('{5}','DDMMYYYY')".format(pk_column, parent_schema, rs_table_name,range_column,start_date,end_date))
- cur_rs.execute("SELECT count(distinct {0}) from {1}.{2} where {3} >= to_date('{4}','DDMMYYYY') and {3} < to_date('{5}','DDMMYYYY')".format(pk_column, parent_schema, rs_table_name,range_column,start_date,end_date))
- else:
- cur_rs.execute('SELECT count(distinct {0}) from {1}.{2}'.format(pk_column, parent_schema, rs_table_name))
- result_pg = cur_pg.fetchone()
- result_rs = cur_rs.fetchone()
- if result_rs[0] != result_pg[0]:
- exit_lambda("Error! Previous partition are not full synched. Row count in source {0}: {1}, Row count in target: {2}".format(old_table_name,result_pg[0],result_rs[0]))
- new_table_name = os.environ.get("new_table_name")
- if new_table_name is None:
- print("Getting new_table_name from db")
- # Get new active partition name
- conn_tb_pg = psycopg2.connect("dbname='db' user='{0}' host='{1}' password='{2}'".format(pg_user,pg_host,pg_pass))
- cur_tb_pg = conn_tb_pg.cursor()
- command = "SELECT admin.get_next_partition('{0}','{1}','{2}')".format(parent_table,old_table_name,parent_schema)
- # print(command)
- cur_tb_pg.execute(command)
- result_tb_pg = cur_tb_pg.fetchone()
- new_table_name = result_tb_pg[0]
- conn_tb_pg.close()
- cur_tb_pg.close()
- if (new_table_name == old_table_name) or (new_table_name is None):
- exit_lambda("Error! The current partition id still active! Table names: {0}, {1} ".format(new_table_name,old_table_name))
- task_status = response_desc["ReplicationTasks"][0]["Status"]
- print('replication current status is: {0}'.format(task_status))
- if response_desc["ReplicationTasks"][0]["Status"] == "running":
- # Stopping Replication
- print("Stoping Replication...")
- response_stop = client.stop_replication_task(
- ReplicationTaskArn=arn_task
- )
- waiting_for('stopped', 'stopping...')
- print('Task stopped')
- print("Replacing active partition: {0}, with: {1}".format(old_table_name,new_table_name))
- remap = response_desc['ReplicationTasks'][0]['TableMappings']
- remap = remap.replace(old_table_name,new_table_name)
- response_mod = client.modify_replication_task(
- ReplicationTaskArn=arn_task,
- TableMappings=remap,
- )
- waiting_for('stopped','Modifing...')
- # Restarting Replication task
- print("Starting replication...")
- response_start = client.start_replication_task(
- ReplicationTaskArn=arn_task,
- StartReplicationTaskType='reload-target'
- )
- waiting_for('starting', 'Starting...')
- send_email("Lambda {0} Finished Seccessfuly!".format(lambda_name))
- #close cursors
- cur_pg.close()
- conn_pg.close()
- cur_rs.close()
- conn_rs.close()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement