Advertisement
Guest User

Untitled

a guest
Apr 12th, 2019
142
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 5.57 KB | None | 0 0
  1. import sys
  2. import boto3
  3. import time
  4. import json
  5. import psycopg2
  6. import os
  7. from time import sleep
  8.  
  9.  
  10. client = boto3.client('dms')
  11.  
  12. # set variables
  13. parent_table = os.environ["parent_table"]
  14. parent_schema = os.environ["parent_schema"]
  15. arn_task = os.environ["arn_task"]
  16. #interval=os.environ["interval"]
  17. pk_column= os.environ["pk_column"]
  18. range_column= os.environ["range_column"]
  19. rs_table_name=os.environ["rs_table_name"]
  20.  
  21. # DB parameters
  22. pg_user=os.environ["pg_db_user"]
  23. pg_host=os.environ["pg_db_server"]
  24. pg_pass=os.environ["pg_db_pass"]
  25. rs_user=os.environ["rs_db_user"]
  26. rs_host=os.environ["rs_db_server"]
  27. rs_pass=os.environ["rs_db_pass"]
  28. sns_topic_arn=os.environ["sns_topic_arn"]
  29. lambda_name=os.environ["lambda_name"]
  30.  
  31. def waiting_for(status, progress_message):
  32. while True:
  33. response = client.describe_replication_tasks(
  34. Filters=[
  35. {
  36. 'Name' : 'replication-task-arn',
  37. 'Values' : [
  38. arn_task
  39. ]
  40. }
  41. ])
  42. if response["ReplicationTasks"][0]["Status"] == status:
  43. break
  44. sleep(3)
  45. print(progress_message)
  46.  
  47. def send_email(email_msg):
  48. # Create an SNS client
  49. sas_client = boto3.client("sns")
  50.  
  51. # Publish a message.
  52. sas_client.publish(Message=email_msg, TopicArn=sns_topic_arn)
  53.  
  54. def exit_lambda(err_msg):
  55. print(err_msg)
  56. send_email(err_msg)
  57. sys.exit()
  58.  
  59. def lambda_handler(event, context):
  60.  
  61. print('Start Task')
  62. print(arn_task)
  63.  
  64. response_desc = client.describe_replication_tasks(
  65. Filters=[
  66. {
  67. 'Name' : 'replication-task-arn',
  68. 'Values' : [
  69. arn_task
  70. ]
  71. }
  72. ])
  73.  
  74. old_table_name = json.loads(response_desc['ReplicationTasks'][0]['TableMappings'])['rules'][0]['object-locator']['table-name']
  75.  
  76. conn_pg = psycopg2.connect("dbname='db' user='{0}' host='{1}' password='{2}'".format(pg_user,pg_host,pg_pass))
  77. conn_rs = psycopg2.connect("dbname='db' user='{0}' host='{1}' password='{2}' port='5439'".format(rs_user,rs_host,rs_pass))
  78.  
  79. cur_pg = conn_pg.cursor()
  80. cur_rs = conn_rs.cursor()
  81.  
  82. # Check if old partition are fully synced
  83. cur_pg.execute('SELECT count(distinct {0}), max({1}), min({1}) from {2}.{3}'.format(pk_column,range_column,parent_schema,old_table_name))
  84.  
  85. if "hist" not in old_table_name:
  86. sp = old_table_name.split("_")
  87. cnt = old_table_name.count("_")
  88. start_date = sp[cnt-1]
  89. end_date = sp[cnt]
  90. print("SELECT count(distinct {0}) from {1}.{2} where {3} >= to_date('{4}','DDMMYYYY') and {3} < to_date('{5}','DDMMYYYY')".format(pk_column, parent_schema, rs_table_name,range_column,start_date,end_date))
  91. cur_rs.execute("SELECT count(distinct {0}) from {1}.{2} where {3} >= to_date('{4}','DDMMYYYY') and {3} < to_date('{5}','DDMMYYYY')".format(pk_column, parent_schema, rs_table_name,range_column,start_date,end_date))
  92. else:
  93. cur_rs.execute('SELECT count(distinct {0}) from {1}.{2}'.format(pk_column, parent_schema, rs_table_name))
  94.  
  95. result_pg = cur_pg.fetchone()
  96. result_rs = cur_rs.fetchone()
  97.  
  98. if result_rs[0] != result_pg[0]:
  99. exit_lambda("Error! Previous partition are not full synched. Row count in source {0}: {1}, Row count in target: {2}".format(old_table_name,result_pg[0],result_rs[0]))
  100.  
  101. new_table_name = os.environ.get("new_table_name")
  102. if new_table_name is None:
  103. print("Getting new_table_name from db")
  104. # Get new active partition name
  105. conn_tb_pg = psycopg2.connect("dbname='db' user='{0}' host='{1}' password='{2}'".format(pg_user,pg_host,pg_pass))
  106. cur_tb_pg = conn_tb_pg.cursor()
  107. command = "SELECT admin.get_next_partition('{0}','{1}','{2}')".format(parent_table,old_table_name,parent_schema)
  108. # print(command)
  109. cur_tb_pg.execute(command)
  110. result_tb_pg = cur_tb_pg.fetchone()
  111.  
  112. new_table_name = result_tb_pg[0]
  113.  
  114. conn_tb_pg.close()
  115. cur_tb_pg.close()
  116.  
  117. if (new_table_name == old_table_name) or (new_table_name is None):
  118. exit_lambda("Error! The current partition id still active! Table names: {0}, {1} ".format(new_table_name,old_table_name))
  119.  
  120. task_status = response_desc["ReplicationTasks"][0]["Status"]
  121. print('replication current status is: {0}'.format(task_status))
  122. if response_desc["ReplicationTasks"][0]["Status"] == "running":
  123. # Stopping Replication
  124. print("Stoping Replication...")
  125. response_stop = client.stop_replication_task(
  126. ReplicationTaskArn=arn_task
  127. )
  128.  
  129. waiting_for('stopped', 'stopping...')
  130. print('Task stopped')
  131.  
  132. print("Replacing active partition: {0}, with: {1}".format(old_table_name,new_table_name))
  133. remap = response_desc['ReplicationTasks'][0]['TableMappings']
  134. remap = remap.replace(old_table_name,new_table_name)
  135.  
  136. response_mod = client.modify_replication_task(
  137. ReplicationTaskArn=arn_task,
  138. TableMappings=remap,
  139. )
  140. waiting_for('stopped','Modifing...')
  141.  
  142. # Restarting Replication task
  143. print("Starting replication...")
  144. response_start = client.start_replication_task(
  145. ReplicationTaskArn=arn_task,
  146. StartReplicationTaskType='reload-target'
  147. )
  148.  
  149. waiting_for('starting', 'Starting...')
  150. send_email("Lambda {0} Finished Seccessfuly!".format(lambda_name))
  151.  
  152. #close cursors
  153. cur_pg.close()
  154. conn_pg.close()
  155.  
  156. cur_rs.close()
  157. conn_rs.close()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement