Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- from mitmproxy import http
- from mitmproxy.proxy import ProxyConfig, ProxyServer
- from mitmproxy.master import Master
- from mitmproxy.options import Options
- from mitmproxy import proxy, options
- from mitmproxy.tools.dump import DumpMaster
- from Cryptodome.Cipher import AES
- from Cryptodome.Hash import MD5, SHA256
- from Cryptodome.Hash.HMAC import HMAC
- from Cryptodome.Protocol.KDF import PBKDF1
- from Cryptodome.Util import Padding
- from Cryptodome.Util.Padding import pad
- from typing import Union, Generator, Dict
- import hashlib
- import base64
- import binascii
- import re
- import json
- import os
- import random
- import os.path
- from os import path
- from datetime import datetime
- from mitmproxy.net.http.http1.assemble import *
- APP_PATH = os.path.dirname(os.path.realpath(__file__))
- BLOCK_SIZE = 16 # Bytes
- pad = lambda s: s + (BLOCK_SIZE - len(s) % BLOCK_SIZE) * chr(BLOCK_SIZE
- - len(s) % BLOCK_SIZE)
- unpad = lambda s: s[:-ord(s[len(s) - 1:])]
- def decrypt_sign(data, ver_global: bool = False):
- print("Decrypt_sign - Begin")
- dec_data = {}
- if None != dec_data:
- dec_data = None
- SALT_LEN = 8
- KEY_LEN = 32
- IV_LEN = 16
- ITERATIONS = 1
- HASH_ALGO = MD5
- if ver_global == True:
- print("Global - version - decrypt_sign")
- password = 'MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAzJ9JaHioVi6rr0TAfr6j'
- else:
- print("Japan - version - decrypt_sign")
- password = '2nV6eyINqhT6iDmzack9fykEAQgFkDvABIHjxmMY3qBENWOlnGtzkOHXRWhrIztK'
- data = base64.b64decode(data)
- salt = data[:SALT_LEN]
- print("sign_key_iv")
- sign_key_iv = sign_key_iv_buffer = HASH_ALGO.new(password.encode() + salt).digest()
- while len(sign_key_iv) < KEY_LEN + IV_LEN:
- print("sign_key_iv_buffer")
- sign_key_iv_buffer = HASH_ALGO.new(sign_key_iv_buffer + password.encode() + salt).digest()
- sign_key_iv += sign_key_iv_buffer
- print("sign_key")
- sign_key, sign_iv = sign_key_iv[:KEY_LEN], sign_key_iv[KEY_LEN:]
- print("cipher")
- cipher = AES.new(sign_key, AES.MODE_CBC, sign_iv)
- print("Padding.unpad (dec)")
- dec_data = Padding.unpad(cipher.decrypt(data[SALT_LEN:]), AES.block_size)
- print("json.loads(dec_data)")
- return json.loads(dec_data)
- class Addon(object):
- def __init__(self):
- pass
- def request(self, flow: http.HTTPFlow) -> None:
- Global_URL = re.search('.*\/ishin-global.aktsk.com', flow.request.pretty_url)
- if None != Global_URL:
- print("Global Packets Dumping (Request)")
- now = datetime.now()
- dt_string = now.strftime("_%d.%m.%Y-%H.%M.%S")
- PacketUrl_Raw = flow.request.pretty_url
- SplittedURL = PacketUrl_Raw.split('/')
- #0 for the 3 first entries since it delete it, so 1 became 0 and 2 became 1 and so on.
- del SplittedURL[0] #Remove Http(s)
- del SplittedURL[0] #Remove //
- del SplittedURL[0] #Remove Domain name
- PacketPath = APP_PATH+'\\Global\\Request\\'
- if path.exists(APP_PATH+'\\Global\\'):
- pass
- else:
- os.mkdir(APP_PATH+'\\Global\\',755);
- if path.exists(APP_PATH+'\\Global\\Request\\'):
- pass
- else:
- os.mkdir(APP_PATH+'\\Global\\Request\\',755);
- for Folders in SplittedURL:
- PacketPath = PacketPath + Folders + '\\'
- if path.exists(PacketPath):
- pass
- else:
- os.mkdir(PacketPath,755);
- File_Header = open(PacketPath+'Headers'+dt_string+'.txt',"w+")
- File_Header.write(assemble_request_head(flow.request).decode('utf-8'))
- File_Header.close()
- data2 = json.loads(flow.response.text)
- if 'sign' in data2:
- data2 = decrypt_sign(data2['sign'], True) #Global decrypt
- with open(PacketPath+'Content'+dt_string+'.json', 'w+') as File_Content:
- json.dump(data2, File_Content, indent=4)
- print('Packet Dumped : ',PacketPath)
- Japan_URL = re.search('.*\/ishin-production.aktsk.jp', flow.request.pretty_url)
- if None != Japan_URL:
- print("Japan Packets Dumping (Request)")
- now = datetime.now()
- dt_string = now.strftime("_%d.%m.%Y-%H.%M.%S")
- PacketUrl_Raw = flow.request.pretty_url
- SplittedURL = PacketUrl_Raw.split('/')
- #0 for the 3 first entries since it delete it, so 1 became 0 and 2 became 1 and so on.
- del SplittedURL[0] #Remove Http(s)
- del SplittedURL[0] #Remove //
- del SplittedURL[0] #Remove Domain name
- PacketPath = APP_PATH+'\\Japan\\Request\\'
- if path.exists(APP_PATH+'\\Japan\\'):
- pass
- else:
- os.mkdir(APP_PATH+'\\Japan\\',755);
- if path.exists(APP_PATH+'\\Japan\\Request\\'):
- pass
- else:
- os.mkdir(APP_PATH+'\\Japan\\Request\\',755);
- for Folders in SplittedURL:
- PacketPath = PacketPath + Folders + '\\'
- if path.exists(PacketPath):
- pass
- else:
- os.mkdir(PacketPath,755);
- File_Header = open(PacketPath+'Headers'+dt_string+'.txt',"w+")
- File_Header.write(assemble_request_head(flow.request).decode('utf-8'))
- File_Header.close()
- data2 = json.loads(flow.response.text)
- if 'sign' in data2:
- data2 = decrypt_sign(data2['sign'], False) #Japan decrypt
- with open(PacketPath+'Content'+dt_string+'.json', 'w+') as File_Content:
- json.dump(data2, File_Content, indent=4)
- print('Packet Dumped : ',PacketPath)
- def response(self, flow: http.HTTPFlow) -> None:
- Global_URL = re.search('.*\/ishin-global.aktsk.com', flow.request.pretty_url)
- if None != Global_URL:
- print("Global Packets Dumping (response)")
- now = datetime.now()
- dt_string = now.strftime("_%d.%m.%Y-%H.%M.%S")
- PacketUrl_Raw = flow.request.pretty_url
- SplittedURL = PacketUrl_Raw.split('/')
- #0 for the 3 first entries since it delete it, so 1 became 0 and 2 became 1 and so on.
- del SplittedURL[0] #Remove Http(s)
- del SplittedURL[0] #Remove //
- del SplittedURL[0] #Remove Domain name
- PacketPath = APP_PATH+'\\Global\\Response\\'
- if path.exists(APP_PATH+'\\Global\\'):
- pass
- else:
- os.mkdir(APP_PATH+'\\Global\\',755);
- if path.exists(APP_PATH+'\\Global\\Response\\'):
- pass
- else:
- os.mkdir(APP_PATH+'\\Global\\Response\\',755);
- for Folders in SplittedURL:
- PacketPath = PacketPath + Folders + '\\'
- if path.exists(PacketPath):
- pass
- else:
- os.mkdir(PacketPath,755);
- File_Header = open(PacketPath+'Headers'+dt_string+'.txt',"w+")
- File_Header.write(assemble_response_head(flow.response).decode('utf-8'))
- File_Header.close()
- data2 = json.loads(flow.response.text)
- if 'sign' in data2:
- data2 = decrypt_sign(data2['sign'], True) #Global decrypt
- with open(PacketPath+'Content'+dt_string+'.json', 'w+') as File_Content:
- json.dump(data2, File_Content, indent=4)
- print('Packet Dumped : ',PacketPath)
- Japan_URL = re.search('.*\/ishin-production.aktsk.jp', flow.request.pretty_url)
- if None != Japan_URL:
- print("Japan Packets Dumping (Response)")
- now = datetime.now()
- dt_string = now.strftime("_%d.%m.%Y-%H.%M.%S")
- PacketUrl_Raw = flow.request.pretty_url
- SplittedURL = PacketUrl_Raw.split('/')
- #0 for the 3 first entries since it delete it, so 1 became 0 and 2 became 1 and so on.
- del SplittedURL[0] #Remove Http(s)
- del SplittedURL[0] #Remove //
- del SplittedURL[0] #Remove Domain name
- PacketPath = APP_PATH+'\\Japan\\Response\\'
- if path.exists(APP_PATH+'\\Japan\\'):
- pass
- else:
- os.mkdir(APP_PATH+'\\Japan\\',755);
- if path.exists(APP_PATH+'\\Japan\\Response\\'):
- pass
- else:
- os.mkdir(APP_PATH+'\\Japan\\Response\\',755);
- for Folders in SplittedURL:
- PacketPath = PacketPath + Folders + '\\'
- if path.exists(PacketPath):
- pass
- else:
- os.mkdir(PacketPath,755);
- File_Header = open(PacketPath+'Headers'+dt_string+'.txt',"w+")
- File_Header.write(assemble_response_head(flow.response).decode('utf-8'))
- File_Header.close()
- data2 = json.loads(flow.response.text)
- if 'sign' in data2:
- data2 = decrypt_sign(data2['sign'], False) #Japan decrypt
- with open(PacketPath+'Content'+dt_string+'.json', 'w+') as File_Content:
- json.dump(data2, File_Content, indent=4)
- print('Packet Dumped : ',PacketPath)
- if __name__ == "__main__":
- options = Options(listen_port=8080, http2=True)
- m = DumpMaster(options, with_termlog=False, with_dumper=False)
- config = ProxyConfig(options)
- m.server = ProxyServer(config)
- m.addons.add(Addon())
- try:
- print('Starting Dokkan mitmproxy')
- m.run()
- except KeyboardInterrupt:
- m.shutdown()
Add Comment
Please, Sign In to add comment