Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #!/usr/bin/env python
- # -*- coding: utf-8 -*-
- import os
- import csv
- import psycopg2
- import datetime
- from timeit import default_timer as timer
- # Connection details:
- dbname='db'
- user='user'
- host='host:port'
- password='pass'
- def get_tables_databases():
- with psycopg2.connect(dbname=dbname, user=user, host=host, password=password) as conn:
- with conn.cursor() as cur:
- cur.execute('select table_name from INFORMATION_SCHEMA.TABLES where table_type=\'BASE TABLE\' and table_schema not in (\'pg_catalog\', \'information_schema\');')
- return [r for (r,) in cur]
- def get_column_table(table_name: str):
- with psycopg2.connect(dbname=dbname, user=user, host=host, password=password) as conn:
- with conn.cursor() as cur:
- cur.execute('select column_name from INFORMATION_SCHEMA.COLUMNS where table_name=\'' + table_name + '\';')
- return [r for (r,) in cur]
- def get_column_details_table(table_name: str):
- with psycopg2.connect(dbname=dbname, user=user, host=host, password=password) as conn:
- with conn.cursor() as cur:
- cur.execute('select column_name, data_type, character_maximum_length from INFORMATION_SCHEMA.COLUMNS where table_name=\'' + table_name + '\';')
- return [r for r in cur]
- def get_length_table(table_name: str):
- with psycopg2.connect(dbname=dbname, user=user, host=host, password=password) as conn:
- with conn.cursor() as cur:
- cur.execute('select count(*) from "' + table_name + '";')
- return cur.fetchone()[0]
- def get_aprox_length_table(table_name: str):
- with psycopg2.connect(dbname=dbname, user=user, host=host, password=password) as conn:
- with conn.cursor() as cur:
- cur.execute('select reltuples as estimate from pg_class where relname=\'' + table_name + '\';')
- return cur.fetchone()[0]
- def get_size_database():
- with psycopg2.connect(dbname=dbname, user=user, host=host, password=password) as conn:
- with conn.cursor() as cur:
- cur.execute('select pg_size_pretty(pg_database_size(pg_database.datname)) as size from pg_database where pg_database.datname=\'' + dbname + '\';')
- return cur.fetchone()[0]
- def get_rows_table(table_name: str,):
- with psycopg2.connect(dbname=dbname, user=user, host=host, password=password) as conn:
- with conn.cursor() as cur:
- cur.execute('select * from ' + table_name + ';')
- return cur.fetchall()
- def export_table_csv(table_name: str, file_path: str = None, with_title: bool = False):
- file_name = os.path.join(file_path if file_path else '.', table_name + '.csv')
- os.makedirs(os.path.dirname(file_name), exist_ok=True)
- with open(file_name, mode="w", encoding="UTF-8") as csvfile:
- csvwriter = csv.writer(csvfile, delimiter=' ', quotechar='|', quoting=csv.QUOTE_MINIMAL)
- with psycopg2.connect(dbname=dbname, user=user, host=host, password=password) as conn:
- # column names
- if with_title:
- with conn.cursor() as cur:
- cur.execute('select column_name from INFORMATION_SCHEMA.COLUMNS where table_name=\'' + table_name + '\';')
- title = []
- for (r,) in cur:
- title.append(r)
- csvwriter.writerow(title)
- # content
- with conn.cursor('server-side-cursor') as cur:
- cur.itersize = 100000 # how much records to buffer on a client
- cur.execute('select * from ' + table_name + ';')
- for r in cur:
- csvwriter.writerow(r)
- def export_database_csv(file_path: str = None, with_title: bool = False):
- file_path = file_path if file_path else dbname
- os.makedirs(file_path, exist_ok=True)
- for table in get_tables_databases():
- export_table_csv(table, file_path, with_title)
- def export_table_copy(table_name: str, file_path: str = None, with_title: bool = False):
- file_name = os.path.join(file_path if file_path else '.', table_name + '.copy')
- os.makedirs(os.path.dirname(file_name), exist_ok=True)
- with open(file_name, mode="w", encoding="UTF-8") as copyfile:
- with psycopg2.connect(dbname=dbname, user=user, host=host, password=password) as conn:
- with conn.cursor() as cur:
- # column names
- if with_title:
- cur.execute('select column_name from INFORMATION_SCHEMA.COLUMNS where table_name=\'' + table_name + '\';')
- title = ''
- for (r,) in cur:
- title += r + '|'
- copyfile.write(title[:-1] + '\n')
- # content
- cur.copy_to(copyfile, table_name, sep="|")
- def export_database_copy(file_path: str = None, with_title: bool = False):
- file_path = file_path if file_path else dbname
- os.makedirs(file_path, exist_ok=True)
- for table in get_tables_databases():
- export_table_copy(table, file_path, with_title)
- def export_table_sql(table_name: str, file_path: str = None):
- file_name = os.path.join(file_path if file_path else '.', table_name + '.sql')
- os.makedirs(os.path.dirname(file_name), exist_ok=True)
- with open(file_name, mode="w", encoding="UTF-8") as sqlfile:
- with psycopg2.connect(dbname=dbname, user=user, host=host, password=password) as conn:
- # table
- with conn.cursor() as cur:
- cur.execute('select column_name, data_type, character_maximum_length from INFORMATION_SCHEMA.COLUMNS where table_name=\'' + table_name + '\';')
- columns = []
- for r in cur:
- (cname, ctype, clen) = r
- if ctype in {'character varying', 'varchar', 'character', 'char', 'text'}:
- ctype = 'varchar'
- elif ctype == 'money':
- ctype = 'double precision'
- elif ctype == 'bytea':
- ctype = 'BLOB'
- clen = '(' + clen + ')' if clen and ctype == 'varchar' else ''
- columns.append(cname + ' ' + ctype + clen)
- sqlfile.write('create table \'' + table_name + '\' (' + ', '.join(columns) + ');\n')
- # rows
- with conn.cursor('server-side-cursor') as cur:
- cur.itersize = 100000 # how much records to buffer on a client
- cur.execute('select * from ' + table_name + ';')
- for r in cur:
- data = []
- for rd in r:
- if rd == 'null':
- data.append('NULL')
- else:
- data.append(repr(rd))
- sqlfile.write('insert into ' + table_name + ' values (' + ', '.join(data) + ');\n')
- def export_database_sql(file_path: str = None):
- file_path = file_path if file_path else dbname
- os.makedirs(file_path, exist_ok=True)
- for table in get_tables_databases():
- export_table_sql(table, file_path)
- # test :
- print(get_tables_databases())
- print(get_size_database())
- print(get_column_table('a_table'))
- for col in get_column_details_table('a_table'):
- print(col)
- print(get_length_table('a_table'))
- print(get_aprox_length_table('a_table'))
- # export bench
- start = timer()
- export_table_sql('a_table')
- end = timer()
- print(end-start)
- # export bench
- start = timer()
- export_table_csv('a_table')
- end = timer()
- print(end-start)
- # export bench
- start = timer()
- export_table_copy('a_table')
- end = timer()
- print(end-start)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement