Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import os,sys
- import zipfile
- import psycopg2
- import psycopg2.extensions
- import datetime
- import time
- psycopg2.extensions.register_type(psycopg2.extensions.UNICODE)
- psycopg2.extensions.register_type(psycopg2.extensions.UNICODEARRAY)
- from psycopg2.extras import execute_batch
- import unicodecsv
- import pprint
- import json
- import StringIO
- LOAD_STOPS = True
- LOAD_CALENDAR = True
- LOAD_CALENDAR_DATES = True
- LOAD_AGENCY = True
- LOAD_ROUTES = True
- LOAD_TRIPS = True
- LOAD_STOPTIMES = True
- def loadGtfsFileToDb(filename):
- assert os.path.exists(filename)
- stt = time.time()
- zf = zipfile.ZipFile(filename)
- conn = psycopg2.connect(host='localhost',port=5432,dbname='joukkoliikenne',user='xxxyyy',password='yyyxxx')
- curs = conn.cursor()
- feed_id = 'matka'
- if LOAD_STOPS:
- print 'STOPS loading'
- st = time.time()
- sf = zf.open('stops.txt')
- csvr = unicodecsv.reader(sf,delimiter=',',quotechar='"')
- header = None
- insert_vals = []
- for l in csvr:
- if not header:
- header = l
- continue
- ld = dict(zip(header,l))
- ld['stop_lat'],ld['stop_lon'] = map(float,(ld['stop_lat'],ld['stop_lon']))
- svals = {'feed_id':feed_id}
- for sk in ('stop_id','stop_code','stop_name','stop_desc','stop_lat','stop_lon','zone_id','stop_url','location_type','parent_station','stop_timezone','wheelchair_boarding'):
- svals[sk] = ld[sk] if sk in ld else None
- insert_vals.append(svals)
- sf.close()
- curs.execute('TRUNCATE stops')
- print 'STOPS executemany',len(insert_vals),'stops'
- execute_batch(curs,'INSERT INTO stops (feed_id,stop_id,stop_code,stop_name,stop_desc,stop_lat,stop_lon,zone_id,stop_url,location_type,parent_station,stop_timezone,wheelchair_boarding,pos) VALUES (%(feed_id)s,%(stop_id)s,%(stop_code)s,%(stop_name)s,%(stop_desc)s,%(stop_lat)s,%(stop_lon)s,%(zone_id)s,%(stop_url)s,%(location_type)s,%(parent_station)s,%(stop_timezone)s,%(wheelchair_boarding)s,ST_SetSRID(ST_MakePoint(%(stop_lat)s,%(stop_lon)s),4326))',insert_vals)
- conn.commit()
- print 'STOPS done',time.time()-st,len(insert_vals)
- if LOAD_CALENDAR:
- print 'CALENDAR loading'
- st = time.time()
- sf = zf.open('calendar.txt')
- csvr = unicodecsv.reader(sf,delimiter=',',quotechar='"')
- header = None
- insert_vals = []
- for l in csvr:
- if not header:
- header = l
- continue
- ld = dict(zip(header,l))
- #lat,lon = map(float,(ld['stop_lat'],ld['stop_lon']))
- insert_vals.append((feed_id,ld['service_id'],
- ld['monday'] == '1',ld['tuesday'] == '1',ld['wednesday'] == '1',
- ld['thursday'] == '1',ld['friday'] == '1',ld['saturday'] == '1',ld['sunday'] == '1',
- datetime.datetime.strptime(ld['start_date'],'%Y%m%d').date(),
- datetime.datetime.strptime(ld['end_date'],'%Y%m%d').date()
- ))
- sf.close()
- curs.execute('TRUNCATE calendar')
- execute_batch(curs,'INSERT INTO calendar (feed_id,service_id,mon,tue,wed,thu,fri,sat,sun,start_date,end_date) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)',insert_vals)
- conn.commit()
- print 'CALENDAR done',time.time()-st,len(insert_vals)
- if LOAD_CALENDAR_DATES:
- print 'CALENDAR DATES loading'
- st = time.time()
- sf = zf.open('calendar_dates.txt')
- csvr = unicodecsv.reader(sf,delimiter=',',quotechar='"')
- header = None
- insert_vals = []
- for l in csvr:
- if not header:
- header = l
- continue
- ld = dict(zip(header,l))
- #lat,lon = map(float,(ld['stop_lat'],ld['stop_lon']))
- insert_vals.append((feed_id,ld['service_id'],
- datetime.datetime.strptime(ld['date'],'%Y%m%d').date(),
- ld['exception_type'] == '1'
- ))
- sf.close()
- curs.execute('TRUNCATE calendar_dates')
- execute_batch(curs,'INSERT INTO calendar_dates (feed_id,service_id,date,isrunning) VALUES (%s,%s,%s,%s)',insert_vals)
- conn.commit()
- print 'CALENDAR DATES done',time.time()-st,len(insert_vals)
- if LOAD_AGENCY:
- print 'AGENCY loading'
- st = time.time()
- sf = zf.open('agency.txt')
- csvr = unicodecsv.reader(sf,delimiter=',',quotechar='"')
- header = None
- insert_vals = []
- agency_keys = ('agency_id','agency_name','agency_url','agency_timezone','agency_lang','agency_phone','agency_fare_url','agency_email')
- for l in csvr:
- if not header:
- header = l
- continue
- ld = dict(zip(header,l))
- #lat,lon = map(float,(ld['stop_lat'],ld['stop_lon']))
- avals = {'feed_id':feed_id}
- for ak in agency_keys:
- avals[ak] = ld[ak] if ak in ld else None
- insert_vals.append(avals)
- sf.close()
- curs.execute('TRUNCATE agency')
- execute_batch(curs,'INSERT INTO agency (feed_id,agency_id,agency_name,agency_url,agency_timezone,agency_lang,agency_phone,agency_fare_url,agency_email) VALUES (%(feed_id)s,%(agency_id)s,%(agency_name)s,%(agency_url)s,%(agency_timezone)s,%(agency_lang)s,%(agency_phone)s,%(agency_fare_url)s,%(agency_email)s)',insert_vals)
- conn.commit()
- print 'AGENCY done',time.time()-st,len(insert_vals)
- if LOAD_ROUTES:
- route_fks = ('route_id','agency_id','route_short_name','route_long_name','route_desc','route_type','route_url','route_color','route_text_color')
- print 'ROUTES loading'
- st = time.time()
- sf = zf.open('routes.txt')
- csvr = unicodecsv.reader(sf,delimiter=',',quotechar='"')
- header = None
- numi = 0
- totalr = 0
- sio = StringIO.StringIO()
- csvw = unicodecsv.writer(sio,delimiter='\t',quotechar='"')
- curs.execute('TRUNCATE routes')
- for l in csvr:
- if not header:
- header = l
- continue
- ld = dict(zip(header,l))
- rowv = [feed_id,]
- for fk in route_fks:
- rowv.append(ld[fk] if fk in ld else '')
- csvw.writerow(rowv)
- numi+=1
- totalr+=1
- if numi > 5000:
- sio.flush()
- print sio.tell(),'bytes in stringio',numi,'items'
- sio.seek(0)
- curs.copy_from(sio, 'routes', sep="\t",null="")
- numi = 0
- sio.close()
- sio = StringIO.StringIO()
- csvw = unicodecsv.writer(sio,delimiter='\t',quotechar='"')
- print sio.tell(),'bytes in stringio',numi,'items'
- sio.seek(0)
- curs.copy_from(sio, 'routes', sep="\t",null="")
- sio.close()
- sio = None
- conn.commit()
- print 'ROUTES done',time.time()-st,totalr
- if LOAD_TRIPS:
- print 'TRIPS loading'
- trip_fks = ('route_id','service_id','trip_id','trip_headsign','trip_short_name','direction_id','block_id','shape_id','wheelchair_accessible','bikes_allowed')
- st = time.time()
- sf = zf.open('trips.txt')
- csvr = unicodecsv.reader(sf,delimiter=',',quotechar='"')
- header = None
- numi = 0
- totalr = 0
- sio = StringIO.StringIO()
- csvw = unicodecsv.writer(sio,delimiter='\t',quotechar='"')
- curs.execute('TRUNCATE trips')
- for l in csvr:
- if not header:
- header = l
- continue
- ld = dict(zip(header,l))
- rowv = [feed_id,]
- for fk in trip_fks:
- rowv.append(ld[fk] if fk in ld else '')
- rowv.append(ld['trip_id'].split('_',1)[1] if ld['trip_id'].find('_') != -1 else '')
- rowv.append(ld['trip_id'].split('_',1)[0] if ld['trip_id'].find('_') != -1 else '')
- csvw.writerow(rowv)
- numi+=1
- totalr+=1
- if numi > 100000:
- sio.flush()
- print sio.tell(),'bytes in stringio',numi,'items'
- sio.seek(0)
- curs.copy_from(sio, 'trips', sep="\t",null="")
- numi = 0
- sio.close()
- sio = StringIO.StringIO()
- csvw = unicodecsv.writer(sio,delimiter='\t',quotechar='"')
- print sio.tell(),'bytes in stringio',numi,'items'
- sio.seek(0)
- curs.copy_from(sio, 'trips', sep="\t",null="")
- sio.close()
- sio = None
- conn.commit()
- print 'TRIPS done',time.time()-st,totalr
- if LOAD_STOPTIMES:
- print 'STOPTIMES loading'
- stoptimes_fks = ('trip_id','arrival_time','departure_time','stop_id','stop_sequence','stop_headsign','pickup_type','drop_off_type','shape_dist_traveled','timepoint')
- st = time.time()
- sf = zf.open('stop_times.txt')
- csvr = unicodecsv.reader(sf,delimiter=',',quotechar='"')
- header = None
- numi = 0
- totalr = 0
- sio = StringIO.StringIO()
- csvw = unicodecsv.writer(sio,delimiter='\t',quotechar='"')
- curs.execute('TRUNCATE stop_times')
- for l in csvr:
- if not header:
- header = l
- continue
- ld = dict(zip(header,l))
- rowv = [feed_id,]
- for fk in stoptimes_fks:
- if fk.endswith('_time'):
- val = int(datetime.timedelta(**dict(zip(('hours','minutes','seconds'),map(int,ld[fk].split(':'))))).total_seconds()) if ld[fk] != '' else ''
- else:
- val = ld[fk] if fk in ld else ''
- rowv.append(val)
- csvw.writerow(rowv)
- numi+=1
- totalr+=1
- if numi > 1000000:
- sio.flush()
- print sio.tell(),'bytes in stringio',numi,'items'
- sio.seek(0)
- curs.copy_from(sio, 'stop_times', sep="\t",null="")
- numi = 0
- sio.close()
- sio = StringIO.StringIO()
- csvw = unicodecsv.writer(sio,delimiter='\t',quotechar='"')
- print sio.tell(),'bytes in stringio',numi,'items'
- sio.seek(0)
- curs.copy_from(sio, 'stop_times', sep="\t",null="")
- sio.close()
- sio = None
- conn.commit()
- print 'STOPTIMES done',time.time()-st,totalr
- zf.close()
- conn.set_isolation_level(0)
- st = time.time()
- if LOAD_STOPS: curs.execute('VACUUM ANALYSE stops')
- if LOAD_CALENDAR: curs.execute('VACUUM ANALYSE calendar')
- if LOAD_CALENDAR_DATES: curs.execute('VACUUM ANALYSE calendar_dates')
- if LOAD_ROUTES: curs.execute('VACUUM ANALYSE routes')
- if LOAD_TRIPS: curs.execute('VACUUM ANALYSE trips')
- if LOAD_STOPTIMES: curs.execute('VACUUM ANALYSE stop_times')
- if LOAD_AGENCY: curs.execute('VACUUM ANALYSE agency')
- print 'Vacuum done',time.time()-st
- curs.close()
- conn.close()
- print 'total',time.time()-stt
- if __name__ == '__main__':
- if len(sys.argv) != 2:
- print 'USAGE: python load_gtfs_data.py <gtfs-file>'
- sys.exit(1)
- loadGtfsFileToDb(sys.argv[1])
- '''
- DROP TABLE stops;
- CREATE TABLE stops (
- feed_id text,
- stop_id text PRIMARY KEY,
- stop_code text,
- stop_name text,
- stop_desc text,
- stop_lat double precision,
- stop_lon double precision,
- zone_id text,
- stop_url text,
- location_Type integer,
- parent_station text,
- stop_timezone text,
- wheelchair_boarding integer,
- pos Geometry(Point,4326)
- );
- ALTER TABLE stops OWNER TO joukkoliikenne;
- CREATE INDEX "stops_feed_id_idx" ON stops (feed_id);
- CREATE INDEX "stops_id_idx" ON stops USING gist (stop_id gist_trgm_ops);
- CREATE INDEX "stops_nimi_idx" ON stops USING gist (stop_name gist_trgm_ops);
- CREATE INDEX "stops_pos_gix" ON stops USING gist (pos);
- DROP TABLE routes;
- CREATE TABLE routes (
- feed_id text,
- route_id text PRIMARY KEY,
- agency_id text,
- route_short_name text,
- route_long_name text,
- route_desc text,
- route_type integer,
- route_url text,
- route_color text,
- route_text_color text
- );
- ALTER TABLE routes OWNER TO joukkoliikenne;
- CREATE INDEX "routes_feed_id_idx" ON routes (feed_id);
- CREATE INDEX "routes_agency_id_idx" ON routes (agency_id);
- CREATE INDEX "routes_short_name_idx" ON routes USING gist (route_short_name gist_trgm_ops);
- CREATE INDEX "routes_long_name_idx" ON routes USING gist (route_long_name gist_trgm_ops);
- DROP TABLE trips;
- CREATE TABLE trips (
- feed_id text,
- route_id text,
- service_id text,
- trip_id text PRIMARY KEY,
- trip_headsign text,
- trip_short_name text,
- direction_id integer,
- block_id text,
- shape_id text,
- wheelchair_accessible integer,
- bikes_allowed integer,
- trip_source text,
- trip_source_id text
- );
- ALTER TABLE trips OWNER TO joukkoliikenne;
- CREATE INDEX "trips_feed_id_idx" ON trips (feed_id);
- CREATE INDEX "trips_route_id_idx" ON trips (route_id);
- CREATE INDEX "trips_service_id_idx" ON trips (service_id);
- CREATE INDEX "trips_headsign_idx" ON trips USING gist (trip_headsign gist_trgm_ops);
- CREATE INDEX "trips_trip_short_name_idx" ON trips USING gist (trip_short_name gist_trgm_ops);
- CREATE INDEX "trips_source_idx" ON trips (trip_source);
- CREATE INDEX "trips_source_id_idx" ON trips (trip_source_id);
- DROP TABLE stop_times;
- CREATE TABLE stop_times (
- feed_id text,
- trip_id text,
- arrival_time integer,
- departure_time integer,
- stop_id text,
- stop_sequence integer,
- stop_headsign text,
- pickup_type integer,
- drop_off_type integer,
- shape_dist_traveled real,
- timepoint integer
- );
- ALTER TABLE stop_times OWNER TO joukkoliikenne;
- CREATE INDEX "stop_times_feed_id_idx" ON stop_times (feed_id);
- CREATE INDEX "stop_times_trip_id_idx" ON stop_times (trip_id);
- CREATE INDEX "stop_times_stop_id_idx" ON stop_times (stop_id);
- CREATE INDEX "stop_times_arrival_time_idx" ON stop_times (arrival_time);
- CREATE INDEX "stop_times_departure_time_idx" ON stop_times (departure_time);
- DROP TABLE agency;
- CREATE TABLE agency (
- feed_id text,
- agency_id text PRIMARY KEY,
- agency_name text,
- agency_url text,
- agency_timezone text,
- agency_lang text,
- agency_phone text,
- agency_fare_url text,
- agency_email text
- );
- ALTER TABLE agency OWNER TO joukkoliikenne;
- DROP TABLE calendar;
- CREATE TABLE calendar (
- feed_id text,
- service_id text PRIMARY KEY,
- mon boolean,
- tue boolean,
- wed boolean,
- thu boolean,
- fri boolean,
- sat boolean,
- sun boolean,
- start_date date,
- end_date date
- );
- ALTER TABLE calendar OWNER TO joukkoliikenne;
- DROP TABLE calendar_dates;
- CREATE TABLE calendar_dates (
- feed_id text,
- service_id text,
- date date,
- isrunning boolean,
- PRIMARY KEY (service_id,date)
- );
- ALTER TABLE calendar_dates OWNER TO joukkoliikenne;
- '''
Add Comment
Please, Sign In to add comment