Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- """
- ----------------------------------------------------------------------------------------------------
- GeoGuessr Coordinate Geocoding and Enhancement Script
- Description:
- This script processes a CSV file containing latitude and longitude coordinates, uses the
- OpenStreetMap Nominatim API to geocode the coordinates, and enriches the data with
- country codes, administrative regions, and other location details. It also calculates
- the distance between the original coordinates and the geocoded coordinates.
- Requirements:
- 1. Have a csv file named 'coordinates.csv' with two columns named latitude and longitude
- in the same directory as this script.
- 2. Be able to access 'https://nominatim.openstreetmap.org/reverse.php?lat={latitude}&lon={longitude}&format=jsonv2&accept-language=en'
- Usage:
- 1. Run the script. The script will read the coordinates from 'coordinates.csv' in the
- same directory, geocode them using the Nominatim API, and save the enriched data
- to 'coordinates_output.csv' in the same directory.
- 2. The script includes a 1-second delay between API requests to prevent overloading
- the Nominatim server.
- Notes:
- - The script handles various exceptions, including API errors and invalid coordinates.
- - The output CSV will contain additional columns such as country code, country, state,
- city, road, distance, and geocoding status.
- - The script uses tqdm to display a progress bar.
- ----------------------------------------------------------------------------------------------------
- """
- import io
- import sys
- sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8')
- import requests
- import json
- import csv
- import time
- import os
- import math
- from tqdm import tqdm
- def haversine(lat1, lon1, lat2, lon2):
- """
- Calculate the great circle distance between two points
- on the earth (specified in decimal degrees)
- """
- # Convert decimal degrees to radians
- lat1, lon1, lat2, lon2 = map(math.radians, [lat1, lon1, lat2, lon2])
- # Haversine formula
- dlat = lat2 - lat1
- dlon = lon2 - lon1
- a = math.sin(dlat/2)**2 + math.cos(lat1) * math.cos(lat2) * math.sin(dlon/2)**2
- c = 2 * math.asin(math.sqrt(a))
- r = 6371000 # Radius of earth in meters
- return c * r
- def get_country_code_and_details(latitude, longitude):
- api_url = f"https://nominatim.openstreetmap.org/reverse.php?lat={latitude}&lon={longitude}&format=jsonv2&accept-language=en"
- headers = {
- "User-Agent": "MyGeoCodingApp/1.1 ([email protected])"
- }
- try:
- response = requests.get(api_url, headers=headers)
- response.raise_for_status()
- data = response.json()
- if "error" in data:
- if "Unable to geocode" in data["error"]: #check if the error contains the unable to geocode string.
- geocoding_status = "NO RESULTS"
- else:
- geocoding_status = "API ERROR"
- print(f"Geocoding error: {data['error']} (Lat: {latitude}, Lon: {longitude})")
- return "", "", "", "", "", "", "", "", "", "", "", "", geocoding_status
- if data.get("address") is None:
- geocoding_status = "NO RESULTS"
- return "", "", "", "", "", "", "", "", "", "", "", "", geocoding_status
- country_code = ""
- country = ""
- city = ""
- state = ""
- road = ""
- county = "" # Added county
- display_name = ""
- nominatim_lat = data.get("lat")
- nominatim_lon = data.get("lon")
- geocoding_status = "SUCCESS"
- iso3166_2 = ""
- if data.get("address"):
- if data["address"].get("country_code"):
- country_code = data["address"]["country_code"].lower() #change to lower here.
- country = data["address"].get("country", "")
- city_options = ["city", "town", "village", "hamlet", "locality", "municipality"]
- for option in city_options:
- if data["address"].get(option):
- city = data["address"].get(option)
- break
- state_options = ["state", "province", "region", "department", "district"]
- for option in state_options:
- if data["address"].get(option):
- state = data["address"].get(option)
- break
- # Get county
- county = data["address"].get("county", "")
- road = data["address"].get("road", "")
- # Find any key that starts with "ISO3166-2-lvl"
- iso_keys = [key for key in data["address"].keys() if key.startswith("ISO3166-2-lvl")]
- if iso_keys:
- iso3166_2 = data["address"][iso_keys[0]] # Access the first element of the list
- # Territory Code Corrections
- if country_code == 'us':
- address = data["address"]
- if (
- "puerto rico" in str(address.get("state", "")).lower()
- or "puerto rico" in str(address.get("county", "")).lower()
- or "puerto rico" in str(data.get("display_name", "")).lower()
- ):
- country_code = 'pr'
- elif data["address"].get('ISO3166-2-lvl4') == 'US-PR':
- country_code = 'pr'
- elif (
- "american samoa" in str(address.get("state", "")).lower()
- or "american samoa" in str(address.get("county", "")).lower()
- or "american samoa" in str(data.get("display_name", "")).lower()
- ):
- country_code = 'as'
- elif data["address"].get('ISO3166-2-lvl4') == 'US-AS':
- country_code = 'as'
- elif (
- "guam" in str(address.get("state", "")).lower()
- or "guam" in str(address.get("county", "")).lower()
- or "guam" in str(data.get("display_name", "")).lower()
- ):
- country_code = 'gu'
- elif data["address"].get('ISO3166-2-lvl4') == 'US-GU':
- country_code = 'gu'
- elif (
- "northern mariana islands" in str(address.get("state", "")).lower()
- or "northern mariana islands" in str(address.get("county", "")).lower()
- or "northern mariana islands" in str(data.get("display_name", "")).lower()
- ):
- country_code = 'mp'
- elif data["address"].get('ISO3166-2-lvl4') == 'US-MP':
- country_code = 'mp'
- elif (
- "virgin islands" in str(address.get("state", "")).lower()
- or "virgin islands" in str(address.get("county", "")).lower()
- or "virgin islands" in str(data.get("display_name", "")).lower()
- ):
- country_code = 'vi'
- elif data["address"].get('ISO3166-2-lvl4') == 'US-VI':
- country_code = 'vi'
- # US Minor Outlying Islands Bounding Box
- um_min_lat = 22.0
- um_max_lat = 31.4
- um_min_lon = -180.0
- um_max_lon = -161.5
- if um_min_lat <= latitude <= um_max_lat and um_min_lon <= longitude <= um_max_lon:
- country_code = 'um'
- display_name = data.get("display_name", "")
- if country_code == 'es':
- if "canary islands" in data.get("display_name", "").lower():
- country_code = 'es_cn'
- elif data["address"].get('ISO3166-2-lvl4') == 'ES-CN':
- country_code = 'es_cn'
- if country_code == 'pt':
- # Madeira Bounding Box
- pt_ma_min_lat = 32.0
- pt_ma_max_lat = 33.5
- pt_ma_min_lon = -17.5
- pt_ma_max_lon = -15.8
- # Azores Bounding Box
- pt_az_min_lat = 35.8
- pt_az_max_lat = 40.5
- pt_az_min_lon = -32.3
- pt_az_max_lon = -24.2
- if pt_ma_min_lat <= latitude <= pt_ma_max_lat and pt_ma_min_lon <= longitude <= pt_ma_max_lon:
- country_code = 'pt_ma'
- elif pt_az_min_lat <= latitude <= pt_az_max_lat and pt_az_min_lon <= longitude <= pt_az_max_lon:
- country_code = 'pt_az'
- if country_code == 'fr':
- # Reunion Bounding Box
- fr_re_min_lat = -21.5
- fr_re_max_lat = -20.75
- fr_re_min_lon = 55.0
- fr_re_max_lon = 56.1
- if fr_re_min_lat <= latitude <= fr_re_max_lat and fr_re_min_lon <= longitude <= fr_re_max_lon:
- country_code = 'fr_re'
- if country_code == 'no':
- # Svalbard Bounding Box
- sj_min_lat = 75.6
- sj_max_lat = 81.2
- sj_min_lon = 7.0
- sj_max_lon = 35.79
- if sj_min_lat <= latitude <= sj_max_lat and sj_min_lon <= longitude <= sj_max_lon:
- country_code = 'sj'
- if country_code == 'au':
- # Christmas Island Bounding Box
- cx_min_lat = -10.7
- cx_max_lat = -10.2
- cx_min_lon = 105.4
- cx_max_lon = 105.9
- # Cocos (Keeling) Islands Bounding Box
- cc_min_lat = -12.3
- cc_max_lat = -11.7
- cc_min_lon = 96.6
- cc_max_lon = 97.1
- if data["address"].get('territory') == 'Cocos (Keeling) Islands' or (cc_min_lat <= latitude <= cc_max_lat and cc_min_lon <= longitude <= cc_max_lon):
- country_code = 'cc'
- elif data["address"].get('territory') == 'Christmas Island' or (cx_min_lat <= latitude <= cx_max_lat and cx_min_lon <= longitude <= cx_max_lon):
- country_code = 'cx'
- if country_code == 'nl':
- # Curacao Bounding Box
- cw_min_lat = 11.96
- cw_max_lat = 12.56
- cw_min_lon = -69.34
- cw_max_lon = -68.59
- if data["address"].get('ISO3166-2-lvl3') == 'NL-CW' or (cw_min_lat <= latitude <= cw_max_lat and cw_min_lon <= longitude <= cw_max_lon):
- country_code = 'cw'
- if country_code == 'cn':
- if data["address"].get('ISO3166-2-lvl3') == 'CN-HK':
- country_code = 'hk'
- elif data["address"].get('ISO3166-2-lvl3') == 'CN-MO':
- country_code = 'mo'
- display_name = data.get("display_name", "")
- return country_code, country, state, road, city, county, nominatim_lat, nominatim_lon, display_name, latitude, longitude, iso3166_2, geocoding_status
- except requests.exceptions.RequestException as e:
- # ... (exception handling remains the same) ...
- return "", "", "", "", "", "", "", "", "", "", "", "", "API ERROR"
- except json.JSONDecodeError as e:
- # ... (exception handling remains the same) ...
- return "", "", "", "", "", "", "", "", "", "", "", "", "JSON ERROR"
- except Exception as e:
- # ... (exception handling remains the same) ...
- return "", "", "", "", "", "", "", "", "", "", "", "", "UNEXPECTED ERROR"
- def process_coordinates_from_csv(input_file, output_file):
- try:
- with open(input_file, 'r', encoding='utf-8') as infile, open(output_file, 'w', newline='', encoding='utf-8-sig') as outfile:
- reader = csv.DictReader(infile)
- coordinates = list(reader)
- num_coordinates = len(coordinates)
- print(f"Found {num_coordinates} coordinates in '{input_file}'.")
- # Define new field order
- new_fieldnames = [
- 'latitude',
- 'longitude',
- 'iso3166_2',
- 'country_code',
- 'country',
- 'state',
- 'county',
- 'city',
- 'road',
- 'lat',
- 'lon',
- 'distance',
- 'display_name',
- 'Geocoding Status'
- ]
- writer = csv.DictWriter(outfile, fieldnames=new_fieldnames)
- writer.writeheader()
- for row in tqdm(coordinates, desc="Processing Coordinates"):
- new_row = {}
- try:
- original_latitude = float(row['latitude'])
- original_longitude = float(row['longitude'])
- # Copy original latitude and longitude
- new_row['latitude'] = row['latitude']
- new_row['longitude'] = row['longitude']
- except ValueError:
- print(f"Error: Invalid latitude or longitude in row: {row}")
- for field in new_fieldnames:
- if field in ['latitude', 'longitude']:
- new_row[field] = row.get(field, "INVALID DATA")
- else:
- new_row[field] = "INVALID DATA"
- writer.writerow(new_row)
- continue
- except KeyError as e:
- print(f"Error: missing column {e} in row: {row}")
- break
- country_code, country, state, road, city, county, nominatim_lat, nominatim_lon, display_name, original_lat_returned, original_lon_returned, iso3166_2, geocoding_status = get_country_code_and_details(original_latitude, original_longitude)
- new_row['iso3166_2'] = iso3166_2
- new_row['country_code'] = country_code.lower() if isinstance(country_code, str) else country_code
- new_row['country'] = country
- new_row['state'] = state
- new_row['county'] = county
- new_row['city'] = city
- new_row['road'] = road
- new_row['lat'] = nominatim_lat
- new_row['lon'] = nominatim_lon
- if nominatim_lat and nominatim_lon and isinstance(nominatim_lat, str) and isinstance(nominatim_lon, str):
- try:
- new_row['distance'] = haversine(original_latitude, original_longitude, float(nominatim_lat), float(nominatim_lon))
- except ValueError:
- new_row['distance'] = "DISTANCE ERROR"
- else:
- new_row['distance'] = "NO DISTANCE"
- new_row['display_name'] = display_name
- new_row['Geocoding Status'] = geocoding_status
- writer.writerow(new_row)
- time.sleep(1)
- except FileNotFoundError:
- print(f"Error: Input file '{input_file}' not found.")
- except Exception as e:
- print(f"An error occurred: {e}")
- if __name__ == "__main__":
- script_directory = os.path.dirname(os.path.abspath(__file__))
- input_csv_file = os.path.join(script_directory, "coordinates.csv")
- output_csv_file = os.path.join(script_directory, "coordinates_output.csv")
- print(f"Input file path: {input_csv_file}")
- print(f"Output file path: {output_csv_file}")
- process_coordinates_from_csv(input_csv_file, output_csv_file)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement