Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import base64
- import binascii
- import json
- import urllib.request, urllib.error, urllib.parse
- import logging
- from collections import OrderedDict
- from hashlib import sha1
- from logging.handlers import TimedRotatingFileHandler
- from Crypto.Cipher import AES
- '''main defines'''
- # Silk Prod
- _KEY = binascii.a2b_hex("428f62982c5e1db32d1e9f3b66f75ba9973810eb60c1001e4e581b50cc879fc8")
- _IV = binascii.a2b_hex("29e4c7dc1c6f8cebcf86a04f34725c8f")
- PROVIDER = "silknet"
- LICENSE_SERVER_URL = "https://license.widevine.com/cenc/getlicense"
- ALLOWED_TRACK_TYPES = "SD_HD"
- ALLOW_UNVERIFIED_PLATFORM = True
- # silk Test
- """_KEY = binascii.a2b_hex("a3dff872487bdc5c104aa12e90ec1fe40858f56fca8b7b6f1ecd561099993a8c")
- _IV = binascii.a2b_hex("73d395265dbcd512dc488e4a532cf00e")
- PROVIDER = "silknet"
- LICENSE_SERVER_URL = "https://license.uat.widevine.com/cenc/getlicense"
- ALLOWED_TRACK_TYPES = "SD_HD"""
- PARSEFIRST = True
- '''logger init'''
- log = logging.getLogger('proxy')
- log.setLevel(logging.DEBUG)
- aFormat = logging.Formatter('[%(asctime)s][%(levelname)s] %(message)s', datefmt='%Y-%m-%d %H:%M:%S')
- consoleHandler = logging.StreamHandler()
- consoleHandler.setLevel(logging.DEBUG)
- consoleHandler.setFormatter(aFormat)
- fh = TimedRotatingFileHandler(filename = 'vwproxy.log', when='midnight')
- fh.setFormatter(aFormat)
- log.addHandler(fh)
- log.addHandler(consoleHandler)
- '''functions'''
- def generateSignature(text_to_sign):
- """Ingest License Request and Encrypt"""
- print("!!!!!!!!!!!!!!!!!!!!!!! 46")
- try:
- hashed_text = sha1(text_to_sign.encode("utf-8")).digest()
- cipher = AES.new(_KEY, AES.MODE_CBC, _IV)
- padding = binascii.a2b_hex("" if len(hashed_text) % 16 == 0
- else (16 -(len(hashed_text) % 16)) * "00")
- aes_msg = cipher.encrypt(hashed_text + padding)
- signature = base64.b64encode(aes_msg)
- return signature
- except Exception as e:
- log.error('error generating signature' + str(e))
- return -1
- def sendRequest(message):
- """Send HTTP request via urllib2"""
- try:
- log.info(message)
- message = message.encode("utf-8")
- f = urllib.request.urlopen(LICENSE_SERVER_URL + "/" + PROVIDER, message)
- return f.read()
- except urllib.error.HTTPError as e:
- return {'status': -1, 'errno': str(e), 'message': e}
- def buildCertificateRequest(request):
- """Builds JSON requests to be sent to the license server."""
- request = json.dumps({'payload': base64.standard_b64encode(request).decode("utf-8")})
- request = req.encode("utf-8")
- request = base64.standard_b64encode(req)
- request = req.decode("utf-8")
- print(type(request))
- print("checkpoint 1")
- signature = generateSignature(request)
- if signature:
- certificate_request = json.dumps({"request": request,
- "signature": signature,
- "signer": PROVIDER})
- print("checkpoint 2")
- return certificate_request
- else:
- log.error('sigrature error in buildCertificateRequest()')
- def parseLicRequest(request):
- # payload = base64.standard_b64encode(request)
- req = json.dumps({
- "payload": base64.standard_b64encode(request).decode("utf-8"),
- "provider": PROVIDER,
- "parse_only": True,
- "allowed_track_types":ALLOWED_TRACK_TYPES,
- "allow_unverified_platform":ALLOW_UNVERIFIED_PLATFORM
- })
- print(type(req))
- signature = generateSignature(req)
- parseServerRequest = json.dumps({
- "request": base64.standard_b64encode(req),
- "signature": signature,
- "signer": PROVIDER
- })
- # return json.loads(sendRequest(parseServerRequest))
- return json.loads(sendRequest(parseServerRequest))
- def buildLicenseRequest(request, haveContentId = False, content_id = 0):
- reqRaw = {
- "payload": base64.standard_b64encode(request).decode("utf-8"),
- "provider": PROVIDER,
- "allowed_track_types":ALLOWED_TRACK_TYPES,
- "allow_unverified_platform":ALLOW_UNVERIFIED_PLATFORM
- }
- if haveContentId:
- reqRaw["content_id"] = content_id
- req = json.dumps(OrderedDict(reqRaw))
- signature = generateSignature(req).decode("utf-8")
- print("\n\nHERE IS SIGNED REQUEST \n\n", signature )
- req = req.encode("utf-8")
- req = base64.standard_b64encode(req)
- req = req.decode("utf-8")
- print("prshshsvc", req)
- parseServerRequest = json.dumps({
- "request": req,
- "signature": signature,
- "signer": PROVIDER
- })
- return parseServerRequest
- '''kinda' __main__'''
- def application(env, start_response):
- log.debug("CONTENT_LENGTH: " + env.get('CONTENT_LENGTH', 0))
- try:
- if env.get('REQUEST_METHOD') == 'OPTIONS':
- log.info('sending OPTIONS')
- start_response('200 OK', [('Content-Type','text/html')])
- return ['OK']
- elif env.get('REQUEST_METHOD') == 'GET' or int(env.get('CONTENT_LENGTH', 0)) < 1: #TODO add other methods in exception
- log.error('got empty request')
- start_response('400', [('Content-Type','text/html')])
- return ['bad request']
- elif int(env.get('CONTENT_LENGTH', 0)) < 50:
- log.info('building certificate request')
- data = env['wsgi.input'].read()
- print("request ",data)
- answer = json.loads(sendRequest(buildCertificateRequest(data)))
- if answer['status'] == 'OK' and answer.has_key('license'):
- #TODO Populate responses
- log.info('sending lic anwser')
- start_response('200 OK', [('Content-Type','text/html')])
- print("certificate",answer["license"])
- return [base64.standard_b64decode(answer["license"])]
- else:
- start_response('400 Error', [('Content-Type','text/html')])
- return ['go home']
- else:
- log.debug('building license request')
- data = env['wsgi.input'].read()
- if all(j in env.keys() for j in ['X-QRV-CONTENT-ID', 'X-QRV-SID']):
- answer = parseLicRequest(data)
- if answer['status'] == 'OK':
- #TODO Populate responses
- log.debug("content_id: " + json.dumps(base64.standard_b64decode(answer["pssh_data"]["content_id"])))
- log.debug(answer["pssh_data"])
- log.debug('client info: ' + json.dumps(answer["client_info"], indent=4))
- answer = json.loads(sendRequest(buildLicenseRequest(data, haveContentId=True, content_id=answer["pssh_data"]["content_id"])))
- if answer['status'] == 'OK' and answer.has_key('license'):
- log.info(answer["license"])
- start_response('200 OK', [('Content-Type','text/html')])
- return [base64.standard_b64decode(answer["license"])]
- else:
- start_response('400 Error', [('Content-Type','text/html')])
- return ['go home']
- else:
- answer = json.loads(sendRequest(buildLicenseRequest(data, haveContentId=False)))
- if 'status' in answer and answer['status'] == 'OK' and 'license' in answer:
- start_response('200 OK', [('Content-Type','text/html')])
- return [base64.standard_b64decode(answer["license"])]
- else:
- if answer.has_key('status'):
- log.error('request denied. status: ' + answer['status'])
- start_response('500', [('Content-Type','text/html')])
- return ['request error']
- else:
- log.error('request error, no status code')
- start_response('500', [('Content-Type','text/html')])
- return ['request error']
- except Exception as e:
- log.error('error' + e)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement