Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- """Module for general CBC stuff"""
- from uuid import uuid4
- from base64 import b64encode, b64decode
- import json
- import urllib
- import urlparse
- # import http.client as http_client
- from xml.dom.minidom import *
- import xml.etree.ElementTree as ET
- import requests
- from .utils import save_cookies, loadCookies, saveAuthorization, log
- # http_client.HTTPConnection.debuglevel = 1
- from resources.lib.utils import loadAuthorization
- CALLSIGN = 'cbc$callSign'
- API_KEY = '3f4beddd-2061-49b0-ae80-6f1f2ed65b37'
- SCOPES = 'openid '\
- 'offline_access '\
- 'https://rcmnb2cprod.onmicrosoft.com/84593b65-0ef6-4a72-891c-d351ddd50aab/email '\
- 'https://rcmnb2cprod.onmicrosoft.com/84593b65-0ef6-4a72-891c-d351ddd50aab/id.account.create '\
- 'https://rcmnb2cprod.onmicrosoft.com/84593b65-0ef6-4a72-891c-d351ddd50aab/id.account.delete '\
- 'https://rcmnb2cprod.onmicrosoft.com/84593b65-0ef6-4a72-891c-d351ddd50aab/id.account.info '\
- 'https://rcmnb2cprod.onmicrosoft.com/84593b65-0ef6-4a72-891c-d351ddd50aab/id.account.modify '\
- 'https://rcmnb2cprod.onmicrosoft.com/84593b65-0ef6-4a72-891c-d351ddd50aab/id.account.reset-password '\
- 'https://rcmnb2cprod.onmicrosoft.com/84593b65-0ef6-4a72-891c-d351ddd50aab/id.account.send-confirmation-email '\
- 'https://rcmnb2cprod.onmicrosoft.com/84593b65-0ef6-4a72-891c-d351ddd50aab/id.write '\
- 'https://rcmnb2cprod.onmicrosoft.com/84593b65-0ef6-4a72-891c-d351ddd50aab/media-drmt '\
- 'https://rcmnb2cprod.onmicrosoft.com/84593b65-0ef6-4a72-891c-d351ddd50aab/media-meta '\
- 'https://rcmnb2cprod.onmicrosoft.com/84593b65-0ef6-4a72-891c-d351ddd50aab/media-validation '\
- 'https://rcmnb2cprod.onmicrosoft.com/84593b65-0ef6-4a72-891c-d351ddd50aab/media-validation.read '\
- 'https://rcmnb2cprod.onmicrosoft.com/84593b65-0ef6-4a72-891c-d351ddd50aab/metrik '\
- 'https://rcmnb2cprod.onmicrosoft.com/84593b65-0ef6-4a72-891c-d351ddd50aab/oidc4ropc '\
- 'https://rcmnb2cprod.onmicrosoft.com/84593b65-0ef6-4a72-891c-d351ddd50aab/ott-profiling '\
- 'https://rcmnb2cprod.onmicrosoft.com/84593b65-0ef6-4a72-891c-d351ddd50aab/ott-subscription '\
- 'https://rcmnb2cprod.onmicrosoft.com/84593b65-0ef6-4a72-891c-d351ddd50aab/profile '\
- 'https://rcmnb2cprod.onmicrosoft.com/84593b65-0ef6-4a72-891c-d351ddd50aab/subscriptions.validate '\
- 'https://rcmnb2cprod.onmicrosoft.com/84593b65-0ef6-4a72-891c-d351ddd50aab/subscriptions.write '\
- 'https://rcmnb2cprod.onmicrosoft.com/84593b65-0ef6-4a72-891c-d351ddd50aab/toutv '\
- 'https://rcmnb2cprod.onmicrosoft.com/84593b65-0ef6-4a72-891c-d351ddd50aab/toutv-presentation '\
- 'https://rcmnb2cprod.onmicrosoft.com/84593b65-0ef6-4a72-891c-d351ddd50aab/toutv-profiling '\
- 'https://rcmnb2cprod.onmicrosoft.com/84593b65-0ef6-4a72-891c-d351ddd50aab/testapiwithjwtendpoint.admin '\
- 'https://rcmnb2cprod.onmicrosoft.com/84593b65-0ef6-4a72-891c-d351ddd50aab/id.account.info'
- AUTHORIZE_LOGIN = 'https://login.cbc.radio-canada.ca/bef1b538-1950-4283-9b27-b096cbc18070/B2C_1A_ExternalClient_FrontEnd_Login_CBC/oauth2/v2.0/authorize'
- SELF_ASSERTED_LOGIN = 'https://login.cbc.radio-canada.ca/bef1b538-1950-4283-9b27-b096cbc18070/B2C_1A_ExternalClient_FrontEnd_Login_CBC/SelfAsserted'
- CONFIRM_LOGIN = 'https://login.cbc.radio-canada.ca/bef1b538-1950-4283-9b27-b096cbc18070/B2C_1A_ExternalClient_FrontEnd_Login_CBC/api/SelfAsserted/confirmed'
- SIGNIN_LOGIN = 'https://login.cbc.radio-canada.ca/bef1b538-1950-4283-9b27-b096cbc18070/B2C_1A_ExternalClient_FrontEnd_Login_CBC/api/CombinedSigninAndSignup/confirmed'
- RADIUS_LOGIN_FMT = 'https://api.loginradius.com/identity/v2/auth/login?{}'
- RADIUS_JWT_FMT = 'https://cloud-api.loginradius.com/sso/jwt/api/token?{}'
- TOKEN_URL = 'https://services.radio-canada.ca/ott/cbc-api/v2/token'
- PROFILE_URL = 'https://services.radio-canada.ca/ott/subscription/v2/gem/subscriber/profile'
- LAYOUT_MAP = {
- 'featured': 'https://services.radio-canada.ca/ott/cbc-api/v2/home',
- 'shows': 'https://services.radio-canada.ca/ott/cbc-api/v2/hubs/shows',
- 'documentaries': 'https://services.radio-canada.ca/ott/cbc-api/v2/hubs/documentaries',
- 'kids': 'https://services.radio-canada.ca/ott/cbc-api/v2/hubs/kids'
- }
- SHOW_BY_ID = 'https://services.radio-canada.ca/ott/cbc-api/v2/shows/{}'
- CATEGORY_BY_ID = 'https://services.radio-canada.ca/ott/cbc-api/v2/categories/{}'
- ASSET_BY_ID = 'https://services.radio-canada.ca/ott/cbc-api/v2/assets/{}'
- SEARCH_BY_NAME = 'https://services.radio-canada.ca/ott/cbc-api/v2/search'
- class CBC:
- """Class for CBC stuff."""
- def __init__(self):
- """Initialize the CBC class."""
- # Create requests session object
- self.session = requests.Session()
- session_cookies = loadCookies()
- if session_cookies is not None:
- self.session.cookies = session_cookies
- @staticmethod
- def azure_authorize_authorize(sess):
- """
- Make the first authorization call.
- @param sess A requests session
- """
- nonce= str(uuid4())
- guid = str(uuid4())
- #state_str = f'{guid}|{{"action":"login","returnUrl":"/","fromSubscription":false}}'.encode()
- #Convert using CLI f2format and temp file
- state_str = '{}|{{"action":"login","returnUrl":"/","fromSubscription":false}}'.format(guid).encode()
- state = b64encode(state_str).decode('ascii')
- params = {
- 'client_id': 'fc05b0ee-3865-4400-a3cc-3da82c330c23',
- 'nonce': nonce,
- 'redirect_uri': 'https://gem.cbc.ca/auth-changed',
- 'scope': SCOPES,
- 'response_type': 'id_token token',
- 'response_mode': 'fragment',
- 'state': state,
- 'state_value': state,
- 'ui_locales': 'en',
- }
- resp = sess.get(AUTHORIZE_LOGIN, params=params)
- if resp.status_code != 200:
- log('Call to authorize fails', True)
- return False
- if not 'x-ms-gateway-requestid' in resp.headers:
- log('authorize authorize response had no x-ms-gateway-requestid header')
- return False
- return resp.headers['x-ms-gateway-requestid']
- @staticmethod
- def azure_authorize_self_asserted(sess, username, tx_arg, password = None):
- """
- Make the second authorization call.
- @param sess The requests session
- """
- cookies = sess.cookies.get_dict()
- headers = { 'x-csrf-token': cookies['x-ms-cpim-csrf'] }
- params = { 'tx': tx_arg, 'p': 'B2C_1A_ExternalClient_FrontEnd_Login_CBC' }
- data = { 'request_type': 'RESPONSE', 'email': username}
- if password:
- data['password'] = password
- resp = sess.post(SELF_ASSERTED_LOGIN, params=params, headers=headers, data=data)
- if not resp.status_code == 200:
- log('Call to SelfAsserted fails', True)
- return False
- return True
- @staticmethod
- def azure_authorize_confirmed(sess, tx_arg):
- """
- Make the third authorization call.
- @param sess The requests session
- @param csrf The csrf token
- @param tx_arg the tx parameter
- """
- cookies = sess.cookies.get_dict()
- params = {
- 'tx': tx_arg,
- 'p': 'B2C_1A_ExternalClient_FrontEnd_Login_CBC',
- 'csrf_token': cookies['x-ms-cpim-csrf'],
- }
- resp = sess.get(CONFIRM_LOGIN, params=params)
- if resp.status_code != 200:
- log('Call to authorize fails', True)
- return False
- if not 'x-ms-gateway-requestid' in resp.headers:
- log('authorize confirmed response had no x-ms-gateway-requestid header')
- return False
- return resp.headers['x-ms-gateway-requestid']
- @staticmethod
- def azure_authorize_sign_in(sess, tx_arg):
- """
- Make the third authorization call.
- @param sess The requests session
- @param csrf The csrf token
- @param tx_arg the tx parameter
- """
- cookies = sess.cookies.get_dict()
- params = {
- 'tx': tx_arg,
- 'p': 'B2C_1A_ExternalClient_FrontEnd_Login_CBC',
- 'csrf_token': cookies['x-ms-cpim-csrf'],
- 'rememberMe': 'true',
- }
- resp = sess.get(SIGNIN_LOGIN, params=params, allow_redirects=False)
- if resp.status_code != 302:
- log('Call to authorize fails', True)
- return None
- url = urlparse.urlparse(resp.headers['location'])
- frags = urlparse.parse_qs(url.fragment)
- access_token = frags['access_token'][0]
- id_token = frags['id_token'][0]
- return (access_token, id_token)
- def azure_authorize(self, username=None, password=None, callback=None):
- """
- Perform multi-step authorization with CBC's azure authorization platform.
- ** Azure Active Directory B2C **
- """
- sess = requests.Session()
- if callback:
- callback(0)
- gw_req_id = CBC.azure_authorize_authorize(sess)
- if not gw_req_id:
- log('Authorization "authorize" step failed', True)
- return False
- if callback:
- callback(20)
- cookies = sess.cookies.get_dict()
- if 'x-ms-cpim-csrf' not in cookies:
- log('Unable to get csrt token for self asserted', True)
- return False
- if 'x-ms-cpim-trans' not in cookies:
- log('Unable to get transaction for self asserted', True)
- return False
- trans = cookies['x-ms-cpim-trans']
- trans = b64decode(trans).decode()
- trans = json.loads(trans)
- if not 'C_ID' in trans:
- log('Unable to get C_ID from trans', True)
- return False
- tid = trans['C_ID']
- #tid_str = f'{{"TID":"{tid}"}}'.encode()
- #Convert using CLI f2format and temp file
- tid_str = '{{"TID":"{}"}}'.format(tid).encode()
- b64_tid = b64encode(tid_str).decode('ascii')
- b64_tid = b64_tid.rstrip('=')
- #tx_arg = f'StateProperties={b64_tid}'
- #Convert using CLI f2format and temp file
- tx_arg = 'StateProperties={}'.format(b64_tid)
- if callback:
- callback(40)
- if not CBC.azure_authorize_self_asserted(sess, username, tx_arg):
- log('Authorization "SelfAsserted" step failed', True)
- return False
- if callback:
- callback(60)
- gw_req_id = CBC.azure_authorize_confirmed(sess, tx_arg)
- if not gw_req_id:
- log('Authorization "confirmed" step failed', True)
- return False
- if callback:
- callback(80)
- if not CBC.azure_authorize_self_asserted(sess, username, tx_arg, password):
- log('Authorization "SelfAsserted" step failed', True)
- return False
- access_token, id_token = CBC.azure_authorize_sign_in(sess, tx_arg)
- if not access_token or not id_token:
- log('Authorization "confirmed" step failed', True)
- return False
- if callback:
- callback(90)
- claims_token = self.get_claims_token(access_token)
- saveAuthorization({'token': access_token, 'claims': claims_token})
- if callback:
- callback(100)
- return True
- def get_claims_token(self, access_token):
- """Get the claims token for tied to the access token."""
- #headers = {'Authorization': f'Bearer {access_token}'}
- #Convert using CLI f2format and temp file
- headers = {'Authorization': 'Bearer {}'.format(access_token)}
- params = {'device': 'web'}
- req = self.session.get(PROFILE_URL, headers=headers, params=params)
- if not req.status_code == 200:
- log('{} returns status {}'.format(req.url, req.status_code), True)
- return None
- return json.loads(req.content)['claimsToken']
- def getImage(self, item):
- """Get an image."""
- # ignore 'cbc$liveImage' - the pix don't make sense after the first load
- if 'defaultThumbnailUrl' in item:
- return item['defaultThumbnailUrl']
- if 'cbc$staticImage' in item:
- return item['cbc$staticImage']
- if 'cbc$featureImage' in item:
- return item['cbc$featureImage']
- return None
- @staticmethod
- def get_callsign(item):
- """Get the callsign for a channel."""
- return item[CALLSIGN] if CALLSIGN in item else None
- @staticmethod
- def get_labels(item):
- """Get labels for a CBC item."""
- labels = {
- 'studio': 'Canadian Broadcasting Corporation',
- 'country': 'Canada'
- }
- if 'cbc$callSign' in item:
- labels['title'] = '{} {}'.format(item['cbc$callSign'].replace('\u2013', "-"), item['title'].replace('\u2013', "-"))
- else:
- labels['title'] = item['title']
- if 'cbc$show' in item:
- labels['tvshowtitle'] = item['cbc$show']
- elif 'clearleap:series' in item:
- labels['tvshowtitle'] = item['clearleap:series']
- if 'description' in item:
- labels['plot'] = item['description']
- labels['plotoutline'] = item['description']
- if 'cbc$liveDisplayCategory' in item:
- labels['genre'] = item['cbc$liveDisplayCategory']
- elif 'media:keywords' in item:
- labels['genre'] = item['media:keywords']
- if 'clearleap:season' in item:
- labels['season'] = item['clearleap:season']
- if 'clearleap:episodeInSeason' in item:
- labels['episode'] = item['clearleap:episodeInSeason']
- if 'media:rating' in item:
- labels['mpaa'] = item['media:rating']
- if 'premiered' in item:
- labels['premiered'] = item['premiered']
- if 'video' in item:
- labels['mediatype'] = 'video'
- elif 'cbc$audioVideo' in item:
- if item['cbc$audioVideo'].lower() == 'video':
- labels['mediatype'] = 'video'
- return labels
- def parseSmil(self, smil):
- """Parse a SMIL file for the video."""
- resp = self.session.get(smil)
- if not resp.status_code == 200:
- log('ERROR: {} returns status of {}'.format(smil, r.status_code), True)
- return None
- save_cookies(self.session.cookies)
- dom = parseString(r.content)
- seq = dom.getElementsByTagName('seq')[0]
- video = seq.getElementsByTagName('video')[0]
- src = video.attributes['src'].value
- title = video.attributes['title'].value
- abstract = video.attributes['abstract'].value
- return src
- @staticmethod
- def get_session():
- """Get a requests session object with CBC cookies."""
- sess = requests.Session()
- cookies = loadCookies()
- if cookies is not None:
- self.session.cookies = cookies
- return sess
- # """Module for the V2 Gem API."""
- # import json
- #
- # import requests
- #
- # from resources.lib.utils import log
- # from resources.lib.cbc import CBC
- #
- # Move LAYOUT_MAP to top of file
- #
- # class GemV2:
- # """V2 Gem API class."""
- # @staticmethod # DEBUG
- def GemV2_get_layout(self, name):
- """Get a Gem V2 layout by name."""
- url = LAYOUT_MAP[name]
- # resp = CBC.get_session().get(url)
- resp = self.session.get(url)
- return json.loads(resp.content)
- # @staticmethod # DEBUG
- def GemV2_get_show_layout_by_id(self, show_id):
- """Get a Gem V2 show layout by ID."""
- url = SHOW_BY_ID.format(show_id)
- # resp = CBC.get_session().get(url)
- resp = self.session.get(url)
- return json.loads(resp.content)
- # @staticmethod # DEBUG
- def GemV2_get_asset_by_id(self, asset_id):
- url = ASSET_BY_ID.format(asset_id)
- # resp = CBC.get_session().get(url)
- resp = self.session.get(url)
- return json.loads(resp.content)
- # @staticmethod # DEBUG
- def GemV2_get_episode(self, url):
- """Get a Gem V2 episode by URL."""
- # resp = CBC.get_session().get(url) # Python 3
- # resp = self.session.get(url) # Python 2
- auth = loadAuthorization()
- # if we have no authorization, return none to for the UI to authorize
- if auth is None:
- return None
- headers = {}
- if 'token' in auth:
- headers['Authorization'] = 'Bearer {}'.format(auth['token'])
- if 'claims' in auth:
- headers['x-claims-token'] = auth['claims']
- resp = requests.get(url, headers=headers)
- return json.loads(resp.content)
- # @staticmethod # DEBUG
- def GemV2_get_category(self, category_id):
- """Get a Gem V2 category by ID."""
- url = CATEGORY_BY_ID.format(category_id)
- # resp = CBC.get_session().get(url)
- resp = self.session.get(url)
- return json.loads(resp.content)
- @staticmethod
- def GemV2_get_labels(show, episode):
- """Get labels for a show."""
- labels = {
- 'studio': 'Canadian Broadcasting Corporation',
- 'country': 'Canada',
- 'tvshowtitle': show['title'],
- 'title': episode['title'],
- 'originaltitle': episode['title'],
- 'plot': episode['description'],
- 'plotoutline': episode['description'],
- 'season': episode['season'],
- }
- if 'episode' in episode:
- labels['episode'] = episode['episode']
- if 'duration' in episode:
- labels['duration'] = episode['duration']
- return labels
- # @staticmethod # DEBUG
- def GemV2_search_by_term(self, term):
- params = {'term': term}
- # resp = CBC.get_session().get(SEARCH_BY_NAME, params=params)
- resp = self.session.get(SEARCH_BY_NAME, params=params)
- return json.loads(resp.content)
Add Comment
Please, Sign In to add comment