Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- """
- UYD4RSS
- Python 3.6+
- Dependencies:
- beautifulsoup4
- selenium
- webdriver_manager
- --help for options
- """
- import argparse
- import collections
- import datetime
- import logging
- import os
- import platform
- import sys
- import time
- from typing import Iterable, List, Set, Tuple
- from bs4 import BeautifulSoup
- from bs4.element import Tag
- from selenium import webdriver
- from selenium.common.exceptions import WebDriverException
- from webdriver_manager.chrome import ChromeDriverManager
- from webdriver_manager.firefox import GeckoDriverManager
- logging.basicConfig(
- format='%(asctime)s: %(msg)s',
- datefmt='%H:%M:%S %p',
- level=logging.INFO
- )
- DEFAULT_HTML_PATH = os.path.join(os.getcwd(), 'uyd_archive.html')
- DEFAULT_RSS_PATH = os.path.join(os.getcwd(), 'uyd_archive.rss')
- VAULT_ENTRANCE_URL = 'https://uhhyeahdude.com/archive/'
- VAULT_SUCCESS_URL = 'https://uhhyeahdude.com/index.php/archive'
- WEBDRIVERS_PATH = os.path.join(os.getcwd(), '.drivers')
- class UYDEpisode:
- _EPISODE_XML_TEMPLATE = ( # Note this is one implicitly concatenated string literal, not a tuple
- """<item>"""
- """\n\t<title>{title}</title>"""
- """\n\t<pubDate>{dt} -0000</pubDate>"""
- """\n\t<link>{url}</link>"""
- """\n</item>"""
- )
- @staticmethod
- def _parse_date(date_str: str) -> datetime.datetime:
- parts = date_str.split()
- parts[1] = parts[1].strip("stndrh") # '1st', '2nd', '3rd', '4th' become 1, 2, 3, 4
- if len(parts[1]) == 1:
- parts[1] = '0' + parts[1]
- fixed_str = " ".join(parts)
- return datetime.datetime.strptime(fixed_str, '%B %d %Y')
- @staticmethod
- def _parse_number(title: str) -> int:
- # TODO: This works for the data on the page as currently formatted, but could break in the future
- n_str = title.split()[1]
- return int(n_str)
- def __init__(self, title: str, date_str: str, url: str, live: bool, marcia: bool):
- self.title = title
- self.number = self._parse_number(title)
- self.dt = self._parse_date(date_str) # Date parsed from str; time component set to 00:00:00
- self.url = url
- self.supplement = 'Supplement' in title # TODO: Also works for existing data but is fragile
- self.video = self.number == 100 or 'video' in title.lower() # TODO: Gets 100, 500 but check URL or something?
- self.live = live
- self.marcia = marcia
- def __lt__(self, other: 'UYDEpisode'):
- return (self.dt, self.number, self.supplement, self.video, self.title) < \
- (other.dt, other.number, other.supplement, other.video, other.title)
- def __str__(self) -> str:
- s = '{}, {}: {}'.format(self.title, self.dt.strftime('%b %d %Y'), self.url)
- return s
- def __repr__(self):
- return str(self)
- def to_xml(self) -> str:
- # Get non-zero-padded day without platform-specific %#d vs. %-d
- date_str = f"{self.dt.strftime('%B')} {self.dt.day}, {self.dt.year}"
- title_for_xml = self.title + ' ' + date_str
- if self.video and 'video' not in self.title.lower(): # 100 not tagged in title, 500 is
- title_for_xml += ' (Video)'
- if self.live:
- title_for_xml += ' (Live)'
- if self.marcia:
- title_for_xml += ' (feat. Marcia)'
- dt_fmt = '%a, %d %b %Y %H:%M:%S'
- episode_xml = UYDEpisode._EPISODE_XML_TEMPLATE.format(
- title=title_for_xml,
- dt=self.dt.strftime(dt_fmt),
- url=self.url
- )
- return episode_xml
- # Setup
- def get_chromedriver(headless: bool, disable_logging: bool, executable: str) -> webdriver.Chrome:
- options = webdriver.ChromeOptions()
- if headless:
- options.add_argument('headless')
- if disable_logging:
- options.add_experimental_option("excludeSwitches", ['enable-logging'])
- if executable:
- executable_path = executable
- else:
- basename = 'chromedriver' + ('.exe' if platform.system() == 'Windows' else '')
- cwd_path = os.path.join(os.getcwd(), basename)
- if os.path.exists(cwd_path):
- executable_path = cwd_path
- else:
- logging.info('Attempting to download Chromedriver in {}'.format(WEBDRIVERS_PATH))
- try:
- executable_path = ChromeDriverManager(path=WEBDRIVERS_PATH).install()
- except:
- msg = 'Failed to download Chromedriver! Download and place in this directory to proceed.'
- logging.exception(msg)
- sys.exit(1)
- logging.info('Initializing webdriver at {}'.format(executable_path))
- driver = webdriver.Chrome(options=options, executable_path=executable_path)
- return driver
- def get_geckodriver(headless: bool, disable_logging: bool, executable: str) -> webdriver.Firefox:
- options = webdriver.FirefoxOptions()
- options.headless = headless
- ff_kwargs = {}
- if disable_logging:
- ff_kwargs['service_log_path'] = os.path.devnull
- if executable:
- ff_kwargs['executable_path'] = executable
- else:
- basename = 'geckodriver' + ('.exe' if platform.system() == 'Windows' else '')
- cwd_path = os.path.join(os.getcwd(), basename)
- if os.path.exists(cwd_path):
- ff_kwargs['executable_path'] = cwd_path
- else:
- logging.info('Attempting to download Geckodriver in {}'.format(WEBDRIVERS_PATH))
- try:
- ff_kwargs['executable_path'] = GeckoDriverManager(path=WEBDRIVERS_PATH).install()
- except:
- msg = 'Failed to download Geckodriver! Download and place in this directory to proceed.'
- logging.exception(msg)
- sys.exit(1)
- logging.info('Initializing webdriver at {}'.format(ff_kwargs['executable_path']))
- driver = webdriver.Firefox(options=options, **ff_kwargs)
- return driver
- def get_webdriver(browser: str, headless: bool, disable_logging: bool, executable: str):
- if browser == 'firefox':
- return get_geckodriver(headless, disable_logging, executable)
- else:
- return get_chromedriver(headless, disable_logging, executable)
- # User input
- def prompt_credentials() -> Tuple[str, str]:
- print('Please enter the username and password for access to the Uhh Yeah Dude archive.')
- time.sleep(1)
- print('These are NOT the credentials to your Patreon account.')
- time.sleep(1)
- print(
- 'They are the credentials given to all subscribers to the Uhh Yeah Dude Patreon.',
- '\nCheck https://patreon.com/UHHYEAHDUDE or your old emails; make sure to use the most recent ones.'
- )
- username = input('Username: ')
- password = input('Password: ')
- return username, password
- def prompt_int(question: str, min_n: int, max_n: int) -> int:
- if max_n < min_n:
- raise ValueError('Maximum (given {}) must be greater than or equal to minimum (given {})'.format(max_n, min_n))
- question += ' [{}-{}]: '.format(min_n, max_n)
- invalid_msg = 'Please respond with a number from {} to {}'.format(min_n, max_n)
- while True:
- answer_str = input(question)
- try:
- answer = int(answer_str)
- except ValueError:
- print(invalid_msg)
- continue
- else:
- if min_n <= answer <= max_n:
- return answer
- else:
- print(invalid_msg)
- continue
- def prompt_yn(question: str, default: bool) -> bool:
- question += ' [Y/n]: ' if default else ' [y/N]: '
- while True:
- answer = input(question)
- if not answer:
- return default
- elif answer.lower() in {'y', 'yes'}:
- return True
- elif answer.lower() in {'n', 'no'}:
- return False
- else:
- print("Please respond with either 'y' or 'n'")
- continue
- # Navigate the monsterweb
- def enter_credentials(driver: webdriver.Chrome, username: str, password: str):
- driver.get(VAULT_ENTRANCE_URL)
- logging.info('Entrance page loaded')
- username_box = driver.find_element_by_name('username')
- password_box = driver.find_element_by_name('password')
- submit_button = driver.find_element_by_name('submit')
- username_box.send_keys(username)
- password_box.send_keys(password)
- submit_button.click()
- logging.info('Credentials submitted')
- def validate_credentials(driver: webdriver.Chrome) -> bool:
- interval = 3
- time.sleep(interval)
- return driver.current_url == VAULT_SUCCESS_URL
- def scroll_to_bottom(driver: webdriver.Chrome) -> bool:
- last_title_span_text = "Episode 1February 11th 2006"
- # Scroll to bottom; limit breaks to exit if it's not working
- interval = 5
- count = 0
- limit = 15
- while count < limit:
- time.sleep(interval)
- episode_title_spans = driver.find_elements_by_xpath(
- "//div[@id='archive__tab-content-01']//span[@class='archive-ep__title']"
- )
- if episode_title_spans:
- if episode_title_spans[-1].text == last_title_span_text:
- logging.info('All episodes found!')
- return True
- else:
- logging.info('{} episodes found, getting more...'.format(len(episode_title_spans)))
- driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
- count += 1
- else:
- return False
- def scrape_site(browser: str, headless: bool, disable_logging: bool, executable: str) -> str:
- driver = get_webdriver(browser, headless, disable_logging, executable)
- username, password = prompt_credentials()
- error_msg, source, success = '', '', False
- try:
- enter_credentials(driver, username, password)
- success = validate_credentials(driver)
- if not success:
- error_msg = 'Login attempt failed! Check credentials again.'
- else:
- success = scroll_to_bottom(driver)
- if not success:
- error_msg = 'Failed to reach end of archive page!'
- else:
- source = driver.page_source
- except WebDriverException as error:
- logging.error('Fatal error!', exc_info=error)
- finally:
- driver.close()
- if not success:
- logging.critical(error_msg)
- sys.exit(1)
- else:
- return source
- # Process the data
- def parse_episodes(source: str) -> List[UYDEpisode]:
- soup = BeautifulSoup(source, 'html.parser')
- all_episodes_tab_id = "archive__tab-content-01"
- all_episode_divs = get_episode_divs_from_tab(soup, all_episodes_tab_id)
- live_tab_id = "archive__tab-content-02"
- live_episode_divs = get_episode_divs_from_tab(soup, live_tab_id)
- live_episode_urls = get_episode_urls_from_divs(live_episode_divs)
- marcia_tab_id = "archive__tab-content-03"
- marcia_episode_divs = get_episode_divs_from_tab(soup, marcia_tab_id)
- marcia_episode_urls = get_episode_urls_from_divs(marcia_episode_divs)
- episodes = []
- for episode_div in all_episode_divs:
- title_tag = episode_div.find(class_='archive-ep__title')
- date_str = title_tag.span.text
- title_str = title_tag.text[:-len(date_str)]
- url_str = episode_div.find('a', class_="archive-ep__download")['href']
- live = url_str in live_episode_urls
- marcia = url_str in marcia_episode_urls
- episode = UYDEpisode(title_str, date_str, url_str, live, marcia)
- episodes.append(episode)
- logging.info('{} items retrieved!'.format(len(episodes)))
- fix_datetimes(episodes)
- episodes.sort()
- return episodes
- def get_episode_divs_from_tab(soup: BeautifulSoup, tab_id: str) -> Iterable[Tag]:
- return soup.find(id=tab_id).find_all(class_='archive-ep')
- def get_episode_urls_from_divs(divs: Iterable[Tag]) -> Set[str]:
- return {div.find('a', class_="archive-ep__download")['href'] for div in divs}
- def fix_datetimes(episodes: List[UYDEpisode]) -> None:
- """
- We've only parsed episode dates out of the HTML, not times (defaulting to 12:00 a.m.) Many episodes share the same
- date. Arbitrarily increment the times of the latter of multiple episodes with the same date so they sort properly
- in podcast apps.
- """
- # Group episodes by date
- by_date = collections.defaultdict(list)
- for ep in episodes:
- by_date[ep.dt.date()].append(ep)
- for day_eps in by_date.values():
- if len(day_eps) > 1:
- day_eps.sort()
- for i, ep in enumerate(day_eps):
- offset = datetime.timedelta(minutes=i)
- ep.dt += offset
- def filter_episodes(
- episodes: List[UYDEpisode],
- first_ep: int,
- last_ep: int,
- supplements: bool,
- video: bool,
- live: bool,
- marcia: bool
- ) -> List[UYDEpisode]:
- episodes = filter(lambda e: first_ep <= e.number <= last_ep, episodes)
- if not supplements:
- episodes = filter(lambda e: not e.supplement, episodes)
- if not video and (first_ep <= 100 <= last_ep):
- episodes = filter(lambda e: not e.video, episodes)
- if not live:
- episodes = filter(lambda e: not e.live, episodes)
- if not marcia:
- episodes = filter(lambda e: not e.marcia, episodes)
- return list(episodes)
- # Output
- def build_xml(episodes: List[UYDEpisode]) -> str:
- logging.info('Preparing RSS data...')
- header = '<?xml version="1.0" encoding="UTF-8" ?>\n<rss version="2.0">\n<channel>'
- first_ep_n = episodes[0].number
- last_ep_n = episodes[-1].number
- title = '\n<title>Uhh Yeah Dude Archive Episodes {}-{}</title>'.format(first_ep_n, last_ep_n)
- description = '\n<description>America through the eyes of two American Americans</description>'
- language = '\n<language>en-us</language>\n'
- footer = '\n</channel>\n</rss>'
- episodes_xml = '\n'.join(episode.to_xml() for episode in episodes)
- text = header + title + description + language + episodes_xml + footer
- return text
- def print_instructions():
- msg = """
- ## How to use this RSS file?
- - Check if your podcast app allows importing RSS files stored locally.
- - If not, host it somewhere accessible to you. One easy way is as follows:
- ### Dropbox
- - Log in to Dropbox via the web interface and upload the RSS file generated by this program to a non-public folder.
- - Select the uploaded file and find the "Share" button.
- - Select "Create link".
- - The provided link will end with `?dl=0`. Change `0` to `1` when adding this URL to your podcast app.
- - **Ensure that your podcast app is not adding this feed to a public catalog. Most apps that offer such a feature will
- ask you to specify if a feed is private when subscribing. Take care to do so.**
- - (You can also tell your app not to update this feed automatically, since it won't be changing.)
- ## Please don't use this for piracy
- This script is intended for the convenience of paying patrons, not to facilitate the indiscriminate delivery of episodes
- UYD has chosen to put in the vault. Please support the boys!
- - If hosting the RSS file on the internet, make the link available only to yourself.
- - As noted above, ensure your feed is not being added to a public catalog of podcasts.
- - Remain a patron, at whatever level is required for Vault access, for as long as you use these files.
- """
- print(msg)
- def get_options() -> argparse.Namespace:
- arg_parser = argparse.ArgumentParser(
- description='Create an RSS feed from the UYD archives; can scrape site or process an HTML file'
- )
- scrape_options = arg_parser.add_argument_group(title='Scraping options')
- scrape_options.add_argument(
- '--browser', choices=['chrome', 'firefox'], default='chrome',
- help='Specify which browser to use for scraping'
- )
- scrape_options.add_argument(
- '--webdriver-path', default=None, dest='webdriver_executable',
- help=('Specify path to Chrome/Gecko driver executable; if not given, defaults to checking current directory, '
- 'then attempting to download the correct version automatically')
- )
- scrape_options.add_argument(
- '--show-browser', action='store_false', dest='headless',
- help='Make the web browser visible while scraping'
- )
- scrape_options.add_argument(
- '--verbose-browser', action='store_false', dest='disable_browser_log',
- help='Show information logged by web browser while scraping'
- )
- scrape_options.add_argument(
- '--save-source', dest='save_source', default=DEFAULT_HTML_PATH,
- help=('Write archive page source HTML to the given path for future use with the --from-source option; '
- 'defaults to uyd_archive.html')
- )
- parse_options = arg_parser.add_argument_group(title='Parsing options')
- parse_options.add_argument(
- '--from-source', nargs='?', dest='from_source', default=None, const=DEFAULT_HTML_PATH,
- help=('Process HTML file found at given path rather than scraping site; '
- 'if argument present but path not given, defaults to uyd_archive.html')
- )
- parse_options.add_argument(
- '--out-file', dest='out_file', default=DEFAULT_RSS_PATH,
- help='Write RSS file to the given path; if not present, defaults to uyd_archive.rss'
- )
- return arg_parser.parse_args()
- def main() -> None:
- options = get_options()
- if not options.from_source:
- source = scrape_site(
- options.browser, options.headless, options.disable_browser_log, options.webdriver_executable
- )
- with open(options.save_source, 'w', encoding='utf-8') as out_html:
- out_html.write(source)
- logging.info('HTML file written to {}'.format(options.save_source))
- else:
- with open(options.from_source, 'r', encoding='utf-8') as in_html:
- source = in_html.read()
- episodes = parse_episodes(source)
- if not episodes:
- logging.critical('Failed to locate any episodes on page, exiting!')
- sys.exit(1)
- min_ep, max_ep = episodes[0].number, episodes[-1].number
- first_ep = prompt_int('Start at episode', min_ep, max_ep)
- last_ep = prompt_int('End at episode', first_ep, max_ep)
- supplements_prompt = 'Include supplements (a few mini-episodes in the 100s), if any in range?'
- supplements = prompt_yn(supplements_prompt, True)
- video_prompt = 'Include videos, if any in range? (#100: only available as video; #500: audio version also in feed)'
- video = (first_ep <= 100 <= last_ep or first_ep <= 500 <= last_ep) and prompt_yn(video_prompt, False)
- live_prompt = 'Include live episodes?'
- live = prompt_yn(live_prompt, True)
- marcia_prompt = 'Include episodes featuring Marcia Romatelli?'
- marcia = prompt_yn(marcia_prompt, True)
- episodes = filter_episodes(episodes, first_ep, last_ep, supplements, video, live, marcia)
- if not episodes:
- logging.info('No episodes within given parameters, exiting!')
- sys.exit(0)
- logging.info('{} episodes to be included'.format(len(episodes)))
- xml = build_xml(episodes)
- with open(options.out_file, 'w', encoding='utf-8') as f:
- f.write(xml)
- logging.info('RSS file written to {}'.format(options.out_file))
- print_instructions()
- if __name__ == '__main__':
- main()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement