Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #!/usr/bin/python3.6
- """Sync PostgreSQL."""
- import psycopg2
- import sys
- from psycopg2.extras import RealDictCursor, execute_values
- """
- Usage:
- sync.py
- Options:
- -f Force import tables without asking for user input per table
- """
- """
- Configure the info for connecting to both databases
- """
- SOURCE_USERNAME = ''
- SOURCE_PASSWD = ''
- SOURCE_URL = ''
- SOURCE_PORT = 5432
- SOURCE_DB = ''
- DEST_USERNAME = ''
- DEST_PASSWD = ''
- DEST_URL = ''
- DEST_PORT = 5432
- DEST_DB = ''
- """
- Add the tables to import
- if no schema_name is provided it defaults to 'public'
- """
- list_of_tables = [
- 'schema_name.table_name'
- ]
- def format_drop_create_query(table_name, columns, table_schema='public'):
- """format_drop_create_query."""
- """
- We create the DROP query portion
- """
- q_drop = 'DROP TABLE IF EXISTS "{}"."{}";'.format(table_schema, table_name)
- """
- For each column we get the:
- - column name
- - column type
- - column character maximum length for varchar values
- - indication if it allows NUll or not
- and we add the column part to the CREATE query
- """
- columns_string = ""
- i = 1
- for c_data in columns:
- if c_data['data_type'] == 'character varying':
- data_type = 'varchar({})'.format(
- c_data['character_maximum_length'])
- else:
- data_type = c_data['data_type']
- is_nullable_str = ''
- if c_data['is_nullable'] == 'NO':
- is_nullable_str = 'NOT NULL'
- column_string = '"{}" {} {}'.format(
- c_data['column_name'], data_type, is_nullable_str)
- if i < len(columns):
- column_string += ','
- column_string += '\n'
- columns_string += column_string
- i += 1
- q_create_body = '''CREATE TABLE "{}"."{}" (
- {}
- );
- '''.format(table_schema, table_name, columns_string)
- """
- We combine the two parts and return it
- """
- return q_drop + '\n' + q_create_body
- if __name__ == '__main__':
- try:
- """
- We check if the user set the -f arg or not and set the correct
- value to the 'prompt' flag
- """
- prompt = True
- if len(sys.argv) > 1 and sys.argv[1] == '-f':
- prompt = False
- """
- This is the query that we will run to get
- the data from the INFORMATION_SCHEMA
- """
- get_table_struct_query = '''
- SELECT
- table_schema,
- column_name,
- data_type,
- character_maximum_length,
- is_nullable
- FROM
- INFORMATION_SCHEMA.COLUMNS
- WHERE
- table_name = '{}'
- ORDER BY ordinal_position
- '''
- """
- Create connection to the source database
- and initialize a cursor
- """
- src = psycopg2.connect(
- database=SOURCE_DB,
- user=SOURCE_USERNAME,
- password=SOURCE_PASSWD,
- host=SOURCE_URL,
- port=SOURCE_PORT,
- cursor_factory=RealDictCursor
- )
- src_cur = src.cursor()
- """
- Create connection to the destination database
- and initialize a cursor
- """
- dest = psycopg2.connect(
- database=DEST_DB,
- user=DEST_USERNAME,
- password=DEST_PASSWD,
- host=DEST_URL,
- port=DEST_PORT,
- cursor_factory=RealDictCursor
- )
- dest_cur = dest.cursor()
- print('================================\n')
- """
- Iterate the list of tables we want to import
- """
- for table_name in list_of_tables:
- print('Importing {} \nfrom: [SOURCE] {} \nto: [DEST] {}\n'.format(
- table_name, src.dsn, dest.dsn))
- """
- Check if the -f flag was provided
- If not we ask the user to verify the import process for each table
- """
- if prompt:
- user_resp = input('Do you want to continue? [N/y] ')
- if user_resp != 'y' and user_resp != 'Y':
- print('Skiping table {}'.format(table_name))
- continue
- print('Starting import...\n')
- print('Getting table data from SOURCE...', '\n')
- """
- Separate the table name and the schema name from the
- combined table name in the list.
- If the name did not contain a schema we default to `public`
- """
- table_name_info = table_name.split('.')
- if len(table_name_info) == 2:
- table_schema = table_name_info[0]
- table_name = table_name_info[1]
- else:
- table_schema = 'public'
- """
- Execute the query in the source database that will
- get all the data of the table
- """
- q_get = 'SELECT * FROM "{}"."{}";'.format(table_schema, table_name)
- src_cur.execute(q_get)
- table_data = src_cur.fetchall()
- """
- Execute the query in the source database that will get the
- column data of the table from the INFORMATION_SCHEMA
- """
- print('Create table to DEST...', '\n')
- src_cur.execute(get_table_struct_query.format(table_name))
- """
- Use the reponse from the previous query and
- execute the DROP...CREATE... query to the destination database
- """
- dest_cur.execute(format_drop_create_query(
- table_name, columns=src_cur.fetchall()))
- if dest.notices:
- print(dest.notices, '\n')
- """
- After the table is created to the destination database
- we execute the query that inserts tha data.
- """
- print('Insert data to DEST...', '\n')
- column_names = ",".join(list(table_data[0].keys()))
- q_insert = 'INSERT INTO "{}"."{}" ({}) VALUES %s'.format(
- 'public', table_name, column_names)
- execute_values(
- dest_cur,
- q_insert,
- argslist=[list(v.values()) for v in table_data],
- template=None,
- page_size=100
- )
- """
- We commit everything and inform the user
- """
- dest.commit()
- print('...finished import\n')
- print('================================\n')
- """
- We close the connections to the databases
- """
- src.close()
- dest.close()
- except psycopg2.Error as e:
- """
- In case of error we rollback all actions to the destination database
- and close all connections
- """
- dest.rollback()
- src.close()
- dest.close()
- print('An error occured, ALL actions have been rollbacked.', '\n')
- print(e, '\n')
- print('================================\n')
Add Comment
Please, Sign In to add comment