Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- # This code adds a new year of RWS data to the old dataset.
- # --- IMPORT ---
- import psycopg2
- # --- FUNCTIONS ---
- # VKL_NUMMER <--- ID for rows
- # Function to print some messages for easy debug
- def run_query(query):
- if(DEBUG):
- print(query)
- cur.execute(query)
- # Function to make a copy of the table to edit in
- def copy_table(table_name):
- copy_table_query = """
- DROP TABLE IF EXISTS "{0}_copy";
- CREATE TABLE "{0}_copy" AS
- SELECT * FROM "{0}"
- """.format(table_name)
- run_query(copy_table_query)
- # Function to add a column to a table
- def add_column(table_name, column_name, datatype):
- add_column_query = """
- ALTER TABLE "{0}"
- ADD COLUMN "{1}" {2};
- """.format(table_name, column_name, datatype)
- run_query(add_column_query)
- # Function that converts _ID or _CODE into _OMS
- # 0= data table (2016 for example) - table_to_convert
- # 1= the reference table
- # 2=item name, iterable (_ID or _CODE)
- # 3=item name (but the _OMS version, needs function turn_id_or_code_into_oms)
- #HARDCODED: VKL_NUMMER (PTJ_ID for partij, SOR_ID for slachtoffer, PTJ_ID for voertuig)
- #IDEA slachtoffer en voertuig: since the original data is here, maybe I should segment it in years and then put it on Ixiwa, solves the conversion problem and such
- def run_queries(table_to_convert, from_table, item_name, item_oms):
- special_columns = ['BZD_','AGT_','BWG_','TDT_']
- if any(special_item in item_name for special_item in special_columns):
- print(item_name, ' is in special columns: TRUE')
- newcolname = item_name.replace("_ID", "")
- newcolname = newcolname + str("_OMS")
- newname = turn_id_or_code_into_oms(item)
- add_column(table_to_convert, newcolname, "VARCHAR")
- grab_oms_query = """
- DROP TABLE IF EXISTS "{3}_temp";
- CREATE TABLE "{3}_temp" AS
- SELECT "{0}"."{4}", "{0}"."{5}", oms_table."{3}"
- FROM "{0}"
- LEFT JOIN "{1}" as oms_table
- ON CAST("{0}"."{2}" AS VARCHAR) = CAST(oms_table."{3}" AS VARCHAR);
- ALTER TABLE "{3}_temp" RENAME COLUMN "{3}" TO "{5}";
- """.format(table_to_convert, from_table, item_name, newname, link_id, newcolname)
- fill_oms_column_query = """
- UPDATE "{0}"
- SET "{1}" = "{1}_temp"."{1}"
- FROM "{1}_temp"
- WHERE "{0}"."{2}" = "{1}_temp"."{2}"
- """.format(table_to_convert, newname, link_id)
- drop_temp_table_query = """
- DROP TABLE IF EXISTS "{0}";
- """.format(newname + "_temp")
- #run_query(grab_oms_query)
- #run_query(fill_oms_column_query)
- #run_query(drop_temp_table_query)
- #break
- else:
- print(item_name, ' is NOT in special columns: FALSE')
- add_column(table_to_convert, item_oms, "VARCHAR")
- grab_oms_query = """
- DROP TABLE IF EXISTS "{3}_temp";
- CREATE TABLE "{3}_temp" AS
- SELECT "{0}"."{4}", "{0}"."{2}", oms_table."{3}"
- FROM "{0}"
- LEFT JOIN "{1}" as oms_table
- ON CAST("{0}"."{2}" AS VARCHAR) = CAST(oms_table."{2}" AS VARCHAR);
- """.format(table_to_convert, from_table, item_name, item_oms, link_id)
- fill_oms_column_query = """
- UPDATE "{0}"
- SET "{1}" = "{1}_temp"."{1}"
- FROM "{1}_temp"
- WHERE "{0}"."{2}" = "{1}_temp"."{2}"
- """.format(table_to_convert, item_oms, link_id)
- drop_temp_table_query = """
- DROP TABLE IF EXISTS "{0}";
- """.format(item_oms + "_temp")
- run_query(grab_oms_query)
- run_query(fill_oms_column_query)
- run_query(drop_temp_table_query)
- #break
- # changes extension _ID and _CODE into _OMS (for easy lookup in reference table)
- def turn_id_or_code_into_oms(name):
- if "_CODE" in name:
- if name == "DAG_CODE" or name == "PVE_CODE":
- return name.replace("_CODE", "_NAAM")
- else:
- return name.replace("_CODE", "_OMS")
- #writing for BZD_ID_IF1 -> BZD_IF1_OMS
- elif "BZD_ID_" in name:
- #newname = name.replace("_ID", "")
- #newname = newname + str("_OMS")
- newname = "BZD_OMS"
- return newname
- elif "AGT_ID_" in name:
- newname = "AGT_OMS"
- return newname
- elif "BWG_ID_" in name:
- newname = "BWG_OMS"
- return newname
- elif "TDT_ID_" in name:
- newname = "TDT_OMS"
- return newname
- else:
- return name.replace("_ID", "_OMS")
- # Concat column names for create_union_tables_query
- def concat_column_names(column_names):
- all_column_names = ""
- for i in range(len(column_names) - 1):
- all_column_names += ("\"" + column_names[i] + "\"" + ", ")
- all_column_names += ("\"" + column_names[len(column_names) - 1] + "\"")
- return all_column_names
- # create union query
- def create_union_tables_query(tables):
- start = 'DROP TABLE IF EXISTS _all_data; CREATE TABLE _all_data AS SELECT {0} FROM "{1}" '.format(concat_column_names(all_columns), input_for_script[0] + '_copy')
- #union_query = 'UNION SELECT * FROM "{0}" '.format(tables[0])
- union_query = ''
- for i in tables:
- union_query += 'UNION SELECT {0} FROM "{1}" '.format(concat_column_names(all_columns), i)
- union_query = start + union_query
- print(union_query)
- return union_query
- # --- QUERIES ---
- #space for queries, will restructure it later
- # Query for the loop, selects the correct reference table based on the iteration
- get_table_name_query = """
- SELECT table_name from _rws_references WHERE column_name = '{0}'
- """
- # --- SETUP ---
- # for printing some messages in the console
- DEBUG = True
- #HARDCODED
- input_for_script = ('partij.csv', 'partij')
- if input_for_script[1] == 'partij':
- link_id = 'PTJ_ID'
- elif input_for_script[1] == 'ongeval':
- link_id = 'VKL_NUMMER'
- # The only things that need to be filled in to run the script is the variables 'tables' and 'input_for_script':
- # List of tables that have ID/CODE to convert to OMS ["ongevallen_2016.csv", "ongevallen_2017.csv"]
- tables = ["partij_2016.csv"]
- tables_copy = []
- for table in tables:
- tables_copy.append(table + '_copy')
- print('tables copy: ', tables_copy)
- # dbname, user, password and year, old data filename, category, empty string for later
- # Connect to an existing database
- conn = psycopg2.connect("dbname=rws_2011_2017 user=postgres password=Prandall19s!")
- # Open a cursor to perform database operations
- cur = conn.cursor()
- copy_table(input_for_script[0])
- #IDEA old table TIJDSTIP = (H)HMM, new table it's UUR = (H)H
- #ADD COLUMN "TIJDSTIP_MODULO" integer
- #SET "TIJDSTIP_MODULO" = (("TIJDSTIP"%100)/60.0+"TIJDSTIP"/100);
- # --- EXECUTION ---
- # Requirements: 1. all tables are located in the same schema (old years, new year to add, reference tables)
- # 2. the table will only join the attributes from the old table
- # 3. the names of the reference tables end in '.txt.csv' */
- # Creates settings table. input it takes:
- # the year (to add),
- # the name of the old table (containing the previous years)
- # and category (partij/ongeval/slachtoffer/voertuig/etcetera)
- cur.execute("""
- DROP TABLE IF EXISTS _rws_settings;
- CREATE TABLE _rws_settings (
- old_data text,
- category text);
- """)
- #year_to_add INT,
- cur.execute("""
- INSERT INTO _rws_settings
- (old_data, category)
- VALUES
- (%s, %s);
- """,
- input_for_script)
- # Creates lookup table _rws_datatypes, which states the column names and datatypes of old_data from _rws_settings
- cur.execute("""
- DROP TABLE IF EXISTS _rws_datatypes;
- CREATE TABLE _rws_datatypes AS
- SELECT column_name, data_type FROM INFORMATION_SCHEMA.columns
- WHERE table_name = '{0}';
- """.format(input_for_script[0] + '_copy'))
- if input_for_script[0] == 'ongeval.csv':
- cur.execute("""INSERT INTO "_rws_datatypes" VALUES ('DAGTYPE', 'varchar');""")
- # Creates lookup table _rws_references: It takes the columns ending in '_ID' or '_CODE' and the name of the reference table.
- # With this, it'll be possible to look up whether columns in the new table (year to add) have to be transformed into '_OMS'.
- cur.execute("""
- DROP TABLE IF EXISTS _rws_references;
- CREATE TABLE _rws_references AS
- SELECT column_name, table_name FROM INFORMATION_SCHEMA.columns
- WHERE table_name LIKE '%.txt.csv'
- AND (column_name LIKE '%\_ID' OR column_name LIKE '%\_CODE' OR column_name LIKE '%_ID_%');
- """)
- # Loop through all the tables and convertable items, find all reference tables and start function run_queries
- # Runs the function to copy the table mentioned in the variable tables (list)
- for table in tables:
- copy_table(table)
- # Selects all columns from new table that end in '_ID' or '_CODE', and outputs it into a Python list
- cur.execute("""
- DROP TABLE IF EXISTS _rws_columns_to_convert;
- CREATE TABLE _rws_columns_to_convert AS
- SELECT column_name FROM INFORMATION_SCHEMA.columns
- WHERE table_name LIKE (SELECT '{0}' FROM _rws_settings)
- AND (column_name LIKE '%\_ID' OR column_name LIKE '%\_CODE' OR column_name LIKE '%_ID_%');
- """.format(table))
- # Makes a list on what columns need to be converted with a reference table
- cur.execute("""
- SELECT _rws_columns_to_convert."column_name" FROM _rws_columns_to_convert;
- """)
- list_to_convert = cur.fetchall()
- # removes '(' and '),'
- list_to_convert = [i[0] for i in list_to_convert]
- # HARDCODED List of column names that do end in _ID or _CODE, but should not be converted (checked manually)
- #everything including and after AGT_ID_1 needs conversion 'AGT_ID_1', 'AGT_ID_2', 'BWG_ID_1', 'BWG_ID_2', 'TDT_ID_1', 'TDT_ID_2', 'TDT_ID_3'
- columns_not_to_convert = ['JTE_ID', 'WVK_ID', 'GME_ID', 'PVE_CODE', 'WSE_ID', 'PTJ_ID']
- # Loop to check if the name should not have a conversion to _OMS, and remove them from the conversion list that will be iterated through (not the table in pgAdmin)
- for name in columns_not_to_convert:
- if name in list_to_convert:
- list_to_convert.remove(name)
- print(list_to_convert)
- table_copy = table + "_copy"
- for item in list_to_convert:
- print("PROCESSING: " + table + " - item: " + item)
- if 'BZD_' in item:
- item_id = 'BZD_ID'
- from_table = cur.execute(get_table_name_query.format(item_id))
- from_table = cur.fetchone()[0] #ERROR: Nonetype object is not subscriptable
- elif 'AGT_' in item:
- item_id = 'AGT_ID'
- from_table = cur.execute(get_table_name_query.format(item_id))
- from_table = cur.fetchone()[0] #ERROR: Nonetype object is not subscriptable
- elif 'BWG_' in item:
- item_id = 'BWG_ID'
- from_table = cur.execute(get_table_name_query.format(item_id))
- from_table = cur.fetchone()[0] #ERROR: Nonetype object is not subscriptable
- elif 'TDT_' in item:
- item_id = 'TDT_ID'
- from_table = cur.execute(get_table_name_query.format(item_id))
- from_table = cur.fetchone()[0] #ERROR: Nonetype object is not subscriptable
- else:
- cur.execute(get_table_name_query.format(item))
- from_table = cur.fetchone()[0]
- run_queries(table_copy, from_table, item, turn_id_or_code_into_oms(item))
- #HARDCODED Adding dagtype to older data (will have to rewrite for Ixiwa, but this'll do for now)
- if input_for_script[0] == "ongeval.csv":
- print('input for script is ongeval.csv')
- cur.execute("""ALTER TABLE "ongeval.csv_copy" ADD COLUMN "DAGTYPE" varchar;""")
- cur.execute("""UPDATE "ongeval.csv_copy"
- SET "DAGTYPE" = 'MA-VR' WHERE ("DAG_CODE" = 'MA' OR "DAG_CODE" = 'DI' OR "DAG_CODE" = 'WO' OR "DAG_CODE" = 'DO' OR "DAG_CODE" = 'VR');
- UPDATE "ongeval.csv_copy"
- SET "DAGTYPE" = 'ZA-ZO' WHERE ("DAG_CODE" = 'ZA' OR "DAG_CODE" = 'ZO');""")
- # Get list with column names from new year datafile
- columns_in_new_data = cur.execute("""
- SELECT column_name FROM INFORMATION_SCHEMA.columns
- WHERE table_name = '{0}';
- """.format(table_copy))
- # Removes '(' and '),'
- columns_in_new_data = cur.fetchall()
- columns_in_new_data = [i[0] for i in columns_in_new_data]
- # Get list with column names from original datafile
- cur.execute("""
- SELECT _rws_datatypes."column_name" FROM _rws_datatypes;
- """)
- columns_for_union = cur.fetchall()
- # Removes '(' and '),'
- columns_for_union = [i[0] for i in columns_for_union]
- print('old: ', columns_for_union)
- print('2016:', columns_in_new_data)
- for column_name in columns_for_union:
- # If column exists in original datafile, but not in new datafile: add column with correct datatype
- if column_name not in columns_in_new_data:
- datatype_column = cur.execute("""SELECT data_type FROM _rws_datatypes WHERE column_name = '{0}'""".format(column_name))
- datatype_column = cur.fetchone()
- datatype_column = datatype_column[0]
- print(datatype_column)
- cur.execute("""
- ALTER TABLE "{0}"
- ADD COLUMN "{1}" {2};
- """.format(table_copy, column_name, datatype_column))
- # If column name exists in new datafile, but not in original datafile: drop column
- for column_name in columns_in_new_data:
- if column_name not in columns_for_union:
- cur.execute("""
- ALTER TABLE "{0}"
- DROP COLUMN "{1}";
- """.format(table_copy, column_name))
- # get all column names
- cur.execute("""SELECT column_name FROM _rws_datatypes""")
- all_columns = cur.fetchall()
- all_columns = [i[0] for i in all_columns]
- # Check if column names are same datatype (old and new dataset)
- for column_name in all_columns:
- wanted_datatype = cur.execute("""SELECT data_type FROM _rws_datatypes WHERE column_name = '{0}'""".format(column_name))
- wanted_datatype = cur.fetchone()
- wanted_datatype = wanted_datatype[0]
- print("column name: ", column_name, "datatype: ", wanted_datatype)
- current_datatype = cur.execute("""SELECT data_type FROM INFORMATION_SCHEMA.columns WHERE table_name = '{0}' AND column_name = '{1}';""".format(table_copy, column_name))
- current_datatype = cur.fetchone()
- current_datatype = current_datatype[0]
- print('current: ', current_datatype, 'wanted: ', wanted_datatype)
- # Changes a varchar column to the correct datatype and adds NULLS to empty values
- if current_datatype != wanted_datatype and current_datatype == 'character varying':
- cur.execute("""UPDATE "{0}" SET "{1}" = NULL WHERE "{1}" = '';""".format(table_copy, column_name))
- cur.execute("""
- ALTER TABLE "{0}"
- ALTER COLUMN "{1}" TYPE {2} USING "{1}"::{2};
- """.format(table_copy, column_name, wanted_datatype))
- # changes a non varchar column to the correct datatype - no NULLS changed
- elif current_datatype != wanted_datatype and current_datatype != 'character varying':
- cur.execute("""
- ALTER TABLE "{0}"
- ALTER COLUMN "{1}" TYPE {2} USING "{1}"::{2};
- """.format(table_copy, column_name, wanted_datatype))
- #ADD TIJDSTIP_MODULE (for nice graph on zonnestand)
- #cur.execute("""UPDATE "{0}" SET "TIJDSTIP_MODULO" = ({"TIJDSTIP"}%100)/60.0+{"TIJDSTIP"}/100;""".format(table))
- #Union goes wrong because it sees a column with Null timestamps as varchar (even though it doesn't show..)
- #If you add all column names manually instead of putting a star (*), it's all good
- #Run union query
- #cur.execute("""{0}""".format(create_union_tables_query(tables_copy)))
- # --- TO DO
- #-MELDDATAPK, VERVDATAPK, GEBDAT, DATOVERL to_date
- #- """({ongeval.TIJDSTIP}%100)/60.0+{ongeval.TIJDSTIP}/100""" DOES NOT WORK! OLD TABLE uses TIJDSTIP (H)HMM, new uses UUR (H)H
- # New script: create linking tables
- # make it work with partij
- # make it work with voertuig
- # make it work with slachtoffer
- # create index? aliases?
- #rewrite for use with Mozart
- # TO DO in new scripts
- #-ADD DAGTYPE TO YEAR < 2016
- # new scripts - restructure for better readability
- # 1. functions and queries
- # 2. find and convert all txt to csv (reference tables) (input: main folder and year(s))
- # 3. prepare datasets rws specific (coordinate system mainly) (input: main folder and year(s)) <-- maybe put this in 5.
- # 4. import all data found in the folders to pgadmin (input: main folder, old data table(s) and year(s))
- # 5. add new year's data ongeval/partij/voertuig/slachtoffer (also: convert coordinates, add DAGTYPE to original dataset)
- # 6. make linking tables
- # --- FINISH UP RUNNING THE SCRIPT
- # Make the changes to the database persistent
- conn.commit()
- # Close communication with the database
- cur.close()
- conn.close()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement