Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import urllib2, json, csv
- import requests
- import itertools
- import multiprocessing
- import numpy
- from datetime import datetime, date, timedelta
- def getTaxiTrips(date):
- """
- Gets the taxi trips occurred in NY from a starting date.
- :param date: (Y-m-d).
- :return: list of tuples (long, lat, drop off date).
- """
- today = str(datetime.date(datetime.now())).split('-')
- today_y = today[0]
- today_m = today[1]
- start = date.split('-')
- start_y = start[0]
- start_m = start[1]
- print start_m + "-" + start_y + " / " + today_m + "-" + today_y
- data = []
- y = int(start_y)
- m = int(start_m)
- while int(start_y) <= int(today_y):
- # Month transformation
- if m > 12:
- m %= 12
- y += 1
- mt = str(m) if m > 9 else '0' + str(m)
- # Green cabs
- if readCSV("https://storage.googleapis.com/tlc-trip-data/" + str(y) +
- "/green_tripdata_" + str(y) + "-" + mt + ".csv") is not None:
- data.append("https://storage.googleapis.com/tlc-trip-data/" + str(y) +
- "/green_tripdata_" + str(y) + "-" + mt + ".csv")
- # Yellow cabs
- if readCSV("https://storage.googleapis.com/tlc-trip-data/" + str(y) +
- "/yellow_tripdata_" + str(y) + "-" + mt + ".csv") is not None:
- data.append("https://storage.googleapis.com/tlc-trip-data/" + str(y) +
- "/yellow_tripdata_" + str(y) + "-" + mt + ".csv")
- if m == int(today_m):
- break
- m += 1
- pool = multiprocessing.Pool(mps-1)
- result = pool.map(consumeTaxiData, data)
- pool.close()
- pool.join()
- return list(itertools.chain(*result))
- def consumeTaxiData(url):
- """
- Given a url, reads its content and process its data.
- :param url: the url to be readen.
- :return: a list of tuples in the form (long, lat, hour).
- """
- print "Processing", url
- points = []
- data = readCSV(url)
- for line in data:
- latitude = line.get('dropoff_latitude', None)
- if latitude is None:
- latitude = line.get('Dropoff_latitude', None)
- longitude = line.get('dropoff_longitude', None)
- if longitude is None:
- longitude = line.get('Dropoff_longitude', None)
- time = line.get('tpep_dropoff_datetime', None)
- if time is None:
- time = line.get('Lpep_dropoff_datetime', None)
- if time is not None:
- time = datetime.strptime(time, '%Y-%m-%d %H:%M:%S')
- if latitude is not None and longitude is not None and time >= datetime.strptime(date, '%Y-%m-%d') and
- time.weekday():
- time = roundTime(time, roundTo=60 * 60).hour
- points.append((float(longitude), float(latitude), time))
- return points
- def readCSV(url):
- """
- Read a csv file.
- :param url: url to be read.
- :return: an array of dictionaries.
- """
- try:
- response = urllib2.urlopen(url)
- return csv.DictReader(response, delimiter=',')
- except urllib2.HTTPError as e:
- return None
- def roundTime(dt=None, roundTo=60):
- """
- Round a datetime object to any time laps in seconds
- :param dt: datetime.datetime object, default now.
- :param roundTo: closest number of seconds to round to, default 1 minute.
- :return: the rounded time.
- """
- if dt == None : dt = datetime.now()
- seconds = (dt - dt.min).seconds
- rounding = (seconds+roundTo/2) // roundTo * roundTo
- return dt + timedelta(0, rounding-seconds, -dt.microsecond)
- if __name__ == '__main__':
- mps = multiprocessing.cpu_count()
- date = str(datetime.date(datetime.now()) - timedelta(31*8))
- print "-----> Inital date:", date
- print "-----> Getting taxi data..."
- taxi_dropoffs = getTaxiTrips(date)
- print len(taxi_dropoffs), "taxi trips"
- https://storage.googleapis.com/tlc-trip-data/2015/green_tripdata_2015-06.csv
- https://storage.googleapis.com/tlc-trip-data/2015/yellow_tripdata_2015-06.csv
- https://storage.googleapis.com/tlc-trip-data/2015/green_tripdata_2015-06.csv
- https://storage.googleapis.com/tlc-trip-data/2015/green_tripdata_2015-07.csv
- https://storage.googleapis.com/tlc-trip-data/2015/green_tripdata_2015-08.csv
- https://storage.googleapis.com/tlc-trip-data/2015/yellow_tripdata_2015-08.csv
- https://storage.googleapis.com/tlc-trip-data/2015/yellow_tripdata_2015-07.csv
- https://storage.googleapis.com/tlc-trip-data/2015/yellow_tripdata_2015-06.csv
- https://storage.googleapis.com/tlc-trip-data/2015/green_tripdata_2015-09.csv
- Traceback (most recent call last):
- File "noiseInference.py", line 489, in <module>
- taxi_dropoffs = getTaxiTrips(date)
- File "noiseInference.py", line 300, in getTaxiTrips
- result = pool.map(consumeTaxiData, data)
- File "/usr/local/Cellar/python/2.7.11/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/pool.py", line 251, in map
- return self.map_async(func, iterable, chunksize).get()
- File "/usr/local/Cellar/python/2.7.11/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/pool.py", line 567, in get
- raise self._value
- socket.error: [Errno 60] Operation timed out
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement