Advertisement
Guest User

Untitled

a guest
Feb 10th, 2016
60
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 5.08 KB | None | 0 0
  1. import urllib2, json, csv
  2. import requests
  3. import itertools
  4. import multiprocessing
  5. import numpy
  6. from datetime import datetime, date, timedelta
  7.  
  8. def getTaxiTrips(date):
  9. """
  10. Gets the taxi trips occurred in NY from a starting date.
  11. :param date: (Y-m-d).
  12. :return: list of tuples (long, lat, drop off date).
  13. """
  14. today = str(datetime.date(datetime.now())).split('-')
  15. today_y = today[0]
  16. today_m = today[1]
  17.  
  18. start = date.split('-')
  19. start_y = start[0]
  20. start_m = start[1]
  21.  
  22. print start_m + "-" + start_y + " / " + today_m + "-" + today_y
  23.  
  24. data = []
  25. y = int(start_y)
  26. m = int(start_m)
  27. while int(start_y) <= int(today_y):
  28. # Month transformation
  29. if m > 12:
  30. m %= 12
  31. y += 1
  32.  
  33. mt = str(m) if m > 9 else '0' + str(m)
  34. # Green cabs
  35. if readCSV("https://storage.googleapis.com/tlc-trip-data/" + str(y) +
  36. "/green_tripdata_" + str(y) + "-" + mt + ".csv") is not None:
  37. data.append("https://storage.googleapis.com/tlc-trip-data/" + str(y) +
  38. "/green_tripdata_" + str(y) + "-" + mt + ".csv")
  39.  
  40. # Yellow cabs
  41. if readCSV("https://storage.googleapis.com/tlc-trip-data/" + str(y) +
  42. "/yellow_tripdata_" + str(y) + "-" + mt + ".csv") is not None:
  43. data.append("https://storage.googleapis.com/tlc-trip-data/" + str(y) +
  44. "/yellow_tripdata_" + str(y) + "-" + mt + ".csv")
  45.  
  46. if m == int(today_m):
  47. break
  48. m += 1
  49.  
  50. pool = multiprocessing.Pool(mps-1)
  51. result = pool.map(consumeTaxiData, data)
  52. pool.close()
  53. pool.join()
  54.  
  55. return list(itertools.chain(*result))
  56.  
  57.  
  58. def consumeTaxiData(url):
  59. """
  60. Given a url, reads its content and process its data.
  61. :param url: the url to be readen.
  62. :return: a list of tuples in the form (long, lat, hour).
  63. """
  64. print "Processing", url
  65. points = []
  66.  
  67. data = readCSV(url)
  68. for line in data:
  69. latitude = line.get('dropoff_latitude', None)
  70. if latitude is None:
  71. latitude = line.get('Dropoff_latitude', None)
  72.  
  73. longitude = line.get('dropoff_longitude', None)
  74. if longitude is None:
  75. longitude = line.get('Dropoff_longitude', None)
  76.  
  77. time = line.get('tpep_dropoff_datetime', None)
  78. if time is None:
  79. time = line.get('Lpep_dropoff_datetime', None)
  80.  
  81. if time is not None:
  82. time = datetime.strptime(time, '%Y-%m-%d %H:%M:%S')
  83. if latitude is not None and longitude is not None and time >= datetime.strptime(date, '%Y-%m-%d') and
  84. time.weekday():
  85. time = roundTime(time, roundTo=60 * 60).hour
  86. points.append((float(longitude), float(latitude), time))
  87.  
  88. return points
  89.  
  90. def readCSV(url):
  91. """
  92. Read a csv file.
  93. :param url: url to be read.
  94. :return: an array of dictionaries.
  95. """
  96. try:
  97. response = urllib2.urlopen(url)
  98. return csv.DictReader(response, delimiter=',')
  99. except urllib2.HTTPError as e:
  100. return None
  101.  
  102. def roundTime(dt=None, roundTo=60):
  103. """
  104. Round a datetime object to any time laps in seconds
  105. :param dt: datetime.datetime object, default now.
  106. :param roundTo: closest number of seconds to round to, default 1 minute.
  107. :return: the rounded time.
  108. """
  109. if dt == None : dt = datetime.now()
  110. seconds = (dt - dt.min).seconds
  111. rounding = (seconds+roundTo/2) // roundTo * roundTo
  112. return dt + timedelta(0, rounding-seconds, -dt.microsecond)
  113.  
  114. if __name__ == '__main__':
  115. mps = multiprocessing.cpu_count()
  116.  
  117. date = str(datetime.date(datetime.now()) - timedelta(31*8))
  118. print "-----> Inital date:", date
  119.  
  120. print "-----> Getting taxi data..."
  121. taxi_dropoffs = getTaxiTrips(date)
  122. print len(taxi_dropoffs), "taxi trips"
  123.  
  124. https://storage.googleapis.com/tlc-trip-data/2015/green_tripdata_2015-06.csv
  125. https://storage.googleapis.com/tlc-trip-data/2015/yellow_tripdata_2015-06.csv
  126.  
  127. https://storage.googleapis.com/tlc-trip-data/2015/green_tripdata_2015-06.csv
  128. https://storage.googleapis.com/tlc-trip-data/2015/green_tripdata_2015-07.csv
  129. https://storage.googleapis.com/tlc-trip-data/2015/green_tripdata_2015-08.csv
  130. https://storage.googleapis.com/tlc-trip-data/2015/yellow_tripdata_2015-08.csv
  131. https://storage.googleapis.com/tlc-trip-data/2015/yellow_tripdata_2015-07.csv
  132. https://storage.googleapis.com/tlc-trip-data/2015/yellow_tripdata_2015-06.csv
  133. https://storage.googleapis.com/tlc-trip-data/2015/green_tripdata_2015-09.csv
  134.  
  135. Traceback (most recent call last):
  136. File "noiseInference.py", line 489, in <module>
  137. taxi_dropoffs = getTaxiTrips(date)
  138. File "noiseInference.py", line 300, in getTaxiTrips
  139. result = pool.map(consumeTaxiData, data)
  140. File "/usr/local/Cellar/python/2.7.11/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/pool.py", line 251, in map
  141. return self.map_async(func, iterable, chunksize).get()
  142. File "/usr/local/Cellar/python/2.7.11/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/pool.py", line 567, in get
  143. raise self._value
  144. socket.error: [Errno 60] Operation timed out
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement