Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import configparser
- import os
- import csv
- import dateutil.parser
- import sys
- import psycopg2
- import time
- class TemporalRisk:
- def __init__(self):
- self.config = configparser.ConfigParser()
- self.config.read('config.cfg')
- self.directory = self.config.get('FILE CONFIG', 'Directory')
- self.file_delimiter = self.config.get('FILE CONFIG', 'Delimiter')
- def read_data(self):
- input_file_name = self.config.get('FILE CONFIG', 'input_file')
- files = os.listdir(self.directory)
- if input_file_name in files:
- input_file = open(os.path.join(self.directory, input_file_name), 'rt', encoding="utf8")
- input_file_reader = csv.reader(input_file, delimiter=self.file_delimiter)
- data = []
- for row in input_file_reader:
- point = {'account_id': int(row[0]), 'trip_id': int(row[1]), 'lat': float(row[2]), 'lon': float(row[3])}
- data.append(point)
- return data
- else:
- print("Specified files is not present in the given location")
- sys.exit(1)
- def get_connection(self):
- try:
- conn = psycopg2.connect("dbname={0} user={1} host={2} password={3} port={4}".format(
- self.config.get('DB CONFIG', 'DBName'),
- self.config.get('DB CONFIG', 'User'),
- self.config.get('DB CONFIG', 'Host'),
- self.config.get('DB CONFIG', 'Password'),
- self.config.get('DB CONFIG', 'Port')
- ))
- return conn
- except psycopg2.OperationalError as e:
- print("Error in establishing connection\n%s" % e)
- sys.exit(1)
- def process_data(self, conn, data):
- data_tup = tuple(data)
- cursor = conn.cursor()
- # Create temporary table
- cursor.execute("select tn_create_temp_trip_table()")
- # Insert data to temporary table
- cursor.executemany("insert into trip_locations values(%(account_id)s, %(trip_id)s, %(lat)s, %(lon)s)", data_tup)
- # Build query to score risk
- query = """SELECT * from
- public.make_linestring();"""
- cursor.execute(query, (
- self.config.get('APP CONFIG', 'tolerance_distance_m'),
- self.config.get('APP CONFIG', 'tolerance_duration_s'),
- self.config.get('APP CONFIG', 'tolerance_speed_m_s'),
- ))
- query1 = """SELECT * from public.input_waypoints;"""
- cursor.execute(query1, (
- self.config.get('APP CONFIG', 'tolerance_distance_m'),
- self.config.get('APP CONFIG', 'tolerance_duration_s'),
- self.config.get('APP CONFIG', 'tolerance_speed_m_s'),
- ))
- # Getting the results
- result = cursor.fetchall()
- conn.close()
- return result
- def write_data(self, result):
- conn = psycopg2.connect("dbname={0} user={1} host={2} password={3} port={4}".format(
- self.config.get('DB CONFIG', 'DBName'),
- self.config.get('DB CONFIG', 'User'),
- self.config.get('DB CONFIG', 'Host'),
- self.config.get('DB CONFIG', 'Password'),
- self.config.get('DB CONFIG', 'Port')
- ))
- outfile = open(os.path.join(self.directory, self.config.get('FILE CONFIG', 'output_file')), 'w',
- encoding="utf8", newline='')
- outfile_writer = csv.writer(outfile, delimiter=self.file_delimiter)
- outfile_writer.writerow(["Account id","route_risk_score", "start_time", "end_time", "time_taken"])
- for row in result:
- cursor = conn.cursor()
- # Create temporary table
- cursor.execute("""SET work_mem TO '256MB';""")
- start_time = time.time()
- cursor.execute("""select sum(dn_2017) from grid_centroid_us_2017 where ST_Intersects(geom, ST_Buffer(ST_Transform(ST_SetSRID('{}'::geometry, 4326),102003),100));""".format(row[1]))
- result = cursor.fetchall()
- end = time.time()
- time_taken = end - start_time
- for i in result:
- row = [row[0], row[1], i[0], start_time, end, time_taken]
- outfile_writer.writerow(row)
- conn.close()
- outfile.close()
- if __name__ == "__main__":
- tr = TemporalRisk()
- print("Reading data....")
- input_data = tr.read_data()
- print("Processing data....")
- con = tr.get_connection()
- res = tr.process_data(con, input_data)
- print("Writing data...")
- tr.write_data(res)
- print("Completed.....")
Add Comment
Please, Sign In to add comment