Advertisement
Guest User

Untitled

a guest
Jan 15th, 2019
144
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 7.57 KB | None | 0 0
  1. import psycopg2
  2. import parse
  3. from typing import Optional, Tuple, List, Dict
  4.  
  5.  
  6. class GisPoint(object):
  7. def __init__(self, p_id: int, binary: str):
  8. self.p_id = p_id
  9. self.binary = binary
  10.  
  11. def __str__(self) -> str:
  12. return 'id={}, binary={}'.format(self.p_id, self.binary)
  13.  
  14.  
  15. class GisRoute(object):
  16. def __init__(self, binary: str, length: int):
  17. self.binary = binary
  18. self.length = length
  19.  
  20. def __str__(self) -> str:
  21. return 'binary={}, length={}'.format(self.binary, self.length)
  22.  
  23.  
  24. class PostGisDB(object):
  25. NEAREST_START_POINT_QUERY = "SELECT t.source, ST_SetSrid(ST_MakePoint(t.x1, t.y1), " \
  26. "4326) FROM (SELECT source, x1, y1 FROM hh_2po_4pgr " \
  27. "ORDER BY sqrt((x1 - {})*(x1 - {}) + (y1 - {})*(y1 - {}))" \
  28. "LIMIT 1) as t;"
  29. NEAREST_END_POINT_QUERY = "SELECT t.target, ST_SetSrid(ST_MakePoint(t.x2, t.y2), " \
  30. "4326) FROM (SELECT target, x2, y2 FROM hh_2po_4pgr " \
  31. "ORDER BY sqrt((x2 - {})*(x2 - {}) + (y2 - {})*(y2 - {}))" \
  32. "LIMIT 1) as t;"
  33. SHORTEST_ROUTE_QUERY = "SELECT ST_Union(ARRAY(SELECT geom_way FROM pgr_dijkstra('" \
  34. "SELECT id, source, target, km as cost FROM hh_2po_4pgr', " \
  35. "{}, {}, false) AS route1 LEFT JOIN hh_2po_4pgr as route2 " \
  36. "ON (route1.edge = route2.id) ORDER BY seq));"
  37. UNION_ROUTE_QUERY = "SELECT ST_Union(%s);"
  38. ROUTE_TIME_QUERY = "SELECT geom_way, km / kmh as time FROM pgr_dijkstra('" \
  39. "SELECT id, source, target, km as cost FROM hh_2po_4pgr', " \
  40. "{}, {}, false) AS route1 LEFT JOIN hh_2po_4pgr as route2 " \
  41. "ON (route1.edge = route2.id) ORDER BY seq;"
  42. ROUTE_LENGTH_QUERY = "SELECT ST_Length('{}'::geography);"
  43. LOCATIONS_NEAR_ROUTE_QUERY = "SELECT * FROM (SELECT id, geom_coords, ST_Distance" \
  44. "('{}'::geography, geom_coords::geography) AS dist FROM " \
  45. "locations) AS t WHERE t.dist < {}"
  46. POINTS_DISTANCE_QUERY = "SELECT ST_Distance('{}'::geography, '{}'::geography);"
  47. HUMAN_READABLE_QUERY = "SELECT ST_AsText('{}');"
  48. DUMP_POINTS_QUERY = "SELECT ST_DumpPoints('{}');"
  49.  
  50. FORMAT_STRING_POINT = "POINT({:f} {:f})"
  51. FORMAT_STRING_DUMP_POINTS = "(\"<{:d},{:d}>\",{})"
  52.  
  53. def __init__(self, host: str, port: str, database: str, user: str, password: str):
  54. self.connection = psycopg2.connect(host=host, port=port, database=database,
  55. user=user, password=password)
  56.  
  57. def close(self):
  58. self.connection.close()
  59.  
  60. def get_nearest_start_point(self, x: float, y: float) -> GisPoint:
  61. curr = self.connection.cursor()
  62. curr.execute(PostGisDB.NEAREST_START_POINT_QUERY.format(x, x, y, y))
  63. row = curr.fetchone()
  64.  
  65. return GisPoint(*row)
  66.  
  67. def get_nearest_end_point(self, x: float, y: float) -> GisPoint:
  68. curr = self.connection.cursor()
  69. curr.execute(PostGisDB.NEAREST_END_POINT_QUERY.format(x, x, y, y))
  70. row = curr.fetchone()
  71.  
  72. return GisPoint(*row)
  73.  
  74. def get_shortest_route(self, point_1: GisPoint, point_2: GisPoint) -> Optional[GisRoute]:
  75. curr = self.connection.cursor()
  76.  
  77. curr.execute(PostGisDB.ROUTE_TIME_QUERY.format(point_1.p_id, point_2.p_id))
  78. routes_times = curr.fetchall()
  79. routes = [route[0] for route in routes_times[:-1]]
  80. time = 0
  81. for route in routes_times[:-1]:
  82. time += route[1]
  83.  
  84. curr.execute(PostGisDB.UNION_ROUTE_QUERY, (routes, ))
  85. route = curr.fetchone()[0]
  86. if route is None:
  87. return None
  88.  
  89. # curr.execute(PostGisDB.SHORTEST_ROUTE_QUERY.format(point_1.p_id, point_2.p_id))
  90. # route = curr.fetchone()[0]
  91. # if route is None:
  92. # return None
  93.  
  94. curr.execute(PostGisDB.ROUTE_LENGTH_QUERY.format(route))
  95. route_len = curr.fetchone()[0]
  96.  
  97. return GisRoute(route, route_len)
  98.  
  99. def get_locations_near_route(self, route: GisRoute, max_len: float) -> Tuple[List[GisPoint], List[float]]:
  100. curr = self.connection.cursor()
  101. curr.execute(PostGisDB.LOCATIONS_NEAR_ROUTE_QUERY.format(route.binary, max_len))
  102. rows = curr.fetchall()
  103.  
  104. points_gis = [GisPoint(l[0], l[1]) for l in rows]
  105. route_distances = [l[2] for l in rows]
  106.  
  107. return points_gis, route_distances
  108.  
  109. def get_points_distance(self, point_1: GisPoint, point_2: GisPoint) -> float:
  110. curr = self.connection.cursor()
  111. curr.execute(PostGisDB.POINTS_DISTANCE_QUERY.format(point_1.binary, point_2.binary))
  112. row = curr.fetchone()
  113.  
  114. return row[0]
  115.  
  116. def get_human_readable_point(self, point: GisPoint) -> Dict[str, float]:
  117. curr = self.connection.cursor()
  118. curr.execute(PostGisDB.HUMAN_READABLE_QUERY.format(point.binary))
  119. row = parse.parse(PostGisDB.FORMAT_STRING_POINT, curr.fetchone()[0])
  120. coords = {
  121. 'x': row[0],
  122. 'y': row[1]
  123. }
  124.  
  125. return coords
  126.  
  127. def get_human_readable_route(self, route: GisRoute) -> List[List[Dict[str, float]]]:
  128. curr = self.connection.cursor()
  129. curr.execute(PostGisDB.HUMAN_READABLE_QUERY.format(route.binary))
  130. row = curr.fetchone()[0]
  131.  
  132. curr.execute(PostGisDB.DUMP_POINTS_QUERY.format(row))
  133. rows = curr.fetchall()
  134.  
  135. lines: Dict[int, List[Dict[str, float]]] = {}
  136. for r in rows:
  137. data_line = r[0].replace('{', '<').replace('}', '>')
  138. data = parse.parse(PostGisDB.FORMAT_STRING_DUMP_POINTS, data_line)
  139. if data is None:
  140. continue
  141.  
  142. line_id = data[0]
  143. if line_id not in lines:
  144. lines[line_id] = []
  145.  
  146. binary_point = data[2]
  147. curr.execute(PostGisDB.HUMAN_READABLE_QUERY.format(binary_point))
  148. hr_point = parse.parse(PostGisDB.FORMAT_STRING_POINT, curr.fetchone()[0])
  149. coords = {
  150. 'x': hr_point[0],
  151. 'y': hr_point[1]
  152. }
  153. lines[line_id].append(coords)
  154.  
  155. points = []
  156. for key, val in lines.items():
  157. points.append(val)
  158.  
  159. return points
  160.  
  161.  
  162. def _test_db_queries():
  163. db = PostGisDB(host="localhost", port="5436", database="postgres",
  164. user="postgres", password="mysecretpassword")
  165.  
  166. point_a_gis = db.get_nearest_start_point(22.0, 51.0)
  167. print("Point A: {}".format(point_a_gis))
  168.  
  169. point_b_gis = db.get_nearest_end_point(20.0, 50.0)
  170. print("Point B: {}".format(point_b_gis))
  171.  
  172. route_gis = db.get_shortest_route(point_a_gis, point_b_gis)
  173. print("Shortest route: {}".format(route_gis))
  174.  
  175. near_loc_points_gis, near_loc_route_dist = db.get_locations_near_route(route_gis, 1000.5)
  176. print("Locations near route:")
  177. for p, d in zip(near_loc_points_gis, near_loc_route_dist):
  178. print("Point: {}, distance: {}".format(p, d))
  179.  
  180. distance_a_b = db.get_points_distance(point_a_gis, point_b_gis)
  181. print("Distance A-B: {}".format(distance_a_b))
  182.  
  183. human_readable_point = db.get_human_readable_point(point_a_gis)
  184. print("Human readable point: {}".format(human_readable_point))
  185.  
  186. human_readable_route = db.get_human_readable_route(route_gis)
  187. print("Human readable route: {}".format(human_readable_route))
  188.  
  189.  
  190. if __name__ == "__main__":
  191. _test_db_queries()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement