Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- # -*- coding: utf-8 -*-
- import math
- import pandas as pd
- from pathlib import Path
- import time
- import numpy as np
- from selenium.webdriver.firefox.options import Options
- from selenium import webdriver
- from selenium.webdriver.common.keys import Keys
- from selenium.webdriver.common.action_chains import ActionChains
- from selenium.webdriver.common.by import By
- from selenium.webdriver.support.ui import WebDriverWait
- from selenium.webdriver.support import expected_conditions as EC
- from bs4 import BeautifulSoup
- # search parameters
- search_date = '18.12.2019'
- search_time = '08:00'
- time_dep_arr = 1 # 1 time is departue, 0 is arrival (?!)
- # Hamburg HBF coords
- outFile = 'C:/WorkExchange/Python/Git/OPNV/HH_HBF.xlsx'
- coords = [(53.82, 9.40), #NW
- (53.82, 10.50), #NE
- (53.20, 9.40), #SW
- (53.20, 10.50) #SE
- ]
- # Kernbereich Hamburg
- #coords = [(53.644949, 9.732696), #NW
- # (53.644949, 10.321555), #NE
- # (53.418800, 9.732696), #SW
- # (53.418800, 10.321555) #SE
- # ]
- destination = "Hamburg Hbf"
- destination_city = "Hamburg"
- destination_type = "STATION" # oder COORDINATE
- destination_coord = (53.55297, 10.0069)
- # grid spacing in meters
- spacing = 100
- def calc_deg_increments(meters, coords_list):
- # returns (lat,lon) mean degree increments representing x meters in coords area
- avg_lat = sum([l[0] for l in coords_list]) / len(coords_list)
- # earth circle in meters at avg_lat
- circle = 6372800 * 2 * 3.141592 * math.cos(math.radians(avg_lat))
- # meters in degrees at avg_lat
- lat_dist_in_deg = 360 / (circle / meters)
- # longitude distance in degrees along great circle
- lon_dist_in_deg = 360 / (6372800 * 2 * 3.141592 / meters)
- return (lat_dist_in_deg, lon_dist_in_deg)
- def haversine(coord1, coord2):
- R = 6372800 # Earth radius in meters
- lat1, lon1 = coord1
- lat2, lon2 = coord2
- phi1, phi2 = math.radians(lat1), math.radians(lat2)
- dphi = math.radians(lat2 - lat1)
- dlambda = math.radians(lon2 - lon1)
- a = math.sin(dphi/2)**2 + \
- math.cos(phi1)*math.cos(phi2)*math.sin(dlambda/2)**2
- return 2*R*math.atan2(math.sqrt(a), math.sqrt(1 - a))
- def string_to_minutes(inStr):
- """ Input is string in format '1:15 h' and returns minutes """
- def build_url(start_x, start_y, destination, destination_city, destination_type,
- search_date, search_time, time_dep_arr):
- #start_x = "9.9771741231548"
- #start_y = "53.55145216841"
- url = "https://geofox.hvv.de/jsf/home.seam?execute=true&" +\
- f"date={search_date}&time={search_time}&" +\
- "language=de&start=&startCity=Hamburg&startType=COORDINATE&" +\
- f"startX={start_x}&startY={start_y}2&" +\
- f"destination={destination}&" +\
- f"destinationCity={destination_city}&destinationType={destination_type}&" +\
- f"timeIsDeparture={time_dep_arr}&wayBy="
- return url
- if __name__ == '__main__':
- # check if output already exists, if then continue, else create new
- if Path(outFile).is_file():
- outDF = pd.read_excel(outFile)
- else:
- outDF = pd.DataFrame(columns=['org_coord',
- 'org_lat',
- 'org_lon',
- 'opnv_time',
- 'bike_time',
- 'opnv_changes',
- 'dist_meters'])
- #create new driver
- options = webdriver.firefox.options.Options()
- #options.add_argument('-headless')
- driver = webdriver.Firefox(options=options)
- # calculate degree steps for lat and lon
- spacing_deg = calc_deg_increments(spacing, coords)
- cur_lat = coords[0][0]
- cur_lon = coords[0][1]
- # close coords
- #cur_lat = 53.521251
- #cur_lon = 9.979911
- # fail coords
- #cur_lat = 53.540028
- #cur_lon = 19.929517
- while cur_lat > coords[3][0]:
- while cur_lon < coords[3][1]:
- cur_lat = round(cur_lat, 6)
- cur_lon = round(cur_lon, 6)
- coord_tuple = (cur_lat, cur_lon)
- # skip distances already fetched
- if str(coord_tuple) in outDF['org_coord'].to_list():
- cur_lon = cur_lon + spacing_deg[1]
- print(str(coord_tuple) + " skipped" )
- continue
- dist_meters = haversine((cur_lat, cur_lon), destination_coord)
- url = build_url(cur_lon, cur_lat, destination, destination_city, destination_type,
- search_date, search_time, time_dep_arr)
- # wait for schedule table to load, if fail -> wait 120s
- try:
- driver.get(url)
- if "Mit diesen Angaben können wir leider keine Verbindung für Sie finden" in driver.page_source:
- cur_lon = cur_lon + spacing_deg[1]
- continue
- if "Es ist ein Fehler aufgetreten" in driver.page_source:
- time.sleep(30)
- cur_lon = cur_lon + spacing_deg[1]
- continue
- element = WebDriverWait(driver, 10).until(
- EC.presence_of_element_located((By.CLASS_NAME, "c-schedule-table"))
- )
- except:
- if "Mit diesen Angaben können wir leider keine Verbindung für Sie finden" in driver.page_source:
- cur_lon = cur_lon + spacing_deg[1]
- continue
- time.sleep(30)
- driver.get(url)
- if "Mit diesen Angaben können wir leider keine Verbindung für Sie finden" in driver.page_source:
- cur_lon = cur_lon + spacing_deg[1]
- continue
- if "Es ist ein Fehler aufgetreten" in driver.page_source:
- time.sleep(30)
- cur_lon = cur_lon + spacing_deg[1]
- continue
- element = WebDriverWait(driver, 10).until(
- EC.presence_of_element_located((By.CLASS_NAME, "c-schedule-table"))
- )
- # get site html in beautifulsoup as well (should work parallel to clickEntries)
- blankHTML_all = driver.page_source
- soup_all = BeautifulSoup(blankHTML_all, "html5lib")
- soup_sched_table = soup_all.find('div', {'class':'c-schedule-table'})
- # OPNV time
- try:
- soup_duration_tags = soup_sched_table.find_all('div', {'class': lambda x: 'duration' in x})
- times_list = []
- for time_tag in soup_duration_tags:
- time_string = time_tag.text
- time_text_split = time_string.split(":")
- # remove all letters from second part
- time_text_split[1] = [ ''.join(i for i in time_text_split[1] if i.isdigit())][0]
- # caluclate time in minutes
- time_int = int(time_text_split[0])*60 + int(time_text_split[1])
- times_list.append(time_int)
- opnv_time = min(times_list)
- # OPNV Umstiege
- soup_change_tags = soup_sched_table.find_all('div', {'class': lambda x: 'change' in x})
- opnv_changes = min([int(x.text) for x in soup_change_tags])
- except:
- time.sleep(30)
- cur_lon = cur_lon + spacing_deg[1]
- continue
- # bike travel time
- bike_button = driver.find_element_by_xpath('/html/body/div[3]/div/div/main/div/app-root/app-search-container/app-search/div/div[3]/div/div[2]/div/ul/li[4]/button/span[2]')
- driver.execute_script("arguments[0].scrollIntoView();", bike_button)
- bike_button.click()
- time.sleep(3)
- if "Es ist ein Fehler aufgetreten" in driver.page_source:
- time.sleep(30)
- cur_lon = cur_lon + spacing_deg[1]
- continue
- try:
- element = WebDriverWait(driver, 10).until(
- EC.presence_of_element_located((By.CSS_SELECTOR, ".wrapper > div:nth-child(2) > div:nth-child(1) > app-bikepath-result:nth-child(1) > div:nth-child(1) > div:nth-child(3)"))
- )
- blankHTML_bike = driver.page_source
- soup_bike = BeautifulSoup(blankHTML_bike, "html5lib")
- bike_container = soup_bike.find_all('div', {'class': 'content ng-star-inserted'})
- bike_time_list= []
- for bike_tags in bike_container:
- bike_str = bike_tags.text
- bike_time1 = int(bike_str[ bike_str.find('Dauer:') + 6 : bike_str.find('Min.') ].strip())
- bike_time_list.append(bike_time1)
- bike_time= min(bike_time_list)
- except:
- bike_time = np.NaN
- cur_lon = cur_lon + spacing_deg[1]
- continue
- outDF = outDF.append({'org_coord':coord_tuple,
- 'org_lat':cur_lat,
- 'org_lon':cur_lon,
- 'opnv_time':opnv_time,
- 'bike_time':bike_time,
- 'opnv_changes':opnv_changes,
- 'dist_meters':dist_meters},
- ignore_index=True)
- if len(outDF) % 100 == 0:
- print(str(len(outDF)) + " done")
- outDF.drop_duplicates(inplace=True)
- outDF.to_excel(outFile, index=False)
- elif len(outDF) % 20 == 0:
- outDF.to_excel(outFile, index=False)
- cur_lon = cur_lon + spacing_deg[1]
- cur_lon = coords[0][1]
- cur_lat = cur_lat - spacing_deg[0]
- outDF.drop_duplicates(inplace=True)
- outDF.to_excel(outFile, index=False)
- driver.close()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement