Guest User

live_eversoul_updater.py

a guest
Apr 17th, 2025
19
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 10.90 KB | Gaming | 0 0
  1. import re
  2. import sys
  3. import json
  4. import shutil
  5. import hashlib
  6. import requests
  7. import urllib3
  8. import concurrent.futures
  9. from pathlib import Path
  10. from typing import Dict, Any
  11. from google_play_scraper import app as playstore_app
  12. urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
  13.  
  14.  
  15. def get_app_version():
  16.     """从Google Play获取最新版本号"""
  17.     try:
  18.         result = playstore_app(app_id="com.kakaogames.eversoul", lang="en", country="kr")
  19.         return result["version"]
  20.     except Exception as e:
  21.         print(f"get app version error: {e}")
  22.         sys.exit(1)
  23.  
  24.  
  25. def download_catalog(version: str) -> dict:
  26.     """下载游戏资源目录"""
  27.     catalog_url = f"https://patch.esoul.kakaogames.com/Live/{version}/Android/catalog_eversoul.json"
  28.     try:
  29.         response = requests.get(catalog_url, verify=False)
  30.         response.raise_for_status()
  31.         return response.json()
  32.     except Exception as e:
  33.         print(f"download catalog file error: {e}")
  34.         sys.exit(1)
  35.  
  36.  
  37. def read_base64(data: str) -> bytes:
  38.     """Base64解码"""
  39.     import base64
  40.     return base64.b64decode(data)
  41.  
  42.  
  43. class BinaryReader:
  44.     """二进制读取器,简化版"""
  45.     def __init__(self, data: bytes):
  46.         self.data = data
  47.         self.position = 0
  48.    
  49.     @property
  50.     def u8(self) -> int:
  51.         value = self.data[self.position]
  52.         self.position += 1
  53.         return value
  54.    
  55.     @property
  56.     def u32(self) -> int:
  57.         value = int.from_bytes(self.data[self.position:self.position+4], byteorder='little')
  58.         self.position += 4
  59.         return value
  60.    
  61.     @property
  62.     def i32(self) -> int:
  63.         value = int.from_bytes(self.data[self.position:self.position+4], byteorder='little', signed=True)
  64.         self.position += 4
  65.         return value
  66.    
  67.     @property
  68.     def u16(self) -> int:
  69.         value = int.from_bytes(self.data[self.position:self.position+2], byteorder='little')
  70.         self.position += 2
  71.         return value
  72.    
  73.     def str(self, length: int, encoding: str = 'utf-8') -> str:
  74.         value = self.data[self.position:self.position+length].decode(encoding)
  75.         self.position += length
  76.         return value
  77.    
  78.     @property
  79.     def pos(self) -> int:
  80.         return self.position
  81.    
  82.     @pos.setter
  83.     def pos(self, value: int):
  84.         self.position = value
  85.  
  86.  
  87. def read_obj(reader: BinaryReader) -> Any:
  88.     """读取对象"""
  89.     objt = reader.u8
  90.     if objt == 0:  # ascii string
  91.         return reader.str(reader.u32)
  92.     elif objt == 1:  # unicode(16) string
  93.         return reader.str(reader.u32, 'utf-16')
  94.     elif objt == 2:  # u16
  95.         return reader.u16
  96.     elif objt == 3:  # u32
  97.         return reader.u32
  98.     elif objt == 4:  # i32
  99.         return reader.i32
  100.     elif objt == 7:  # json object
  101.         return {'an': reader.str(reader.u8),
  102.                 'cn': reader.str(reader.u8),
  103.                 'js': json.loads(reader.str(reader.i32, 'utf-16'))}
  104.     elif objt == 8:
  105.         objt = reader.u8
  106.         return reader.u32
  107.     else:
  108.         raise RuntimeError(f'type {objt} not supported.')
  109.  
  110.  
  111. def del_str(name: str) -> str:
  112.     """处理文件名,移除哈希值"""
  113.     if '%3D' in name:
  114.         return name.split('%3D')[0]
  115.    
  116.     is_bundle = name.endswith('.bundle')
  117.     base_name = name[:-7] if is_bundle else name
  118.    
  119.     hash_match = re.search(r'_[0-9a-f]{32}(?=\.bundle$|$)', name)
  120.     if hash_match:
  121.         clean_name = name[:hash_match.start()]
  122.         if is_bundle:
  123.             clean_name += '.bundle'
  124.         return clean_name
  125.    
  126.     if is_bundle:
  127.         n = base_name
  128.         n = n[:-4] if n.endswith('_all') else n
  129.         n = n[:-7] if n.endswith('_assets') else n
  130.         return f'{n}.bundle'
  131.     else:
  132.         c = name.find('_') > 0 and len(name) > 32 and name[-33:-1].isalnum()
  133.         n = name[:-33] if c else name
  134.         n = n[:-4] if n.endswith('_all') else n
  135.         n = n[:-7] if n.endswith('_assets') else n
  136.         return n
  137.  
  138.  
  139. def parse_catalog(catalog: dict, prefix: str = '') -> Dict[str, Dict[str, Any]]:
  140.     """解析游戏资源目录"""
  141.     results = {}
  142.    
  143.     kds = BinaryReader(read_base64(catalog['m_KeyDataString']))
  144.     eds = BinaryReader(read_base64(catalog['m_EntryDataString']))
  145.     xds = BinaryReader(read_base64(catalog['m_ExtraDataString']))
  146.  
  147.     skds = [read_obj(kds) for _ in range(kds.u32)]
  148.     mii = catalog['m_InternalIds']
  149.    
  150.     for _ in range(eds.u32):
  151.         ii, pi, dki, dh, di, pk, rt = eds.i32, eds.i32, eds.i32, eds.i32, eds.i32, eds.i32, eds.i32
  152.         obj, enckey = None, None
  153.        
  154.         if di >= 0:
  155.             xds.pos = di
  156.             obj = read_obj(xds)
  157.             if isinstance(obj, dict):
  158.                 if 'm_EncryptKey' in obj['js']:
  159.                     enckey = obj['js']['m_EncryptKey']
  160.                 if 'm_BundleSize' in obj['js']:
  161.                     obj = obj['js']['m_BundleSize']
  162.                 elif 'm_FileSize' in obj['js']:
  163.                     obj = obj['js']['m_FileSize']
  164.                 else:
  165.                     obj = 0
  166.        
  167.         on, nn = mii[ii], skds[pk]
  168.         d = None if dki < 0 else skds[dki]
  169.        
  170.         start_check = on.startswith('https://')
  171.         end_check = on.endswith('.bundle')
  172.        
  173.         if start_check and end_check:
  174.             n = on.split('https://eversoul.com/', 1)[1] if 'https://eversoul.com/' in on else on
  175.             nn = del_str(n)
  176.             url = f"https://patch.esoul.kakaogames.com/Live/{prefix}/Android/{n}" if prefix else on
  177.            
  178.             results[nn] = {
  179.                 'name': nn,
  180.                 'url': url,
  181.                 'size': obj
  182.             }
  183.            
  184.             if enckey is not None:
  185.                 results[nn]['key'] = enckey
  186.    
  187.     return results
  188.  
  189.  
  190. def calculate_file_md5(file_path: str) -> str:
  191.     """计算文件MD5值"""
  192.     hash_md5 = hashlib.md5()
  193.     with open(file_path, "rb") as f:
  194.         for chunk in iter(lambda: f.read(4096), b""):
  195.             hash_md5.update(chunk)
  196.     return hash_md5.hexdigest()
  197.  
  198.  
  199. def check_assets(assets_dir: Path, asset_list: Dict[str, Dict[str, Any]]) -> Dict[str, Dict[str, Any]]:
  200.     """检查本地资产与远程资产的差异"""
  201.     updated_files = {}
  202.    
  203.     if not assets_dir.exists():
  204.         assets_dir.mkdir(parents=True, exist_ok=True)
  205.         return asset_list
  206.    
  207.     for filename, info in asset_list.items():
  208.         file_path = assets_dir / filename
  209.        
  210.         if not file_path.exists():
  211.             updated_files[filename] = info
  212.         elif 'size' in info and info['size'] > 0:
  213.             local_size = file_path.stat().st_size
  214.             remote_size = info['size']
  215.            
  216.             if local_size != remote_size:
  217.                 print(f"file size not match: {filename} (local: {local_size}, remote: {remote_size})")
  218.                 updated_files[filename] = info
  219.    
  220.     return updated_files
  221.  
  222.  
  223. def download_file(url: str, file_path: Path, retries: int = 3) -> bool:
  224.     """下载文件"""
  225.     for attempt in range(retries):
  226.         try:
  227.             response = requests.get(url, stream=True, timeout=30, verify=False)
  228.             response.raise_for_status()
  229.            
  230.             file_path.parent.mkdir(parents=True, exist_ok=True)
  231.            
  232.             with open(file_path, 'wb') as f:
  233.                 for chunk in response.iter_content(chunk_size=8192):
  234.                     if chunk:
  235.                         f.write(chunk)
  236.             return True
  237.         except Exception as e:
  238.             print(f"download {url} failed (attempt {attempt+1}/{retries}): {e}")
  239.             if attempt == retries - 1:
  240.                 return False
  241.     return False
  242.  
  243.  
  244. def download_assets(assets_to_download: Dict[str, Dict[str, Any]], target_dir: Path, max_workers: int = 100) -> Dict[str, bool]:
  245.     """多线程下载资产"""
  246.     results = {}
  247.    
  248.     with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
  249.         future_to_file = {
  250.             executor.submit(
  251.                 download_file,
  252.                 info['url'],
  253.                 target_dir / filename
  254.             ): filename
  255.             for filename, info in assets_to_download.items()
  256.         }
  257.        
  258.         for future in concurrent.futures.as_completed(future_to_file):
  259.             filename = future_to_file[future]
  260.             try:
  261.                 success = future.result()
  262.                 results[filename] = success
  263.                 status = "success" if success else "failed"
  264.                 print(f"download {filename}: {status}")
  265.             except Exception as e:
  266.                 results[filename] = False
  267.                 print(f"download {filename} error: {e}")
  268.    
  269.     return results
  270.  
  271.  
  272. def save_asset_list(asset_list: Dict[str, Dict[str, Any]], file_path: Path):
  273.     """保存资产列表到JSON文件"""
  274.     with open(file_path, 'w', encoding='utf-8') as f:
  275.         json.dump(asset_list, f, ensure_ascii=False, indent=2)
  276.  
  277.  
  278. def load_asset_list(file_path: Path) -> Dict[str, Dict[str, Any]]:
  279.     """从JSON文件加载资产列表"""
  280.     if not file_path.exists():
  281.         return {}
  282.    
  283.     with open(file_path, 'r', encoding='utf-8') as f:
  284.         return json.load(f)
  285.  
  286.  
  287. def main():
  288.     base_dir = Path('.')
  289.     assets_dir = base_dir / 'assets'
  290.     update_dir = base_dir / 'update'
  291.     asset_list_file = base_dir / 'asset_list.json'
  292.    
  293.     print("Eversoul resource update checker")
  294.     print("=" * 40)
  295.  
  296.     assets_dir.mkdir(exist_ok=True)
  297.    
  298.     has_assets = assets_dir.exists() and any(assets_dir.iterdir())
  299.    
  300.     print("get latest version info...")
  301.     version = get_app_version()
  302.     print(f"current version: {version}")
  303.  
  304.     print("download catalog...")
  305.     catalog = download_catalog(version)
  306.    
  307.     print("parse catalog...")
  308.     asset_list = parse_catalog(catalog, version)
  309.     print(f"found {len(asset_list)} resources")
  310.    
  311.     save_asset_list(asset_list, asset_list_file)
  312.    
  313.     print("check local resources...")
  314.     updated_files = check_assets(assets_dir, asset_list)
  315.    
  316.     print(f"need update files: {len(updated_files)}")
  317.    
  318.     if updated_files:
  319.         print("need update files list:")
  320.         for file, info in updated_files.items():
  321.             print(f"  - {file}")
  322.        
  323.         choice = input("update these files? (y/n): ").strip().lower()
  324.         if choice == 'y':
  325.             target_dir = assets_dir if not has_assets else update_dir
  326.             target_dir.mkdir(parents=True, exist_ok=True)
  327.            
  328.             print(f"start download update files to {target_dir}...")
  329.             download_results = download_assets(updated_files, target_dir)
  330.            
  331.             success_count = sum(1 for success in download_results.values() if success)
  332.             print(f"download success: {success_count}/{len(updated_files)}")
  333.            
  334.     else:
  335.         print("all files are up to date, no need to update")
  336.  
  337.  
  338. if __name__ == "__main__":
  339.     main()
Add Comment
Please, Sign In to add comment