Guest User

Untitled

a guest
Sep 20th, 2018
131
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 70.90 KB | None | 0 0
  1.  
  2.  
  3. # "Inclusive range" - both start and stop are inclusive, which is more intuitive
  4. # for some applications.
  5. def irange(start, stop):
  6. return list(range(start, stop + 1))
  7.  
  8.  
  9. import bisect
  10. import csv
  11. import logging
  12. import os
  13. import platform
  14. import shutil
  15. import subprocess
  16. import sys
  17. from base64 import encodestring
  18. from json import loads as loadjson
  19. from os.path import join as pathjoin
  20. from typing import Mapping, Any, List
  21.  
  22. import psycopg2 # library for interacting directly with PostgreSQL for mapit
  23.  
  24. from sqlutil import Querier
  25.  
  26. if sys.version_info[0] < 3:
  27. from urllib import urlencode
  28. # noinspection PyUnresolvedReferences
  29. from urllib2 import urlopen, HTTPError, Request as HttpRequest
  30. else:
  31. from urllib.parse import urlencode
  32. from urllib.request import urlopen, Request as HttpRequest
  33. from urllib.error import HTTPError
  34.  
  35.  
  36. class DefaultPecasSettings:
  37. """
  38. This object enforces passing a pecas_settings object around, so that functions don't accidentally revert to the
  39. default and ignore the user-specified settings file. Even though the default pecas_settings is not supported, the
  40. use of ps=_ps at the end of every applicable function is too ingrained to change for now.
  41. """
  42.  
  43. def __getattr__(self, attr):
  44. raise TypeError(
  45. "Default value for PECAS settings is not supported; "
  46. "explicitly pass in a settings object")
  47.  
  48.  
  49. _ps = DefaultPecasSettings()
  50.  
  51.  
  52. def set_up_logging():
  53. # Configure basic logger that writes to file
  54. logging.basicConfig(filename='run.log',
  55. level=logging.INFO,
  56. format='%(asctime)s %(name)-12s %(levelname)-8s %(message)s',
  57. datefmt='%y-%m-%d %H:%M:%S',
  58. filemode='a'
  59. )
  60.  
  61. # Configure a second logger for screen usage
  62. console = logging.StreamHandler() # Create console logger
  63. console.setLevel(logging.INFO) # Set the info level
  64. formatter = logging.Formatter('%(name)-12s: %(levelname)-8s %(message)s') # Create a format
  65. console.setFormatter(formatter) # Set the format
  66. logging.getLogger('').addHandler(console) # Add this to the root logger
  67.  
  68.  
  69. def run_shell_script(fname, ps=_ps):
  70. """
  71. Runs a bash shell script in a platform-independent way.
  72. """
  73. if sys.platform == "win32":
  74. subprocess.check_call([ps.winShCommand, "-l", fname])
  75. else:
  76. subprocess.check_call(fname, shell=True)
  77.  
  78.  
  79. def execute_postgresql_query(query, database, port, host, user, ps=_ps):
  80. return subprocess.call(
  81. [ps.pgpath + "psql", "-c", query, "--dbname=" + database, "--port=" + str(port), "--host=" + host,
  82. "--username=" + user])
  83.  
  84.  
  85. def execute_sqlserver_query(query, database, user, password, ps=_ps):
  86. return subprocess.call(
  87. [ps.sqlpath + "sqlcmd", "-S", ps.sd_host, "-U", user, "-P", password, "-d", database, "-Q", query])
  88.  
  89.  
  90. def check_is_postgres(ps=_ps):
  91. # Error if not using postgres so that we can use psycopg. If we need to
  92. # use a different database system, functions that call this need to have
  93. # an alternate version supplied.
  94. if ps.sql_system != ps.postgres:
  95. raise ValueError("This function only works on postgres databases")
  96.  
  97.  
  98. def connect_to_mapit(ps=_ps):
  99. return psycopg2.connect(
  100. database=ps.mapit_database,
  101. host=ps.mapit_host,
  102. port=ps.mapit_port,
  103. user=ps.mapit_user,
  104. password=ps.mapit_password)
  105.  
  106.  
  107. def connect_to_sd(ps=_ps):
  108. return psycopg2.connect(
  109. database=ps.sd_database,
  110. host=ps.sd_host,
  111. port=ps.sd_port,
  112. user=ps.sd_user,
  113. password=ps.sd_password)
  114.  
  115.  
  116. def mapit_querier(ps=_ps):
  117. return Querier(lambda: connect_to_mapit(ps=ps), sch=ps.mapit_schema)
  118.  
  119.  
  120. def sd_querier(ps=_ps):
  121. return Querier(lambda: connect_to_sd(ps=ps), sch=ps.sd_schema)
  122.  
  123.  
  124. def build_class_path_in_path(path, *args):
  125. fullargs = [path] + [pathjoin(path, arg) for arg in args]
  126. return build_class_path(*fullargs)
  127.  
  128.  
  129. # Append paths to the classpath using the right separator for Windows or Unix
  130. def build_class_path(*arg):
  131. string = ""
  132. if platform.system() == "Windows" or platform.system() == "Microsoft":
  133. separator = ";"
  134. else:
  135. separator = ":"
  136. for a in arg:
  137. string = str(string) + str(a) + separator
  138. return string
  139.  
  140.  
  141. def get_native_path():
  142. string = ""
  143. if platform.system() == "Windows" or platform.system() == "Microsoft":
  144. separator = ";"
  145. string = separator.join(["AllYears/Code/hdf5/win64", ])
  146. elif platform.system() == "Darwin":
  147. separator = ":"
  148. string = separator.join(["AllYears/Code/hdf5/macos", ])
  149. elif platform.system() == "Linux":
  150. separator = ":"
  151. string = separator.join(["AllYears/Code/hdf5/linux64", ])
  152. return string
  153.  
  154.  
  155. def vm_properties(properties: Mapping[str, Any]) -> List[str]:
  156. """
  157. Returns a list of command line arguments to set the specified system properties on a Java VM subprocess call
  158. """
  159. return ["-D{}={}".format(name, value) for name, value in properties.items()]
  160.  
  161.  
  162. # Move a file to a new location, deleting any old file there.
  163. def move_replace(arg1, arg2):
  164. try:
  165. os.remove(arg2)
  166. except OSError as detail:
  167. if detail.errno != 2:
  168. raise
  169.  
  170. os.rename(arg1, arg2)
  171.  
  172.  
  173. # Grab the skim year to use given a list of years with new skims
  174. def get_skim_year(year, skimyears):
  175. i = bisect.bisect_left(skimyears, year) - 1
  176. if i < 0:
  177. i = 0
  178. logging.info("Using skims from year " + str(skimyears[i]) + " for year " + str(year))
  179. return skimyears[i]
  180.  
  181.  
  182. # Function to calculate the base year ratio between the prices that AA is producing and the prices that SD has been
  183. # calibrated to
  184. def calculate_aa_to_sd_price_correction(ps=_ps):
  185. fin = open(ps.scendir + "/" + str(ps.baseyear) + "/ExchangeResults.csv", "rU")
  186. exchange_results = csv.reader(fin)
  187. er_header = next(exchange_results)
  188. fin2 = open(ps.scendir + "/" + str(ps.baseyear) + "/ExchangeResultsTargets.csv", "rU")
  189. er_targets = csv.reader(fin2)
  190. targets_header = next(er_targets)
  191. fout = open(ps.scendir + "/AllYears/Outputs/AAtoSDPriceCorrections.csv", "w", newline="")
  192. out_writer = csv.writer(fout)
  193. out_writer.writerow(("Commodity", "LUZ", "PriceCorrection"))
  194. target_prices = {}
  195. for row in er_targets:
  196. commodity = row[targets_header.index("Commodity")]
  197. zone = row[targets_header.index("ZoneNumber")]
  198. key = (commodity, zone)
  199. price = row[targets_header.index("TargetPrice")]
  200. target_prices[key] = price
  201. for row in exchange_results:
  202. commodity = row[er_header.index("Commodity")]
  203. zone = row[er_header.index("ZoneNumber")]
  204. key = (commodity, zone)
  205. if key in target_prices:
  206. aa_price = row[er_header.index("Price")]
  207. target_price = target_prices[key]
  208. price_correction = float(target_price) / float(aa_price)
  209. out_writer.writerow((commodity, zone, price_correction))
  210. fin.close()
  211. fin2.close()
  212. fout.close()
  213.  
  214.  
  215. # Function to multiply the AA prices by the previously calculated correction ratio.
  216. def apply_aa_to_sd_price_correction(year, ps=_ps):
  217. fin = open(ps.scendir + "/" + str(year) + "/ExchangeResults.csv", "rU")
  218. exchange_results = csv.reader(fin)
  219. fin2 = open(ps.scendir + "/AllYears/Outputs/AAtoSDPriceCorrections.csv", "rU")
  220. corrections = csv.reader(fin2)
  221. fout = open(ps.scendir + "/" + str(year) + "/SDPrices.csv", "w", newline="")
  222. out_writer = csv.writer(fout)
  223. er_header = next(exchange_results)
  224. corr_header = next(corrections)
  225. out_writer.writerow(er_header)
  226. price_corrections = {}
  227. for row in corrections:
  228. commodity = row[corr_header.index("Commodity")]
  229. zone = row[corr_header.index("LUZ")]
  230. key = (commodity, zone)
  231. price_correction = row[corr_header.index("PriceCorrection")]
  232. price_corrections[key] = price_correction
  233. for row in exchange_results:
  234. commodity = row[er_header.index("Commodity")]
  235. zone = row[er_header.index("ZoneNumber")]
  236. key = (commodity, zone)
  237. if key in price_corrections:
  238. price = float(row[er_header.index("Price")])
  239. newprice = price * float(price_corrections[key])
  240. max_price = ps.maximum_sd_prices.get(commodity, float("inf"))
  241. if newprice > max_price:
  242. newprice = max_price
  243. row[er_header.index("Price")] = newprice
  244. out_writer.writerow(row)
  245. fin.close()
  246. fin2.close()
  247. fout.close()
  248.  
  249. logging.info("Replacing ExchangeResults with SD Corrected Version")
  250.  
  251. # for now copy corrected prices to ExchangeResults.csv, but want to change SD code to read SDPrices.csv
  252. move_replace(ps.scendir + "/" + str(year) + "/ExchangeResults.csv",
  253. ps.scendir + "/" + str(year) + "/AAExchangeResults.csv")
  254. shutil.copyfile(ps.scendir + "/" + str(year) + "/SDPrices.csv",
  255. ps.scendir + "/" + str(year) + "/ExchangeResults.csv")
  256.  
  257. def getMapitHTTPSAuthHeader(ps=_ps):
  258. if not ps.mapit_httpsuser:
  259. return None
  260.  
  261. authstr = '{}:{}'.format(ps.mapit_httpsuser, ps.mapit_httpspass)
  262. return 'Basic {}'.format(encodestring(authstr.encode('ascii'))[:-1].decode("utf-8"))
  263.  
  264. def makeMapitAPICall(apiname, ps=_ps, retries=None, **api_params):
  265. HTTP_UNAUTHORIZED = 401
  266. HTTP_INTERNALERROR = 500
  267.  
  268. timeout_retries = retries or 3
  269.  
  270. headers = {
  271. 'User-Agent' : 'PECAS-Routines-Script',
  272. 'Accept' : 'application/json'
  273. }
  274.  
  275. url = '/'.join([ps.mapit_httpspath, apiname, ''])
  276.  
  277. api_params['accept'] = 'application/json'
  278. params = urlencode(api_params, True).encode('utf-8')
  279. req = HttpRequest(url, params, headers)
  280.  
  281. logging.info("Calling MapIt API {!r}".format(apiname))
  282.  
  283. ok = False
  284. auth_retry = False
  285. while not ok:
  286. try:
  287. resp = urlopen(req).read()
  288. ok = True
  289. except HTTPError as e:
  290. if (getattr(e, 'code', None) == HTTP_UNAUTHORIZED) and not auth_retry:
  291. authline = e.headers['www-authenticate']
  292. if ':' in authline:
  293. authline = authline.split(':')[1]
  294. scheme = authline.split()[0]
  295. if scheme.lower() != 'basic':
  296. logging.error("API Lookup: Trying to authenticate HTTPS but expecting {!r}.".format(authline))
  297. raise
  298.  
  299. req.add_header("Authorization", getMapitHTTPSAuthHeader(ps=ps))
  300. auth_retry = True
  301. elif (getattr(e, 'code', None) == HTTP_INTERNALERROR) and timeout_retries:
  302. # just do it again
  303. timeout_retries -= 1
  304. else:
  305. try:
  306. request_data = req.get_data()
  307. except AttributeError:
  308. request_data = req.data
  309.  
  310. request_fields = request_data.split('&')
  311. request_data_print = '&'.join([d for d in request_fields if not d.startswith('file_contents')])
  312.  
  313. logging.error("\n".join([
  314. "API Lookup HTTPError {} ({}):".format(getattr(e, 'code', 0), getattr(e, 'msg', '')),
  315. " URL:\n {}".format(req.get_full_url()),
  316. " Data:\n {}".format(request_data_print),
  317. " Headers:",
  318. "\n".join([" {}:{}".format(k, v) for (k, v) in req.header_items()])
  319. ]))
  320. raise
  321.  
  322. return loadjson(resp)
  323.  
  324. def showAPIError(context_action, errmsg):
  325. logging.error("Unable to {} MapIt server: API returned {!r}".format(context_action, errmsg or 'fail'))
  326.  
  327. def load_outputs_for_year(year, sequence, excl=None, ps=_ps):
  328. operation = "loading data sequence {} for year:{}".format(sequence, year)
  329. logging.info(operation.capitalize())
  330.  
  331. excluded_files = [fn if fn.endswith('.csv') else (fn + '.csv') for fn in (excl or [])]
  332.  
  333. folder_path = '/'.join([ps.scendir, str(year)])
  334. if not getattr(ps, 'mapit_httpspath', None):
  335. logging.error(
  336. "Field 'mapit_httpspath' required in pecas_settings but was not found."
  337. "Cannot load outputs to MapIt server."
  338. )
  339. return
  340.  
  341. # The upload starts to intermittently fail (with a difficult-to-diagnose
  342. # HTTP 500 error) if the HTTP POST data string is larger than 128MB (this
  343. # size is actually dependent on various server parameters, but in practice,
  344. # this is where we're at right now). That generally translates into about
  345. # 1.6 million rows of CSV data - but I'll make the cutoff 100,000 just to
  346. # be on the safe side. Anything larger than that, and I want to chunk the
  347. # file into blocks smaller than that cutoff. The API at the MapIt server
  348. # end is written such that it can just be re-called repeatedly with any
  349. # additional data.
  350. # If you start seeing HTTP 500 errors in the uploadFileContents API that
  351. # you can't pin down, and haven't (obviously) changed anything else, and
  352. # other API calls continue to work, this may be the culprit. Try making
  353. # chunksz smaller, and see if that helps. :-)
  354. chunksz = 100000
  355.  
  356. resp = makeMapitAPICall('getOutputFilesByLoadSeq', ps=ps, load_seq=sequence)
  357. if 'fail' in resp:
  358. showAPIError(context_action="read file list from", errmsg=resp['fail'])
  359. return
  360.  
  361. for row in resp['success']:
  362. # list of rows where each row is a dict with fields 'file_id' and 'csv_file_name'
  363.  
  364. csv_file_name = row['csv_file_name']
  365.  
  366. if csv_file_name in excluded_files:
  367. print("Skipping file {}".format(csv_file_name))
  368. continue
  369.  
  370. logging.info("Loading file {}".format(csv_file_name))
  371. filename = '/'.join([folder_path, csv_file_name])
  372. try:
  373. with open(filename, 'r') as csv_file:
  374. file_contents = csv_file.readlines()
  375. except IOError as e:
  376. logging.error("NOTICE: No such file or directory:" + filename)
  377. else:
  378. counter = 0
  379. all_table_name = None
  380.  
  381. totalsz = len(file_contents)
  382. for i in range(1, totalsz, chunksz):
  383. resp = makeMapitAPICall(
  384. apiname = 'uploadFileContents',
  385. ps = ps,
  386. file_id = row['file_id'],
  387. scenario = ps.scenario,
  388. year_run = year,
  389. file_contents = ''.join(file_contents[i:i+chunksz]),
  390. skip_update = (i+chunksz < totalsz) # Only run update_loaded_scenarios() on the last chunk
  391. )
  392. if 'fail' in resp:
  393. showAPIError(context_action="upload file contents to", errmsg=resp['fail'])
  394. return
  395.  
  396. counter += resp['success']['records']
  397. all_table_name = resp['success']['tblname']
  398.  
  399. logging.info("{} record(s) added to {}".format(counter, all_table_name))
  400.  
  401. logging.info("Finished " + operation)
  402.  
  403. def loadOutputsForYear_(year, sequence, excl=None, ps=_ps):
  404. """In-line for deprecation; has been replaced by (what is now designated) load_outputs_for_year()"""
  405.  
  406. logging.info("Loading data sequence "+str(sequence)+" for year:" + str(year))
  407.  
  408. excluded_files = [fn if fn.endswith('.csv') else (fn + '.csv') for fn in (excl or [])]
  409.  
  410. folder_path = os.path.join(ps.scendir, str(year))
  411. folder_path = folder_path.replace('\\', '/')
  412.  
  413. conn = connect_to_mapit(ps=ps)
  414. with conn, conn.cursor() as cur:
  415. cur.execute('SELECT * FROM %s.aa_output_files where load_sequence = %d;' % (
  416. ps.mapit_schema, sequence)) # get the list of aa output files we are interested in
  417. filesrows = cur.fetchall()
  418. colnames = [x[0] for x in cur.description]
  419.  
  420. for row in filesrows: # for each output file
  421. csv_file_name=row[colnames.index("csv_file_name")]
  422. if csv_file_name in excluded_files:
  423. print("Skipping file {}".format(csv_file_name))
  424. continue
  425.  
  426. print("Loading file " + csv_file_name)
  427. all_table_name = row[colnames.index("all_table_name")]
  428. temp_table_name = row[colnames.index("temp_table_name")]
  429.  
  430. if "partitioned" in colnames:
  431. partition = row[colnames.index("partitioned")]
  432. else:
  433. partition = False
  434. # print(colnames)
  435. # print(str(all_table_name) +" partitioned status is "+str(partition))
  436. cur.execute('TRUNCATE %s.%s' % (ps.mapit_schema, temp_table_name)) # empty the temporary table
  437. csv_file = os.path.join(folder_path, csv_file_name)
  438. try:
  439. f = open(csv_file, 'r') # open the AA output file
  440. f.readline() # skip the first line for header
  441. cur.copy_from(f, "%s.%s" % (ps.mapit_schema, temp_table_name), sep=',',
  442. null='') # use the psycopg2 fast copy command to copy the data into the temporary table
  443.  
  444. cur.execute('SELECT count(*) FROM %s.%s;' % (ps.mapit_schema, temp_table_name))
  445. counter = cur.fetchone()[0]
  446.  
  447. # now insert the records from the temporary table into the full table which contains data for each
  448. # year/scenario. This needs to happen before the scenario runs sqlcmd ="SELECT * from
  449. # %s.%s__drop_partition('%s')" %(ps.mapit_schema, all_table_name, ps.scenario);
  450. if partition:
  451. print("Trying to partition " + all_table_name)
  452. sqlcmd = "SELECT * from %s.%s__create_partition('%s')" % (
  453. ps.mapit_schema, all_table_name, ps.scenario)
  454. cur.execute(sqlcmd)
  455. sqlcmd = (
  456. "SELECT column_name from INFORMATION_SCHEMA.COLUMNS"
  457. "where TABLE_NAME='%s' AND TABLE_SCHEMA = '%s';" % (
  458. temp_table_name, ps.mapit_schema)
  459. )
  460. cur.execute(sqlcmd)
  461. temptablecolnames = cur.fetchall()
  462. cols = str("")
  463. first = True
  464. for colrow in temptablecolnames:
  465. if not first:
  466. cols = cols + ","
  467. first = False
  468. cols = cols + '"' + str(colrow[0]) + '"'
  469. sqlcmd = "INSERT INTO %s.%s (scenario, year_run, %s) SELECT '%s', %d, %s FROM %s.%s;" % (
  470. ps.mapit_schema, all_table_name, cols, ps.scenario, year, cols, ps.mapit_schema, temp_table_name)
  471. # print(sqlcmd)
  472. cur.execute(sqlcmd)
  473.  
  474. logging.info("%d record(s) added to %s" % (counter, all_table_name))
  475. except IOError: # if file does not exist. ex. ActivityLocations2_6k.csv not created every year.
  476. logging.error("NOTICE: No such file or directory:" + csv_file)
  477.  
  478. cur.execute('SET search_path = %s ' % ps.mapit_schema)
  479. cur.callproc('output.update_loaded_scenarios', [ps.scenario, year, year])
  480. logging.info("Finished loading data sequence " + str(sequence) + " for year:" + str(year))
  481.  
  482.  
  483. def clear_upload(year, ps=_ps):
  484. conn = connect_to_mapit(ps=ps)
  485. with conn, conn.cursor() as cur:
  486. query = (
  487. "set search_path={}; select "
  488. "clean_up_tables_for_scenario_and_year("
  489. "%(scen)s, %(year)s)").format(ps.mapit_schema)
  490. cur.execute(query, {"scen": ps.scenario, "year": year})
  491.  
  492.  
  493. # Function to adjust the Floorspace by the (previously calculated) correction delta.
  494. def write_floorspace_i(year, ps=_ps):
  495. # can parameterize column names from FloorspaceI in pecas_settings, but this is rarely used
  496. if hasattr(ps, 'fl_itaz'):
  497. fl_itaz = ps.flItaz
  498. fl_icommodity = ps.flIcommodity
  499. fl_iquantity = ps.flIquantity
  500. else:
  501. fl_itaz = "TAZ"
  502. fl_icommodity = "Commodity"
  503. fl_iquantity = "Quantity"
  504.  
  505. # Read Floorspace O
  506. floor_o_in = open(ps.scendir + str(year) + "/FloorspaceO.csv", "r")
  507. floor_o_in_file = csv.reader(floor_o_in)
  508. header = next(floor_o_in_file)
  509. floor_o_dict = {}
  510. for row in floor_o_in_file:
  511. key = (row[header.index(fl_itaz)], row[header.index(fl_icommodity)])
  512. if key in floor_o_dict:
  513. logging.warning("Line duplicated in FloorspaceO file: %s", key)
  514. floor_o_dict[key] = floor_o_dict[key] + float(row[header.index(fl_iquantity)])
  515. else:
  516. floor_o_dict[key] = float(row[header.index(fl_iquantity)])
  517. floor_o_in.close()
  518.  
  519. has_c = False
  520. try:
  521. floor_c_in = open(ps.scendir + str(year) + "/FloorspaceCalc.csv", "r")
  522. has_c = True
  523. except IOError:
  524. logging.info("NOTICE: FloorspaceCalc not found, using FloorspaceDelta file.")
  525.  
  526. if has_c:
  527. # Read floorspace Calc
  528. # noinspection PyUnboundLocalVariable
  529. floor_c_in_file = csv.reader(floor_c_in)
  530. header = next(floor_c_in_file)
  531. floor_c_dict = {}
  532. for row in floor_c_in_file:
  533. key = (row[header.index(fl_itaz)], row[header.index(fl_icommodity)])
  534. if key in floor_c_dict:
  535. logging.warning("Line duplicated in FloorspaceCalc file: %s", key)
  536. floor_c_dict[key] = floor_c_dict[key] + float(row[header.index(fl_iquantity)])
  537. else:
  538. floor_c_dict[key] = float(row[header.index(fl_iquantity)])
  539. floor_c_in.close()
  540.  
  541. # Write floorspace Delta
  542. floor_d_out = open(ps.scendir + str(year) + "/FloorspaceDelta.csv", "w", newline="")
  543. floor_d_out_file = csv.writer(floor_d_out)
  544. header = [fl_itaz, fl_icommodity, fl_iquantity]
  545. floor_d_out_file.writerow(header)
  546. key_list = list(floor_c_dict.keys())
  547. key_list.sort()
  548. for key in key_list:
  549. if key in floor_o_dict:
  550. delta = floor_c_dict[key] - floor_o_dict[key]
  551. else:
  552. delta = floor_c_dict[key]
  553. out_row = list(key)
  554. out_row.append(delta)
  555. floor_d_out_file.writerow(out_row)
  556.  
  557. # Add in ODict values not in CDict; set delta to -ve of ODict value
  558. key_list = list(floor_o_dict.keys())
  559. for key in key_list:
  560. if key in floor_c_dict:
  561. pass
  562. else:
  563. delta = -1 * floor_o_dict[key]
  564. out_row = list(key)
  565. out_row.append(delta)
  566. floor_d_out_file.writerow(out_row)
  567. floor_d_out.close()
  568. else:
  569. # Copy from previous year
  570. shutil.copyfile(ps.scendir + "/" + str(year - 1) + "/FloorspaceDelta.csv",
  571. ps.scendir + "/" + str(year) + "/FloorspaceDelta.csv")
  572.  
  573. # Read floorspace Delta
  574. floor_d_in = open(ps.scendir + str(year) + "/FloorspaceDelta.csv", "r")
  575. floor_d_in_file = csv.reader(floor_d_in)
  576. header = next(floor_d_in_file)
  577. floor_d_dict = {}
  578. for row in floor_d_in_file:
  579. key = (row[header.index(fl_itaz)], row[header.index(fl_icommodity)])
  580. if key in floor_d_dict:
  581. logging.warning("Line duplicated in FloorspaceDelta file: %s", key)
  582. floor_d_dict[key] = floor_d_dict[key] + float(row[header.index(fl_iquantity)])
  583. else:
  584. floor_d_dict[key] = float(row[header.index(fl_iquantity)])
  585. floor_d_in.close()
  586.  
  587. # Write floorspace I
  588. if has_c:
  589. # copy FloorspaceCalc as FloorspaceI
  590. shutil.copyfile(ps.scendir + "/" + str(year) + "/FloorspaceCalc.csv",
  591. ps.scendir + "/" + str(year) + "/FloorspaceI.csv")
  592. else:
  593. floor_i_out = open(ps.scendir + str(year) + "/FloorspaceI.csv", "w", newline="")
  594. floor_i_out_file = csv.writer(floor_i_out)
  595. header = [fl_itaz, fl_icommodity, fl_iquantity]
  596. floor_i_out_file.writerow(header)
  597. key_list = list(floor_d_dict.keys())
  598. key_list.sort()
  599. for key in key_list:
  600. if key in floor_o_dict:
  601. net = floor_d_dict[key] + floor_o_dict[key]
  602. if net < 0:
  603. logging.debug("Negative value for floorspace in %s", key)
  604. net = 0
  605. else:
  606. # print "WARNING: Key", key, "in floorspaceO, but not in floorspaceDelta."
  607. net = floor_d_dict[key]
  608. if net < 0:
  609. logging.debug("Negative value for floorspace in %s", key)
  610. net = 0
  611. out_row = list(key)
  612. out_row.append(net)
  613. floor_i_out_file.writerow(out_row)
  614.  
  615. # Add in ODict values not in DDict; set net to ODict value
  616. key_list = list(floor_o_dict.keys())
  617. for key in key_list:
  618. if key in floor_d_dict:
  619. pass
  620. else:
  621. net = floor_o_dict[key]
  622. if net < 0:
  623. logging.debug("Negative value for floorspace in %s", key)
  624. net = 0
  625. out_row = list(key)
  626. out_row.append(net)
  627. floor_i_out_file.writerow(out_row)
  628. floor_i_out.close()
  629.  
  630.  
  631. # Function to calculate and update total imports and exports in ActivityTotalsI in future years (base year has to be
  632. # set before running.
  633.  
  634. def update_importers_or_exporters(year, activities_to_exclude, activities_to_update, update_using_mor_u, ps=_ps):
  635. sqlstr = "update input.activity_totals \n\
  636. set total_amount=(case when '" + update_using_mor_u + "'='U' then -im.amount else im.amount end) from \n\
  637. -- select mu.activity, im.amount from \n\
  638. ( \n\
  639. -- inner query is amount made (or used in the case of updating imports) internally last year \n\
  640. select commodity,moru, sum(amount) as amount from output.all_makeuse \n\
  641. where scenario='" + ps.scenario + "' and year_run=" + str(
  642. year - 1) + " and moru='" + update_using_mor_u + "' and activity not like '" + activities_to_exclude + "' \n\
  643. group by commodity, moru \n\
  644. ) im, output.all_makeuse mu \n\
  645. where mu.scenario='" + ps.scenario + "' and \n\
  646. mu.year_run=" + str(year - 1) + " and \n\
  647. -- next two lines find the exporting activity that used (or importing commodity that made) \n\
  648. -- the commodity we measured in the inner query \n\
  649. mu.activity like '" + activities_to_update + "' and \n\
  650. mu.commodity=im.commodity \n\
  651. -- next three lines specify the values we are updating, i.e. the appropriate activity/scneario/year combination \n\
  652. and mu.activity=input.activity_totals.activity \n\
  653. and input.activity_totals.year_run=" + str(year) + " \n\
  654. and input.activity_totals.scenario='" + ps.scenario + "';"
  655. logging.debug("Updating " + activities_to_update + " with \n" + sqlstr)
  656. if ps.sql_system == ps.postgres:
  657. retcode = execute_postgresql_query(sqlstr, ps.mapit_database, ps.mapit_port, ps.mapit_host, ps.pguser, ps=ps)
  658. elif ps.sql_system == ps.sqlserver:
  659. retcode = execute_sqlserver_query(sqlstr, ps.mapit_database, ps.sd_user, ps.sd_password, ps=ps)
  660. else:
  661. logging.error("Invalid database system: " + ps.sql_system)
  662. raise ValueError
  663. log_results_from_external_program("Updated amounts for " + activities_to_update + " in " + str(year),
  664. "Failed in updating " + activities_to_update, (retcode,))
  665.  
  666.  
  667. def update_importers(year, ps=_ps):
  668. update_importers_or_exporters(year, ps.exporter_string, ps.importer_string, 'U', ps=ps)
  669.  
  670.  
  671. def update_exporters(year, ps=_ps):
  672. update_importers_or_exporters(year, ps.importer_string, ps.exporter_string, 'M', ps=ps)
  673.  
  674.  
  675. def load_distances(skim_file_name, skimyear, ps=_ps):
  676. query = "truncate table " + ps.sd_schema + ".distances"
  677. if ps.sql_system == ps.postgres:
  678. retcode = execute_postgresql_query(query, ps.sd_database, ps.sd_port, ps.sd_host, ps.sd_user, ps=ps)
  679. elif ps.sql_system == ps.sqlserver:
  680. retcode = execute_sqlserver_query(query, ps.sd_database, ps.sd_user, ps.sd_password, ps=ps)
  681. else:
  682. logging.error("Invalid database system: " + ps.sql_system)
  683. raise ValueError
  684. log_results_from_external_program("Deleted old distance data from database",
  685. "Problem deleting old distance data from database", (retcode,))
  686. skim_file = open(ps.scendir + str(skimyear) + "/" + skim_file_name + ".csv", "rU")
  687. skim_file_reader = csv.reader(skim_file)
  688. header = next(skim_file_reader)
  689. query = "insert into " + ps.sd_schema + ".distances (origin_luz, destination_luz, distance) values "
  690. first = True
  691. counter = 0
  692. for row in skim_file_reader:
  693. if not first:
  694. query += ","
  695. first = False
  696. origin = row[header.index("Origin")]
  697. destination = row[header.index("Destination")]
  698. distance = float(row[header.index(ps.distance_column)])
  699. if distance == 0:
  700. distance = 1E99
  701. query += "(" + str(origin) + "," + str(destination) + "," + str(distance) + ")"
  702. counter = counter + 1
  703. if counter >= 500:
  704. query += ";"
  705. # print query
  706. if ps.sql_system == ps.postgres:
  707. execute_postgresql_query(query, ps.sd_database, ps.sd_port, ps.sd_host, ps.sd_user, ps=ps)
  708. elif ps.sql_system == ps.sqlserver:
  709. execute_sqlserver_query(query, ps.sd_database, ps.sd_user, ps.sd_password, ps=ps)
  710. else:
  711. logging.error("Invalid database system: " + ps.sql_system)
  712. raise ValueError
  713. query = "insert into " + ps.sd_schema + ".distances (origin_luz, destination_luz, distance) values "
  714. first = True
  715. counter = 0
  716. if not first:
  717. query += ";"
  718. # print query
  719. if ps.sql_system == ps.postgres:
  720. execute_postgresql_query(query, ps.sd_database, ps.sd_port, ps.sd_host, ps.sd_user, ps=ps)
  721. elif ps.sql_system == ps.sqlserver:
  722. execute_sqlserver_query(query, ps.sd_database, ps.sd_user, ps.sd_password, ps=ps)
  723. else:
  724. logging.error("Invalid database system: " + ps.sql_system)
  725. raise ValueError
  726.  
  727.  
  728. def load_exchange_results(year, ps=_ps):
  729. query = "truncate table " + ps.sd_schema + ".exchange_results"
  730. if ps.sql_system == ps.postgres:
  731. retcode = execute_postgresql_query(query, ps.sd_database, ps.sd_port, ps.sd_host, ps.sd_user, ps=ps)
  732. elif ps.sql_system == ps.sqlserver:
  733. retcode = execute_sqlserver_query(query, ps.sd_database, ps.sd_user, ps.sd_password, ps=ps)
  734. else:
  735. logging.error("Invalid database system: " + ps.sql_system)
  736. raise ValueError
  737. log_results_from_external_program(None, "Problem deleting old exchange results from database", (retcode,))
  738. exresults = open(ps.scendir + str(year) + "/ExchangeResults.csv", "r")
  739. exresults_reader = csv.reader(exresults)
  740. header = next(exresults_reader)
  741. # TODO add year column in all_exchange_results, make exchange_results a view for the current year
  742. query = "insert into " + ps.sd_schema + ".exchange_results (commodity, luz, price, internal_bought) values "
  743. first = True
  744. counter = 0
  745. for row in exresults_reader:
  746. if not first:
  747. query += ","
  748. first = False
  749. commodity = row[header.index("Commodity")]
  750. luz = row[header.index("ZoneNumber")]
  751. price = row[header.index("Price")]
  752. internal_bought = row[header.index("InternalBought")]
  753. query += "('" + commodity + "'," + luz + "," + price + "," + internal_bought + ")"
  754. counter = counter + 1
  755. if counter >= 250:
  756. query += ";"
  757. # print query
  758. if ps.sql_system == ps.postgres:
  759. execute_postgresql_query(query, ps.sd_database, ps.sd_port, ps.sd_host, ps.sd_user, ps=ps)
  760. elif ps.sql_system == ps.sqlserver:
  761. execute_sqlserver_query(query, ps.sd_database, ps.sd_user, ps.sd_password, ps=ps)
  762. else:
  763. logging.error("Invalid database system: " + ps.sql_system)
  764. raise ValueError
  765. query = ("insert into " + ps.sd_schema +
  766. ".exchange_results (commodity, luz, price, internal_bought) values ")
  767. first = True
  768. counter = 0
  769. if not first:
  770. query += ";"
  771. # print query
  772. if ps.sql_system == ps.postgres:
  773. execute_postgresql_query(query, ps.sd_database, ps.sd_port, ps.sd_host, ps.sd_user, ps=ps)
  774. elif ps.sql_system == ps.sqlserver:
  775. execute_sqlserver_query(query, ps.sd_database, ps.sd_user, ps.sd_password, ps=ps)
  776. else:
  777. logging.error("Invalid database system: " + ps.sql_system)
  778. raise ValueError
  779.  
  780.  
  781. def smooth_prices(ps=_ps):
  782. logging.info("Applying price smoothing")
  783. query = "update " + ps.sd_schema + ".exchange_results\n\
  784. set price=new_price from\n\
  785. ( select commodity, origin_luz, sum(internal_bought) as total_bought, \
  786. sum(internal_bought * price * power(distance, " + str(ps.gravity_exponent) + \
  787. ")) / sum(internal_bought * power (distance, " + str(ps.gravity_exponent) + ")) as new_price\n\
  788. from " + ps.sd_schema + ".exchange_results\n\
  789. inner join " + ps.sd_schema + ".space_to_commodity\n\
  790. on exchange_results.commodity=space_to_commodity.aa_commodity\n\
  791. inner join " + ps.sd_schema + ".distances\n\
  792. on exchange_results.luz=distances.destination_luz\n\
  793. group by commodity, origin_luz\n\
  794. ) new_prices\n\
  795. where exchange_results.commodity=new_prices.commodity\n\
  796. and exchange_results.luz=new_prices.origin_luz\n\
  797. and total_bought > 0"
  798. if ps.sql_system == ps.postgres:
  799. execute_postgresql_query(query, ps.sd_database, ps.sd_port, ps.sd_host, ps.sd_user, ps=ps)
  800. elif ps.sql_system == ps.sqlserver:
  801. execute_sqlserver_query(query, ps.sd_database, ps.sd_user, ps.sd_password, ps=ps)
  802. else:
  803. logging.error("Invalid database system: " + ps.sql_system)
  804. raise ValueError
  805.  
  806.  
  807. def write_smoothed_prices(year, pgcmd, ps=_ps):
  808. # TODO use psycopg2 for this
  809. if ps.sql_system == ps.sqlserver:
  810. logging.error("Writing smoothed prices from database not supported yet on SQL Server")
  811. raise ValueError
  812. sqlstr = "\\copy (SELECT commodity as \"Commodity\", luz as \"ZoneNumber\"\n\
  813. , price as \"Price\", internal_bought as \"InternalBought\"\n\
  814. FROM " + ps.sd_schema + ".exchange_results) to '" + ps.scendir + str(
  815. year) + "/ExchangeResultsSmoothed.csv' csv header"
  816. retcode = subprocess.check_call(
  817. [pgcmd, "-c", sqlstr, "--host=" + ps.sd_host, "--port=" + str(ps.sd_port), "--dbname=" + ps.sd_database,
  818. "--username=" + ps.sd_user])
  819. log_results_from_external_program(None, "Problem updating ActivityTotalsI in year " + str(year), (retcode,))
  820.  
  821.  
  822. def apply_site_spec(year, ps=_ps):
  823. logging.info("Applying site spec")
  824. query = ("update {sch}.parcels p\n"
  825. "set year_built = s.year_effective, \n"
  826. "space_type_id = s.space_type_id,\n"
  827. "space_quantity = s.space_quantity,\n"
  828. "land_area = s.land_area,\n"
  829. "is_derelict = false,\n"
  830. "is_brownfield = false\n"
  831. "from {sch}.sitespec_parcels s\n"
  832. "where p.pecas_parcel_num = s.pecas_parcel_num\n"
  833. "and s.update_parcel = true\n"
  834. "and s.year_effective = " + str(year)).format(
  835. sch=ps.sd_schema, yr=year)
  836. if ps.sql_system == ps.postgres:
  837. execute_postgresql_query(query, ps.sd_database, ps.sd_port, ps.sd_host, ps.sd_user, ps=ps)
  838. elif ps.sql_system == ps.sqlserver:
  839. execute_sqlserver_query(query, ps.sd_database, ps.sd_user, ps.sd_password, ps=ps)
  840. else:
  841. logging.error("Invalid database system: " + ps.sql_system)
  842. raise ValueError
  843.  
  844.  
  845. def update_space_limits(year, ps=_ps):
  846. logging.info("Updating space maximums")
  847. check_is_postgres(ps=ps)
  848.  
  849. with sd_querier(ps=ps).transaction(parcels=ps.parcels_table, yr=year) as tr:
  850. tr.query(
  851. "delete from {sch}.space_taz_limits\n"
  852. "where year_effective >= %(yr)s and manual = false")
  853.  
  854. tr.query(
  855. "insert into {sch}.space_taz_limits\n"
  856. "select gen.taz_group_id, gen.taz_limit_group_id,\n"
  857. "gen.min_quantity * "
  858. "(1 + gen.percent_annual_change_to_min / 100) ^ "
  859. "(%(yr)s - gen.year_effective),\n"
  860. "case when gen.max_quantity is null then\n"
  861. "greatest(sub.total_quantity * "
  862. "(1 + gen.percent_annual_change_to_max / 100), "
  863. "sub.total_quantity + gen.min_increment)\n"
  864. "else\n"
  865. "gen.max_quantity * "
  866. "(1 + gen.percent_annual_change_to_max / 100) ^ "
  867. "(%(yr)s - gen.year_effective)"
  868. "end\n,"
  869. "%(yr)s, false\n"
  870. "from\n"
  871. "(select distinct on (taz_group_id, taz_limit_group_id) gen.*\n"
  872. "from {sch}.space_taz_limits_generator gen\n"
  873. "left join {sch}.space_taz_limits lim\n"
  874. "on lim.taz_group_id = gen.taz_group_id\n"
  875. "and lim.taz_limit_group_id = gen.taz_limit_group_id\n"
  876. "and lim.year_effective <= %(yr)s\n"
  877. "and lim.year_effective >= gen.year_effective\n"
  878. "and lim.manual\n"
  879. "where gen.year_effective <= %(yr)s\n"
  880. "and lim.taz_group_id is null\n"
  881. "order by gen.taz_group_id, gen.taz_limit_group_id, gen.year_effective desc\n"
  882. ") gen\n"
  883. "left join\n"
  884. "(select tg.taz_group_id, tlst.taz_limit_group_id, "
  885. "sum(p.space_quantity) as total_quantity\n"
  886. "from {sch}.parcels p\n"
  887. "join {sch}.tazs_by_taz_group tg on tg.taz_number = p.taz\n"
  888. "join {sch}.taz_limit_space_types tlst "
  889. "on tlst.space_type_id = p.space_type_id\n"
  890. "group by tg.taz_group_id, tlst.taz_limit_group_id) sub\n"
  891. "on sub.taz_group_id = gen.taz_group_id\n"
  892. "and sub.taz_limit_group_id = gen.taz_limit_group_id\n"
  893. )
  894.  
  895.  
  896. def write_activity_totals(year, pgcmd, dbyear=None, ps=_ps):
  897. if dbyear is None:
  898. dbyear = year
  899. # TODO use psycopg2 for this
  900. if ps.sql_system == ps.sqlserver:
  901. logging.error("Updating ActivityTotalsI from database not supported yet on SQL Server")
  902. raise ValueError
  903. sqlstr = "\\copy (select activity as \"Activity\", total_amount as \"TotalAmount\" from input.activity_totals \
  904. where year_run=" + str(dbyear) + " and scenario='" + ps.scenario + "') to '" + ps.scendir + str(
  905. year) + "/ActivityTotalsI.csv' csv header"
  906. retcode = subprocess.check_call([pgcmd, "-c", sqlstr, "--host=" + ps.mapit_host, "--port=" + str(ps.mapit_port),
  907. "--dbname=" + ps.mapit_database, "--username=" + ps.mapit_user])
  908. log_results_from_external_program(None, "Problem updating ActivityTotalsI in year " + str(year), (retcode,))
  909.  
  910.  
  911. def load_development_events(year, ps=_ps):
  912. logging.info("Loading development events for year:" + str(year))
  913. if ps.sql_system == ps.postgres:
  914. folder_path = os.path.join(ps.scendir, str(year))
  915. folder_path = folder_path.replace('\\', '/')
  916. csv_file = os.path.join(folder_path, "developmentEvents.csv")
  917. conn = connect_to_sd(ps=ps)
  918. with conn, conn.cursor() as cur:
  919. cur.execute('TRUNCATE %s.%s' % (ps.sd_schema, "development_events")) # empty the temporary table
  920. f = open(csv_file, 'r') # open the AA output file
  921. f.readline() # skip the first line for header
  922. cur.copy_from(f, "%s.%s" % (ps.sd_schema, "development_events"), sep=',',
  923. null='') # use the psycopg2 fast copy command to copy the data into the temporary table
  924. cur.execute('SELECT count(*) FROM %s.%s;' % (ps.sd_schema, "development_events"))
  925. counter = cur.fetchone()[0]
  926. logging.info("Loaded %s development events from file %s" % (counter, str(csv_file)))
  927. elif ps.sql_system == ps.sqlserver:
  928. # TODO Add logging with pyodbc or _mssql
  929. folder_path = os.path.join(ps.DEVEVENTPATH)
  930. folder_path = folder_path.replace('/', '\\')
  931. csv_file = os.path.join(folder_path, "developmentEvents" + str(year) + ".csv")
  932. # csv_file = os.path.join(folder_path, "developmentEvents.csv")
  933. query = 'TRUNCATE TABLE %s.%s' % (ps.sd_schema, "development_events")
  934. execute_sqlserver_query(query, ps.sd_database, ps.sd_user, ps.sd_password, ps=ps)
  935. f = open(csv_file, 'r') # open the AA output file
  936. f.readline() # skip the first line for header
  937. query = ("BULK INSERT %s.%s FROM " % (ps.sd_schema, "development_events") +
  938. "'" + csv_file + "' WITH (FIELDTERMINATOR = ',',ROWTERMINATOR = '0x0a', FIRSTROW = 2)")
  939. execute_sqlserver_query(query, ps.sd_database, ps.sd_user, ps.sd_password, ps=ps)
  940. logging.info("Loaded development events for year:" + str(year))
  941. else:
  942. logging.error("Invalid database system: " + ps.sql_system)
  943. raise ValueError
  944.  
  945.  
  946. def load_aa_totals(ps=_ps):
  947. logging.info("Loading Activity Totals for scenario:" + ps.scenario)
  948. folder_path = os.path.join(ps.scendir, 'AllYears/Inputs')
  949. folder_path = folder_path.replace('\\', '/')
  950. conn = connect_to_mapit(ps=ps)
  951. with conn, conn.cursor() as cur:
  952. cur.execute('TRUNCATE %s.%s' % ('input', "activity_totals_temp")) # empty the temporary table
  953. csv_file = os.path.join(folder_path, "All_ActivityTotalsI.csv")
  954. f = open(csv_file, 'r') # open the AA input file
  955. f.readline() # skip the first line for header
  956. cur.copy_from(f, "%s.%s" % ('input', "activity_totals_temp"), sep=',',
  957. null='') # use the psycopg2 fast copy command to copy the data into the temporary table
  958. cur.execute("DELETE FROM input.activity_totals WHERE scenario= '%s';" % ps.scenario)
  959. cur.execute(
  960. "INSERT INTO input.activity_totals "
  961. "(select year_run, '%s', activity, total_amount from input.activity_totals_temp);" % (
  962. ps.scenario))
  963. cur.execute('SELECT count(*) FROM %s.%s;' % ('input', "activity_totals_temp"))
  964. counter = cur.fetchone()[0]
  965. logging.info("Loaded %s activity totals from file %s" % (counter, str(csv_file)))
  966. if counter == 0:
  967. logging.fatal("No entries loaded from file " + str(csv_file))
  968. raise RuntimeError
  969.  
  970.  
  971. def load_tm_totals(ps=_ps):
  972. logging.info("Loading Travel Model Totals for scenario:" + ps.scenario)
  973. folder_path = os.path.join(ps.scendir, 'AllYears/Inputs')
  974. folder_path = folder_path.replace('\\', '/')
  975. conn = connect_to_mapit(ps=ps)
  976. cur = conn.cursor()
  977. cur.execute('TRUNCATE %s.%s' % ('input', "travel_model_totals_temp")) # empty the temporary table
  978. csv_file = os.path.join(folder_path, "TravelModelTotalsI.csv")
  979. f = open(csv_file, 'r') # open the AA input file
  980. f.readline() # skip the first line for header
  981. cur.copy_from(f, "%s.%s" % ('input', "travel_model_totals_temp"), sep=',',
  982. null='') # use the psycopg2 fast copy command to copy the data into the temporary table
  983. cur.execute("DELETE FROM input.travel_model_totals WHERE scenario= '%s';" % ps.scenario)
  984. cur.execute(
  985. "INSERT INTO input.travel_model_totals "
  986. "(select '%s', year_run, tdm_code, tdm_total_amount from input.travel_model_totals_temp );" % (
  987. ps.scenario))
  988. cur.execute('SELECT count(*) FROM %s.%s;' % ('input', "travel_model_totals_temp"))
  989. counter = cur.fetchone()[0]
  990. logging.info("Loaded %s travel model totals from file %s" % (counter, str(csv_file)))
  991. conn.commit()
  992. conn.close()
  993.  
  994.  
  995. def load_am_totals(ps=_ps):
  996. logging.info("Loading Activity Model Totals for scenario:" + ps.scenario)
  997. folder_path = os.path.join(ps.scendir, 'AllYears/Inputs')
  998. folder_path = folder_path.replace('\\', '/')
  999. conn = connect_to_mapit(ps=ps)
  1000. with conn, conn.cursor() as cur:
  1001. cur.execute('TRUNCATE %s.%s' % ('input', "activity_model_totals_temp")) # empty the temporary table
  1002. csv_file = os.path.join(folder_path, "ABModelTotalsI.csv")
  1003. f = open(csv_file, 'r') # open the AA input file
  1004. f.readline() # skip the first line for header
  1005. cur.copy_from(f, "%s.%s" % ('input', "activity_model_totals_temp"), sep=',',
  1006. null='') # use the psycopg2 fast copy command to copy the data into the temporary table
  1007. cur.execute("DELETE FROM input.abm_totals WHERE scenario= '%s';" % ps.scenario)
  1008. cur.execute(
  1009. "INSERT INTO input.abm_totals "
  1010. "(select '%s', year_run, abm_code, abm_total_amount from input.activity_model_totals_temp );" % (
  1011. ps.scenario))
  1012. cur.execute('SELECT count(*) FROM %s.%s;' % ('input', "activity_model_totals_temp"))
  1013. counter = cur.fetchone()[0]
  1014. logging.info("Loaded %s activity model totals from file %s" % (counter, str(csv_file)))
  1015.  
  1016.  
  1017. def write_abm_land_use(year, dbyear=None, ps=_ps):
  1018. if dbyear is None:
  1019. dbyear = year
  1020. folder_path = os.path.join(ps.scendir, str(year))
  1021. folder_path = folder_path.replace('\\', '/')
  1022. csv_file_name = 'abm_land_use.csv'
  1023. csv_file = os.path.join(folder_path, csv_file_name)
  1024.  
  1025. conn = connect_to_mapit(ps=ps)
  1026. with conn, conn.cursor() as cur:
  1027. query = "delete from output.abm_se_taz10_table where year_run=" + str(
  1028. dbyear) + " and scenario='" + ps.scenario + "';"
  1029. cur.execute(query)
  1030. query = "insert into output.abm_se_taz10_table select * from output.abm_se_taz10 where year_run=" + str(
  1031. dbyear) + " and scenario='" + ps.scenario + "';"
  1032. cur.execute(query)
  1033. query = "copy (select * from output.abm_se_taz10_table where year_run=" + str(
  1034. dbyear) + " and scenario='" + ps.scenario + "') to STDOUT DELIMITER ',' CSV HEADER;"
  1035. with open(csv_file, 'w') as f:
  1036. cur.copy_expert(query, f)
  1037.  
  1038.  
  1039. def write_labor_make_use(year, dbyear=None, ps=_ps):
  1040. if dbyear is None:
  1041. dbyear = year
  1042. folder_path = os.path.join(ps.scendir, str(year))
  1043. folder_path = folder_path.replace('\\', '/')
  1044. csv_file_name = 'lmau.csv'
  1045. csv_file = os.path.join(folder_path, csv_file_name)
  1046.  
  1047. conn = connect_to_mapit(ps=ps)
  1048. with conn, conn.cursor() as cur:
  1049. query = "copy (select * from output.labor_make_and_use where year_run=" + str(
  1050. dbyear) + " and scenario='" + ps.scenario + "') to STDOUT DELIMITER ',' CSV HEADER;"
  1051. with open(csv_file, 'w') as f:
  1052. cur.copy_expert(query, f)
  1053.  
  1054.  
  1055. def replay_development_events_for_year(year, ps=_ps):
  1056. load_development_events(year, ps=ps)
  1057. logging.info("Replaying development events for " + str(year))
  1058. if ps.apply_sitespec:
  1059. apply_site_spec(year, ps=ps)
  1060. replay_development_events(ps=ps)
  1061. insert_development_events_into_history(year, ps=ps)
  1062.  
  1063.  
  1064. def replay_development_events(ps=_ps):
  1065. logging.info("Replaying development events")
  1066. try:
  1067. with sd_querier(ps=ps).transaction(parcels=ps.parcels_table) as tr:
  1068. tr.query(
  1069. '''UPDATE {sch}.parcels parcels SET
  1070. space_quantity = development_events.new_space_quantity,
  1071. space_type_id = development_events.new_space_type_id,
  1072. year_built = development_events.new_year_built,
  1073. land_area = development_events.land_area,
  1074. is_derelict = development_events.new_is_derelict,
  1075. is_brownfield = development_events.new_is_brownfield
  1076. FROM {sch}.development_events
  1077. WHERE parcels.pecas_parcel_num = development_events.original_pecas_parcel_num
  1078. AND (development_events.event_type = 'C' OR
  1079. development_events.event_type = 'R' OR
  1080. development_events.event_type = 'D' OR
  1081. development_events.event_type = 'A' OR
  1082. development_events.event_type = 'L' OR
  1083. development_events.event_type = 'US'
  1084. );''')
  1085.  
  1086. tr.query(
  1087. '''INSERT INTO {sch}.parcels
  1088. SELECT parcel_id,
  1089. new_pecas_parcel_num,
  1090. new_year_built,
  1091. taz,
  1092. new_space_type_id,
  1093. new_space_quantity,
  1094. land_area,
  1095. available_services,
  1096. new_is_derelict,
  1097. new_is_brownfield
  1098. FROM {sch}.development_events
  1099. WHERE
  1100. development_events.event_type = 'CS'
  1101. OR development_events.event_type = 'AS'
  1102. OR development_events.event_type = 'RS'
  1103. OR development_events.event_type = 'DS'
  1104. OR development_events.event_type = 'LS'
  1105. ; ''')
  1106.  
  1107. tr.query(
  1108. '''INSERT INTO {sch}.parcel_cost_xref
  1109. SELECT development_events.new_pecas_parcel_num, parcel_cost_xref.cost_schedule_id,
  1110. parcel_cost_xref.year_effective
  1111. FROM {sch}.parcel_cost_xref, {sch}.development_events
  1112. WHERE parcel_cost_xref.pecas_parcel_num=development_events.original_pecas_parcel_num
  1113. AND (
  1114. event_type = 'CS'
  1115. OR event_type = 'AS'
  1116. OR event_type = 'RS'
  1117. OR event_type = 'DS'
  1118. OR event_type = 'LS'
  1119. ); ''')
  1120.  
  1121. tr.query(
  1122. '''INSERT INTO {sch}.parcel_fee_xref
  1123. SELECT de.new_pecas_parcel_num, xref.fee_schedule_id, xref.year_effective
  1124. FROM {sch}.parcel_fee_xref xref, {sch}.development_events de
  1125. WHERE xref.pecas_parcel_num=de.original_pecas_parcel_num
  1126. AND (
  1127. event_type = 'CS'
  1128. OR event_type = 'AS'
  1129. OR event_type = 'RS'
  1130. OR event_type = 'DS'
  1131. OR event_type = 'LS'
  1132. ); ''')
  1133.  
  1134. tr.query(
  1135. '''INSERT INTO {sch}.parcel_zoning_xref
  1136. SELECT de.new_pecas_parcel_num, xref.zoning_rules_code, xref.year_effective
  1137. FROM {sch}.parcel_zoning_xref xref, {sch}.development_events de
  1138. WHERE xref.pecas_parcel_num=de.original_pecas_parcel_num
  1139. AND (
  1140. event_type = 'CS'
  1141. OR event_type = 'AS'
  1142. OR event_type = 'RS'
  1143. OR event_type = 'DS'
  1144. OR event_type = 'LS'
  1145. ); ''')
  1146.  
  1147. tr.query(
  1148. '''INSERT INTO {sch}.local_effect_distances
  1149. SELECT de.new_pecas_parcel_num,
  1150. dist.local_effect_id,
  1151. dist.local_effect_distance,
  1152. dist.year_effective
  1153. FROM {sch}.local_effect_distances dist, {sch}.development_events de
  1154. WHERE
  1155. dist.pecas_parcel_num=de.original_pecas_parcel_num
  1156. AND (
  1157. event_type = 'CS'
  1158. OR event_type = 'AS'
  1159. OR event_type = 'RS'
  1160. OR event_type = 'DS'
  1161. OR event_type = 'LS'
  1162. );''')
  1163. logging.info("Replayed development events into parcels table")
  1164. except Exception as e:
  1165. logging.error("Problem replaying development events into parcel table: {}".format(e))
  1166.  
  1167.  
  1168. def insert_development_events_into_history(year, ps=_ps):
  1169. logging.info("Start Inserting Development Events to History" + str(year))
  1170. if year == ps.baseyear:
  1171. if ps.sql_system == ps.postgres:
  1172. query = ("SET search_path=" + ps.sd_schema +
  1173. "; TRUNCATE development_events_history; INSERT INTO development_events_history (select " + str(
  1174. year) + " as year_run,* from development_events);")
  1175. retcode = execute_postgresql_query(query, ps.sd_database, ps.sd_port, ps.sd_host, ps.sd_user, ps=ps)
  1176. logging.info("Completed Inserting Development Events to History: " + str(year))
  1177. elif ps.sql_system == ps.sqlserver:
  1178. query = "TRUNCATE table development_events_history; \
  1179. INSERT INTO development_events_history \
  1180. (year_run, \
  1181. event_type, parcel_id, original_pecas_parcel_num, new_pecas_parcel_num, available_services, \
  1182. old_space_type_id, new_space_type_id, old_space_quantity, new_space_quantity, \
  1183. old_year_built, new_year_built, land_area,\
  1184. old_is_derelict, new_is_derelict, \
  1185. old_is_brownfield, new_is_brownfield, \
  1186. zoning_rules_code, taz) (select " + str(year) + " as year_run,* from development_events);"
  1187. retcode = execute_sqlserver_query(query, ps.sd_database, ps.sd_user, ps.sd_password, ps=ps)
  1188. logging.info("Completed Inserting Development Events to History: " + str(year))
  1189. else:
  1190. logging.error("Invalid database system: " + ps.sql_system)
  1191. raise ValueError
  1192. else:
  1193. if ps.sql_system == ps.postgres:
  1194. query = "SET search_path=" + ps.sd_schema + "; INSERT INTO development_events_history (select " + str(
  1195. year) + " as year_run,* from development_events);"
  1196. retcode = execute_postgresql_query(query, ps.sd_database, ps.sd_port, ps.sd_host, ps.sd_user, ps=ps)
  1197. logging.info("Completed Inserting Development Events to History: " + str(year))
  1198. elif ps.sql_system == ps.sqlserver:
  1199. query = "INSERT INTO development_events_history \
  1200. (year_run, \
  1201. event_type, parcel_id, original_pecas_parcel_num, new_pecas_parcel_num, available_services, \
  1202. old_space_type_id, new_space_type_id, old_space_quantity, new_space_quantity, \
  1203. old_year_built, new_year_built, land_area,\
  1204. old_is_derelict, new_is_derelict, \
  1205. old_is_brownfield, new_is_brownfield, \
  1206. zoning_rules_code, taz) (select " + str(year) + " as year_run,* from development_events);"
  1207. retcode = execute_sqlserver_query(query, ps.sd_database, ps.sd_user, ps.sd_password, ps=ps)
  1208. logging.info("Completed Inserting Development Events to History: " + str(year))
  1209. else:
  1210. logging.error("Invalid database system: " + ps.sql_system)
  1211. raise ValueError
  1212. log_results_from_external_program(None, "Problem updating development events history for " + str(year), (retcode,))
  1213.  
  1214.  
  1215. def write_floorspace_summary_from_parcel_file(sd_output_year, ps=_ps):
  1216. # TODO pyscopg2 version
  1217. if ps.sql_system == ps.postgres:
  1218. sqlstr = "\\copy (select * from " + ps.sd_schema + ".floorspacei_view) to '" + ps.scendir + str(
  1219. sd_output_year) + "/FloorspaceI.csv' csv header"
  1220. print(sqlstr)
  1221. retcode = execute_postgresql_query(sqlstr, ps.sd_database, ps.sd_port, ps.sd_host, ps.sd_user, ps=ps)
  1222. log_results_from_external_program("Wrote FloorspaceI from Parcel File for use in year " + str(sd_output_year),
  1223. "Problem writing FloorspaceI from Parcel File for use in year " + str(
  1224. sd_output_year), (retcode,))
  1225. elif ps.sql_system == ps.sqlserver:
  1226. folder_path = os.path.join(ps.EXACTPATH, str(sd_output_year))
  1227. file_path = os.path.join(folder_path, 'FloorspaceI.csv')
  1228. file_path = file_path.replace('/', '\\')
  1229. sqlstr = ("select 'TAZ' as taz, 'Commodity' as commodity, 'Quantity' as quantity "
  1230. "union all select cast(TAZ as nvarchar), cast(Commodity as nvarchar(50)), cast(Quantity as nvarchar) "
  1231. "from " + ps.sd_database + "." + ps.sd_schema + ".floorspacei_view")
  1232. subprocess.call(["bcp", sqlstr, "queryout", file_path, "-S", ps.sd_host, "-T", "-c", "-t,"])
  1233. else:
  1234. logging.error("Invalid database system: " + ps.sql_system)
  1235. raise ValueError
  1236.  
  1237.  
  1238. def copy_floorspace_summary(sd_output_year, ps=_ps):
  1239. shutil.copyfile(ps.scendir + "/" + str(sd_output_year) + "/FloorspaceI.csv",
  1240. ps.scendir + "/" + str(sd_output_year) + "/FloorspaceSD.csv")
  1241. shutil.copyfile(ps.scendir + "/" + str(sd_output_year) + "/FloorspaceI.csv",
  1242. ps.scendir + "/" + str(sd_output_year) + "/FloorspaceO.csv")
  1243.  
  1244.  
  1245. def reset_parcel_database(ps=_ps):
  1246. # delete the parcels file and reload from backup
  1247. logging.info("Resetting parcel database in database {} and schema {}".format(
  1248. ps.sd_database, ps.sd_schema))
  1249. try:
  1250. with sd_querier(ps=ps).transaction(parcels=ps.parcels_table) as tr:
  1251. logging.info("Deleting local effects for pseudoparcels")
  1252. tr.query(
  1253. "DELETE from {sch}.local_effect_distances "
  1254. "WHERE pecas_parcel_num > (SELECT MAX(pecas_parcel_num) FROM {sch}.parcels_backup);")
  1255. logging.info("Deleting costs for pseudoparcels")
  1256. tr.query(
  1257. "DELETE from {sch}.parcel_cost_xref "
  1258. "WHERE pecas_parcel_num > (SELECT MAX(pecas_parcel_num) FROM {sch}.parcels_backup);")
  1259. logging.info("Deleting fees for pseudoparcels")
  1260. tr.query(
  1261. "DELETE from {sch}.parcel_fee_xref "
  1262. "WHERE pecas_parcel_num > (SELECT MAX(pecas_parcel_num) FROM {sch}.parcels_backup);")
  1263. logging.info("Deleting zoning for pseudoparcels")
  1264. tr.query(
  1265. "DELETE from {sch}.parcel_zoning_xref "
  1266. "WHERE pecas_parcel_num > (SELECT MAX(pecas_parcel_num) FROM {sch}.parcels_backup);")
  1267. logging.info("Deleting pseudoparcels")
  1268. tr.query(
  1269. "DELETE from {sch}.parcels "
  1270. "WHERE pecas_parcel_num > (SELECT MAX(pecas_parcel_num) FROM {sch}.parcels_backup);")
  1271. # tr.query("VACUUM FULL {sch}.parcels;")
  1272. logging.info("Restoring parcels to base year state")
  1273. tr.query("UPDATE {sch}.parcels parcels SET "
  1274. "year_built = backup.year_built, "
  1275. "space_type_id = backup.space_type_id, "
  1276. "space_quantity = backup.space_quantity, "
  1277. "land_area = backup.land_area, "
  1278. "available_services_code = backup.available_services_code, "
  1279. "is_derelict = backup.is_derelict, "
  1280. "is_brownfield = backup.is_brownfield "
  1281. "FROM {sch}.parcels_backup backup "
  1282. "WHERE parcels.pecas_parcel_num = backup.pecas_parcel_num;")
  1283. logging.info("Finished resetting parcels")
  1284. except Exception as e:
  1285. logging.error("Problem resetting parcels: {}".format(e))
  1286.  
  1287.  
  1288. def prepare_gale_shapley_outputs(year):
  1289. """
  1290. Prepares the outputs from the Gale-Shapley algorithm in SD to be uploaded to Mapit
  1291. """
  1292. os.environ["CURYEAR"] = str(year)
  1293. run_shell_script("prepare_gs_tables.sh")
  1294.  
  1295.  
  1296. def snapshot_parcels(year, ps=_ps):
  1297. # Snapshot the parcel database so we can look at intermediate parcel snapshots.
  1298. querier = sd_querier(ps=ps)
  1299.  
  1300. try:
  1301. try:
  1302. with querier.transaction(yr=year) as tr:
  1303. tr.query("drop table if exists {sch}.parcels_{yr};")
  1304. tr.query("create table {sch}.parcels_{yr} "
  1305. " as select * from {sch}.parcels;")
  1306. except psycopg2.ProgrammingError:
  1307. with querier.transaction(yr=year) as tr:
  1308. # Try truncate-and-insert instead of drop-and-create
  1309. tr.query("truncate table {sch}.parcels_{yr};")
  1310. tr.query("insert into {sch}.parcels_{yr} "
  1311. " select * from {sch}.parcels;")
  1312. logging.info("Backed up the parcel database for {}".format(year))
  1313. except Exception as e:
  1314. # Don't kill the run if this parcel snapshot doesn't work for some reason
  1315. logging.error("Parcel backup failed: {}".format(e))
  1316.  
  1317.  
  1318. class ExternalProgramError(Exception):
  1319. def __init__(self, value):
  1320. self.value = value
  1321.  
  1322. def __str__(self):
  1323. return repr(self.value)
  1324.  
  1325.  
  1326. def log_results_from_external_program(okmsg, notokmsg, results_array):
  1327. ok = True
  1328. for result in results_array:
  1329. if result != 0:
  1330. ok = False
  1331. if ok:
  1332. if okmsg is not None:
  1333. logging.info(okmsg)
  1334. else:
  1335. logging.error(notokmsg + ", return codes " + str(results_array))
  1336. raise ExternalProgramError(notokmsg)
  1337.  
  1338.  
  1339. def prepare_travel_model_inputs(year, scenario, ps=_ps):
  1340. logging.info("Executing query: select input.generate_tm_inputs(" + str(year) + ",'" + scenario + "');")
  1341. retcode = execute_postgresql_query("select input.generate_tm_inputs(" + str(year) + ",'" + scenario + "');",
  1342. ps.mapit_database, ps.mapit_port, ps.mapit_host, ps.mapit_user, ps=ps)
  1343. log_results_from_external_program(None, "Problem preparing table of travel model inputs in database", (retcode,))
  1344.  
  1345.  
  1346.  
  1347.  
  1348.  
  1349. def updateParcelZoningXrefAccordingToadaptivePhasing(ps=_ps):
  1350. print(ps.baseyear)
  1351. print(ps.endyear)
  1352. print(ps.sd_schema)
  1353. schema = ps.sd_schema
  1354. #start from baseyear
  1355. currentYear =ps.baseyear
  1356. #this dict provides information about each plan's next developable phase
  1357. planAndCurrentPhase = {}
  1358.  
  1359.  
  1360. with sd_querier(ps=ps).transaction(parcels=ps.parcels_table) as tr:
  1361. #find the smallest phase_number in each plan, store it into planAndCurrentPhase.
  1362. smallestPhaseNumber = tr.query("select {sch}.phasing_plan_xref.plan_id,min(phase_number),initial_release_year, release_threshold from {sch}.phasing_plan_xref INNER JOIN {sch}.phasing_plans ON {sch}.phasing_plans.plan_id= {sch}.phasing_plan_xref.plan_id group by ({sch}.phasing_plan_xref.plan_id,initial_release_year, release_threshold)\n")
  1363. #for testing purpose, clone a parcel_zoning_xref table
  1364. #in future, may directly do every thing in parcel_zoning_xref, we'll see
  1365. query = "DROP table if exists "+schema+".parcel_zoning_xref_fortest"
  1366. execute_postgresql_query(query, ps.sd_database, ps.sd_port, ps.sd_host, ps.sd_user, ps=ps)
  1367. query = "CREATE TABLE "+schema+".parcel_zoning_xref_fortest \
  1368. (pecas_parcel_num bigint NOT NULL,zoning_rules_code integer,year_effective integer NOT NULL, \
  1369. CONSTRAINT pz_pkey_fortest PRIMARY KEY (pecas_parcel_num,year_effective));"
  1370. execute_postgresql_query(query, ps.sd_database, ps.sd_port, ps.sd_host, ps.sd_user, ps=ps)
  1371.  
  1372. ####tr.query("delete from {sch}.parcel_zoning_xref_fortest")
  1373. ####tr.query("insert into {sch}.parcel_zoning_xref_fortest SELECT * FROM {sch}.parcel_zoning_xref")
  1374.  
  1375. #initialize planAndCurrentPhase
  1376. for i in smallestPhaseNumber:
  1377. planAndCurrentPhase[i[0]] = [i[1],i[2]]
  1378.  
  1379.  
  1380. #update/initialize year_effective based on plan_id for all the parcels in the first phase
  1381. query= "insert into "+schema+".parcel_zoning_xref_fortest \
  1382. SELECT ppx.pecas_parcel_num,9999999,pp.initial_release_year \
  1383. FROM "+schema+".phasing_plans as pp, "+schema+".phasing_plan_xref as ppx,"+schema+".parcel_zoning_xref as pzx \
  1384. WHERE ppx.pecas_parcel_num = pzx.pecas_parcel_num \
  1385. and ppx.plan_id = pp.plan_id \
  1386. and (ppx.plan_id,ppx.phase_number) in (select "+schema+".phasing_plan_xref.plan_id,min(phase_number) from "+schema+".phasing_plan_xref INNER JOIN "+schema+".phasing_plans ON "+schema+".phasing_plans.plan_id= "+schema+".phasing_plan_xref.plan_id group by ("+schema+".phasing_plan_xref.plan_id))"
  1387.  
  1388.  
  1389.  
  1390.  
  1391.  
  1392. #ON CONFLICT (pecas_parcel_num,year_effective) DO NOTHING;'
  1393. execute_postgresql_query(query, ps.sd_database, ps.sd_port, ps.sd_host, ps.sd_user, ps=ps)
  1394.  
  1395.  
  1396.  
  1397. #update zoning_rule_code for baseyear(ex.2011)
  1398. query= "UPDATE "+schema+".parcel_zoning_xref_fortest SET zoning_rules_code = ppx.zoning_rules_code \
  1399. FROM "+schema+".phasing_plans as pp, "+schema+".phasing_plan_xref as ppx \
  1400. WHERE ppx.pecas_parcel_num = "+schema+".parcel_zoning_xref_fortest.pecas_parcel_num and ppx.plan_id = pp.plan_id and year_effective ="+str(currentYear)+" and (ppx.plan_id,ppx.phase_number) in (select "+schema+".phasing_plan_xref.plan_id,min(phase_number) from "+schema+".phasing_plan_xref INNER JOIN "+schema+".phasing_plans ON "+schema+".phasing_plans.plan_id= "+schema+".phasing_plan_xref.plan_id group by ("+schema+".phasing_plan_xref.plan_id))"
  1401.  
  1402.  
  1403. execute_postgresql_query(query, ps.sd_database, ps.sd_port, ps.sd_host, ps.sd_user, ps=ps)
  1404.  
  1405.  
  1406. while currentYear<=ps.endyear:
  1407. newDict = {}
  1408. for plan,[phase,year] in planAndCurrentPhase.items():
  1409.  
  1410. print("plan",plan)
  1411. print("phase",phase)
  1412. print("year",year)
  1413. if phase == False:#All phase has been developed
  1414. newDict[plan] = [phase,year]###may be able to delete
  1415. continue
  1416. elif year==currentYear:
  1417. startANewPlan(plan,ps)
  1418. newDict[plan] = [phase,year]
  1419. elif year<currentYear:#the plan has already start to process
  1420. if(calculateCurrentThreshold(currentYear,phase,plan,ps)):
  1421. nextPhase,releaseYearForNextPhase = findNextPhase(currentYear,phase,plan,ps)
  1422. if nextPhase==False:
  1423. newDict[plan] = [False,year]
  1424. else:
  1425. newDict[plan] = [nextPhase,year]
  1426. updateNextPhaseReleaseYear(releaseYearForNextPhase,nextPhase,plan,ps)
  1427. else:
  1428. newDict[plan] = [phase,year]
  1429. else:
  1430. newDict[plan] = [phase,year]
  1431.  
  1432. continue
  1433. planAndCurrentPhase = { k: v for k, v in newDict.items() }
  1434. currentYear+=1
  1435.  
  1436.  
  1437.  
  1438.  
  1439. #calculate threshold for phase for current year
  1440.  
  1441.  
  1442.  
  1443.  
  1444.  
  1445.  
  1446. #UPDATE {sch}.parcel_zoning_xref_fortest as pzxf
  1447. #SET pzxf.year_effective = pp.initial_release_year
  1448. #FROM {sch}.phasing_plans as pp, {sch}.pecas_plan_xref as ppx
  1449. #WHERE ppx.pecas_parcel_num = pzxf.pecas_parcel_num and ppx.plan_id = pp.plan_id and ppx.phase_number = (select min(ppx.phase_number) from ppx where ppx.plan_id = pp.plan_id)
  1450. def calculateCurrentThreshold(year,phaseNumber,planId,ps=_ps):
  1451. schema = ps.sd_schema
  1452. undeveloped_zoning_rules_code = '9999999'
  1453. with sd_querier(ps=ps).transaction(parcels=ps.parcels_table) as tr:
  1454. #select all developable zoning_rules_code
  1455. #tr.query("select distinct i111.zoning_permissions.zoning_rules_code "
  1456. # "from i111.parcel_zoning_xref_fortest "
  1457. # "inner join i111.zoning_permissions "
  1458. # "on i111.parcel_zoning_xref_fortest.zoning_rules_code = i111.zoning_permissions.zoning_rules_code")
  1459.  
  1460.  
  1461. #select summation of land_area for specific phase
  1462. totalDevelopableArea = tr.query("select sum(ps.land_area) \
  1463. from "+schema+".parcel_zoning_xref as pzx \
  1464. inner join "+schema+".parcels_backup as ps \
  1465. on ps.pecas_parcel_num = pzx.pecas_parcel_num \
  1466. inner join "+schema+".phasing_plan_xref as ppx \
  1467. on ps.pecas_parcel_num = ppx.pecas_parcel_num \
  1468. where ppx.phase_number = "+str(phaseNumber)+" \
  1469. and ppx.zoning_rules_code in (select distinct "+schema+".zoning_permissions.zoning_rules_code from "+schema+".zoning_permissions)")[0][0]
  1470.  
  1471.  
  1472.  
  1473. #select total developed area for specific phase and specific year
  1474. currentDevelopedArea = tr.query("select sum(ps.land_area) \
  1475. from "+schema+".parcel_zoning_xref_fortest as pzx \
  1476. inner join "+schema+".parcels as ps \
  1477. on ps.pecas_parcel_num = pzx.pecas_parcel_num \
  1478. inner join "+schema+".phasing_plan_xref as ppx \
  1479. on ps.pecas_parcel_num = ppx.pecas_parcel_num \
  1480. where ppx.phase_number = "+str(phaseNumber)+" \
  1481. and pzx.zoning_rules_code !="+str(undeveloped_zoning_rules_code)+" \
  1482. and pzx.year_effective<="+str(year)+" ")[0][0]
  1483.  
  1484.  
  1485. print('Total Area:',totalDevelopableArea)
  1486. print('Current Area:',currentDevelopedArea)
  1487. if currentDevelopedArea==None:
  1488. currentDevelopedArea = 0
  1489. currentThreshold = currentDevelopedArea/totalDevelopableArea
  1490.  
  1491. maxThreshold = tr.query("select release_threshold from {sch}.phasing_plans where plan_id = "+str(planId)+" " )[0][0]
  1492.  
  1493. print('Current Threshold:',currentThreshold)
  1494.  
  1495. if currentThreshold>maxThreshold:
  1496. return True
  1497. else:
  1498. return False
  1499. return False
  1500.  
  1501.  
  1502.  
  1503. def findNextPhase(year,phaseNumber,planId,ps=_ps):
  1504. schema = ps.sd_schema
  1505. with sd_querier(ps=ps).transaction(parcels=ps.parcels_table) as tr:
  1506. nextPhase = tr.query("select distinct phase_number "
  1507. "from {sch}.phasing_plan_xref "
  1508. "where plan_id="+str(planId)+" "
  1509. "and phase_number>"+str(phaseNumber)+" "
  1510. "order by phase_number "
  1511. "limit 1")
  1512. if len(nextPhase)==1:
  1513. nextPhase = nextPhase[0][0]
  1514.  
  1515.  
  1516.  
  1517.  
  1518. nextYear = year+1
  1519.  
  1520. return nextPhase,nextYear
  1521. else:
  1522. return False,False
  1523.  
  1524.  
  1525. def updateNextPhaseReleaseYear(releaseYear,phaseNumber,planId,ps=_ps):
  1526. schema = ps.sd_schema
  1527. print(releaseYear,phaseNumber,planId)
  1528. with sd_querier(ps=ps).transaction(parcels=ps.parcels_table) as tr:
  1529. tr.query("insert into {sch}.parcel_zoning_xref_fortest (pecas_parcel_num,zoning_rules_code,year_effective)"
  1530. "select ppx.pecas_parcel_num,ppx.zoning_rules_code,"+str(releaseYear)+" "
  1531. "from {sch}.parcel_zoning_xref as pzx,{sch}.phasing_plan_xref as ppx "
  1532. "where pzx.pecas_parcel_num = ppx.pecas_parcel_num "
  1533. "and ppx.phase_number="+str(phaseNumber)+" "
  1534. "and ppx.plan_id="+str(planId)+"")
  1535. return
  1536.  
  1537.  
  1538. def startANewPlan(planId,ps=_ps):
  1539. schema = ps.sd_schema
  1540. with sd_querier(ps=ps).transaction(parcels=ps.parcels_table) as tr:
  1541. query="UPDATE "+schema+".parcel_zoning_xref_fortest \
  1542. SET zoning_rules_code=ppx.zoning_rules_code \
  1543. from "+schema+".phasing_plan_xref as ppx,"+schema+".phasing_plans as pp \
  1544. WHERE ppx.pecas_parcel_num = "+schema+".parcel_zoning_xref_fortest.pecas_parcel_num \
  1545. and ppx.plan_id = pp.plan_id and ppx.plan_id="+str(planId)+";"
  1546.  
  1547. execute_postgresql_query(query, ps.sd_database, ps.sd_port, ps.sd_host, ps.sd_user, ps=ps)
  1548. query= "insert into "+schema+".parcel_zoning_xref_fortest \
  1549. SELECT ppx.pecas_parcel_num,ppx.zoning_rules_code,pp.initial_release_year \
  1550. FROM "+schema+".phasing_plans as pp, "+schema+".phasing_plan_xref as ppx,"+schema+".parcel_zoning_xref as pzx \
  1551. WHERE ppx.pecas_parcel_num = pzx.pecas_parcel_num \
  1552. and ppx.plan_id = \
  1553. "
  1554.  
  1555. execute_postgresql_query(query, ps.sd_database, ps.sd_port, ps.sd_host, ps.sd_user, ps=ps)
Add Comment
Please, Sign In to add comment