Advertisement
arekay115

nomination_osm.py

Apr 29th, 2025
29
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 15.40 KB | None | 0 0
  1. """
  2. ----------------------------------------------------------------------------------------------------
  3. GeoGuessr Coordinate Geocoding and Enhancement Script
  4.  
  5. Description:
  6. This script processes a CSV file containing latitude and longitude coordinates, uses the
  7. OpenStreetMap Nominatim API to geocode the coordinates, and enriches the data with
  8. country codes, administrative regions, and other location details. It also calculates
  9. the distance between the original coordinates and the geocoded coordinates.
  10.  
  11. Requirements:
  12. 1. Have a csv file named 'coordinates.csv' with two columns named latitude and longitude
  13. in the same directory as this script.
  14. 2. Be able to access 'https://nominatim.openstreetmap.org/reverse.php?lat={latitude}&lon={longitude}&format=jsonv2&accept-language=en'
  15.  
  16. Usage:
  17. 1. Run the script. The script will read the coordinates from 'coordinates.csv' in the
  18. same directory, geocode them using the Nominatim API, and save the enriched data
  19. to 'coordinates_output.csv' in the same directory.
  20. 2. The script includes a 1-second delay between API requests to prevent overloading
  21. the Nominatim server.
  22.  
  23. Notes:
  24. - The script handles various exceptions, including API errors and invalid coordinates.
  25. - The output CSV will contain additional columns such as country code, country, state,
  26. city, road, distance, and geocoding status.
  27. - The script uses tqdm to display a progress bar.
  28. ----------------------------------------------------------------------------------------------------
  29. """
  30.  
  31. import io
  32. import sys
  33.  
  34. sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8')
  35.  
  36. import requests
  37. import json
  38. import csv
  39. import time
  40. import os
  41. import math
  42.  
  43. from tqdm import tqdm
  44.  
  45. def haversine(lat1, lon1, lat2, lon2):
  46. """
  47. Calculate the great circle distance between two points
  48. on the earth (specified in decimal degrees)
  49. """
  50. # Convert decimal degrees to radians
  51. lat1, lon1, lat2, lon2 = map(math.radians, [lat1, lon1, lat2, lon2])
  52.  
  53. # Haversine formula
  54. dlat = lat2 - lat1
  55. dlon = lon2 - lon1
  56. a = math.sin(dlat/2)**2 + math.cos(lat1) * math.cos(lat2) * math.sin(dlon/2)**2
  57. c = 2 * math.asin(math.sqrt(a))
  58. r = 6371000 # Radius of earth in meters
  59. return c * r
  60.  
  61. def get_country_code_and_details(latitude, longitude):
  62. api_url = f"https://nominatim.openstreetmap.org/reverse.php?lat={latitude}&lon={longitude}&format=jsonv2&accept-language=en"
  63.  
  64. headers = {
  65. "User-Agent": "MyGeoCodingApp/1.1 ([email protected])"
  66. }
  67.  
  68. try:
  69. response = requests.get(api_url, headers=headers)
  70. response.raise_for_status()
  71. data = response.json()
  72.  
  73. if "error" in data:
  74. if "Unable to geocode" in data["error"]: #check if the error contains the unable to geocode string.
  75. geocoding_status = "NO RESULTS"
  76. else:
  77. geocoding_status = "API ERROR"
  78. print(f"Geocoding error: {data['error']} (Lat: {latitude}, Lon: {longitude})")
  79. return "", "", "", "", "", "", "", "", "", "", "", "", geocoding_status
  80.  
  81. if data.get("address") is None:
  82. geocoding_status = "NO RESULTS"
  83. return "", "", "", "", "", "", "", "", "", "", "", "", geocoding_status
  84.  
  85. country_code = ""
  86. country = ""
  87. city = ""
  88. state = ""
  89. road = ""
  90. county = "" # Added county
  91. display_name = ""
  92. nominatim_lat = data.get("lat")
  93. nominatim_lon = data.get("lon")
  94. geocoding_status = "SUCCESS"
  95. iso3166_2 = ""
  96.  
  97. if data.get("address"):
  98. if data["address"].get("country_code"):
  99. country_code = data["address"]["country_code"].lower() #change to lower here.
  100. country = data["address"].get("country", "")
  101.  
  102. city_options = ["city", "town", "village", "hamlet", "locality", "municipality"]
  103. for option in city_options:
  104. if data["address"].get(option):
  105. city = data["address"].get(option)
  106. break
  107.  
  108. state_options = ["state", "province", "region", "department", "district"]
  109. for option in state_options:
  110. if data["address"].get(option):
  111. state = data["address"].get(option)
  112. break
  113.  
  114. # Get county
  115. county = data["address"].get("county", "")
  116.  
  117. road = data["address"].get("road", "")
  118.  
  119. # Find any key that starts with "ISO3166-2-lvl"
  120. iso_keys = [key for key in data["address"].keys() if key.startswith("ISO3166-2-lvl")]
  121. if iso_keys:
  122. iso3166_2 = data["address"][iso_keys[0]] # Access the first element of the list
  123.  
  124. # Territory Code Corrections
  125. if country_code == 'us':
  126. address = data["address"]
  127. if (
  128. "puerto rico" in str(address.get("state", "")).lower()
  129. or "puerto rico" in str(address.get("county", "")).lower()
  130. or "puerto rico" in str(data.get("display_name", "")).lower()
  131. ):
  132. country_code = 'pr'
  133. elif data["address"].get('ISO3166-2-lvl4') == 'US-PR':
  134. country_code = 'pr'
  135. elif (
  136. "american samoa" in str(address.get("state", "")).lower()
  137. or "american samoa" in str(address.get("county", "")).lower()
  138. or "american samoa" in str(data.get("display_name", "")).lower()
  139. ):
  140. country_code = 'as'
  141. elif data["address"].get('ISO3166-2-lvl4') == 'US-AS':
  142. country_code = 'as'
  143. elif (
  144. "guam" in str(address.get("state", "")).lower()
  145. or "guam" in str(address.get("county", "")).lower()
  146. or "guam" in str(data.get("display_name", "")).lower()
  147. ):
  148. country_code = 'gu'
  149. elif data["address"].get('ISO3166-2-lvl4') == 'US-GU':
  150. country_code = 'gu'
  151. elif (
  152. "northern mariana islands" in str(address.get("state", "")).lower()
  153. or "northern mariana islands" in str(address.get("county", "")).lower()
  154. or "northern mariana islands" in str(data.get("display_name", "")).lower()
  155. ):
  156. country_code = 'mp'
  157. elif data["address"].get('ISO3166-2-lvl4') == 'US-MP':
  158. country_code = 'mp'
  159. elif (
  160. "virgin islands" in str(address.get("state", "")).lower()
  161. or "virgin islands" in str(address.get("county", "")).lower()
  162. or "virgin islands" in str(data.get("display_name", "")).lower()
  163. ):
  164. country_code = 'vi'
  165. elif data["address"].get('ISO3166-2-lvl4') == 'US-VI':
  166. country_code = 'vi'
  167.  
  168. # US Minor Outlying Islands Bounding Box
  169. um_min_lat = 22.0
  170. um_max_lat = 31.4
  171. um_min_lon = -180.0
  172. um_max_lon = -161.5
  173.  
  174. if um_min_lat <= latitude <= um_max_lat and um_min_lon <= longitude <= um_max_lon:
  175. country_code = 'um'
  176.  
  177. display_name = data.get("display_name", "")
  178.  
  179. if country_code == 'es':
  180. if "canary islands" in data.get("display_name", "").lower():
  181. country_code = 'es_cn'
  182. elif data["address"].get('ISO3166-2-lvl4') == 'ES-CN':
  183. country_code = 'es_cn'
  184.  
  185. if country_code == 'pt':
  186. # Madeira Bounding Box
  187. pt_ma_min_lat = 32.0
  188. pt_ma_max_lat = 33.5
  189. pt_ma_min_lon = -17.5
  190. pt_ma_max_lon = -15.8
  191.  
  192. # Azores Bounding Box
  193. pt_az_min_lat = 35.8
  194. pt_az_max_lat = 40.5
  195. pt_az_min_lon = -32.3
  196. pt_az_max_lon = -24.2
  197.  
  198. if pt_ma_min_lat <= latitude <= pt_ma_max_lat and pt_ma_min_lon <= longitude <= pt_ma_max_lon:
  199. country_code = 'pt_ma'
  200. elif pt_az_min_lat <= latitude <= pt_az_max_lat and pt_az_min_lon <= longitude <= pt_az_max_lon:
  201. country_code = 'pt_az'
  202.  
  203. if country_code == 'fr':
  204. # Reunion Bounding Box
  205. fr_re_min_lat = -21.5
  206. fr_re_max_lat = -20.75
  207. fr_re_min_lon = 55.0
  208. fr_re_max_lon = 56.1
  209.  
  210. if fr_re_min_lat <= latitude <= fr_re_max_lat and fr_re_min_lon <= longitude <= fr_re_max_lon:
  211. country_code = 'fr_re'
  212.  
  213. if country_code == 'no':
  214. # Svalbard Bounding Box
  215. sj_min_lat = 75.6
  216. sj_max_lat = 81.2
  217. sj_min_lon = 7.0
  218. sj_max_lon = 35.79
  219.  
  220. if sj_min_lat <= latitude <= sj_max_lat and sj_min_lon <= longitude <= sj_max_lon:
  221. country_code = 'sj'
  222.  
  223. if country_code == 'au':
  224. # Christmas Island Bounding Box
  225. cx_min_lat = -10.7
  226. cx_max_lat = -10.2
  227. cx_min_lon = 105.4
  228. cx_max_lon = 105.9
  229.  
  230. # Cocos (Keeling) Islands Bounding Box
  231. cc_min_lat = -12.3
  232. cc_max_lat = -11.7
  233. cc_min_lon = 96.6
  234. cc_max_lon = 97.1
  235.  
  236. if data["address"].get('territory') == 'Cocos (Keeling) Islands' or (cc_min_lat <= latitude <= cc_max_lat and cc_min_lon <= longitude <= cc_max_lon):
  237. country_code = 'cc'
  238. elif data["address"].get('territory') == 'Christmas Island' or (cx_min_lat <= latitude <= cx_max_lat and cx_min_lon <= longitude <= cx_max_lon):
  239. country_code = 'cx'
  240.  
  241. if country_code == 'nl':
  242. # Curacao Bounding Box
  243. cw_min_lat = 11.96
  244. cw_max_lat = 12.56
  245. cw_min_lon = -69.34
  246. cw_max_lon = -68.59
  247.  
  248. if data["address"].get('ISO3166-2-lvl3') == 'NL-CW' or (cw_min_lat <= latitude <= cw_max_lat and cw_min_lon <= longitude <= cw_max_lon):
  249. country_code = 'cw'
  250.  
  251. if country_code == 'cn':
  252. if data["address"].get('ISO3166-2-lvl3') == 'CN-HK':
  253. country_code = 'hk'
  254. elif data["address"].get('ISO3166-2-lvl3') == 'CN-MO':
  255. country_code = 'mo'
  256. display_name = data.get("display_name", "")
  257.  
  258. return country_code, country, state, road, city, county, nominatim_lat, nominatim_lon, display_name, latitude, longitude, iso3166_2, geocoding_status
  259.  
  260. except requests.exceptions.RequestException as e:
  261. # ... (exception handling remains the same) ...
  262. return "", "", "", "", "", "", "", "", "", "", "", "", "API ERROR"
  263. except json.JSONDecodeError as e:
  264. # ... (exception handling remains the same) ...
  265. return "", "", "", "", "", "", "", "", "", "", "", "", "JSON ERROR"
  266. except Exception as e:
  267. # ... (exception handling remains the same) ...
  268. return "", "", "", "", "", "", "", "", "", "", "", "", "UNEXPECTED ERROR"
  269.  
  270. def process_coordinates_from_csv(input_file, output_file):
  271. try:
  272. with open(input_file, 'r', encoding='utf-8') as infile, open(output_file, 'w', newline='', encoding='utf-8-sig') as outfile:
  273. reader = csv.DictReader(infile)
  274. coordinates = list(reader)
  275. num_coordinates = len(coordinates)
  276. print(f"Found {num_coordinates} coordinates in '{input_file}'.")
  277.  
  278. # Define new field order
  279. new_fieldnames = [
  280. 'latitude',
  281. 'longitude',
  282. 'iso3166_2',
  283. 'country_code',
  284. 'country',
  285. 'state',
  286. 'county',
  287. 'city',
  288. 'road',
  289. 'lat',
  290. 'lon',
  291. 'distance',
  292. 'display_name',
  293. 'Geocoding Status'
  294. ]
  295.  
  296. writer = csv.DictWriter(outfile, fieldnames=new_fieldnames)
  297. writer.writeheader()
  298.  
  299. for row in tqdm(coordinates, desc="Processing Coordinates"):
  300. new_row = {}
  301.  
  302. try:
  303. original_latitude = float(row['latitude'])
  304. original_longitude = float(row['longitude'])
  305.  
  306. # Copy original latitude and longitude
  307. new_row['latitude'] = row['latitude']
  308. new_row['longitude'] = row['longitude']
  309.  
  310. except ValueError:
  311. print(f"Error: Invalid latitude or longitude in row: {row}")
  312. for field in new_fieldnames:
  313. if field in ['latitude', 'longitude']:
  314. new_row[field] = row.get(field, "INVALID DATA")
  315. else:
  316. new_row[field] = "INVALID DATA"
  317. writer.writerow(new_row)
  318. continue
  319. except KeyError as e:
  320. print(f"Error: missing column {e} in row: {row}")
  321. break
  322.  
  323. country_code, country, state, road, city, county, nominatim_lat, nominatim_lon, display_name, original_lat_returned, original_lon_returned, iso3166_2, geocoding_status = get_country_code_and_details(original_latitude, original_longitude)
  324.  
  325. new_row['iso3166_2'] = iso3166_2
  326. new_row['country_code'] = country_code.lower() if isinstance(country_code, str) else country_code
  327. new_row['country'] = country
  328. new_row['state'] = state
  329. new_row['county'] = county
  330. new_row['city'] = city
  331. new_row['road'] = road
  332. new_row['lat'] = nominatim_lat
  333. new_row['lon'] = nominatim_lon
  334.  
  335. if nominatim_lat and nominatim_lon and isinstance(nominatim_lat, str) and isinstance(nominatim_lon, str):
  336. try:
  337. new_row['distance'] = haversine(original_latitude, original_longitude, float(nominatim_lat), float(nominatim_lon))
  338. except ValueError:
  339. new_row['distance'] = "DISTANCE ERROR"
  340. else:
  341. new_row['distance'] = "NO DISTANCE"
  342.  
  343. new_row['display_name'] = display_name
  344. new_row['Geocoding Status'] = geocoding_status
  345.  
  346. writer.writerow(new_row)
  347. time.sleep(1)
  348.  
  349. except FileNotFoundError:
  350. print(f"Error: Input file '{input_file}' not found.")
  351. except Exception as e:
  352. print(f"An error occurred: {e}")
  353.  
  354. if __name__ == "__main__":
  355. script_directory = os.path.dirname(os.path.abspath(__file__))
  356. input_csv_file = os.path.join(script_directory, "coordinates.csv")
  357. output_csv_file = os.path.join(script_directory, "coordinates_output.csv")
  358.  
  359. print(f"Input file path: {input_csv_file}")
  360. print(f"Output file path: {output_csv_file}")
  361.  
  362. process_coordinates_from_csv(input_csv_file, output_csv_file)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement