Advertisement
Guest User

Untitled

a guest
Oct 18th, 2018
97
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 13.48 KB | None | 0 0
  1. import psycopg2
  2.  
  3. # --- SETUP ---
  4. # for printing some messages in the console
  5. DEBUG = True
  6.  
  7. # The only things that need to be filled in to run the script (+ line 102: tables, will change that later):
  8. # dbname, user, password
  9. # and
  10. # year, old data filename, category, empty string for later
  11. input_for_script = (2016, 'ongeval.csv', 'ongevallen', '')
  12.  
  13. # Connect to an existing database
  14. conn = psycopg2.connect("dbname=rws_2011_2017 user=postgres password=Prandall19s!")
  15.  
  16. # --- EXECUTION ---
  17. # This code adds a new year of RWS data to the old dataset.
  18. # Requirements:            1. all tables are located in the same schema (old years, new year to add, reference tables)
  19. #              2. the table will only join the attributes from the old table
  20. #              3. the names of the reference tables end in '.txt.csv' */
  21.  
  22. # Open a cursor to perform database operations
  23. cur = conn.cursor()
  24.  
  25. # Creates settings table. input it takes:
  26. #   the year (to add),
  27. #   the name of the old table (containing the previous years)
  28. #   and category (partij/ongeval/slachtoffer/voertuig/etcetera)
  29. cur.execute("""
  30. DROP TABLE IF EXISTS _rws_settings;
  31. CREATE TABLE _rws_settings (
  32.     year_to_add     INT,
  33.     old_data        text,
  34.     category        text,
  35.     year_filename   text);
  36. """)
  37.  
  38. cur.execute("""
  39. INSERT INTO _rws_settings
  40.     (year_to_add, old_data, category, year_filename)
  41. VALUES
  42.     (%s, %s, %s, %s);
  43.    """,
  44.     input_for_script)
  45.  
  46. cur.execute("""
  47. UPDATE _rws_settings
  48. SET year_filename = CONCAT(_rws_settings.category, '_', _rws_settings.year_to_add, '.csv');
  49. """)
  50.  
  51.  
  52. # Creates lookup table _rws_datatypes, which states the column names and datatypes of old_data from _rws_settings
  53. cur.execute("""
  54. DROP TABLE IF EXISTS _rws_datatypes;
  55. CREATE TABLE _rws_datatypes AS
  56. SELECT column_name, data_type FROM INFORMATION_SCHEMA.columns
  57. WHERE table_name = (SELECT old_data FROM _rws_settings);
  58. """)
  59.  
  60. # Creates lookup table _rws_references: It takes the columns ending in '_ID' or '_CODE' and the name of the reference table.
  61. # With this, it'll be possible to look up whether columns in the new table (year to add) have to be transformed into '_OMS'.
  62. cur.execute("""
  63. DROP TABLE IF EXISTS _rws_references;
  64. CREATE TABLE _rws_references AS
  65. SELECT column_name, table_name FROM INFORMATION_SCHEMA.columns
  66. WHERE table_name LIKE '%.txt.csv'
  67. AND (column_name LIKE '%\_ID' OR column_name LIKE '%\_CODE' OR column_name LIKE '%_ID_%');
  68. """)
  69.    
  70. # Selects all columns from new table that end in '_ID' or '_CODE', and outputs it into a Python list
  71. cur.execute("""
  72. DROP TABLE IF EXISTS _rws_columns_to_convert;
  73. CREATE TABLE _rws_columns_to_convert AS
  74. SELECT column_name FROM INFORMATION_SCHEMA.columns
  75. WHERE table_name LIKE (SELECT _rws_settings.year_filename FROM _rws_settings)
  76. AND (column_name LIKE '%\_ID' OR column_name LIKE '%\_CODE' OR column_name LIKE '%_ID_%');
  77. """)
  78.  
  79. # Makes a list on what columns need to be converted with a reference table
  80. cur.execute("""
  81. SELECT _rws_columns_to_convert."column_name" FROM _rws_columns_to_convert;
  82. """)
  83. list_to_convert = cur.fetchall()
  84. # removes '(' and '),'
  85. list_to_convert = [i[0] for i in list_to_convert]
  86. # List of column names that do end in _ID or _CODE, but should not be converted (checked manually)
  87. columns_not_to_convert = ['JTE_ID', 'WVK_ID', 'GME_ID', 'PVE_CODE', 'WSE_ID']
  88.  
  89. # Loop to check if the name should not have a conversion to _OMS, and remove them from the conversion list that will be iterated through (not the table in pgAdmin)
  90. for name in columns_not_to_convert:
  91.     if name in list_to_convert:
  92.         list_to_convert.remove(name)    
  93. print(list_to_convert)
  94.  
  95. # Query for the loop, selects the correct reference table based on the iteration
  96. get_table_name_query = """
  97. SELECT table_name from _rws_references WHERE column_name = '{0}'
  98. """
  99.  
  100. # List of tables that have ID/CODE to convert to OMS  ["ongevallen_2016.csv", "ongevallen_2017.csv"]
  101. # create a temporary data table and fill it with the old data
  102. tables = ["ongevallen_2016.csv"]
  103. all_data = cur.execute("""
  104. DROP TABLE IF EXISTS _rws_all_temp;
  105. CREATE TABLE _rws_all_temp AS SELECT * FROM (SELECT _rws_settings.old_data FROM _rws_settings) AS derived_temp
  106. """)
  107.  
  108. # VKL_NUMMER <--- ID for rows
  109. # Function to print some messages for easy debug
  110. def run_query(query):
  111.     if(DEBUG):
  112.         print(query)
  113.     cur.execute(query)
  114.  
  115. # Function to make a copy of the table to edit in
  116. def copy_table(table_name):
  117.     copy_table_query = """
  118.    DROP TABLE IF EXISTS "{0}_copy";
  119.    CREATE TABLE "{0}_copy" AS
  120.    SELECT * FROM "{0}"
  121.    """.format(table_name)
  122.    
  123.     run_query(copy_table_query)
  124.  
  125. # Function to add a column to a table
  126. def add_column(table_name, column_name, datatype):
  127.     add_column_query = """
  128.    ALTER TABLE "{0}"
  129.    ADD COLUMN "{1}" {2};
  130.    """.format(table_name, column_name, datatype)
  131.    
  132.     run_query(add_column_query)
  133.  
  134. # Runs the function to copy the table mentioned in the variable tables (list)
  135. for table in tables:
  136.     copy_table(table)
  137.  
  138.  
  139. # Function that converts _ID or _CODE into _OMS
  140. # 0= data table (2016 for example) - table_to_convert
  141. # 1= the reference table
  142. # 2=item name, iterable (_ID or _CODE)
  143. # 3=item name (but the _OMS version, needs function turn_id_or_code_into_oms)
  144. def run_queries(table_to_convert, from_table, item_name, item_oms):
  145.  
  146.     if 'BZD_' in item_name:
  147.         newname = item_name.replace("_ID", "")
  148.         newname = newname + str("_OMS")
  149.  
  150.         add_column(table_to_convert, newname, "VARCHAR")
  151.  
  152.         grab_oms_query = """
  153.            DROP TABLE IF EXISTS "{3}_temp";
  154.            CREATE TABLE "{3}_temp" AS
  155.            SELECT "{0}"."VKL_NUMMER", "{0}"."{2}", oms_table."BZD_ID"
  156.            FROM "{0}"
  157.            LEFT JOIN "{1}" as oms_table
  158.            ON CAST("{0}"."{2}" AS VARCHAR) = CAST(oms_table."BZD_ID" AS VARCHAR);
  159.            ALTER TABLE "{3}_temp" RENAME COLUMN "BZD_ID" TO "{3}";
  160.        """.format(table_to_convert, from_table, item_name, newname)
  161.  
  162.         fill_oms_column_query = """
  163.            UPDATE "{0}"
  164.            SET "{1}" = "{1}_temp"."{1}"
  165.            FROM "{1}_temp"
  166.            WHERE "{0}"."VKL_NUMMER" = "{1}_temp"."VKL_NUMMER"
  167.        """.format(table_to_convert, newname)
  168.        
  169.         drop_temp_table_query = """
  170.            DROP TABLE IF EXISTS "{0}";
  171.        """.format(newname + "_temp")
  172.     else:
  173.         add_column(table_to_convert, item_oms, "VARCHAR")
  174.  
  175.         grab_oms_query = """
  176.            DROP TABLE IF EXISTS "{3}_temp";
  177.            CREATE TABLE "{3}_temp" AS
  178.            SELECT "{0}"."VKL_NUMMER", "{0}"."{2}", oms_table."{3}"
  179.            FROM "{0}"
  180.            LEFT JOIN "{1}" as oms_table
  181.            ON CAST("{0}"."{2}" AS VARCHAR) = CAST(oms_table."{2}" AS VARCHAR);
  182.        """.format(table_to_convert, from_table, item_name, item_oms)
  183.    
  184.         fill_oms_column_query = """
  185.            UPDATE "{0}"
  186.            SET "{1}" = "{1}_temp"."{1}"
  187.            FROM "{1}_temp"
  188.            WHERE "{0}"."VKL_NUMMER" = "{1}_temp"."VKL_NUMMER"
  189.        """.format(table_to_convert, item_oms)
  190.        
  191.         drop_temp_table_query = """
  192.            DROP TABLE IF EXISTS "{0}";
  193.        """.format(item_oms + "_temp")
  194.  
  195.     run_query(grab_oms_query)
  196.     run_query(fill_oms_column_query)
  197.     run_query(drop_temp_table_query)
  198.    
  199.  
  200. # changes extension _ID and _CODE into _OMS (for easy lookup in reference table)
  201. def turn_id_or_code_into_oms(name):
  202.     if "_CODE" in name:
  203.         if name == "DAG_CODE" or name == "PVE_CODE":
  204.             return name.replace("_CODE", "_NAAM")
  205.         else:
  206.             return name.replace("_CODE", "_OMS")
  207.     #writing for BZD_ID_IF1 -> BZD_IF1_OMS
  208.     elif name =="BZD_ID_%":
  209.         #newname = name.replace("_ID", "")
  210.         #newname = newname + str("_OMS")
  211.         newname = "BZD_OMS"
  212.         return newname
  213.     else:
  214.         return name.replace("_ID", "_OMS")
  215.  
  216. # Get list with column names from original datafile
  217. cur.execute("""
  218. SELECT _rws_datatypes."column_name" FROM _rws_datatypes;
  219. """)
  220. columns_for_union = cur.fetchall()
  221. # Removes '(' and '),'
  222. columns_for_union = [i[0] for i in columns_for_union]
  223.  
  224. # Loop through all the tables and convertable items, find all reference tables and start function run_queries
  225. for table in tables:
  226.     table_copy = table + "_copy"
  227.     for item in list_to_convert:        
  228.         print("PROCESSING: " + table + " - item: " + item)
  229.         if 'BZD_' in item:
  230.             item_id = 'BZD_ID'
  231.             from_table = cur.execute(get_table_name_query.format(item_id))
  232.             from_table = cur.fetchone()[0] #ERROR: Nonetype object is not subscriptable
  233.         else:
  234.             cur.execute(get_table_name_query.format(item))
  235.             from_table = cur.fetchone()[0]
  236.        
  237.         run_queries(table_copy, from_table, item, turn_id_or_code_into_oms(item))
  238.  
  239.     # Get list with column names from new year datafile
  240.     columns_in_new_data = cur.execute("""
  241.    SELECT column_name FROM INFORMATION_SCHEMA.columns
  242.    WHERE table_name = '{0}';
  243.    """.format(table_copy))
  244.     # Removes '(' and '),'
  245.     columns_in_new_data = cur.fetchall()
  246.     columns_in_new_data = [i[0] for i in columns_in_new_data]
  247.     print('2016:', columns_in_new_data)
  248.  
  249.     for column_name in columns_for_union:
  250.         # If column exists in original datafile, but not in new datafile: add column with correct datatype
  251.         if column_name not in columns_in_new_data:
  252.             datatype_column = cur.execute("""SELECT data_type FROM _rws_datatypes WHERE column_name = '{0}'""".format(column_name))
  253.             datatype_column = cur.fetchone()
  254.             datatype_column = datatype_column[0]
  255.             print(datatype_column)
  256.             cur.execute("""
  257.            ALTER TABLE "{0}"
  258.            ADD COLUMN "{1}" {2};
  259.            """.format(table_copy, column_name, datatype_column))
  260. #    If column name exists in new datafile, but not in original datafile: drop column
  261.     for column_name in columns_in_new_data:
  262.             if column_name not in columns_for_union:
  263.                 cur.execute("""
  264.                ALTER TABLE "{0}"
  265.                DROP COLUMN "{1}";
  266.                """.format(table_copy, column_name))
  267.  
  268.     # get all column names
  269.     cur.execute("""SELECT column_name FROM _rws_datatypes""")
  270.     all_columns = cur.fetchall()
  271.     all_columns = [i[0] for i in all_columns]
  272.  
  273.     # Check if column names are same datatype (old and new dataset)
  274.     for column_name in all_columns:  
  275.         wanted_datatype = cur.execute("""SELECT data_type FROM _rws_datatypes WHERE column_name = '{0}'""".format(column_name))
  276.         wanted_datatype = cur.fetchone()
  277.         wanted_datatype = wanted_datatype[0]
  278.         print("column name: ", column_name, "datatype: ", wanted_datatype)
  279.         current_datatype = cur.execute("""SELECT data_type FROM INFORMATION_SCHEMA.columns WHERE table_name = '{0}' AND column_name = '{1}';""".format(table_copy, column_name))
  280.         current_datatype = cur.fetchone()
  281.         current_datatype = current_datatype[0]
  282.         print('current: ', current_datatype, 'wanted: ', wanted_datatype)
  283.  
  284.         # Changes a varchar column to the correct datatype and adds NULLS to empty values
  285.         if current_datatype != wanted_datatype and current_datatype == 'character varying':
  286.             cur.execute("""UPDATE "{0}" SET "{1}" = NULL WHERE "{1}" = '';""".format(table_copy, column_name))
  287.             cur.execute("""
  288.            ALTER TABLE "{0}"
  289.            ALTER COLUMN "{1}" TYPE {2} USING "{1}"::{2};              
  290.            """.format(table_copy, column_name, wanted_datatype))
  291.         # changes a non varchar column to the correct datatype - no NULLS changed
  292.         elif current_datatype != wanted_datatype and current_datatype != 'character varying':
  293.             cur.execute("""
  294.            ALTER TABLE "{0}"
  295.            ALTER COLUMN "{1}" TYPE {2} USING "{1}"::{2};              
  296.            """.format(table_copy, column_name, wanted_datatype))
  297.  
  298. #Union goes wrong because it sees a column with Null timestamps as varchar (even though it doesn't show..)
  299. #If you add all column names manually instead of putting a star (*), it's all good
  300.     # Union old and new table, still need it to work with multiple years
  301.     def concat_column_names(column_names):
  302.         all_column_names = ""
  303.         for i in range(len(column_names) - 1):
  304.             all_column_names += ("\"" + column_names[i] + "\"" + ", ")
  305.        
  306.         all_column_names += ("\"" + column_names[len(column_names) - 1] + "\"")
  307.         return all_column_names
  308.  
  309.     union_query = ("""DROP TABLE IF EXISTS _all_data; CREATE TABLE _all_data AS SELECT {0} FROM "{1}" UNION SELECT {0} FROM "{2}";""".format(concat_column_names(all_columns), input_for_script[1], table_copy))
  310.     cur.execute(union_query)
  311.  
  312.  
  313. # --- TO DO
  314. # BZD_ID_IF1 -> BZD_IF1_OMS inbouwen
  315. # union old and new data: make sure that it can add multiple years
  316. # create linking tables
  317. # make it work with partij
  318. # make it work with voertuig
  319. # make it work with slachtoffer
  320. # create index? aliases?
  321.  
  322. # new scripts - restructure for better readability
  323. # 1. functions and queries
  324. # 2. find and convert all txt to csv (reference tables) (input: main folder and year(s))
  325. # 3. prepare datasets rws specific (coordinate system mainly) (input: main folder and year(s))
  326. # 4. import all data found in the folders to pgadmin (input: main folder, old data table(s) and year(s))
  327. # 5. add new year's data ongeval/partij/voertuig/slachtoffer
  328. # 6. make linking tables
  329.  
  330.  
  331. # --- FINISH UP RUNNING THE SCRIPT
  332. # Make the changes to the database persistent
  333. conn.commit()
  334.  
  335. # Close communication with the database
  336. cur.close()
  337. conn.close()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement