Guest User

Untitled

a guest
Apr 13th, 2018
119
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 14.77 KB | None | 0 0
  1. import os,sys
  2. import zipfile
  3. import psycopg2
  4. import psycopg2.extensions
  5. import datetime
  6. import time
  7. psycopg2.extensions.register_type(psycopg2.extensions.UNICODE)
  8. psycopg2.extensions.register_type(psycopg2.extensions.UNICODEARRAY)
  9.  
  10. from psycopg2.extras import execute_batch
  11.  
  12. import unicodecsv
  13. import pprint
  14. import json
  15. import StringIO
  16.  
  17.  
  18. LOAD_STOPS = True
  19. LOAD_CALENDAR = True
  20. LOAD_CALENDAR_DATES = True
  21. LOAD_AGENCY = True
  22. LOAD_ROUTES = True
  23. LOAD_TRIPS = True
  24. LOAD_STOPTIMES = True
  25.  
  26.  
  27.  
  28. def loadGtfsFileToDb(filename):
  29. assert os.path.exists(filename)
  30. stt = time.time()
  31. zf = zipfile.ZipFile(filename)
  32.  
  33.  
  34. conn = psycopg2.connect(host='localhost',port=5432,dbname='joukkoliikenne',user='xxxyyy',password='yyyxxx')
  35. curs = conn.cursor()
  36.  
  37.  
  38. feed_id = 'matka'
  39.  
  40.  
  41. if LOAD_STOPS:
  42. print 'STOPS loading'
  43. st = time.time()
  44. sf = zf.open('stops.txt')
  45. csvr = unicodecsv.reader(sf,delimiter=',',quotechar='"')
  46. header = None
  47. insert_vals = []
  48.  
  49. for l in csvr:
  50. if not header:
  51. header = l
  52. continue
  53. ld = dict(zip(header,l))
  54. ld['stop_lat'],ld['stop_lon'] = map(float,(ld['stop_lat'],ld['stop_lon']))
  55. svals = {'feed_id':feed_id}
  56. for sk in ('stop_id','stop_code','stop_name','stop_desc','stop_lat','stop_lon','zone_id','stop_url','location_type','parent_station','stop_timezone','wheelchair_boarding'):
  57. svals[sk] = ld[sk] if sk in ld else None
  58. insert_vals.append(svals)
  59. sf.close()
  60.  
  61. curs.execute('TRUNCATE stops')
  62. print 'STOPS executemany',len(insert_vals),'stops'
  63. execute_batch(curs,'INSERT INTO stops (feed_id,stop_id,stop_code,stop_name,stop_desc,stop_lat,stop_lon,zone_id,stop_url,location_type,parent_station,stop_timezone,wheelchair_boarding,pos) VALUES (%(feed_id)s,%(stop_id)s,%(stop_code)s,%(stop_name)s,%(stop_desc)s,%(stop_lat)s,%(stop_lon)s,%(zone_id)s,%(stop_url)s,%(location_type)s,%(parent_station)s,%(stop_timezone)s,%(wheelchair_boarding)s,ST_SetSRID(ST_MakePoint(%(stop_lat)s,%(stop_lon)s),4326))',insert_vals)
  64. conn.commit()
  65. print 'STOPS done',time.time()-st,len(insert_vals)
  66.  
  67.  
  68.  
  69.  
  70. if LOAD_CALENDAR:
  71. print 'CALENDAR loading'
  72. st = time.time()
  73. sf = zf.open('calendar.txt')
  74. csvr = unicodecsv.reader(sf,delimiter=',',quotechar='"')
  75. header = None
  76. insert_vals = []
  77.  
  78. for l in csvr:
  79. if not header:
  80. header = l
  81. continue
  82. ld = dict(zip(header,l))
  83. #lat,lon = map(float,(ld['stop_lat'],ld['stop_lon']))
  84. insert_vals.append((feed_id,ld['service_id'],
  85. ld['monday'] == '1',ld['tuesday'] == '1',ld['wednesday'] == '1',
  86. ld['thursday'] == '1',ld['friday'] == '1',ld['saturday'] == '1',ld['sunday'] == '1',
  87. datetime.datetime.strptime(ld['start_date'],'%Y%m%d').date(),
  88. datetime.datetime.strptime(ld['end_date'],'%Y%m%d').date()
  89. ))
  90.  
  91.  
  92. sf.close()
  93.  
  94. curs.execute('TRUNCATE calendar')
  95. execute_batch(curs,'INSERT INTO calendar (feed_id,service_id,mon,tue,wed,thu,fri,sat,sun,start_date,end_date) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)',insert_vals)
  96. conn.commit()
  97. print 'CALENDAR done',time.time()-st,len(insert_vals)
  98.  
  99.  
  100. if LOAD_CALENDAR_DATES:
  101. print 'CALENDAR DATES loading'
  102. st = time.time()
  103. sf = zf.open('calendar_dates.txt')
  104. csvr = unicodecsv.reader(sf,delimiter=',',quotechar='"')
  105. header = None
  106. insert_vals = []
  107.  
  108. for l in csvr:
  109. if not header:
  110. header = l
  111. continue
  112. ld = dict(zip(header,l))
  113. #lat,lon = map(float,(ld['stop_lat'],ld['stop_lon']))
  114. insert_vals.append((feed_id,ld['service_id'],
  115. datetime.datetime.strptime(ld['date'],'%Y%m%d').date(),
  116. ld['exception_type'] == '1'
  117. ))
  118.  
  119.  
  120. sf.close()
  121.  
  122. curs.execute('TRUNCATE calendar_dates')
  123. execute_batch(curs,'INSERT INTO calendar_dates (feed_id,service_id,date,isrunning) VALUES (%s,%s,%s,%s)',insert_vals)
  124. conn.commit()
  125. print 'CALENDAR DATES done',time.time()-st,len(insert_vals)
  126.  
  127.  
  128. if LOAD_AGENCY:
  129. print 'AGENCY loading'
  130. st = time.time()
  131. sf = zf.open('agency.txt')
  132. csvr = unicodecsv.reader(sf,delimiter=',',quotechar='"')
  133. header = None
  134. insert_vals = []
  135. agency_keys = ('agency_id','agency_name','agency_url','agency_timezone','agency_lang','agency_phone','agency_fare_url','agency_email')
  136. for l in csvr:
  137. if not header:
  138. header = l
  139. continue
  140. ld = dict(zip(header,l))
  141. #lat,lon = map(float,(ld['stop_lat'],ld['stop_lon']))
  142. avals = {'feed_id':feed_id}
  143.  
  144. for ak in agency_keys:
  145. avals[ak] = ld[ak] if ak in ld else None
  146. insert_vals.append(avals)
  147.  
  148.  
  149. sf.close()
  150.  
  151.  
  152.  
  153. curs.execute('TRUNCATE agency')
  154. execute_batch(curs,'INSERT INTO agency (feed_id,agency_id,agency_name,agency_url,agency_timezone,agency_lang,agency_phone,agency_fare_url,agency_email) VALUES (%(feed_id)s,%(agency_id)s,%(agency_name)s,%(agency_url)s,%(agency_timezone)s,%(agency_lang)s,%(agency_phone)s,%(agency_fare_url)s,%(agency_email)s)',insert_vals)
  155. conn.commit()
  156. print 'AGENCY done',time.time()-st,len(insert_vals)
  157.  
  158. if LOAD_ROUTES:
  159. route_fks = ('route_id','agency_id','route_short_name','route_long_name','route_desc','route_type','route_url','route_color','route_text_color')
  160.  
  161. print 'ROUTES loading'
  162. st = time.time()
  163. sf = zf.open('routes.txt')
  164. csvr = unicodecsv.reader(sf,delimiter=',',quotechar='"')
  165. header = None
  166. numi = 0
  167. totalr = 0
  168. sio = StringIO.StringIO()
  169. csvw = unicodecsv.writer(sio,delimiter='\t',quotechar='"')
  170.  
  171.  
  172. curs.execute('TRUNCATE routes')
  173.  
  174.  
  175.  
  176. for l in csvr:
  177. if not header:
  178. header = l
  179. continue
  180. ld = dict(zip(header,l))
  181.  
  182. rowv = [feed_id,]
  183. for fk in route_fks:
  184. rowv.append(ld[fk] if fk in ld else '')
  185. csvw.writerow(rowv)
  186. numi+=1
  187. totalr+=1
  188.  
  189.  
  190.  
  191. if numi > 5000:
  192. sio.flush()
  193. print sio.tell(),'bytes in stringio',numi,'items'
  194. sio.seek(0)
  195. curs.copy_from(sio, 'routes', sep="\t",null="")
  196. numi = 0
  197. sio.close()
  198. sio = StringIO.StringIO()
  199. csvw = unicodecsv.writer(sio,delimiter='\t',quotechar='"')
  200.  
  201.  
  202. print sio.tell(),'bytes in stringio',numi,'items'
  203. sio.seek(0)
  204. curs.copy_from(sio, 'routes', sep="\t",null="")
  205. sio.close()
  206. sio = None
  207. conn.commit()
  208. print 'ROUTES done',time.time()-st,totalr
  209.  
  210.  
  211. if LOAD_TRIPS:
  212. print 'TRIPS loading'
  213.  
  214. trip_fks = ('route_id','service_id','trip_id','trip_headsign','trip_short_name','direction_id','block_id','shape_id','wheelchair_accessible','bikes_allowed')
  215.  
  216. st = time.time()
  217. sf = zf.open('trips.txt')
  218. csvr = unicodecsv.reader(sf,delimiter=',',quotechar='"')
  219. header = None
  220. numi = 0
  221. totalr = 0
  222. sio = StringIO.StringIO()
  223. csvw = unicodecsv.writer(sio,delimiter='\t',quotechar='"')
  224.  
  225.  
  226. curs.execute('TRUNCATE trips')
  227.  
  228. for l in csvr:
  229. if not header:
  230. header = l
  231. continue
  232. ld = dict(zip(header,l))
  233.  
  234.  
  235. rowv = [feed_id,]
  236. for fk in trip_fks:
  237. rowv.append(ld[fk] if fk in ld else '')
  238.  
  239. rowv.append(ld['trip_id'].split('_',1)[1] if ld['trip_id'].find('_') != -1 else '')
  240. rowv.append(ld['trip_id'].split('_',1)[0] if ld['trip_id'].find('_') != -1 else '')
  241. csvw.writerow(rowv)
  242.  
  243. numi+=1
  244. totalr+=1
  245.  
  246.  
  247. if numi > 100000:
  248. sio.flush()
  249. print sio.tell(),'bytes in stringio',numi,'items'
  250. sio.seek(0)
  251. curs.copy_from(sio, 'trips', sep="\t",null="")
  252. numi = 0
  253. sio.close()
  254. sio = StringIO.StringIO()
  255. csvw = unicodecsv.writer(sio,delimiter='\t',quotechar='"')
  256.  
  257.  
  258. print sio.tell(),'bytes in stringio',numi,'items'
  259. sio.seek(0)
  260. curs.copy_from(sio, 'trips', sep="\t",null="")
  261. sio.close()
  262. sio = None
  263. conn.commit()
  264. print 'TRIPS done',time.time()-st,totalr
  265.  
  266. if LOAD_STOPTIMES:
  267. print 'STOPTIMES loading'
  268.  
  269. stoptimes_fks = ('trip_id','arrival_time','departure_time','stop_id','stop_sequence','stop_headsign','pickup_type','drop_off_type','shape_dist_traveled','timepoint')
  270.  
  271. st = time.time()
  272. sf = zf.open('stop_times.txt')
  273. csvr = unicodecsv.reader(sf,delimiter=',',quotechar='"')
  274. header = None
  275. numi = 0
  276. totalr = 0
  277. sio = StringIO.StringIO()
  278. csvw = unicodecsv.writer(sio,delimiter='\t',quotechar='"')
  279.  
  280.  
  281. curs.execute('TRUNCATE stop_times')
  282.  
  283. for l in csvr:
  284. if not header:
  285. header = l
  286. continue
  287. ld = dict(zip(header,l))
  288.  
  289.  
  290. rowv = [feed_id,]
  291. for fk in stoptimes_fks:
  292. if fk.endswith('_time'):
  293. val = int(datetime.timedelta(**dict(zip(('hours','minutes','seconds'),map(int,ld[fk].split(':'))))).total_seconds()) if ld[fk] != '' else ''
  294. else:
  295. val = ld[fk] if fk in ld else ''
  296. rowv.append(val)
  297.  
  298. csvw.writerow(rowv)
  299.  
  300. numi+=1
  301. totalr+=1
  302.  
  303.  
  304. if numi > 1000000:
  305. sio.flush()
  306. print sio.tell(),'bytes in stringio',numi,'items'
  307. sio.seek(0)
  308. curs.copy_from(sio, 'stop_times', sep="\t",null="")
  309. numi = 0
  310. sio.close()
  311. sio = StringIO.StringIO()
  312. csvw = unicodecsv.writer(sio,delimiter='\t',quotechar='"')
  313.  
  314.  
  315. print sio.tell(),'bytes in stringio',numi,'items'
  316. sio.seek(0)
  317. curs.copy_from(sio, 'stop_times', sep="\t",null="")
  318. sio.close()
  319. sio = None
  320. conn.commit()
  321. print 'STOPTIMES done',time.time()-st,totalr
  322.  
  323.  
  324. zf.close()
  325.  
  326. conn.set_isolation_level(0)
  327. st = time.time()
  328. if LOAD_STOPS: curs.execute('VACUUM ANALYSE stops')
  329. if LOAD_CALENDAR: curs.execute('VACUUM ANALYSE calendar')
  330. if LOAD_CALENDAR_DATES: curs.execute('VACUUM ANALYSE calendar_dates')
  331. if LOAD_ROUTES: curs.execute('VACUUM ANALYSE routes')
  332. if LOAD_TRIPS: curs.execute('VACUUM ANALYSE trips')
  333. if LOAD_STOPTIMES: curs.execute('VACUUM ANALYSE stop_times')
  334. if LOAD_AGENCY: curs.execute('VACUUM ANALYSE agency')
  335. print 'Vacuum done',time.time()-st
  336. curs.close()
  337. conn.close()
  338. print 'total',time.time()-stt
  339.  
  340.  
  341. if __name__ == '__main__':
  342. if len(sys.argv) != 2:
  343. print 'USAGE: python load_gtfs_data.py <gtfs-file>'
  344. sys.exit(1)
  345.  
  346. loadGtfsFileToDb(sys.argv[1])
  347.  
  348. '''
  349. DROP TABLE stops;
  350. CREATE TABLE stops (
  351. feed_id text,
  352. stop_id text PRIMARY KEY,
  353. stop_code text,
  354. stop_name text,
  355. stop_desc text,
  356. stop_lat double precision,
  357. stop_lon double precision,
  358. zone_id text,
  359. stop_url text,
  360. location_Type integer,
  361. parent_station text,
  362. stop_timezone text,
  363. wheelchair_boarding integer,
  364. pos Geometry(Point,4326)
  365. );
  366. ALTER TABLE stops OWNER TO joukkoliikenne;
  367. CREATE INDEX "stops_feed_id_idx" ON stops (feed_id);
  368. CREATE INDEX "stops_id_idx" ON stops USING gist (stop_id gist_trgm_ops);
  369. CREATE INDEX "stops_nimi_idx" ON stops USING gist (stop_name gist_trgm_ops);
  370. CREATE INDEX "stops_pos_gix" ON stops USING gist (pos);
  371.  
  372.  
  373. DROP TABLE routes;
  374. CREATE TABLE routes (
  375. feed_id text,
  376. route_id text PRIMARY KEY,
  377. agency_id text,
  378. route_short_name text,
  379. route_long_name text,
  380. route_desc text,
  381. route_type integer,
  382. route_url text,
  383. route_color text,
  384. route_text_color text
  385. );
  386. ALTER TABLE routes OWNER TO joukkoliikenne;
  387. CREATE INDEX "routes_feed_id_idx" ON routes (feed_id);
  388. CREATE INDEX "routes_agency_id_idx" ON routes (agency_id);
  389. CREATE INDEX "routes_short_name_idx" ON routes USING gist (route_short_name gist_trgm_ops);
  390. CREATE INDEX "routes_long_name_idx" ON routes USING gist (route_long_name gist_trgm_ops);
  391.  
  392. DROP TABLE trips;
  393. CREATE TABLE trips (
  394. feed_id text,
  395. route_id text,
  396. service_id text,
  397. trip_id text PRIMARY KEY,
  398. trip_headsign text,
  399. trip_short_name text,
  400. direction_id integer,
  401. block_id text,
  402. shape_id text,
  403. wheelchair_accessible integer,
  404. bikes_allowed integer,
  405. trip_source text,
  406. trip_source_id text
  407. );
  408. ALTER TABLE trips OWNER TO joukkoliikenne;
  409. CREATE INDEX "trips_feed_id_idx" ON trips (feed_id);
  410. CREATE INDEX "trips_route_id_idx" ON trips (route_id);
  411. CREATE INDEX "trips_service_id_idx" ON trips (service_id);
  412. CREATE INDEX "trips_headsign_idx" ON trips USING gist (trip_headsign gist_trgm_ops);
  413. CREATE INDEX "trips_trip_short_name_idx" ON trips USING gist (trip_short_name gist_trgm_ops);
  414. CREATE INDEX "trips_source_idx" ON trips (trip_source);
  415. CREATE INDEX "trips_source_id_idx" ON trips (trip_source_id);
  416.  
  417.  
  418. DROP TABLE stop_times;
  419. CREATE TABLE stop_times (
  420. feed_id text,
  421. trip_id text,
  422. arrival_time integer,
  423. departure_time integer,
  424. stop_id text,
  425. stop_sequence integer,
  426. stop_headsign text,
  427. pickup_type integer,
  428. drop_off_type integer,
  429. shape_dist_traveled real,
  430. timepoint integer
  431. );
  432. ALTER TABLE stop_times OWNER TO joukkoliikenne;
  433. CREATE INDEX "stop_times_feed_id_idx" ON stop_times (feed_id);
  434. CREATE INDEX "stop_times_trip_id_idx" ON stop_times (trip_id);
  435. CREATE INDEX "stop_times_stop_id_idx" ON stop_times (stop_id);
  436. CREATE INDEX "stop_times_arrival_time_idx" ON stop_times (arrival_time);
  437. CREATE INDEX "stop_times_departure_time_idx" ON stop_times (departure_time);
  438.  
  439.  
  440. DROP TABLE agency;
  441. CREATE TABLE agency (
  442. feed_id text,
  443. agency_id text PRIMARY KEY,
  444. agency_name text,
  445. agency_url text,
  446. agency_timezone text,
  447. agency_lang text,
  448. agency_phone text,
  449. agency_fare_url text,
  450. agency_email text
  451. );
  452. ALTER TABLE agency OWNER TO joukkoliikenne;
  453.  
  454. DROP TABLE calendar;
  455. CREATE TABLE calendar (
  456. feed_id text,
  457. service_id text PRIMARY KEY,
  458. mon boolean,
  459. tue boolean,
  460. wed boolean,
  461. thu boolean,
  462. fri boolean,
  463. sat boolean,
  464. sun boolean,
  465. start_date date,
  466. end_date date
  467. );
  468. ALTER TABLE calendar OWNER TO joukkoliikenne;
  469.  
  470. DROP TABLE calendar_dates;
  471. CREATE TABLE calendar_dates (
  472. feed_id text,
  473. service_id text,
  474. date date,
  475. isrunning boolean,
  476. PRIMARY KEY (service_id,date)
  477. );
  478. ALTER TABLE calendar_dates OWNER TO joukkoliikenne;
  479. '''
Add Comment
Please, Sign In to add comment