Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- """
- ITV
- Author: stabbedbybrick
- Info:
- ITV L3 is 720p, AAC 2.0 max
- """
- from __future__ import annotations
- import json
- import subprocess
- import time
- from collections import Counter
- from pathlib import Path
- import click
- import requests
- from bs4 import BeautifulSoup
- from utils.args import get_args
- from utils.cdm import LocalCDM
- from utils.config import Config
- from utils.options import get_downloads
- from utils.titles import Episode, Movie, Movies, Series
- from utils.utilities import (
- append_id,
- construct_pssh,
- convert_subtitles,
- force_numbering,
- get_wvd,
- in_cache,
- set_filename,
- set_save_path,
- string_cleaning,
- update_cache,
- )
- class ITV(Config):
- def __init__(self, config, **kwargs):
- super().__init__(config, **kwargs)
- with self.config["download_cache"].open("r") as file:
- self.cache = json.load(file)
- self.client.headers = {
- 'accept': '*/*',
- 'accept-encoding': 'gzip, deflate',
- 'accept-language': 'en-US,en;q=0.9',
- 'sec-fetch-dest': 'document',
- 'sec-fetch-mode': 'navigate',
- 'sec-fetch-site': 'none',
- 'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36',
- 'Connection': 'keep-alive'
- }
- self.get_options()
- def get_license(self, challenge: bytes, lic_url: str) -> bytes:
- r = self.client.post(url=lic_url, data=challenge)
- r.raise_for_status()
- return r.content
- def get_keys(self, pssh: str, lic_url: str) -> bytes:
- wvd = get_wvd(Path.cwd())
- widevine = LocalCDM(wvd)
- challenge = widevine.challenge(pssh)
- response = self.get_license(challenge, lic_url)
- return widevine.parse(response)
- def get_data(self, url: str) -> dict:
- r = self.client.get(url)
- r.raise_for_status()
- soup = BeautifulSoup(r.text, "html.parser")
- props = soup.select_one("#__NEXT_DATA__").text
- data = json.loads(props)
- return data["props"]["pageProps"]
- def get_series(self, url: str) -> Series:
- data = self.get_data(url)
- return Series(
- [
- Episode(
- id_=episode["episodeId"],
- service="ITV",
- title=data["programme"]["title"],
- season=episode.get("series")
- if isinstance(episode.get("series"), int)
- else 0,
- number=episode.get("episode")
- if isinstance(episode.get("episode"), int)
- else 0,
- name=episode["episodeTitle"],
- year=None,
- data=episode["playlistUrl"],
- description=episode.get("description"),
- )
- for series in data["seriesList"]
- if "Latest episodes" not in series["seriesLabel"]
- for episode in series["titles"]
- ]
- )
- def get_movies(self, url: str) -> Movies:
- data = self.get_data(url)
- return Movies(
- [
- Movie(
- id_=movie["episodeId"],
- service="ITV",
- title=data["programme"]["title"],
- year=movie.get("productionYear"),
- name=data["programme"]["title"],
- data=movie["playlistUrl"],
- synopsis=movie.get("description"),
- )
- for movies in data["seriesList"]
- for movie in movies["titles"]
- ]
- )
- def get_playlist(self, playlist: str) -> tuple:
- headers = {
- "Accept": "application/vnd.itv.vod.playlist.v4+json",
- "Accept-Language": "en-US,en;q=0.9,da;q=0.8",
- "Connection": "keep-alive",
- "Content-Type": "application/json",
- "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36",
- }
- payload = {
- "client": {"version":"4.1","id":"browser","supportsAdPods":True,"service":"itv.x","appversion":"2.320.5"},
- "device": {"manufacturer":"Chrome","model":"122.0.0.0","os":{"name":"Windows","version":"10","type":"desktop"},"deviceGroup":"dotcom"},
- "user": {},
- "variantAvailability": {"player":"dash","featureset": ["mpeg-dash", "widevine", "outband-webvtt", "hd", "single-track"], "platformTag": "dotcom","drm":{"system":"widevine","maxSupported":"L3"}},
- }
- r = self.client.post(playlist, headers=headers, json=payload)
- r.raise_for_status()
- data = r.json()
- video = data["Playlist"]["Video"]
- media = video["MediaFiles"]
- mpd_url = f"{media[0].get('Href')}"
- lic_url = f"{media[0].get('KeyServiceUrl')}"
- subtitle = video.get("Subtitles")
- subtitle = f"{subtitle[0].get('Href')}" if subtitle else None
- return mpd_url, lic_url, subtitle
- def get_mediainfo(self, manifest: str, quality: str) -> str:
- r = requests.get(manifest)
- r.raise_for_status()
- self.soup = BeautifulSoup(r.content, "xml")
- elements = self.soup.find_all("Representation")
- heights = sorted(
- [int(x.attrs["height"]) for x in elements if x.attrs.get("height")],
- reverse=True,
- )
- new_base, params = manifest.split(".mpd")
- new_base += "dash/"
- self.soup.select_one("BaseURL").string = new_base
- segments = self.soup.find_all("SegmentTemplate")
- for segment in segments:
- segment["media"] += params
- segment["initialization"] += params
- with open(self.tmp / "manifest.mpd", "w") as f:
- f.write(str(self.soup.prettify()))
- if quality is not None:
- if int(quality) in heights:
- return quality
- else:
- closest_match = min(heights, key=lambda x: abs(int(x) - int(quality)))
- return closest_match
- return heights[0]
- def get_content(self, url: str) -> object:
- if self.movie:
- with self.console.status("Fetching movie titles..."):
- content = self.get_movies(self.url)
- title = string_cleaning(str(content))
- self.log.info(f"{str(content)}\n")
- else:
- with self.console.status("Fetching series titles..."):
- content = self.get_series(url)
- title = string_cleaning(str(content))
- seasons = Counter(x.season for x in content)
- num_seasons = len(seasons)
- num_episodes = sum(seasons.values())
- if self.force_numbering:
- content = force_numbering(content)
- if self.append_id:
- content = append_id(content)
- self.log.info(
- f"{str(content)}: {num_seasons} Season(s), {num_episodes} Episode(s)\n"
- )
- return content, title
- def get_episode_from_url(self, url: str):
- with self.console.status("Getting episode from URL..."):
- data = self.get_data(url)
- episode = Series(
- [
- Episode(
- id_=data["episode"]["episodeId"],
- service="ITV",
- title=data["programme"]["title"],
- season=data["episode"].get("series")
- if isinstance(data["episode"].get("series"), int)
- else 0,
- number=data["episode"].get("episode")
- if isinstance(data["episode"].get("episode"), int)
- else 0,
- name=data["episode"]["episodeTitle"],
- # year=None,
- data=data["episode"]["playlistUrl"],
- description=data["episode"].get("description"),
- )
- ]
- )
- title = string_cleaning(str(episode))
- return [episode[0]], title
- def get_options(self) -> None:
- downloads, title = get_downloads(self)
- for download in downloads:
- if not self.no_cache and in_cache(self.cache, download):
- continue
- if self.slowdown:
- with self.console.status(
- f"Slowing things down for {self.slowdown} seconds..."
- ):
- time.sleep(self.slowdown)
- self.download(download, title)
- def download(self, stream: object, title: str) -> None:
- manifest, lic_url, subtitle = self.get_playlist(stream.data)
- self.res = self.get_mediainfo(manifest, self.quality)
- pssh = construct_pssh(self.soup)
- keys = self.get_keys(pssh, lic_url)
- with open(self.tmp / "keys.txt", "w") as file:
- file.write("\n".join(keys))
- self.filename = set_filename(self, stream, self.res, audio="AAC2.0")
- self.save_path = set_save_path(stream, self, title)
- self.manifest = self.tmp / "manifest.mpd"
- self.key_file = self.tmp / "keys.txt"
- self.sub_path = None
- self.log.info(f"{str(stream)}")
- click.echo("")
- if subtitle is not None and not self.skip_download:
- self.log.info(f"Subtitles: {subtitle}")
- try:
- sub = self.client.get(subtitle)
- sub.raise_for_status()
- except requests.exceptions.HTTPError:
- self.log.warning(f"Subtitle response {sub.status_code}, skipping")
- else:
- sub_path = self.tmp / f"{self.filename}.vtt"
- with open(sub_path, "wb") as f:
- f.write(sub.content)
- if not self.sub_no_fix:
- sub_path = convert_subtitles(self.tmp, self.filename, sub_type="vtt")
- self.sub_path = sub_path
- if self.skip_download:
- self.log.info(f"Filename: {self.filename}")
- self.log.info("Subtitles: Yes\n") if subtitle else self.log.info(
- "Subtitles: None\n"
- )
- args, file_path = get_args(self)
- if not file_path.exists():
- try:
- subprocess.run(args, check=True)
- except Exception as e:
- raise ValueError(f"{e}")
- else:
- self.log.warning(f"{self.filename} already exists. Skipping download...\n")
- self.sub_path.unlink() if self.sub_path else None
- if not self.skip_download and file_path.exists():
- update_cache(self.cache, self.config, stream)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement