Guest User

Untitled

a guest
Jan 2nd, 2018
119
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 6.87 KB | None | 0 0
  1. #!/usr/bin/python3.6
  2.  
  3. """Sync PostgreSQL."""
  4.  
  5. import psycopg2
  6. import sys
  7. from psycopg2.extras import RealDictCursor, execute_values
  8.  
  9. """
  10. Usage:
  11. sync.py
  12. Options:
  13. -f Force import tables without asking for user input per table
  14. """
  15.  
  16. """
  17. Configure the info for connecting to both databases
  18. """
  19. SOURCE_USERNAME = ''
  20. SOURCE_PASSWD = ''
  21. SOURCE_URL = ''
  22. SOURCE_PORT = 5432
  23. SOURCE_DB = ''
  24.  
  25. DEST_USERNAME = ''
  26. DEST_PASSWD = ''
  27. DEST_URL = ''
  28. DEST_PORT = 5432
  29. DEST_DB = ''
  30.  
  31. """
  32. Add the tables to import
  33. if no schema_name is provided it defaults to 'public'
  34. """
  35. list_of_tables = [
  36. 'schema_name.table_name'
  37. ]
  38.  
  39.  
  40. def format_drop_create_query(table_name, columns, table_schema='public'):
  41. """format_drop_create_query."""
  42. """
  43. We create the DROP query portion
  44. """
  45. q_drop = 'DROP TABLE IF EXISTS "{}"."{}";'.format(table_schema, table_name)
  46.  
  47. """
  48. For each column we get the:
  49. - column name
  50. - column type
  51. - column character maximum length for varchar values
  52. - indication if it allows NUll or not
  53. and we add the column part to the CREATE query
  54. """
  55. columns_string = ""
  56. i = 1
  57. for c_data in columns:
  58. if c_data['data_type'] == 'character varying':
  59. data_type = 'varchar({})'.format(
  60. c_data['character_maximum_length'])
  61. else:
  62. data_type = c_data['data_type']
  63.  
  64. is_nullable_str = ''
  65. if c_data['is_nullable'] == 'NO':
  66. is_nullable_str = 'NOT NULL'
  67.  
  68. column_string = '"{}" {} {}'.format(
  69. c_data['column_name'], data_type, is_nullable_str)
  70. if i < len(columns):
  71. column_string += ','
  72. column_string += '\n'
  73.  
  74. columns_string += column_string
  75. i += 1
  76.  
  77. q_create_body = '''CREATE TABLE "{}"."{}" (
  78. {}
  79. );
  80. '''.format(table_schema, table_name, columns_string)
  81.  
  82. """
  83. We combine the two parts and return it
  84. """
  85. return q_drop + '\n' + q_create_body
  86.  
  87. if __name__ == '__main__':
  88. try:
  89. """
  90. We check if the user set the -f arg or not and set the correct
  91. value to the 'prompt' flag
  92. """
  93. prompt = True
  94. if len(sys.argv) > 1 and sys.argv[1] == '-f':
  95. prompt = False
  96.  
  97. """
  98. This is the query that we will run to get
  99. the data from the INFORMATION_SCHEMA
  100. """
  101. get_table_struct_query = '''
  102. SELECT
  103. table_schema,
  104. column_name,
  105. data_type,
  106. character_maximum_length,
  107. is_nullable
  108. FROM
  109. INFORMATION_SCHEMA.COLUMNS
  110. WHERE
  111. table_name = '{}'
  112. ORDER BY ordinal_position
  113. '''
  114.  
  115. """
  116. Create connection to the source database
  117. and initialize a cursor
  118. """
  119. src = psycopg2.connect(
  120. database=SOURCE_DB,
  121. user=SOURCE_USERNAME,
  122. password=SOURCE_PASSWD,
  123. host=SOURCE_URL,
  124. port=SOURCE_PORT,
  125. cursor_factory=RealDictCursor
  126. )
  127. src_cur = src.cursor()
  128.  
  129. """
  130. Create connection to the destination database
  131. and initialize a cursor
  132. """
  133. dest = psycopg2.connect(
  134. database=DEST_DB,
  135. user=DEST_USERNAME,
  136. password=DEST_PASSWD,
  137. host=DEST_URL,
  138. port=DEST_PORT,
  139. cursor_factory=RealDictCursor
  140. )
  141. dest_cur = dest.cursor()
  142.  
  143. print('================================\n')
  144.  
  145. """
  146. Iterate the list of tables we want to import
  147. """
  148. for table_name in list_of_tables:
  149. print('Importing {} \nfrom: [SOURCE] {} \nto: [DEST] {}\n'.format(
  150. table_name, src.dsn, dest.dsn))
  151.  
  152. """
  153. Check if the -f flag was provided
  154. If not we ask the user to verify the import process for each table
  155. """
  156. if prompt:
  157. user_resp = input('Do you want to continue? [N/y] ')
  158. if user_resp != 'y' and user_resp != 'Y':
  159. print('Skiping table {}'.format(table_name))
  160. continue
  161.  
  162. print('Starting import...\n')
  163.  
  164. print('Getting table data from SOURCE...', '\n')
  165.  
  166. """
  167. Separate the table name and the schema name from the
  168. combined table name in the list.
  169. If the name did not contain a schema we default to `public`
  170. """
  171. table_name_info = table_name.split('.')
  172. if len(table_name_info) == 2:
  173. table_schema = table_name_info[0]
  174. table_name = table_name_info[1]
  175. else:
  176. table_schema = 'public'
  177.  
  178. """
  179. Execute the query in the source database that will
  180. get all the data of the table
  181. """
  182. q_get = 'SELECT * FROM "{}"."{}";'.format(table_schema, table_name)
  183. src_cur.execute(q_get)
  184. table_data = src_cur.fetchall()
  185.  
  186. """
  187. Execute the query in the source database that will get the
  188. column data of the table from the INFORMATION_SCHEMA
  189. """
  190. print('Create table to DEST...', '\n')
  191. src_cur.execute(get_table_struct_query.format(table_name))
  192.  
  193. """
  194. Use the reponse from the previous query and
  195. execute the DROP...CREATE... query to the destination database
  196. """
  197. dest_cur.execute(format_drop_create_query(
  198. table_name, columns=src_cur.fetchall()))
  199.  
  200. if dest.notices:
  201. print(dest.notices, '\n')
  202.  
  203. """
  204. After the table is created to the destination database
  205. we execute the query that inserts tha data.
  206. """
  207. print('Insert data to DEST...', '\n')
  208. column_names = ",".join(list(table_data[0].keys()))
  209. q_insert = 'INSERT INTO "{}"."{}" ({}) VALUES %s'.format(
  210. 'public', table_name, column_names)
  211. execute_values(
  212. dest_cur,
  213. q_insert,
  214. argslist=[list(v.values()) for v in table_data],
  215. template=None,
  216. page_size=100
  217. )
  218.  
  219. """
  220. We commit everything and inform the user
  221. """
  222. dest.commit()
  223. print('...finished import\n')
  224.  
  225. print('================================\n')
  226.  
  227. """
  228. We close the connections to the databases
  229. """
  230. src.close()
  231. dest.close()
  232.  
  233. except psycopg2.Error as e:
  234. """
  235. In case of error we rollback all actions to the destination database
  236. and close all connections
  237. """
  238. dest.rollback()
  239. src.close()
  240. dest.close()
  241. print('An error occured, ALL actions have been rollbacked.', '\n')
  242. print(e, '\n')
  243. print('================================\n')
Add Comment
Please, Sign In to add comment