Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #!/usr/bin/env python
- # coding=utf-8
- # pylint: disable=C0103,E1133,W0621
- """This script adjust the collation of SQL Server database"""
- __author__ = "Fábio Matavelli <fabio.matavelli@amaro.com>"
- import argparse
- import os
- import logging
- from sqlalchemy import create_engine, MetaData, Table
- from sqlalchemy.engine import reflection
- from sqlalchemy.exc import InterfaceError, OperationalError, ProgrammingError, NoSuchTableError
- from sqlalchemy.sql.sqltypes import VARCHAR, NVARCHAR, CHAR, NCHAR
- def alter_column(table, column, tp, length, collate, null):
- """Function to generate the ALTER query"""
- query = "ALTER TABLE [{table}] " \
- "ALTER COLUMN [{column}] " \
- "{type}({length}) " \
- "COLLATE {collate} " \
- "{null};".format(table=table, column=column,
- null="NULL" if null else "NOT NULL",
- type=tp, length=length,
- collate=collate)
- return query
- current_dir = os.path.dirname(os.path.realpath(__file__))
- log = logging.Logger(name="change_collate")
- os.environ["TDSDUMP"] = "{0}\\freetds.log".format(current_dir)
- os.environ["FREETDSCONF"] = "{0}\\freetds.conf".format(current_dir)
- parser = argparse.ArgumentParser(description="Change collate of SQL Server database")
- parser.add_argument("--host", help="Database hostname", required=True, metavar="<database>")
- parser.add_argument("--username", help="Database username", metavar="<username>")
- parser.add_argument("--password", help="Database password", metavar="<password>")
- parser.add_argument("--db", help="Database name", metavar="<database>", required=True)
- parser.add_argument("--old-collation", help="Old collation (to change from)", required=True,
- metavar="<collation>")
- parser.add_argument("--new-collation", help="New collation (to change for)", required=True,
- metavar="<collation>")
- parser.add_argument("--tables", help="Tables name", metavar="<table>",
- nargs="+")
- parser.add_argument("--views", help="Views name", metavar="<view>",
- nargs="+")
- parser.add_argument("--log-level", help="Log level",
- choices=["CRITICAL", "ERROR", "WARNING", "INFO", "DEBUG"],
- default="INFO")
- if __name__ == "__main__":
- args = parser.parse_args()
- log.setLevel(args.log_level)
- auth = None
- if args.username is not None:
- auth = "{username}{password}@".format(username=args.username,
- password=":" + args.password
- if args.password is not None else "")
- engine = create_engine(r"mssql+pymssql://{auth}{hostname}/{database}?charset=UTF-8"
- .format(hostname=args.host, auth=auth or "", database=args.db),
- echo=True, deprecate_large_types=True)
- try:
- insp = reflection.Inspector.from_engine(engine)
- except (InterfaceError, OperationalError) as err:
- log.error("Error trying to connect to database %s with DSN %s", err, engine.url)
- exit()
- metadata = MetaData()
- if not args.views:
- views = insp.get_view_names()
- else:
- views = args.views
- if not args.tables:
- tables = [table for table in insp.get_table_names() if table not in views]
- else:
- tables = [table for table in args.tables if table not in views]
- if not args.tables and args.views:
- tables = []
- elif args.tables and not args.views:
- views = []
- for table in tables:
- __map__ = {}
- __indexes__ = {}
- __indexes_trash__ = []
- log.info("Getting info of table %s", table)
- try:
- table_class = Table(table, metadata, autoload=True, autoload_with=engine)
- except NoSuchTableError as err:
- log.error("Table %s not found", table)
- continue
- log.info("Registering indexes for table %s", table)
- for index in table_class.indexes:
- if index.name is None:
- continue
- __indexes__.setdefault(index.name, index)
- for column in index.columns:
- if not __map__.get(column.name):
- __map__.setdefault(column.name, [])
- __map__[column.name].append(index.name)
- log.info("Changing columns in table %s", table)
- for column in [col for col in table_class.columns
- if isinstance(col.type, (type(VARCHAR()), type(NVARCHAR()),
- type(CHAR()), type(NCHAR()))) and
- col.type.collation != args.new_collation]:
- for index in [idx for idx in __map__.get(column.name) or []
- if idx not in __indexes_trash__]:
- log.info("Droping index %s", index)
- __indexes__.get(index).drop(engine)
- __indexes_trash__.append(index)
- column.type.collation = args.new_collation
- query = alter_column(table=table, column=column.name,
- tp=column.type.__visit_name__, null=column.nullable,
- length=column.type.length or "max",
- collate=column.type.collation)
- log.info("Altering column %s", column)
- try:
- engine.execute(query)
- except (ProgrammingError, OperationalError) as err:
- log.warning("Error trying to change column %s collate. %s", column, err)
- if __indexes_trash__ != []:
- log.info("Recreating dropped indexes of table %s", table)
- for index in __indexes_trash__:
- log.info("Creating index %s", index)
- __indexes__.get(index).create(engine)
- del(__map__, __indexes__, __indexes_trash__)
Add Comment
Please, Sign In to add comment