Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- # "Inclusive range" - both start and stop are inclusive, which is more intuitive
- # for some applications.
- def irange(start, stop):
- return list(range(start, stop + 1))
- import bisect
- import csv
- import logging
- import os
- import platform
- import shutil
- import subprocess
- import sys
- from base64 import encodestring
- from json import loads as loadjson
- from os.path import join as pathjoin
- from typing import Mapping, Any, List
- import psycopg2 # library for interacting directly with PostgreSQL for mapit
- from sqlutil import Querier
- if sys.version_info[0] < 3:
- from urllib import urlencode
- # noinspection PyUnresolvedReferences
- from urllib2 import urlopen, HTTPError, Request as HttpRequest
- else:
- from urllib.parse import urlencode
- from urllib.request import urlopen, Request as HttpRequest
- from urllib.error import HTTPError
- class DefaultPecasSettings:
- """
- This object enforces passing a pecas_settings object around, so that functions don't accidentally revert to the
- default and ignore the user-specified settings file. Even though the default pecas_settings is not supported, the
- use of ps=_ps at the end of every applicable function is too ingrained to change for now.
- """
- def __getattr__(self, attr):
- raise TypeError(
- "Default value for PECAS settings is not supported; "
- "explicitly pass in a settings object")
- _ps = DefaultPecasSettings()
- def set_up_logging():
- # Configure basic logger that writes to file
- logging.basicConfig(filename='run.log',
- level=logging.INFO,
- format='%(asctime)s %(name)-12s %(levelname)-8s %(message)s',
- datefmt='%y-%m-%d %H:%M:%S',
- filemode='a'
- )
- # Configure a second logger for screen usage
- console = logging.StreamHandler() # Create console logger
- console.setLevel(logging.INFO) # Set the info level
- formatter = logging.Formatter('%(name)-12s: %(levelname)-8s %(message)s') # Create a format
- console.setFormatter(formatter) # Set the format
- logging.getLogger('').addHandler(console) # Add this to the root logger
- def run_shell_script(fname, ps=_ps):
- """
- Runs a bash shell script in a platform-independent way.
- """
- if sys.platform == "win32":
- subprocess.check_call([ps.winShCommand, "-l", fname])
- else:
- subprocess.check_call(fname, shell=True)
- def execute_postgresql_query(query, database, port, host, user, ps=_ps):
- return subprocess.call(
- [ps.pgpath + "psql", "-c", query, "--dbname=" + database, "--port=" + str(port), "--host=" + host,
- "--username=" + user])
- def execute_sqlserver_query(query, database, user, password, ps=_ps):
- return subprocess.call(
- [ps.sqlpath + "sqlcmd", "-S", ps.sd_host, "-U", user, "-P", password, "-d", database, "-Q", query])
- def check_is_postgres(ps=_ps):
- # Error if not using postgres so that we can use psycopg. If we need to
- # use a different database system, functions that call this need to have
- # an alternate version supplied.
- if ps.sql_system != ps.postgres:
- raise ValueError("This function only works on postgres databases")
- def connect_to_mapit(ps=_ps):
- return psycopg2.connect(
- database=ps.mapit_database,
- host=ps.mapit_host,
- port=ps.mapit_port,
- user=ps.mapit_user,
- password=ps.mapit_password)
- def connect_to_sd(ps=_ps):
- return psycopg2.connect(
- database=ps.sd_database,
- host=ps.sd_host,
- port=ps.sd_port,
- user=ps.sd_user,
- password=ps.sd_password)
- def mapit_querier(ps=_ps):
- return Querier(lambda: connect_to_mapit(ps=ps), sch=ps.mapit_schema)
- def sd_querier(ps=_ps):
- return Querier(lambda: connect_to_sd(ps=ps), sch=ps.sd_schema)
- def build_class_path_in_path(path, *args):
- fullargs = [path] + [pathjoin(path, arg) for arg in args]
- return build_class_path(*fullargs)
- # Append paths to the classpath using the right separator for Windows or Unix
- def build_class_path(*arg):
- string = ""
- if platform.system() == "Windows" or platform.system() == "Microsoft":
- separator = ";"
- else:
- separator = ":"
- for a in arg:
- string = str(string) + str(a) + separator
- return string
- def get_native_path():
- string = ""
- if platform.system() == "Windows" or platform.system() == "Microsoft":
- separator = ";"
- string = separator.join(["AllYears/Code/hdf5/win64", ])
- elif platform.system() == "Darwin":
- separator = ":"
- string = separator.join(["AllYears/Code/hdf5/macos", ])
- elif platform.system() == "Linux":
- separator = ":"
- string = separator.join(["AllYears/Code/hdf5/linux64", ])
- return string
- def vm_properties(properties: Mapping[str, Any]) -> List[str]:
- """
- Returns a list of command line arguments to set the specified system properties on a Java VM subprocess call
- """
- return ["-D{}={}".format(name, value) for name, value in properties.items()]
- # Move a file to a new location, deleting any old file there.
- def move_replace(arg1, arg2):
- try:
- os.remove(arg2)
- except OSError as detail:
- if detail.errno != 2:
- raise
- os.rename(arg1, arg2)
- # Grab the skim year to use given a list of years with new skims
- def get_skim_year(year, skimyears):
- i = bisect.bisect_left(skimyears, year) - 1
- if i < 0:
- i = 0
- logging.info("Using skims from year " + str(skimyears[i]) + " for year " + str(year))
- return skimyears[i]
- # Function to calculate the base year ratio between the prices that AA is producing and the prices that SD has been
- # calibrated to
- def calculate_aa_to_sd_price_correction(ps=_ps):
- fin = open(ps.scendir + "/" + str(ps.baseyear) + "/ExchangeResults.csv", "rU")
- exchange_results = csv.reader(fin)
- er_header = next(exchange_results)
- fin2 = open(ps.scendir + "/" + str(ps.baseyear) + "/ExchangeResultsTargets.csv", "rU")
- er_targets = csv.reader(fin2)
- targets_header = next(er_targets)
- fout = open(ps.scendir + "/AllYears/Outputs/AAtoSDPriceCorrections.csv", "w", newline="")
- out_writer = csv.writer(fout)
- out_writer.writerow(("Commodity", "LUZ", "PriceCorrection"))
- target_prices = {}
- for row in er_targets:
- commodity = row[targets_header.index("Commodity")]
- zone = row[targets_header.index("ZoneNumber")]
- key = (commodity, zone)
- price = row[targets_header.index("TargetPrice")]
- target_prices[key] = price
- for row in exchange_results:
- commodity = row[er_header.index("Commodity")]
- zone = row[er_header.index("ZoneNumber")]
- key = (commodity, zone)
- if key in target_prices:
- aa_price = row[er_header.index("Price")]
- target_price = target_prices[key]
- price_correction = float(target_price) / float(aa_price)
- out_writer.writerow((commodity, zone, price_correction))
- fin.close()
- fin2.close()
- fout.close()
- # Function to multiply the AA prices by the previously calculated correction ratio.
- def apply_aa_to_sd_price_correction(year, ps=_ps):
- fin = open(ps.scendir + "/" + str(year) + "/ExchangeResults.csv", "rU")
- exchange_results = csv.reader(fin)
- fin2 = open(ps.scendir + "/AllYears/Outputs/AAtoSDPriceCorrections.csv", "rU")
- corrections = csv.reader(fin2)
- fout = open(ps.scendir + "/" + str(year) + "/SDPrices.csv", "w", newline="")
- out_writer = csv.writer(fout)
- er_header = next(exchange_results)
- corr_header = next(corrections)
- out_writer.writerow(er_header)
- price_corrections = {}
- for row in corrections:
- commodity = row[corr_header.index("Commodity")]
- zone = row[corr_header.index("LUZ")]
- key = (commodity, zone)
- price_correction = row[corr_header.index("PriceCorrection")]
- price_corrections[key] = price_correction
- for row in exchange_results:
- commodity = row[er_header.index("Commodity")]
- zone = row[er_header.index("ZoneNumber")]
- key = (commodity, zone)
- if key in price_corrections:
- price = float(row[er_header.index("Price")])
- newprice = price * float(price_corrections[key])
- max_price = ps.maximum_sd_prices.get(commodity, float("inf"))
- if newprice > max_price:
- newprice = max_price
- row[er_header.index("Price")] = newprice
- out_writer.writerow(row)
- fin.close()
- fin2.close()
- fout.close()
- logging.info("Replacing ExchangeResults with SD Corrected Version")
- # for now copy corrected prices to ExchangeResults.csv, but want to change SD code to read SDPrices.csv
- move_replace(ps.scendir + "/" + str(year) + "/ExchangeResults.csv",
- ps.scendir + "/" + str(year) + "/AAExchangeResults.csv")
- shutil.copyfile(ps.scendir + "/" + str(year) + "/SDPrices.csv",
- ps.scendir + "/" + str(year) + "/ExchangeResults.csv")
- def getMapitHTTPSAuthHeader(ps=_ps):
- if not ps.mapit_httpsuser:
- return None
- authstr = '{}:{}'.format(ps.mapit_httpsuser, ps.mapit_httpspass)
- return 'Basic {}'.format(encodestring(authstr.encode('ascii'))[:-1].decode("utf-8"))
- def makeMapitAPICall(apiname, ps=_ps, retries=None, **api_params):
- HTTP_UNAUTHORIZED = 401
- HTTP_INTERNALERROR = 500
- timeout_retries = retries or 3
- headers = {
- 'User-Agent' : 'PECAS-Routines-Script',
- 'Accept' : 'application/json'
- }
- url = '/'.join([ps.mapit_httpspath, apiname, ''])
- api_params['accept'] = 'application/json'
- params = urlencode(api_params, True).encode('utf-8')
- req = HttpRequest(url, params, headers)
- logging.info("Calling MapIt API {!r}".format(apiname))
- ok = False
- auth_retry = False
- while not ok:
- try:
- resp = urlopen(req).read()
- ok = True
- except HTTPError as e:
- if (getattr(e, 'code', None) == HTTP_UNAUTHORIZED) and not auth_retry:
- authline = e.headers['www-authenticate']
- if ':' in authline:
- authline = authline.split(':')[1]
- scheme = authline.split()[0]
- if scheme.lower() != 'basic':
- logging.error("API Lookup: Trying to authenticate HTTPS but expecting {!r}.".format(authline))
- raise
- req.add_header("Authorization", getMapitHTTPSAuthHeader(ps=ps))
- auth_retry = True
- elif (getattr(e, 'code', None) == HTTP_INTERNALERROR) and timeout_retries:
- # just do it again
- timeout_retries -= 1
- else:
- try:
- request_data = req.get_data()
- except AttributeError:
- request_data = req.data
- request_fields = request_data.split('&')
- request_data_print = '&'.join([d for d in request_fields if not d.startswith('file_contents')])
- logging.error("\n".join([
- "API Lookup HTTPError {} ({}):".format(getattr(e, 'code', 0), getattr(e, 'msg', '')),
- " URL:\n {}".format(req.get_full_url()),
- " Data:\n {}".format(request_data_print),
- " Headers:",
- "\n".join([" {}:{}".format(k, v) for (k, v) in req.header_items()])
- ]))
- raise
- return loadjson(resp)
- def showAPIError(context_action, errmsg):
- logging.error("Unable to {} MapIt server: API returned {!r}".format(context_action, errmsg or 'fail'))
- def load_outputs_for_year(year, sequence, excl=None, ps=_ps):
- operation = "loading data sequence {} for year:{}".format(sequence, year)
- logging.info(operation.capitalize())
- excluded_files = [fn if fn.endswith('.csv') else (fn + '.csv') for fn in (excl or [])]
- folder_path = '/'.join([ps.scendir, str(year)])
- if not getattr(ps, 'mapit_httpspath', None):
- logging.error(
- "Field 'mapit_httpspath' required in pecas_settings but was not found."
- "Cannot load outputs to MapIt server."
- )
- return
- # The upload starts to intermittently fail (with a difficult-to-diagnose
- # HTTP 500 error) if the HTTP POST data string is larger than 128MB (this
- # size is actually dependent on various server parameters, but in practice,
- # this is where we're at right now). That generally translates into about
- # 1.6 million rows of CSV data - but I'll make the cutoff 100,000 just to
- # be on the safe side. Anything larger than that, and I want to chunk the
- # file into blocks smaller than that cutoff. The API at the MapIt server
- # end is written such that it can just be re-called repeatedly with any
- # additional data.
- # If you start seeing HTTP 500 errors in the uploadFileContents API that
- # you can't pin down, and haven't (obviously) changed anything else, and
- # other API calls continue to work, this may be the culprit. Try making
- # chunksz smaller, and see if that helps. :-)
- chunksz = 100000
- resp = makeMapitAPICall('getOutputFilesByLoadSeq', ps=ps, load_seq=sequence)
- if 'fail' in resp:
- showAPIError(context_action="read file list from", errmsg=resp['fail'])
- return
- for row in resp['success']:
- # list of rows where each row is a dict with fields 'file_id' and 'csv_file_name'
- csv_file_name = row['csv_file_name']
- if csv_file_name in excluded_files:
- print("Skipping file {}".format(csv_file_name))
- continue
- logging.info("Loading file {}".format(csv_file_name))
- filename = '/'.join([folder_path, csv_file_name])
- try:
- with open(filename, 'r') as csv_file:
- file_contents = csv_file.readlines()
- except IOError as e:
- logging.error("NOTICE: No such file or directory:" + filename)
- else:
- counter = 0
- all_table_name = None
- totalsz = len(file_contents)
- for i in range(1, totalsz, chunksz):
- resp = makeMapitAPICall(
- apiname = 'uploadFileContents',
- ps = ps,
- file_id = row['file_id'],
- scenario = ps.scenario,
- year_run = year,
- file_contents = ''.join(file_contents[i:i+chunksz]),
- skip_update = (i+chunksz < totalsz) # Only run update_loaded_scenarios() on the last chunk
- )
- if 'fail' in resp:
- showAPIError(context_action="upload file contents to", errmsg=resp['fail'])
- return
- counter += resp['success']['records']
- all_table_name = resp['success']['tblname']
- logging.info("{} record(s) added to {}".format(counter, all_table_name))
- logging.info("Finished " + operation)
- def loadOutputsForYear_(year, sequence, excl=None, ps=_ps):
- """In-line for deprecation; has been replaced by (what is now designated) load_outputs_for_year()"""
- logging.info("Loading data sequence "+str(sequence)+" for year:" + str(year))
- excluded_files = [fn if fn.endswith('.csv') else (fn + '.csv') for fn in (excl or [])]
- folder_path = os.path.join(ps.scendir, str(year))
- folder_path = folder_path.replace('\\', '/')
- conn = connect_to_mapit(ps=ps)
- with conn, conn.cursor() as cur:
- cur.execute('SELECT * FROM %s.aa_output_files where load_sequence = %d;' % (
- ps.mapit_schema, sequence)) # get the list of aa output files we are interested in
- filesrows = cur.fetchall()
- colnames = [x[0] for x in cur.description]
- for row in filesrows: # for each output file
- csv_file_name=row[colnames.index("csv_file_name")]
- if csv_file_name in excluded_files:
- print("Skipping file {}".format(csv_file_name))
- continue
- print("Loading file " + csv_file_name)
- all_table_name = row[colnames.index("all_table_name")]
- temp_table_name = row[colnames.index("temp_table_name")]
- if "partitioned" in colnames:
- partition = row[colnames.index("partitioned")]
- else:
- partition = False
- # print(colnames)
- # print(str(all_table_name) +" partitioned status is "+str(partition))
- cur.execute('TRUNCATE %s.%s' % (ps.mapit_schema, temp_table_name)) # empty the temporary table
- csv_file = os.path.join(folder_path, csv_file_name)
- try:
- f = open(csv_file, 'r') # open the AA output file
- f.readline() # skip the first line for header
- cur.copy_from(f, "%s.%s" % (ps.mapit_schema, temp_table_name), sep=',',
- null='') # use the psycopg2 fast copy command to copy the data into the temporary table
- cur.execute('SELECT count(*) FROM %s.%s;' % (ps.mapit_schema, temp_table_name))
- counter = cur.fetchone()[0]
- # now insert the records from the temporary table into the full table which contains data for each
- # year/scenario. This needs to happen before the scenario runs sqlcmd ="SELECT * from
- # %s.%s__drop_partition('%s')" %(ps.mapit_schema, all_table_name, ps.scenario);
- if partition:
- print("Trying to partition " + all_table_name)
- sqlcmd = "SELECT * from %s.%s__create_partition('%s')" % (
- ps.mapit_schema, all_table_name, ps.scenario)
- cur.execute(sqlcmd)
- sqlcmd = (
- "SELECT column_name from INFORMATION_SCHEMA.COLUMNS"
- "where TABLE_NAME='%s' AND TABLE_SCHEMA = '%s';" % (
- temp_table_name, ps.mapit_schema)
- )
- cur.execute(sqlcmd)
- temptablecolnames = cur.fetchall()
- cols = str("")
- first = True
- for colrow in temptablecolnames:
- if not first:
- cols = cols + ","
- first = False
- cols = cols + '"' + str(colrow[0]) + '"'
- sqlcmd = "INSERT INTO %s.%s (scenario, year_run, %s) SELECT '%s', %d, %s FROM %s.%s;" % (
- ps.mapit_schema, all_table_name, cols, ps.scenario, year, cols, ps.mapit_schema, temp_table_name)
- # print(sqlcmd)
- cur.execute(sqlcmd)
- logging.info("%d record(s) added to %s" % (counter, all_table_name))
- except IOError: # if file does not exist. ex. ActivityLocations2_6k.csv not created every year.
- logging.error("NOTICE: No such file or directory:" + csv_file)
- cur.execute('SET search_path = %s ' % ps.mapit_schema)
- cur.callproc('output.update_loaded_scenarios', [ps.scenario, year, year])
- logging.info("Finished loading data sequence " + str(sequence) + " for year:" + str(year))
- def clear_upload(year, ps=_ps):
- conn = connect_to_mapit(ps=ps)
- with conn, conn.cursor() as cur:
- query = (
- "set search_path={}; select "
- "clean_up_tables_for_scenario_and_year("
- "%(scen)s, %(year)s)").format(ps.mapit_schema)
- cur.execute(query, {"scen": ps.scenario, "year": year})
- # Function to adjust the Floorspace by the (previously calculated) correction delta.
- def write_floorspace_i(year, ps=_ps):
- # can parameterize column names from FloorspaceI in pecas_settings, but this is rarely used
- if hasattr(ps, 'fl_itaz'):
- fl_itaz = ps.flItaz
- fl_icommodity = ps.flIcommodity
- fl_iquantity = ps.flIquantity
- else:
- fl_itaz = "TAZ"
- fl_icommodity = "Commodity"
- fl_iquantity = "Quantity"
- # Read Floorspace O
- floor_o_in = open(ps.scendir + str(year) + "/FloorspaceO.csv", "r")
- floor_o_in_file = csv.reader(floor_o_in)
- header = next(floor_o_in_file)
- floor_o_dict = {}
- for row in floor_o_in_file:
- key = (row[header.index(fl_itaz)], row[header.index(fl_icommodity)])
- if key in floor_o_dict:
- logging.warning("Line duplicated in FloorspaceO file: %s", key)
- floor_o_dict[key] = floor_o_dict[key] + float(row[header.index(fl_iquantity)])
- else:
- floor_o_dict[key] = float(row[header.index(fl_iquantity)])
- floor_o_in.close()
- has_c = False
- try:
- floor_c_in = open(ps.scendir + str(year) + "/FloorspaceCalc.csv", "r")
- has_c = True
- except IOError:
- logging.info("NOTICE: FloorspaceCalc not found, using FloorspaceDelta file.")
- if has_c:
- # Read floorspace Calc
- # noinspection PyUnboundLocalVariable
- floor_c_in_file = csv.reader(floor_c_in)
- header = next(floor_c_in_file)
- floor_c_dict = {}
- for row in floor_c_in_file:
- key = (row[header.index(fl_itaz)], row[header.index(fl_icommodity)])
- if key in floor_c_dict:
- logging.warning("Line duplicated in FloorspaceCalc file: %s", key)
- floor_c_dict[key] = floor_c_dict[key] + float(row[header.index(fl_iquantity)])
- else:
- floor_c_dict[key] = float(row[header.index(fl_iquantity)])
- floor_c_in.close()
- # Write floorspace Delta
- floor_d_out = open(ps.scendir + str(year) + "/FloorspaceDelta.csv", "w", newline="")
- floor_d_out_file = csv.writer(floor_d_out)
- header = [fl_itaz, fl_icommodity, fl_iquantity]
- floor_d_out_file.writerow(header)
- key_list = list(floor_c_dict.keys())
- key_list.sort()
- for key in key_list:
- if key in floor_o_dict:
- delta = floor_c_dict[key] - floor_o_dict[key]
- else:
- delta = floor_c_dict[key]
- out_row = list(key)
- out_row.append(delta)
- floor_d_out_file.writerow(out_row)
- # Add in ODict values not in CDict; set delta to -ve of ODict value
- key_list = list(floor_o_dict.keys())
- for key in key_list:
- if key in floor_c_dict:
- pass
- else:
- delta = -1 * floor_o_dict[key]
- out_row = list(key)
- out_row.append(delta)
- floor_d_out_file.writerow(out_row)
- floor_d_out.close()
- else:
- # Copy from previous year
- shutil.copyfile(ps.scendir + "/" + str(year - 1) + "/FloorspaceDelta.csv",
- ps.scendir + "/" + str(year) + "/FloorspaceDelta.csv")
- # Read floorspace Delta
- floor_d_in = open(ps.scendir + str(year) + "/FloorspaceDelta.csv", "r")
- floor_d_in_file = csv.reader(floor_d_in)
- header = next(floor_d_in_file)
- floor_d_dict = {}
- for row in floor_d_in_file:
- key = (row[header.index(fl_itaz)], row[header.index(fl_icommodity)])
- if key in floor_d_dict:
- logging.warning("Line duplicated in FloorspaceDelta file: %s", key)
- floor_d_dict[key] = floor_d_dict[key] + float(row[header.index(fl_iquantity)])
- else:
- floor_d_dict[key] = float(row[header.index(fl_iquantity)])
- floor_d_in.close()
- # Write floorspace I
- if has_c:
- # copy FloorspaceCalc as FloorspaceI
- shutil.copyfile(ps.scendir + "/" + str(year) + "/FloorspaceCalc.csv",
- ps.scendir + "/" + str(year) + "/FloorspaceI.csv")
- else:
- floor_i_out = open(ps.scendir + str(year) + "/FloorspaceI.csv", "w", newline="")
- floor_i_out_file = csv.writer(floor_i_out)
- header = [fl_itaz, fl_icommodity, fl_iquantity]
- floor_i_out_file.writerow(header)
- key_list = list(floor_d_dict.keys())
- key_list.sort()
- for key in key_list:
- if key in floor_o_dict:
- net = floor_d_dict[key] + floor_o_dict[key]
- if net < 0:
- logging.debug("Negative value for floorspace in %s", key)
- net = 0
- else:
- # print "WARNING: Key", key, "in floorspaceO, but not in floorspaceDelta."
- net = floor_d_dict[key]
- if net < 0:
- logging.debug("Negative value for floorspace in %s", key)
- net = 0
- out_row = list(key)
- out_row.append(net)
- floor_i_out_file.writerow(out_row)
- # Add in ODict values not in DDict; set net to ODict value
- key_list = list(floor_o_dict.keys())
- for key in key_list:
- if key in floor_d_dict:
- pass
- else:
- net = floor_o_dict[key]
- if net < 0:
- logging.debug("Negative value for floorspace in %s", key)
- net = 0
- out_row = list(key)
- out_row.append(net)
- floor_i_out_file.writerow(out_row)
- floor_i_out.close()
- # Function to calculate and update total imports and exports in ActivityTotalsI in future years (base year has to be
- # set before running.
- def update_importers_or_exporters(year, activities_to_exclude, activities_to_update, update_using_mor_u, ps=_ps):
- sqlstr = "update input.activity_totals \n\
- set total_amount=(case when '" + update_using_mor_u + "'='U' then -im.amount else im.amount end) from \n\
- -- select mu.activity, im.amount from \n\
- ( \n\
- -- inner query is amount made (or used in the case of updating imports) internally last year \n\
- select commodity,moru, sum(amount) as amount from output.all_makeuse \n\
- where scenario='" + ps.scenario + "' and year_run=" + str(
- year - 1) + " and moru='" + update_using_mor_u + "' and activity not like '" + activities_to_exclude + "' \n\
- group by commodity, moru \n\
- ) im, output.all_makeuse mu \n\
- where mu.scenario='" + ps.scenario + "' and \n\
- mu.year_run=" + str(year - 1) + " and \n\
- -- next two lines find the exporting activity that used (or importing commodity that made) \n\
- -- the commodity we measured in the inner query \n\
- mu.activity like '" + activities_to_update + "' and \n\
- mu.commodity=im.commodity \n\
- -- next three lines specify the values we are updating, i.e. the appropriate activity/scneario/year combination \n\
- and mu.activity=input.activity_totals.activity \n\
- and input.activity_totals.year_run=" + str(year) + " \n\
- and input.activity_totals.scenario='" + ps.scenario + "';"
- logging.debug("Updating " + activities_to_update + " with \n" + sqlstr)
- if ps.sql_system == ps.postgres:
- retcode = execute_postgresql_query(sqlstr, ps.mapit_database, ps.mapit_port, ps.mapit_host, ps.pguser, ps=ps)
- elif ps.sql_system == ps.sqlserver:
- retcode = execute_sqlserver_query(sqlstr, ps.mapit_database, ps.sd_user, ps.sd_password, ps=ps)
- else:
- logging.error("Invalid database system: " + ps.sql_system)
- raise ValueError
- log_results_from_external_program("Updated amounts for " + activities_to_update + " in " + str(year),
- "Failed in updating " + activities_to_update, (retcode,))
- def update_importers(year, ps=_ps):
- update_importers_or_exporters(year, ps.exporter_string, ps.importer_string, 'U', ps=ps)
- def update_exporters(year, ps=_ps):
- update_importers_or_exporters(year, ps.importer_string, ps.exporter_string, 'M', ps=ps)
- def load_distances(skim_file_name, skimyear, ps=_ps):
- query = "truncate table " + ps.sd_schema + ".distances"
- if ps.sql_system == ps.postgres:
- retcode = execute_postgresql_query(query, ps.sd_database, ps.sd_port, ps.sd_host, ps.sd_user, ps=ps)
- elif ps.sql_system == ps.sqlserver:
- retcode = execute_sqlserver_query(query, ps.sd_database, ps.sd_user, ps.sd_password, ps=ps)
- else:
- logging.error("Invalid database system: " + ps.sql_system)
- raise ValueError
- log_results_from_external_program("Deleted old distance data from database",
- "Problem deleting old distance data from database", (retcode,))
- skim_file = open(ps.scendir + str(skimyear) + "/" + skim_file_name + ".csv", "rU")
- skim_file_reader = csv.reader(skim_file)
- header = next(skim_file_reader)
- query = "insert into " + ps.sd_schema + ".distances (origin_luz, destination_luz, distance) values "
- first = True
- counter = 0
- for row in skim_file_reader:
- if not first:
- query += ","
- first = False
- origin = row[header.index("Origin")]
- destination = row[header.index("Destination")]
- distance = float(row[header.index(ps.distance_column)])
- if distance == 0:
- distance = 1E99
- query += "(" + str(origin) + "," + str(destination) + "," + str(distance) + ")"
- counter = counter + 1
- if counter >= 500:
- query += ";"
- # print query
- if ps.sql_system == ps.postgres:
- execute_postgresql_query(query, ps.sd_database, ps.sd_port, ps.sd_host, ps.sd_user, ps=ps)
- elif ps.sql_system == ps.sqlserver:
- execute_sqlserver_query(query, ps.sd_database, ps.sd_user, ps.sd_password, ps=ps)
- else:
- logging.error("Invalid database system: " + ps.sql_system)
- raise ValueError
- query = "insert into " + ps.sd_schema + ".distances (origin_luz, destination_luz, distance) values "
- first = True
- counter = 0
- if not first:
- query += ";"
- # print query
- if ps.sql_system == ps.postgres:
- execute_postgresql_query(query, ps.sd_database, ps.sd_port, ps.sd_host, ps.sd_user, ps=ps)
- elif ps.sql_system == ps.sqlserver:
- execute_sqlserver_query(query, ps.sd_database, ps.sd_user, ps.sd_password, ps=ps)
- else:
- logging.error("Invalid database system: " + ps.sql_system)
- raise ValueError
- def load_exchange_results(year, ps=_ps):
- query = "truncate table " + ps.sd_schema + ".exchange_results"
- if ps.sql_system == ps.postgres:
- retcode = execute_postgresql_query(query, ps.sd_database, ps.sd_port, ps.sd_host, ps.sd_user, ps=ps)
- elif ps.sql_system == ps.sqlserver:
- retcode = execute_sqlserver_query(query, ps.sd_database, ps.sd_user, ps.sd_password, ps=ps)
- else:
- logging.error("Invalid database system: " + ps.sql_system)
- raise ValueError
- log_results_from_external_program(None, "Problem deleting old exchange results from database", (retcode,))
- exresults = open(ps.scendir + str(year) + "/ExchangeResults.csv", "r")
- exresults_reader = csv.reader(exresults)
- header = next(exresults_reader)
- # TODO add year column in all_exchange_results, make exchange_results a view for the current year
- query = "insert into " + ps.sd_schema + ".exchange_results (commodity, luz, price, internal_bought) values "
- first = True
- counter = 0
- for row in exresults_reader:
- if not first:
- query += ","
- first = False
- commodity = row[header.index("Commodity")]
- luz = row[header.index("ZoneNumber")]
- price = row[header.index("Price")]
- internal_bought = row[header.index("InternalBought")]
- query += "('" + commodity + "'," + luz + "," + price + "," + internal_bought + ")"
- counter = counter + 1
- if counter >= 250:
- query += ";"
- # print query
- if ps.sql_system == ps.postgres:
- execute_postgresql_query(query, ps.sd_database, ps.sd_port, ps.sd_host, ps.sd_user, ps=ps)
- elif ps.sql_system == ps.sqlserver:
- execute_sqlserver_query(query, ps.sd_database, ps.sd_user, ps.sd_password, ps=ps)
- else:
- logging.error("Invalid database system: " + ps.sql_system)
- raise ValueError
- query = ("insert into " + ps.sd_schema +
- ".exchange_results (commodity, luz, price, internal_bought) values ")
- first = True
- counter = 0
- if not first:
- query += ";"
- # print query
- if ps.sql_system == ps.postgres:
- execute_postgresql_query(query, ps.sd_database, ps.sd_port, ps.sd_host, ps.sd_user, ps=ps)
- elif ps.sql_system == ps.sqlserver:
- execute_sqlserver_query(query, ps.sd_database, ps.sd_user, ps.sd_password, ps=ps)
- else:
- logging.error("Invalid database system: " + ps.sql_system)
- raise ValueError
- def smooth_prices(ps=_ps):
- logging.info("Applying price smoothing")
- query = "update " + ps.sd_schema + ".exchange_results\n\
- set price=new_price from\n\
- ( select commodity, origin_luz, sum(internal_bought) as total_bought, \
- sum(internal_bought * price * power(distance, " + str(ps.gravity_exponent) + \
- ")) / sum(internal_bought * power (distance, " + str(ps.gravity_exponent) + ")) as new_price\n\
- from " + ps.sd_schema + ".exchange_results\n\
- inner join " + ps.sd_schema + ".space_to_commodity\n\
- on exchange_results.commodity=space_to_commodity.aa_commodity\n\
- inner join " + ps.sd_schema + ".distances\n\
- on exchange_results.luz=distances.destination_luz\n\
- group by commodity, origin_luz\n\
- ) new_prices\n\
- where exchange_results.commodity=new_prices.commodity\n\
- and exchange_results.luz=new_prices.origin_luz\n\
- and total_bought > 0"
- if ps.sql_system == ps.postgres:
- execute_postgresql_query(query, ps.sd_database, ps.sd_port, ps.sd_host, ps.sd_user, ps=ps)
- elif ps.sql_system == ps.sqlserver:
- execute_sqlserver_query(query, ps.sd_database, ps.sd_user, ps.sd_password, ps=ps)
- else:
- logging.error("Invalid database system: " + ps.sql_system)
- raise ValueError
- def write_smoothed_prices(year, pgcmd, ps=_ps):
- # TODO use psycopg2 for this
- if ps.sql_system == ps.sqlserver:
- logging.error("Writing smoothed prices from database not supported yet on SQL Server")
- raise ValueError
- sqlstr = "\\copy (SELECT commodity as \"Commodity\", luz as \"ZoneNumber\"\n\
- , price as \"Price\", internal_bought as \"InternalBought\"\n\
- FROM " + ps.sd_schema + ".exchange_results) to '" + ps.scendir + str(
- year) + "/ExchangeResultsSmoothed.csv' csv header"
- retcode = subprocess.check_call(
- [pgcmd, "-c", sqlstr, "--host=" + ps.sd_host, "--port=" + str(ps.sd_port), "--dbname=" + ps.sd_database,
- "--username=" + ps.sd_user])
- log_results_from_external_program(None, "Problem updating ActivityTotalsI in year " + str(year), (retcode,))
- def apply_site_spec(year, ps=_ps):
- logging.info("Applying site spec")
- query = ("update {sch}.parcels p\n"
- "set year_built = s.year_effective, \n"
- "space_type_id = s.space_type_id,\n"
- "space_quantity = s.space_quantity,\n"
- "land_area = s.land_area,\n"
- "is_derelict = false,\n"
- "is_brownfield = false\n"
- "from {sch}.sitespec_parcels s\n"
- "where p.pecas_parcel_num = s.pecas_parcel_num\n"
- "and s.update_parcel = true\n"
- "and s.year_effective = " + str(year)).format(
- sch=ps.sd_schema, yr=year)
- if ps.sql_system == ps.postgres:
- execute_postgresql_query(query, ps.sd_database, ps.sd_port, ps.sd_host, ps.sd_user, ps=ps)
- elif ps.sql_system == ps.sqlserver:
- execute_sqlserver_query(query, ps.sd_database, ps.sd_user, ps.sd_password, ps=ps)
- else:
- logging.error("Invalid database system: " + ps.sql_system)
- raise ValueError
- def update_space_limits(year, ps=_ps):
- logging.info("Updating space maximums")
- check_is_postgres(ps=ps)
- with sd_querier(ps=ps).transaction(parcels=ps.parcels_table, yr=year) as tr:
- tr.query(
- "delete from {sch}.space_taz_limits\n"
- "where year_effective >= %(yr)s and manual = false")
- tr.query(
- "insert into {sch}.space_taz_limits\n"
- "select gen.taz_group_id, gen.taz_limit_group_id,\n"
- "gen.min_quantity * "
- "(1 + gen.percent_annual_change_to_min / 100) ^ "
- "(%(yr)s - gen.year_effective),\n"
- "case when gen.max_quantity is null then\n"
- "greatest(sub.total_quantity * "
- "(1 + gen.percent_annual_change_to_max / 100), "
- "sub.total_quantity + gen.min_increment)\n"
- "else\n"
- "gen.max_quantity * "
- "(1 + gen.percent_annual_change_to_max / 100) ^ "
- "(%(yr)s - gen.year_effective)"
- "end\n,"
- "%(yr)s, false\n"
- "from\n"
- "(select distinct on (taz_group_id, taz_limit_group_id) gen.*\n"
- "from {sch}.space_taz_limits_generator gen\n"
- "left join {sch}.space_taz_limits lim\n"
- "on lim.taz_group_id = gen.taz_group_id\n"
- "and lim.taz_limit_group_id = gen.taz_limit_group_id\n"
- "and lim.year_effective <= %(yr)s\n"
- "and lim.year_effective >= gen.year_effective\n"
- "and lim.manual\n"
- "where gen.year_effective <= %(yr)s\n"
- "and lim.taz_group_id is null\n"
- "order by gen.taz_group_id, gen.taz_limit_group_id, gen.year_effective desc\n"
- ") gen\n"
- "left join\n"
- "(select tg.taz_group_id, tlst.taz_limit_group_id, "
- "sum(p.space_quantity) as total_quantity\n"
- "from {sch}.parcels p\n"
- "join {sch}.tazs_by_taz_group tg on tg.taz_number = p.taz\n"
- "join {sch}.taz_limit_space_types tlst "
- "on tlst.space_type_id = p.space_type_id\n"
- "group by tg.taz_group_id, tlst.taz_limit_group_id) sub\n"
- "on sub.taz_group_id = gen.taz_group_id\n"
- "and sub.taz_limit_group_id = gen.taz_limit_group_id\n"
- )
- def write_activity_totals(year, pgcmd, dbyear=None, ps=_ps):
- if dbyear is None:
- dbyear = year
- # TODO use psycopg2 for this
- if ps.sql_system == ps.sqlserver:
- logging.error("Updating ActivityTotalsI from database not supported yet on SQL Server")
- raise ValueError
- sqlstr = "\\copy (select activity as \"Activity\", total_amount as \"TotalAmount\" from input.activity_totals \
- where year_run=" + str(dbyear) + " and scenario='" + ps.scenario + "') to '" + ps.scendir + str(
- year) + "/ActivityTotalsI.csv' csv header"
- retcode = subprocess.check_call([pgcmd, "-c", sqlstr, "--host=" + ps.mapit_host, "--port=" + str(ps.mapit_port),
- "--dbname=" + ps.mapit_database, "--username=" + ps.mapit_user])
- log_results_from_external_program(None, "Problem updating ActivityTotalsI in year " + str(year), (retcode,))
- def load_development_events(year, ps=_ps):
- logging.info("Loading development events for year:" + str(year))
- if ps.sql_system == ps.postgres:
- folder_path = os.path.join(ps.scendir, str(year))
- folder_path = folder_path.replace('\\', '/')
- csv_file = os.path.join(folder_path, "developmentEvents.csv")
- conn = connect_to_sd(ps=ps)
- with conn, conn.cursor() as cur:
- cur.execute('TRUNCATE %s.%s' % (ps.sd_schema, "development_events")) # empty the temporary table
- f = open(csv_file, 'r') # open the AA output file
- f.readline() # skip the first line for header
- cur.copy_from(f, "%s.%s" % (ps.sd_schema, "development_events"), sep=',',
- null='') # use the psycopg2 fast copy command to copy the data into the temporary table
- cur.execute('SELECT count(*) FROM %s.%s;' % (ps.sd_schema, "development_events"))
- counter = cur.fetchone()[0]
- logging.info("Loaded %s development events from file %s" % (counter, str(csv_file)))
- elif ps.sql_system == ps.sqlserver:
- # TODO Add logging with pyodbc or _mssql
- folder_path = os.path.join(ps.DEVEVENTPATH)
- folder_path = folder_path.replace('/', '\\')
- csv_file = os.path.join(folder_path, "developmentEvents" + str(year) + ".csv")
- # csv_file = os.path.join(folder_path, "developmentEvents.csv")
- query = 'TRUNCATE TABLE %s.%s' % (ps.sd_schema, "development_events")
- execute_sqlserver_query(query, ps.sd_database, ps.sd_user, ps.sd_password, ps=ps)
- f = open(csv_file, 'r') # open the AA output file
- f.readline() # skip the first line for header
- query = ("BULK INSERT %s.%s FROM " % (ps.sd_schema, "development_events") +
- "'" + csv_file + "' WITH (FIELDTERMINATOR = ',',ROWTERMINATOR = '0x0a', FIRSTROW = 2)")
- execute_sqlserver_query(query, ps.sd_database, ps.sd_user, ps.sd_password, ps=ps)
- logging.info("Loaded development events for year:" + str(year))
- else:
- logging.error("Invalid database system: " + ps.sql_system)
- raise ValueError
- def load_aa_totals(ps=_ps):
- logging.info("Loading Activity Totals for scenario:" + ps.scenario)
- folder_path = os.path.join(ps.scendir, 'AllYears/Inputs')
- folder_path = folder_path.replace('\\', '/')
- conn = connect_to_mapit(ps=ps)
- with conn, conn.cursor() as cur:
- cur.execute('TRUNCATE %s.%s' % ('input', "activity_totals_temp")) # empty the temporary table
- csv_file = os.path.join(folder_path, "All_ActivityTotalsI.csv")
- f = open(csv_file, 'r') # open the AA input file
- f.readline() # skip the first line for header
- cur.copy_from(f, "%s.%s" % ('input', "activity_totals_temp"), sep=',',
- null='') # use the psycopg2 fast copy command to copy the data into the temporary table
- cur.execute("DELETE FROM input.activity_totals WHERE scenario= '%s';" % ps.scenario)
- cur.execute(
- "INSERT INTO input.activity_totals "
- "(select year_run, '%s', activity, total_amount from input.activity_totals_temp);" % (
- ps.scenario))
- cur.execute('SELECT count(*) FROM %s.%s;' % ('input', "activity_totals_temp"))
- counter = cur.fetchone()[0]
- logging.info("Loaded %s activity totals from file %s" % (counter, str(csv_file)))
- if counter == 0:
- logging.fatal("No entries loaded from file " + str(csv_file))
- raise RuntimeError
- def load_tm_totals(ps=_ps):
- logging.info("Loading Travel Model Totals for scenario:" + ps.scenario)
- folder_path = os.path.join(ps.scendir, 'AllYears/Inputs')
- folder_path = folder_path.replace('\\', '/')
- conn = connect_to_mapit(ps=ps)
- cur = conn.cursor()
- cur.execute('TRUNCATE %s.%s' % ('input', "travel_model_totals_temp")) # empty the temporary table
- csv_file = os.path.join(folder_path, "TravelModelTotalsI.csv")
- f = open(csv_file, 'r') # open the AA input file
- f.readline() # skip the first line for header
- cur.copy_from(f, "%s.%s" % ('input', "travel_model_totals_temp"), sep=',',
- null='') # use the psycopg2 fast copy command to copy the data into the temporary table
- cur.execute("DELETE FROM input.travel_model_totals WHERE scenario= '%s';" % ps.scenario)
- cur.execute(
- "INSERT INTO input.travel_model_totals "
- "(select '%s', year_run, tdm_code, tdm_total_amount from input.travel_model_totals_temp );" % (
- ps.scenario))
- cur.execute('SELECT count(*) FROM %s.%s;' % ('input', "travel_model_totals_temp"))
- counter = cur.fetchone()[0]
- logging.info("Loaded %s travel model totals from file %s" % (counter, str(csv_file)))
- conn.commit()
- conn.close()
- def load_am_totals(ps=_ps):
- logging.info("Loading Activity Model Totals for scenario:" + ps.scenario)
- folder_path = os.path.join(ps.scendir, 'AllYears/Inputs')
- folder_path = folder_path.replace('\\', '/')
- conn = connect_to_mapit(ps=ps)
- with conn, conn.cursor() as cur:
- cur.execute('TRUNCATE %s.%s' % ('input', "activity_model_totals_temp")) # empty the temporary table
- csv_file = os.path.join(folder_path, "ABModelTotalsI.csv")
- f = open(csv_file, 'r') # open the AA input file
- f.readline() # skip the first line for header
- cur.copy_from(f, "%s.%s" % ('input', "activity_model_totals_temp"), sep=',',
- null='') # use the psycopg2 fast copy command to copy the data into the temporary table
- cur.execute("DELETE FROM input.abm_totals WHERE scenario= '%s';" % ps.scenario)
- cur.execute(
- "INSERT INTO input.abm_totals "
- "(select '%s', year_run, abm_code, abm_total_amount from input.activity_model_totals_temp );" % (
- ps.scenario))
- cur.execute('SELECT count(*) FROM %s.%s;' % ('input', "activity_model_totals_temp"))
- counter = cur.fetchone()[0]
- logging.info("Loaded %s activity model totals from file %s" % (counter, str(csv_file)))
- def write_abm_land_use(year, dbyear=None, ps=_ps):
- if dbyear is None:
- dbyear = year
- folder_path = os.path.join(ps.scendir, str(year))
- folder_path = folder_path.replace('\\', '/')
- csv_file_name = 'abm_land_use.csv'
- csv_file = os.path.join(folder_path, csv_file_name)
- conn = connect_to_mapit(ps=ps)
- with conn, conn.cursor() as cur:
- query = "delete from output.abm_se_taz10_table where year_run=" + str(
- dbyear) + " and scenario='" + ps.scenario + "';"
- cur.execute(query)
- query = "insert into output.abm_se_taz10_table select * from output.abm_se_taz10 where year_run=" + str(
- dbyear) + " and scenario='" + ps.scenario + "';"
- cur.execute(query)
- query = "copy (select * from output.abm_se_taz10_table where year_run=" + str(
- dbyear) + " and scenario='" + ps.scenario + "') to STDOUT DELIMITER ',' CSV HEADER;"
- with open(csv_file, 'w') as f:
- cur.copy_expert(query, f)
- def write_labor_make_use(year, dbyear=None, ps=_ps):
- if dbyear is None:
- dbyear = year
- folder_path = os.path.join(ps.scendir, str(year))
- folder_path = folder_path.replace('\\', '/')
- csv_file_name = 'lmau.csv'
- csv_file = os.path.join(folder_path, csv_file_name)
- conn = connect_to_mapit(ps=ps)
- with conn, conn.cursor() as cur:
- query = "copy (select * from output.labor_make_and_use where year_run=" + str(
- dbyear) + " and scenario='" + ps.scenario + "') to STDOUT DELIMITER ',' CSV HEADER;"
- with open(csv_file, 'w') as f:
- cur.copy_expert(query, f)
- def replay_development_events_for_year(year, ps=_ps):
- load_development_events(year, ps=ps)
- logging.info("Replaying development events for " + str(year))
- if ps.apply_sitespec:
- apply_site_spec(year, ps=ps)
- replay_development_events(ps=ps)
- insert_development_events_into_history(year, ps=ps)
- def replay_development_events(ps=_ps):
- logging.info("Replaying development events")
- try:
- with sd_querier(ps=ps).transaction(parcels=ps.parcels_table) as tr:
- tr.query(
- '''UPDATE {sch}.parcels parcels SET
- space_quantity = development_events.new_space_quantity,
- space_type_id = development_events.new_space_type_id,
- year_built = development_events.new_year_built,
- land_area = development_events.land_area,
- is_derelict = development_events.new_is_derelict,
- is_brownfield = development_events.new_is_brownfield
- FROM {sch}.development_events
- WHERE parcels.pecas_parcel_num = development_events.original_pecas_parcel_num
- AND (development_events.event_type = 'C' OR
- development_events.event_type = 'R' OR
- development_events.event_type = 'D' OR
- development_events.event_type = 'A' OR
- development_events.event_type = 'L' OR
- development_events.event_type = 'US'
- );''')
- tr.query(
- '''INSERT INTO {sch}.parcels
- SELECT parcel_id,
- new_pecas_parcel_num,
- new_year_built,
- taz,
- new_space_type_id,
- new_space_quantity,
- land_area,
- available_services,
- new_is_derelict,
- new_is_brownfield
- FROM {sch}.development_events
- WHERE
- development_events.event_type = 'CS'
- OR development_events.event_type = 'AS'
- OR development_events.event_type = 'RS'
- OR development_events.event_type = 'DS'
- OR development_events.event_type = 'LS'
- ; ''')
- tr.query(
- '''INSERT INTO {sch}.parcel_cost_xref
- SELECT development_events.new_pecas_parcel_num, parcel_cost_xref.cost_schedule_id,
- parcel_cost_xref.year_effective
- FROM {sch}.parcel_cost_xref, {sch}.development_events
- WHERE parcel_cost_xref.pecas_parcel_num=development_events.original_pecas_parcel_num
- AND (
- event_type = 'CS'
- OR event_type = 'AS'
- OR event_type = 'RS'
- OR event_type = 'DS'
- OR event_type = 'LS'
- ); ''')
- tr.query(
- '''INSERT INTO {sch}.parcel_fee_xref
- SELECT de.new_pecas_parcel_num, xref.fee_schedule_id, xref.year_effective
- FROM {sch}.parcel_fee_xref xref, {sch}.development_events de
- WHERE xref.pecas_parcel_num=de.original_pecas_parcel_num
- AND (
- event_type = 'CS'
- OR event_type = 'AS'
- OR event_type = 'RS'
- OR event_type = 'DS'
- OR event_type = 'LS'
- ); ''')
- tr.query(
- '''INSERT INTO {sch}.parcel_zoning_xref
- SELECT de.new_pecas_parcel_num, xref.zoning_rules_code, xref.year_effective
- FROM {sch}.parcel_zoning_xref xref, {sch}.development_events de
- WHERE xref.pecas_parcel_num=de.original_pecas_parcel_num
- AND (
- event_type = 'CS'
- OR event_type = 'AS'
- OR event_type = 'RS'
- OR event_type = 'DS'
- OR event_type = 'LS'
- ); ''')
- tr.query(
- '''INSERT INTO {sch}.local_effect_distances
- SELECT de.new_pecas_parcel_num,
- dist.local_effect_id,
- dist.local_effect_distance,
- dist.year_effective
- FROM {sch}.local_effect_distances dist, {sch}.development_events de
- WHERE
- dist.pecas_parcel_num=de.original_pecas_parcel_num
- AND (
- event_type = 'CS'
- OR event_type = 'AS'
- OR event_type = 'RS'
- OR event_type = 'DS'
- OR event_type = 'LS'
- );''')
- logging.info("Replayed development events into parcels table")
- except Exception as e:
- logging.error("Problem replaying development events into parcel table: {}".format(e))
- def insert_development_events_into_history(year, ps=_ps):
- logging.info("Start Inserting Development Events to History" + str(year))
- if year == ps.baseyear:
- if ps.sql_system == ps.postgres:
- query = ("SET search_path=" + ps.sd_schema +
- "; TRUNCATE development_events_history; INSERT INTO development_events_history (select " + str(
- year) + " as year_run,* from development_events);")
- retcode = execute_postgresql_query(query, ps.sd_database, ps.sd_port, ps.sd_host, ps.sd_user, ps=ps)
- logging.info("Completed Inserting Development Events to History: " + str(year))
- elif ps.sql_system == ps.sqlserver:
- query = "TRUNCATE table development_events_history; \
- INSERT INTO development_events_history \
- (year_run, \
- event_type, parcel_id, original_pecas_parcel_num, new_pecas_parcel_num, available_services, \
- old_space_type_id, new_space_type_id, old_space_quantity, new_space_quantity, \
- old_year_built, new_year_built, land_area,\
- old_is_derelict, new_is_derelict, \
- old_is_brownfield, new_is_brownfield, \
- zoning_rules_code, taz) (select " + str(year) + " as year_run,* from development_events);"
- retcode = execute_sqlserver_query(query, ps.sd_database, ps.sd_user, ps.sd_password, ps=ps)
- logging.info("Completed Inserting Development Events to History: " + str(year))
- else:
- logging.error("Invalid database system: " + ps.sql_system)
- raise ValueError
- else:
- if ps.sql_system == ps.postgres:
- query = "SET search_path=" + ps.sd_schema + "; INSERT INTO development_events_history (select " + str(
- year) + " as year_run,* from development_events);"
- retcode = execute_postgresql_query(query, ps.sd_database, ps.sd_port, ps.sd_host, ps.sd_user, ps=ps)
- logging.info("Completed Inserting Development Events to History: " + str(year))
- elif ps.sql_system == ps.sqlserver:
- query = "INSERT INTO development_events_history \
- (year_run, \
- event_type, parcel_id, original_pecas_parcel_num, new_pecas_parcel_num, available_services, \
- old_space_type_id, new_space_type_id, old_space_quantity, new_space_quantity, \
- old_year_built, new_year_built, land_area,\
- old_is_derelict, new_is_derelict, \
- old_is_brownfield, new_is_brownfield, \
- zoning_rules_code, taz) (select " + str(year) + " as year_run,* from development_events);"
- retcode = execute_sqlserver_query(query, ps.sd_database, ps.sd_user, ps.sd_password, ps=ps)
- logging.info("Completed Inserting Development Events to History: " + str(year))
- else:
- logging.error("Invalid database system: " + ps.sql_system)
- raise ValueError
- log_results_from_external_program(None, "Problem updating development events history for " + str(year), (retcode,))
- def write_floorspace_summary_from_parcel_file(sd_output_year, ps=_ps):
- # TODO pyscopg2 version
- if ps.sql_system == ps.postgres:
- sqlstr = "\\copy (select * from " + ps.sd_schema + ".floorspacei_view) to '" + ps.scendir + str(
- sd_output_year) + "/FloorspaceI.csv' csv header"
- print(sqlstr)
- retcode = execute_postgresql_query(sqlstr, ps.sd_database, ps.sd_port, ps.sd_host, ps.sd_user, ps=ps)
- log_results_from_external_program("Wrote FloorspaceI from Parcel File for use in year " + str(sd_output_year),
- "Problem writing FloorspaceI from Parcel File for use in year " + str(
- sd_output_year), (retcode,))
- elif ps.sql_system == ps.sqlserver:
- folder_path = os.path.join(ps.EXACTPATH, str(sd_output_year))
- file_path = os.path.join(folder_path, 'FloorspaceI.csv')
- file_path = file_path.replace('/', '\\')
- sqlstr = ("select 'TAZ' as taz, 'Commodity' as commodity, 'Quantity' as quantity "
- "union all select cast(TAZ as nvarchar), cast(Commodity as nvarchar(50)), cast(Quantity as nvarchar) "
- "from " + ps.sd_database + "." + ps.sd_schema + ".floorspacei_view")
- subprocess.call(["bcp", sqlstr, "queryout", file_path, "-S", ps.sd_host, "-T", "-c", "-t,"])
- else:
- logging.error("Invalid database system: " + ps.sql_system)
- raise ValueError
- def copy_floorspace_summary(sd_output_year, ps=_ps):
- shutil.copyfile(ps.scendir + "/" + str(sd_output_year) + "/FloorspaceI.csv",
- ps.scendir + "/" + str(sd_output_year) + "/FloorspaceSD.csv")
- shutil.copyfile(ps.scendir + "/" + str(sd_output_year) + "/FloorspaceI.csv",
- ps.scendir + "/" + str(sd_output_year) + "/FloorspaceO.csv")
- def reset_parcel_database(ps=_ps):
- # delete the parcels file and reload from backup
- logging.info("Resetting parcel database in database {} and schema {}".format(
- ps.sd_database, ps.sd_schema))
- try:
- with sd_querier(ps=ps).transaction(parcels=ps.parcels_table) as tr:
- logging.info("Deleting local effects for pseudoparcels")
- tr.query(
- "DELETE from {sch}.local_effect_distances "
- "WHERE pecas_parcel_num > (SELECT MAX(pecas_parcel_num) FROM {sch}.parcels_backup);")
- logging.info("Deleting costs for pseudoparcels")
- tr.query(
- "DELETE from {sch}.parcel_cost_xref "
- "WHERE pecas_parcel_num > (SELECT MAX(pecas_parcel_num) FROM {sch}.parcels_backup);")
- logging.info("Deleting fees for pseudoparcels")
- tr.query(
- "DELETE from {sch}.parcel_fee_xref "
- "WHERE pecas_parcel_num > (SELECT MAX(pecas_parcel_num) FROM {sch}.parcels_backup);")
- logging.info("Deleting zoning for pseudoparcels")
- tr.query(
- "DELETE from {sch}.parcel_zoning_xref "
- "WHERE pecas_parcel_num > (SELECT MAX(pecas_parcel_num) FROM {sch}.parcels_backup);")
- logging.info("Deleting pseudoparcels")
- tr.query(
- "DELETE from {sch}.parcels "
- "WHERE pecas_parcel_num > (SELECT MAX(pecas_parcel_num) FROM {sch}.parcels_backup);")
- # tr.query("VACUUM FULL {sch}.parcels;")
- logging.info("Restoring parcels to base year state")
- tr.query("UPDATE {sch}.parcels parcels SET "
- "year_built = backup.year_built, "
- "space_type_id = backup.space_type_id, "
- "space_quantity = backup.space_quantity, "
- "land_area = backup.land_area, "
- "available_services_code = backup.available_services_code, "
- "is_derelict = backup.is_derelict, "
- "is_brownfield = backup.is_brownfield "
- "FROM {sch}.parcels_backup backup "
- "WHERE parcels.pecas_parcel_num = backup.pecas_parcel_num;")
- logging.info("Finished resetting parcels")
- except Exception as e:
- logging.error("Problem resetting parcels: {}".format(e))
- def prepare_gale_shapley_outputs(year):
- """
- Prepares the outputs from the Gale-Shapley algorithm in SD to be uploaded to Mapit
- """
- os.environ["CURYEAR"] = str(year)
- run_shell_script("prepare_gs_tables.sh")
- def snapshot_parcels(year, ps=_ps):
- # Snapshot the parcel database so we can look at intermediate parcel snapshots.
- querier = sd_querier(ps=ps)
- try:
- try:
- with querier.transaction(yr=year) as tr:
- tr.query("drop table if exists {sch}.parcels_{yr};")
- tr.query("create table {sch}.parcels_{yr} "
- " as select * from {sch}.parcels;")
- except psycopg2.ProgrammingError:
- with querier.transaction(yr=year) as tr:
- # Try truncate-and-insert instead of drop-and-create
- tr.query("truncate table {sch}.parcels_{yr};")
- tr.query("insert into {sch}.parcels_{yr} "
- " select * from {sch}.parcels;")
- logging.info("Backed up the parcel database for {}".format(year))
- except Exception as e:
- # Don't kill the run if this parcel snapshot doesn't work for some reason
- logging.error("Parcel backup failed: {}".format(e))
- class ExternalProgramError(Exception):
- def __init__(self, value):
- self.value = value
- def __str__(self):
- return repr(self.value)
- def log_results_from_external_program(okmsg, notokmsg, results_array):
- ok = True
- for result in results_array:
- if result != 0:
- ok = False
- if ok:
- if okmsg is not None:
- logging.info(okmsg)
- else:
- logging.error(notokmsg + ", return codes " + str(results_array))
- raise ExternalProgramError(notokmsg)
- def prepare_travel_model_inputs(year, scenario, ps=_ps):
- logging.info("Executing query: select input.generate_tm_inputs(" + str(year) + ",'" + scenario + "');")
- retcode = execute_postgresql_query("select input.generate_tm_inputs(" + str(year) + ",'" + scenario + "');",
- ps.mapit_database, ps.mapit_port, ps.mapit_host, ps.mapit_user, ps=ps)
- log_results_from_external_program(None, "Problem preparing table of travel model inputs in database", (retcode,))
- def updateParcelZoningXrefAccordingToadaptivePhasing(ps=_ps):
- print(ps.baseyear)
- print(ps.endyear)
- print(ps.sd_schema)
- schema = ps.sd_schema
- #start from baseyear
- currentYear =ps.baseyear
- #this dict provides information about each plan's next developable phase
- planAndCurrentPhase = {}
- with sd_querier(ps=ps).transaction(parcels=ps.parcels_table) as tr:
- #find the smallest phase_number in each plan, store it into planAndCurrentPhase.
- smallestPhaseNumber = tr.query("select {sch}.phasing_plan_xref.plan_id,min(phase_number),initial_release_year, release_threshold from {sch}.phasing_plan_xref INNER JOIN {sch}.phasing_plans ON {sch}.phasing_plans.plan_id= {sch}.phasing_plan_xref.plan_id group by ({sch}.phasing_plan_xref.plan_id,initial_release_year, release_threshold)\n")
- #for testing purpose, clone a parcel_zoning_xref table
- #in future, may directly do every thing in parcel_zoning_xref, we'll see
- query = "DROP table if exists "+schema+".parcel_zoning_xref_fortest"
- execute_postgresql_query(query, ps.sd_database, ps.sd_port, ps.sd_host, ps.sd_user, ps=ps)
- query = "CREATE TABLE "+schema+".parcel_zoning_xref_fortest \
- (pecas_parcel_num bigint NOT NULL,zoning_rules_code integer,year_effective integer NOT NULL, \
- CONSTRAINT pz_pkey_fortest PRIMARY KEY (pecas_parcel_num,year_effective));"
- execute_postgresql_query(query, ps.sd_database, ps.sd_port, ps.sd_host, ps.sd_user, ps=ps)
- ####tr.query("delete from {sch}.parcel_zoning_xref_fortest")
- ####tr.query("insert into {sch}.parcel_zoning_xref_fortest SELECT * FROM {sch}.parcel_zoning_xref")
- #initialize planAndCurrentPhase
- for i in smallestPhaseNumber:
- planAndCurrentPhase[i[0]] = [i[1],i[2]]
- #update/initialize year_effective based on plan_id for all the parcels in the first phase
- query= "insert into "+schema+".parcel_zoning_xref_fortest \
- SELECT ppx.pecas_parcel_num,9999999,pp.initial_release_year \
- FROM "+schema+".phasing_plans as pp, "+schema+".phasing_plan_xref as ppx,"+schema+".parcel_zoning_xref as pzx \
- WHERE ppx.pecas_parcel_num = pzx.pecas_parcel_num \
- and ppx.plan_id = pp.plan_id \
- and (ppx.plan_id,ppx.phase_number) in (select "+schema+".phasing_plan_xref.plan_id,min(phase_number) from "+schema+".phasing_plan_xref INNER JOIN "+schema+".phasing_plans ON "+schema+".phasing_plans.plan_id= "+schema+".phasing_plan_xref.plan_id group by ("+schema+".phasing_plan_xref.plan_id))"
- #ON CONFLICT (pecas_parcel_num,year_effective) DO NOTHING;'
- execute_postgresql_query(query, ps.sd_database, ps.sd_port, ps.sd_host, ps.sd_user, ps=ps)
- #update zoning_rule_code for baseyear(ex.2011)
- query= "UPDATE "+schema+".parcel_zoning_xref_fortest SET zoning_rules_code = ppx.zoning_rules_code \
- FROM "+schema+".phasing_plans as pp, "+schema+".phasing_plan_xref as ppx \
- WHERE ppx.pecas_parcel_num = "+schema+".parcel_zoning_xref_fortest.pecas_parcel_num and ppx.plan_id = pp.plan_id and year_effective ="+str(currentYear)+" and (ppx.plan_id,ppx.phase_number) in (select "+schema+".phasing_plan_xref.plan_id,min(phase_number) from "+schema+".phasing_plan_xref INNER JOIN "+schema+".phasing_plans ON "+schema+".phasing_plans.plan_id= "+schema+".phasing_plan_xref.plan_id group by ("+schema+".phasing_plan_xref.plan_id))"
- execute_postgresql_query(query, ps.sd_database, ps.sd_port, ps.sd_host, ps.sd_user, ps=ps)
- while currentYear<=ps.endyear:
- newDict = {}
- for plan,[phase,year] in planAndCurrentPhase.items():
- print("plan",plan)
- print("phase",phase)
- print("year",year)
- if phase == False:#All phase has been developed
- newDict[plan] = [phase,year]###may be able to delete
- continue
- elif year==currentYear:
- startANewPlan(plan,ps)
- newDict[plan] = [phase,year]
- elif year<currentYear:#the plan has already start to process
- if(calculateCurrentThreshold(currentYear,phase,plan,ps)):
- nextPhase,releaseYearForNextPhase = findNextPhase(currentYear,phase,plan,ps)
- if nextPhase==False:
- newDict[plan] = [False,year]
- else:
- newDict[plan] = [nextPhase,year]
- updateNextPhaseReleaseYear(releaseYearForNextPhase,nextPhase,plan,ps)
- else:
- newDict[plan] = [phase,year]
- else:
- newDict[plan] = [phase,year]
- continue
- planAndCurrentPhase = { k: v for k, v in newDict.items() }
- currentYear+=1
- #calculate threshold for phase for current year
- #UPDATE {sch}.parcel_zoning_xref_fortest as pzxf
- #SET pzxf.year_effective = pp.initial_release_year
- #FROM {sch}.phasing_plans as pp, {sch}.pecas_plan_xref as ppx
- #WHERE ppx.pecas_parcel_num = pzxf.pecas_parcel_num and ppx.plan_id = pp.plan_id and ppx.phase_number = (select min(ppx.phase_number) from ppx where ppx.plan_id = pp.plan_id)
- def calculateCurrentThreshold(year,phaseNumber,planId,ps=_ps):
- schema = ps.sd_schema
- undeveloped_zoning_rules_code = '9999999'
- with sd_querier(ps=ps).transaction(parcels=ps.parcels_table) as tr:
- #select all developable zoning_rules_code
- #tr.query("select distinct i111.zoning_permissions.zoning_rules_code "
- # "from i111.parcel_zoning_xref_fortest "
- # "inner join i111.zoning_permissions "
- # "on i111.parcel_zoning_xref_fortest.zoning_rules_code = i111.zoning_permissions.zoning_rules_code")
- #select summation of land_area for specific phase
- totalDevelopableArea = tr.query("select sum(ps.land_area) \
- from "+schema+".parcel_zoning_xref as pzx \
- inner join "+schema+".parcels_backup as ps \
- on ps.pecas_parcel_num = pzx.pecas_parcel_num \
- inner join "+schema+".phasing_plan_xref as ppx \
- on ps.pecas_parcel_num = ppx.pecas_parcel_num \
- where ppx.phase_number = "+str(phaseNumber)+" \
- and ppx.zoning_rules_code in (select distinct "+schema+".zoning_permissions.zoning_rules_code from "+schema+".zoning_permissions)")[0][0]
- #select total developed area for specific phase and specific year
- currentDevelopedArea = tr.query("select sum(ps.land_area) \
- from "+schema+".parcel_zoning_xref_fortest as pzx \
- inner join "+schema+".parcels as ps \
- on ps.pecas_parcel_num = pzx.pecas_parcel_num \
- inner join "+schema+".phasing_plan_xref as ppx \
- on ps.pecas_parcel_num = ppx.pecas_parcel_num \
- where ppx.phase_number = "+str(phaseNumber)+" \
- and pzx.zoning_rules_code !="+str(undeveloped_zoning_rules_code)+" \
- and pzx.year_effective<="+str(year)+" ")[0][0]
- print('Total Area:',totalDevelopableArea)
- print('Current Area:',currentDevelopedArea)
- if currentDevelopedArea==None:
- currentDevelopedArea = 0
- currentThreshold = currentDevelopedArea/totalDevelopableArea
- maxThreshold = tr.query("select release_threshold from {sch}.phasing_plans where plan_id = "+str(planId)+" " )[0][0]
- print('Current Threshold:',currentThreshold)
- if currentThreshold>maxThreshold:
- return True
- else:
- return False
- return False
- def findNextPhase(year,phaseNumber,planId,ps=_ps):
- schema = ps.sd_schema
- with sd_querier(ps=ps).transaction(parcels=ps.parcels_table) as tr:
- nextPhase = tr.query("select distinct phase_number "
- "from {sch}.phasing_plan_xref "
- "where plan_id="+str(planId)+" "
- "and phase_number>"+str(phaseNumber)+" "
- "order by phase_number "
- "limit 1")
- if len(nextPhase)==1:
- nextPhase = nextPhase[0][0]
- nextYear = year+1
- return nextPhase,nextYear
- else:
- return False,False
- def updateNextPhaseReleaseYear(releaseYear,phaseNumber,planId,ps=_ps):
- schema = ps.sd_schema
- print(releaseYear,phaseNumber,planId)
- with sd_querier(ps=ps).transaction(parcels=ps.parcels_table) as tr:
- tr.query("insert into {sch}.parcel_zoning_xref_fortest (pecas_parcel_num,zoning_rules_code,year_effective)"
- "select ppx.pecas_parcel_num,ppx.zoning_rules_code,"+str(releaseYear)+" "
- "from {sch}.parcel_zoning_xref as pzx,{sch}.phasing_plan_xref as ppx "
- "where pzx.pecas_parcel_num = ppx.pecas_parcel_num "
- "and ppx.phase_number="+str(phaseNumber)+" "
- "and ppx.plan_id="+str(planId)+"")
- return
- def startANewPlan(planId,ps=_ps):
- schema = ps.sd_schema
- with sd_querier(ps=ps).transaction(parcels=ps.parcels_table) as tr:
- query="UPDATE "+schema+".parcel_zoning_xref_fortest \
- SET zoning_rules_code=ppx.zoning_rules_code \
- from "+schema+".phasing_plan_xref as ppx,"+schema+".phasing_plans as pp \
- WHERE ppx.pecas_parcel_num = "+schema+".parcel_zoning_xref_fortest.pecas_parcel_num \
- and ppx.plan_id = pp.plan_id and ppx.plan_id="+str(planId)+";"
- execute_postgresql_query(query, ps.sd_database, ps.sd_port, ps.sd_host, ps.sd_user, ps=ps)
- query= "insert into "+schema+".parcel_zoning_xref_fortest \
- SELECT ppx.pecas_parcel_num,ppx.zoning_rules_code,pp.initial_release_year \
- FROM "+schema+".phasing_plans as pp, "+schema+".phasing_plan_xref as ppx,"+schema+".parcel_zoning_xref as pzx \
- WHERE ppx.pecas_parcel_num = pzx.pecas_parcel_num \
- and ppx.plan_id = \
- "
- execute_postgresql_query(query, ps.sd_database, ps.sd_port, ps.sd_host, ps.sd_user, ps=ps)
Add Comment
Please, Sign In to add comment