Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- """This tool is designed to emulate BZCC file indexing for ODF files and open search queries as tabs in a text editor."""
- VERSION = 1.03
- import os
- import subprocess
- # Default steam directory to be assumed if script could not read from registry.
- STEAM_DIRECTORY = "C:\\Program Files (x86)\\Steam"
- # Path to text editor you want to open ODF files with, see function 'open_files'
- TEXT_EDITOR = os.path.join(os.environ["WINDIR"], "system32", "notepad.exe") # Set TEXT_EDITOR_AS_TABS to False to use notepad
- #TEXT_EDITOR = "C:\\Program Files\\Notepad++\\notepad++.exe"
- # TEXT_EDITOR = "C:\\program files\\wscite510\\SciTE.exe" # @CHANGE
- # If True, open ODF files as tabs. Otherwise open an instance for each file.
- TEXT_EDITOR_AS_TABS = False
- TEXT_EDITOR_MAX_TABS = 32
- TEXT_EDITOR_MAX_INSTANCES = 4 # Used if TEXT_EDITOR_AS_TABS == False
- # Where to store cached ODF locations (need write permission)
- # Fast storage such as an SSD location would be preferred.
- # To rebuild cache, simply delete the files in this directory.
- CACHE_DIRECTORY = os.path.join(os.environ["TMP"], "Battlezone II Combat Commander ODF Cache")
- CACHE_MYDOCS_DIRECTORIES = False # Set this to True if you have a huge addon folder you rarely change to increase performance.
- CACHE_ROOT_DIRECTORIES = True
- CACHE_WORKSHOP_DIRECTORIES = True
- # If True, uses regex matching for queries that contain any regex characters.
- # Otherwise simple wildcard * matches take its place.
- FULL_REGEX_MATCHING = False
- # Show partial matches if no ODF matching any query is found.
- # This has the performance cost of iterating over all odf names
- # if any queries are not matched to check for partial matches.
- IMPLEMENT_SUGGESTIONS = True
- # If True, use the currently running mod instead of the user-selected mod.
- # In 'launch.ini', the active mod id value updates as soon as a player clicks
- # to join a game and downloads the mod immediately as they enter the lobby.
- # If the player hasn't joined a game running a different mod, it will just be the same as selected mod.
- USE_ACTIVE_MOD_INSTEAD_OF_SELECTED_MOD = False
- # This is called when the queries are finalized and the list of ODF files are passed
- def open_files(odf_full_file_paths=[]):
- if not os.path.exists(TEXT_EDITOR):
- print("%r not found. Set a valid program to open ODF files with.\n%d query matches:" % (TEXT_EDITOR, len(odf_full_file_paths)))
- for path in odf_full_file_paths:
- print(">", path)
- return
- if TEXT_EDITOR_AS_TABS:
- subprocess.call([TEXT_EDITOR] + odf_full_file_paths)
- else:
- for which in odf_full_file_paths:
- subprocess.Popen([TEXT_EDITOR, which])
- # ############################ ###########################
- # ########################### ##########################
- # ########################### ##########################
- # ################### ##################
- # ############### ########## # ########## ##############
- # ############# ############ # ############ ############
- # ########### ############## # ############## ##########
- # ########## ############## ### ############## #########
- # ######### ###### ##### ### #### ###### ########
- # ######## ######### ########## ######### #######
- # ####### ############# ### ### ############# #######
- # ####### ############ ### ### ############ ######
- # ####### ######### #### ##### ######### #######
- # ######## ###### ##################### ###### #######
- # ############### ##############
- # ###### ### ########## ########## ### #####
- # ###### ################ ################ #####
- # ####### ################## ################## ######
- # ############### ############ ########### ###############
- # ################### ############# ##################
- # ____________________________________________________________
- import re
- import sys
- import zlib
- import time
- import struct
- import pickle
- import configparser
- from datetime import datetime
- from collections import namedtuple
- from tempfile import NamedTemporaryFile
- # IMPORT BZ2CFG START
- # from bz2cfg import CFG
- class ParseError(Exception):
- def __init__(self, file="UNDEFINED", line=0, col=0, msg="Parsing Error"):
- self.file = file
- self.line = line
- self.col = col
- self.msg = msg
- def __str__(self):
- return "%s:%d:%d - %s" % (self.file, self.line, self.col, self.msg)
- class UnterminatedString(ParseError): pass
- class ConfigContainer:
- def walk(self):
- for child in self.children:
- yield child
- yield from child.walk()
- def get_attribute(self, attribute_name, occurrence=1):
- return self._find(attribute_name, self.attribute, occurrence)
- def get_attributes(self, name=None):
- return self._findall(name, self.attribute, False)
- def get_container(self, container_name, occurrence=1):
- return self._find(container_name, self.children, occurrence)
- def get_containers(self, name=None):
- return self._findall(name, self.children)
- def _findall(self, name, _list, reverse=False):
- if name:
- name = name.casefold()
- for item in (reversed(_list) if reverse else _list):
- if name is None or item.name.casefold() == name:
- yield item
- def _find(self, name, _list, occurrence=1):
- reverse = occurrence < 0
- occurrence = abs(occurrence)
- for index, container in enumerate(self._findall(name, _list, reverse), start=1):
- if index == occurrence:
- return container
- raise KeyError("%r not found in %r" % (name, self.name))
- def __iter__(self):
- for child in self.children:
- yield child
- def __getitem__(self, container_name):
- """Returns child container by name. Raises KeyError if not found."""
- return self.get_container(container_name)
- class CFG(ConfigContainer):
- def __init__(self, filepath=None):
- self.children = []
- self.attribute = []
- self.filepath = filepath
- self.name = filepath or "<Untitled - CFG>"
- if self.filepath:
- with open(self.filepath, "r") as f:
- self.read(f)
- def __str__(self):
- return "<%s %r>" % (__class__.__name__, self.name)
- # Yields either a word or a control character from a file, 1 at a time
- def parse_words(self, f):
- # Note: these characters do not add to the column counter
- NON_CHARACTER = "\x00\r"
- # Characters which count as a valid name
- NAME = "QWERTYUIOPASDFGHJKLZXCVBNMqwertyuiopasdfghjklzxcvbnm0123456789_.+-"
- WHITESPACE = " \t\n"
- line = col = 1
- state = READ_ANY = 0
- in_comment = 1
- in_quote = None
- word = ""
- for c in f.read():
- if c in NON_CHARACTER:
- continue
- if c == "\n":
- line += 1
- col = 1
- else:
- col += 1
- if in_quote:
- # Character matches quote, completing the string.
- # Note that BZ2 does not handle escapes (e.g. \").
- if c == in_quote:
- yield word, line, col
- word = ""
- in_quote = None
- # New line without terminating quote.
- elif c == "\n":
- raise UnterminatedString(self.filepath, line, col, "Unterminated String")
- else:
- word += c
- elif state == READ_ANY:
- if c in NAME:
- word += c
- else:
- # End of continuous word
- if c in WHITESPACE:
- if word:
- yield word, line, col
- word = ""
- # Start of comment
- elif c == "/":
- if word:
- yield word, line, col
- state = in_comment
- word = c
- # Start of string quote
- elif c in "\"'":
- if word:
- yield word, line, col
- word = ""
- in_quote = c
- # Special control character
- elif c in "{}(),;":
- if word:
- yield word, line, col
- word = ""
- yield c, line, col
- # Unhandled
- else:
- raise ParseError(self.filepath, line, col, "Unexpected Character %r" % c)
- elif state == in_comment:
- # New line character indicates of comment
- if c == "\n":
- if word:
- pass # ignore comments
- # yield word
- word = ""
- state = READ_ANY
- else:
- word += c
- def read(self, f):
- EXPECT_NAME = 0
- EXPECT_PARAM_OPEN = 1
- EXPECT_PARAM = 2
- EXPECT_CONTAINER_TYPE = 3
- state = EXPECT_NAME
- # Temporary buffers
- name = ""
- parameters = []
- # Empty root container
- container_at_level = [self]
- for word, line, col in self.parse_words(f):
- if state == EXPECT_NAME:
- if word == "}":
- if len(container_at_level) <= 1:
- # Extra closing brace: Could raise an exception here, but I think it's safe to ignore.
- continue
- del(container_at_level[-1])
- continue
- name = word
- parameters = []
- state = EXPECT_PARAM_OPEN
- continue
- elif state == EXPECT_PARAM_OPEN:
- if word == "(":
- state = EXPECT_PARAM
- continue
- elif state == EXPECT_PARAM:
- if word == ",":
- continue
- elif word == ")":
- state = EXPECT_CONTAINER_TYPE
- continue
- else:
- parameters += [word]
- continue
- elif state == EXPECT_CONTAINER_TYPE:
- # Start of a new container
- if word == "{":
- container = Container(name, *parameters)
- container_at_level[-1].children += [container]
- container_at_level.append(container)
- state = EXPECT_NAME
- continue
- # End of attribute
- elif word == ";":
- container_at_level[-1].attribute += [Attribute(name, *parameters)]
- state = EXPECT_NAME
- continue
- raise ParseError(self.filepath, line, col)
- class Container(ConfigContainer):
- def __init__(self, name, *parameters):
- self.name = name
- self.parameter = parameters
- self.children = []
- self.attribute = []
- def __str__(self):
- return "<%s %s(%s)>" % (__class__.__name__, self.name, ", ".join("\"%s\"" % p for p in self.parameter))
- class Attribute:
- def __init__(self, name, *parameters):
- self.name = name
- self.parameter = parameters
- def __str__(self):
- return "<%s %s(%s)>" % (__class__.__name__, self.name, ", ".join("\"%s\"" % p for p in self.parameter))
- # IMPORT BZ2CFG END
- # IMPORT STEAMVDF START
- class steamvdf:
- class VDF_ParseError(Exception):
- def __init__(self, msg, line, col):
- self.msg = msg
- self.line = line
- self.col = col
- def __str__(self):
- return self.msg + " (line %d column %d)" % (self.line + 1, self.col + 1) # +1 for 0-based to 1-based
- def read(path):
- """Returns dictionary of {key: value or sub}, case sensitive."""
- with open(path, "r") as f:
- escape = False
- in_quote = False
- words = []
- root = {}
- section = root
- section_history = []
- for line, row_content in enumerate(f):
- for col, c in enumerate(row_content):
- if in_quote:
- if c in "\r\n":
- raise VDF_ParseError("unterminated string %r" % word, line, col)
- elif escape:
- escape = False
- # c is an escaped character (not handled for now)
- words[-1] += "\\" + c
- elif c == in_quote:
- in_quote = False
- if len(words) == 2:
- key, value = tuple(words)
- section[key] = value
- words = []
- elif len(words) > 2:
- raise VDF_ParseError("expected 2 items for key and value but got %d items" % len(words), line, col)
- elif c == "\\":
- escape = True
- else:
- words[-1] += c
- elif c in "'\"":
- in_quote = c
- words += [""]
- elif c == "{":
- if len(words) != 1:
- raise VDF_ParseError("expected exactly 1 item before opening brace but got %d items" % len(words), line, col)
- new_section = {}
- section_history.append(section)
- section[words[0]] = new_section
- section = new_section
- words = []
- elif c == "}":
- if len(section_history) <= 0:
- raise VDF_ParseError("closing brace not expected", line, col)
- section = section_history.pop()
- elif not c.isspace():
- raise VDF_ParseError("unexpected symbol %r" % c, line, col)
- if len(section_history) > 0:
- raise VDF_ParseError("%d braces left unclosed" % len(section_at)-1, line, col)
- return root
- # IMPOT STEAMVDF END
- try:
- winreg = None # Needed to have name in scope
- try:
- import winreg
- except ImportError:
- try:
- import _winreg as winreg
- except ImportError as exception:
- print("Could not import winreg or _winreg.")
- raise exception
- registry_key = winreg.OpenKeyEx(winreg.ConnectRegistry(None, winreg.HKEY_LOCAL_MACHINE), "SOFTWARE\\WOW6432Node\\Valve\\Steam")
- key_value, key_type = winreg.QueryValueEx(registry_key, "InstallPath")
- if key_type == winreg.REG_EXPAND_SZ:
- key_value = os.path.expandvars(key_value)
- elif key_type != winreg.REG_SZ:
- raise TypeError()
- STEAM_DIRECTORY = key_value
- # Find which steam library BZCC is installed in
- vdf = steamvdf.read(os.path.join(key_value, "steamapps/libraryfolders.vdf"))["libraryfolders"]
- try:
- for index in range(1000):
- vdf_lib = vdf[str(index)]
- for game_id, size in vdf_lib["apps"].items():
- if game_id == "624970":
- STEAM_DIRECTORY = vdf_lib["path"]
- raise KeyError() # Breaks out of both loops
- except KeyError:
- pass
- except Exception as e:
- print("Failed to automatically determine steam folder.")
- print("Edit this script to change the STEAM_DIRECTORY variable to your steam directory that has BZCC installed.")
- print("Current STEAM_DIRECTORY set: %r" % STEAM_DIRECTORY)
- print("Exception: %r" % e)
- # '\' must be replaced with '/' to work with re.sub to avoid decoding character escapes
- BZ2R_STEAM_ID = 624970
- BZ2R_ROOT_DIRECTORY = ("%s/steamapps/common/BZ2R" % STEAM_DIRECTORY).replace("\\", "/")
- BZ2R_MYDOCS_DIRECTORY = ("%s/Documents/My Games/Battlezone Combat Commander" % os.environ["USERPROFILE"]).replace("\\", "/")
- BZ2R_WORKSHOP_DIRECTORY = ("%s/steamapps/workshop/content/%d" % (STEAM_DIRECTORY, BZ2R_STEAM_ID)).replace("\\", "/")
- BZ2R_ADDON_DIRECTORY = "%s/addon" % BZ2R_MYDOCS_DIRECTORY
- BZ2R_LAUNCHINI_FILE = "%s/launch.ini" % BZ2R_MYDOCS_DIRECTORY
- BZ2R_BZONECFG_FILE = "%s/bzone.cfg" % BZ2R_ROOT_DIRECTORY
- BZ2R_APPMANIFEST_FILE = "%s/steamapps/appmanifest_%d.acf" % (STEAM_DIRECTORY, BZ2R_STEAM_ID)
- BZ2R_APPWORKSHOP_FILE = "%s/steamapps/workshop/appworkshop_%d.acf" % (STEAM_DIRECTORY, BZ2R_STEAM_ID)
- RE_ROOTDIR = re.compile(r"^@rootdir[/\\]+")
- RE_MYDOCSDIR = re.compile(r"^@mydocsdir[/\\]+")
- RE_INTEGER = re.compile(r"\d+")
- # If True, the steam .acf file timestamps will be checked against cache file creation dates.
- # In the instance an item is newer than the cache file, the cache will be rebuilt automatically.
- CACHE_DATE_CHECK = True
- # Used to format time displays when indicating to user a cache is outdated (e.g. cache file is older than last workshop mod update)
- TIME_FORMAT = "%B %a %d %Y at %I:%M%p" # See https://docs.python.org/3/library/datetime.html#strftime-strptime-behavior
- # Level cache is compressed with zlib (0 = none, 1-9 slowest to fastest)
- CACHE_COMPRESSION_LEVEL = 1 # Just 1 significantly decreases file size. Anything higher seems to be a tiny but unnecessary performance hit.
- # Overwrite existing cache when applicable
- FORCE_GENERATE_CACHE = False
- # If True, all .odf files will be deleted in the cache directory on start.
- # This is to clean up any loose temp files created by pak processing.
- # These temps can get left behind if the user kills the process before it cleans up.
- AUTO_CLEAN_PAK = True
- # Print time it took to complete search (set to True to debug performance)
- SHOW_TIME = False
- # Since this script only considers ODF files, this is disabled.
- # The functionality to process addons is included for potential future expansion.
- LOAD_ADDONS = False
- # ADDON_MOD_EXT_WHITELIST = (".pic", ".tga", ".png", ".jpg", ".bmp", ".dds", ".wav", ".ogg", ".cfg", ".bmf", ".bm2", ".inf", ".des", ".otf", ".brf", ".txt")
- # If True, each ODF file in on-disk directories will be read to see if it is a valid game object.
- # This is to prevent items such as old '.odf' map descriptions from showing up.
- # Has a performance cost, and will not pick up non-gameobject odfs such as vehicle lists, weapon config, etc...
- # Caches would need to be rebuilt if changed. This setting does not apply to pak files.
- VALIDATE_ODF_FILE = False
- GlobalInfo = namedtuple("GlobalInfo", "active_workshop_id selected_workshop_id active_addons language_id")
- ModInfo = namedtuple("ModInfo", "name catagory dependencies")
- DirectoryInfo = namedtuple("DirectoryInfo", "path cache recursive kind workshop_id cache_id")
- KIND_ROOT = 0
- KIND_PAK = 1
- KIND_MYDOCS = 2
- KIND_WORKSHOP_CONFIG = 3 # Any DirectoryInfo object that is a workshop item should have a valid workshop_id value
- KIND_WORKSHOP_ASSET = 4
- KIND_WORKSHOP_ADDON = 5
- # STEAM_INSTALL_REGISTRY_KEY = "HKEY_LOCAL_MACHINE\\SOFTWARE\\WOW6432Node\\Valve\\Steam", "Language" # https://partner.steamgames.com/doc/store/localization ?
- DEFAULT_LANGAUGE_CODE = "en" # "fr" "de"
- def GetDirectoryInfo(**kwargs):
- re_directory_delimiter = re.compile(r"[/\\]+")
- path = kwargs["path"]
- # In BZCC, @rootdir/@mydocsdir only work if at the very start of path and all characters are lowercase.
- path = RE_ROOTDIR.sub(BZ2R_ROOT_DIRECTORY + "/", path)
- path = RE_MYDOCSDIR.sub(BZ2R_MYDOCS_DIRECTORY + "/", path)
- if not os.path.isabs(path):
- # BZCC behaviour seems to be that relative paths are treated as being @rootdir.
- # This might be because BZCC is just using its CWD, but in this script we will assume @rootdir.
- path = os.path.join(BZ2R_ROOT_DIRECTORY, path)
- path = os.path.normcase(path)
- path = re_directory_delimiter.sub("/", path)
- recursive = 0 if kwargs["kind"] == KIND_PAK else int(kwargs["recursive"])
- cache_id = zlib.adler32(path.encode("ascii", "ignore"))
- kwargs["cache_id"] = cache_id
- kwargs["cache"] = os.path.join(CACHE_DIRECTORY, "%d%d.cache" % (recursive, cache_id))
- kwargs["path"] = path
- if not os.path.exists(path):
- raise FileNotFoundError("%r not found" % path)
- return DirectoryInfo(**kwargs)
- def find_first_file(directory, ext):
- ext = ext.casefold()
- for root, folders, files in os.walk(directory):
- for file in files:
- if file[-len(ext)::].casefold() == ext:
- return os.path.join(root, file)
- break
- raise FileNotFoundError("file ending in %r not found in %r" % (ext, directory))
- def read_launch_ini(launch_ini_file_path=BZ2R_LAUNCHINI_FILE):
- config = configparser.ConfigParser()
- config.read(launch_ini_file_path)
- return GlobalInfo(
- int(config["config"]["globalConfigWorkshopId"]), # Current mod being played now (e.g. user joined modded game and auto-switched) (0 is stock)
- int(config["config"]["selectedConfigWorkshopId"]), # Last mod selected from the mod menu by user aka default mod (0 is stock)
- [int(i) for i in RE_INTEGER.findall(config["config"]["activeAddons"])], # List of active addon mods
- int(config["config"]["language"]) # Language ID (0 or >= 4=auto, 1=en, 2=fr, 3=de)
- )
- def read_mod_ini(mod_ini_file_path):
- config = configparser.ConfigParser()
- config.read(mod_ini_file_path)
- return ModInfo(
- config["WORKSHOP"]["modName"].strip("\"") if "modName" in config["WORKSHOP"] else None, # Name of mod
- config["WORKSHOP"]["modType"].strip("\""), # Kind of mod can be: Addon, Config, Asset
- [int(i) for i in RE_INTEGER.findall(config["WORKSHOP"]["assetDependencies"] if "assetDependencies" in config["WORKSHOP"] else "")] # List of other mods this mod inherits
- )
- def read_game_timestamp(steam_appmanifest_file_path=BZ2R_APPMANIFEST_FILE):
- re_timestamp_entry = re.compile(r"(?i)\"LastUpdated\"[\r\n\s]*\"(\d+)\"")
- with open(steam_appmanifest_file_path, "r") as f:
- match = re_timestamp_entry.search(f.read())
- if match:
- return int(match.group(1))
- raise Exception("could not find timestamp in %r" % steam_appmanifest_file_path)
- def read_workshop_timestamps(steam_appworkshop_file_path=BZ2R_APPWORKSHOP_FILE):
- timestamps = dict()
- re_timestamp_entry = re.compile(r"(?i)\"(\d+)\"[\r\n\s]*{[\r\n\s]*\"size\"[\r\n\s]*\"\d+\"[\r\n\s]*\"timeupdated\"[\r\n\s]*\"(\d+)\"")
- with open(steam_appworkshop_file_path, "r") as f:
- for workshop_item, last_updated in re_timestamp_entry.findall(f.read()):
- timestamps[int(workshop_item)] = int(last_updated)
- return timestamps
- def odf_file_iter(directory, recursive=False):
- def odf_is_game_object(filepath):
- re_classlabel = re.compile(r"(?i)\s*classLabel\s*=\s*[^\s]")
- try:
- with open(filepath, "r") as f:
- for line in f:
- if re_classlabel.match(line):
- return True
- except UnicodeError:
- pass
- return False
- for root, folders, files in os.walk(directory):
- for file in files:
- name, ext = os.path.splitext(file.casefold())
- if ext != ".odf":
- continue
- full_path = os.path.join(root, file)
- if not VALIDATE_ODF_FILE or odf_is_game_object(full_path):
- yield name, full_path
- if not recursive:
- break
- PakFileInfo = namedtuple("PakFileInfo", "bz2dir file_name data_offset comp_size real_size")
- def pak_archive_iter(bz2dir, extensions=[".odf"]):
- with open(bz2dir.path, "rb") as f:
- if struct.unpack("4sI", f.read(8)) != (b"DOCP", 2):
- raise Exception("invalid pak header in %r" % pak_file_path)
- f.seek(16, 0)
- file_count, file_offset = struct.unpack("II", f.read(8))
- f.seek(file_offset)
- for index in range(file_count):
- f.seek(4, 1)
- # Warning: Name not sanitized.
- file_name = f.read(f.read(1)[0]).decode("ascii", "ignore")
- name, ext = os.path.splitext(file_name.casefold())
- if ext in extensions:
- yield name, PakFileInfo(bz2dir, file_name, *struct.unpack("III", f.read(12)))
- else:
- f.seek(12, 1)
- class SearchTool:
- def __init__(self, global_info, default_bzone_cfg_path=BZ2R_BZONECFG_FILE, force_input=None):
- self.global_info = global_info
- self.mod_info = None
- self.bzone_cfg_path = default_bzone_cfg_path
- self.cache = dict()
- if global_info.selected_workshop_id:
- mod_workshop_directory = os.path.join(BZ2R_WORKSHOP_DIRECTORY, str(global_info.selected_workshop_id))
- cfg_file = default_bzone_cfg_path
- try:
- cfg_file = find_first_file(mod_workshop_directory, ".cfg")
- ini_file = find_first_file(mod_workshop_directory, ".ini")
- self.mod_info = read_mod_ini(ini_file)
- if self.mod_info.catagory.casefold() != "config":
- # After testing, it appears BZCC just ignores non-config mods and runs under the context of stock.
- self.mod_info = None
- self.global_info = global_info._replace(selected_workshop_id=0)
- print("Error - Workshop item %d is an %r mod, but a 'config' mod is required. Reverting to stock..." % (global_info.selected_workshop_id, self.mod_info.catagory))
- # raise Exception("invalid mod type %r in %r, must be config mod" % (self.mod_info.catagory, cfg_file))
- except FileNotFoundError:
- self.mod_info = None
- self.global_info = global_info._replace(selected_workshop_id=0)
- print("Error - Could not find config mod files for workshop item %d. Reverting to stock..." % global_info.selected_workshop_id)
- bzone = list(self.read_bzone_cfg())
- self.last_game_update = self.last_workshop_update = None
- if CACHE_DATE_CHECK:
- try:
- self.last_game_update = read_game_timestamp(BZ2R_APPMANIFEST_FILE)
- except:
- print("Error - failed to get timestamp from %r" % BZ2R_APPMANIFEST_FILE)
- try:
- self.last_workshop_update = read_workshop_timestamps(BZ2R_APPWORKSHOP_FILE)
- except:
- print("Error - failed to get timestamps from %r" % BZ2R_APPWORKSHOP_FILE)
- if self.mod_info:
- print("<%s> w/ %d Active Addon Mods and %d Asset Dependencies" % (self.mod_info.name, len(self.global_info.active_addons), len(self.mod_info.dependencies)))
- print("Workshop ID %d Using Config %r" % (self.global_info.selected_workshop_id, self.bzone_cfg_path))
- else:
- if os.path.normpath(self.bzone_cfg_path) == os.path.normpath(BZ2R_BZONECFG_FILE):
- print("Stock w/ %d Active Addon Mods" % len(self.global_info.active_addons))
- else:
- print("%r w/ %d Active Addon Mods" % (os.path.basename(self.bzone_cfg_path), len(self.global_info.active_addons)))
- print("Using Config %r" % self.bzone_cfg_path)
- if FULL_REGEX_MATCHING:
- print("Do not specify the extension '.odf' in your query. Separate multiple queries with spaces. If your query contains regular expression characters it will be used as a regex match.")
- else:
- print("Do not specify the extension '.odf' in your query. Separate multiple queries with spaces. You may use wildcards * for partial matches. Example: (*vscav evtank_* apwrck)")
- while True:
- if self.prompt(bzone, force_input):
- break
- def read_bzone_cfg(self):
- global_info, mod_info = self.global_info, self.mod_info
- bzone_cfg_path = self.bzone_cfg_path
- if global_info.selected_workshop_id:
- bzone_cfg_path = self.bzone_cfg_path = find_first_file(os.path.join(BZ2R_WORKSHOP_DIRECTORY, str(global_info.selected_workshop_id)), ".cfg")
- if not os.path.exists(bzone_cfg_path):
- raise FileNotFoundError("%r not found" % bzone_cfg_path)
- configure_file_system = CFG(bzone_cfg_path)["ConfigureFileSystem"]
- if "appendtomydocs" in configure_file_system.attribute:
- # TODO: What is even the point to this feature?
- raise NotImplementedError("AppendToMyDocs in %r not supported" % bzone_cfg_path)
- use_stream_name = configure_file_system.get_attribute("SetActiveStream", -1).parameter[0].casefold()
- for stream in configure_file_system.get_containers("ConfigureStream"):
- if stream.parameter[0].casefold() == use_stream_name:
- configure_stream = stream
- break
- else:
- raise LookupError("stream config %r not found in %r" % (use_stream_name, bzone_cfg_path))
- for attribute in configure_stream.get_attributes():
- name = attribute.name.casefold()
- kind = path = None
- if len(attribute.parameter) >= 1:
- path = attribute.parameter[0]
- if RE_ROOTDIR.match(path):
- kind = KIND_ROOT
- if RE_MYDOCSDIR.match(path):
- kind = KIND_MYDOCS
- if path:
- if name == "addstream":
- raise NotImplementedError("%r in %r not supported" % (attribute.name, bzone_cfg_path))
- if name == "adddirrecurse":
- yield GetDirectoryInfo(path=path, recursive=True, kind=kind, workshop_id=0)
- continue
- elif name == "adddir":
- yield GetDirectoryInfo(path=path, recursive=False, kind=kind, workshop_id=0)
- continue
- elif name == "addlangdir" and path:
- language_id = global_info.language_id if global_info.language_id in (1, 2, 3) else 0
- path += "/bz2r_%s" % {0: DEFAULT_LANGAUGE_CODE, 1: "en", 2: "fr", 3: "de"}[language_id]
- yield GetDirectoryInfo(path=path, recursive=True, kind=kind, workshop_id=0)
- continue
- elif name == "addpack":
- yield GetDirectoryInfo(path=path, recursive=None, kind=KIND_PAK, workshop_id=0)
- continue
- # Note that these AddLocals seem to assume the "selected mod".
- # They do not fire at all (checked in procmon) if no mod is selected.
- elif name == "addlocalworkshopdirrecurse":
- if not global_info.selected_workshop_id:
- print("Error - %r directive outside context of mod" % attribute)
- continue
- path = os.path.join(BZ2R_WORKSHOP_DIRECTORY, str(global_info.selected_workshop_id), path)
- yield GetDirectoryInfo(path=path, recursive=True, kind=KIND_WORKSHOP_CONFIG, workshop_id=global_info.selected_workshop_id)
- continue
- elif name == "addlocalworkshopdir":
- if not global_info.selected_workshop_id:
- print("Error - %r directive outside context of mod" % attribute)
- continue
- path = os.path.join(BZ2R_WORKSHOP_DIRECTORY, str(global_info.selected_workshop_id), path)
- yield GetDirectoryInfo(path=path, recursive=False, kind=KIND_WORKSHOP_CONFIG, workshop_id=global_info.selected_workshop_id)
- continue
- else:
- if name == "enableaddonmods":
- if not LOAD_ADDONS:
- continue
- for addon_mod_id in global_info.active_addons:
- # From an addon .INF:
- # "an addon workshop item must contain addonAssets/ folder."
- # "That folder will act as the root of a AddDirRecurse() call made by the engine"
- addon_path = os.path.join(BZ2R_WORKSHOP_DIRECTORY, str(addon_mod_id), "addonAssets")
- yield GetDirectoryInfo(path=addon_path, recursive=True, kind=KIND_WORKSHOP_ADDON, workshop_id=addon_mod_id)
- continue
- elif name == "addworkshopconfigs":
- # From the VSR .CFG:
- # "This is required so that the workshop is populated with the base directories
- # for each workshop item that contains a global config mod"
- # I don't think this actually causes bzcc to traverse any directories?
- continue
- elif name == "addglobalconfigmod":
- if not global_info.selected_workshop_id:
- continue
- for dependency_id in mod_info.dependencies:
- dep_mod_workshop_directory = os.path.join(BZ2R_WORKSHOP_DIRECTORY, str(dependency_id))
- dep_ini_file_path = find_first_file(dep_mod_workshop_directory, ".ini")
- dep_mod_info = read_mod_ini(dep_ini_file_path)
- if dep_mod_info.catagory.casefold() != "asset":
- raise Exception("%d invalid mod type %r in %r, dependencies must be asset mods" % (dependency_id, mod_info.catagory, dep_ini_file_path))
- # Assuming asset mods are just treated as AddDirRecurse()
- asset_path = os.path.join(BZ2R_WORKSHOP_DIRECTORY, str(dependency_id))
- yield GetDirectoryInfo(path=asset_path, recursive=True, kind=KIND_WORKSHOP_ASSET, workshop_id=dependency_id)
- continue
- raise Exception("unhandled config directive %r in %r" % (attribute.name, bzone_cfg_path))
- def prompt(self, bzone, force_input=None):
- re_odf_name = re.compile(r"[^\s]+")
- user_input = force_input if force_input != None else input("\nODF: ")
- print(">", user_input)
- queries = set()
- regex_queries = set()
- for name in re_odf_name.findall(user_input):
- if FULL_REGEX_MATCHING:
- if re.escape(name) != name:
- final_regex = "(?i)^" + name + "$" # Must match entire string
- # final_regex = "(?i)" + name # Must only match beginning
- try:
- regex_queries.add(re.compile(final_regex))
- except re.error as exception:
- print("invalid regex %r:" % final_regex, exception)
- else:
- queries.add(name.casefold())
- elif "*" in name:
- # Regex matches will always result in the performance cost of checking every possible odf name.
- re_name = re.compile(".*".join((re.escape(segment.casefold()) for segment in name.split("*"))))
- regex_queries.add(re_name)
- else:
- # If any names are not found, and suggestions are enabled, there will be a performance cost of checking every existing name.
- # Otherwise, if cached, the name is found very quickly using hash lookup (dictionary).
- queries.add(name.casefold())
- if not queries and not regex_queries:
- return False
- start = time.time()
- results = self.search(bzone, queries, regex_queries)
- if SHOW_TIME:
- print("Competion Time: %f" % (time.time() - start))
- if results:
- maximum = TEXT_EDITOR_MAX_TABS if TEXT_EDITOR_AS_TABS else TEXT_EDITOR_MAX_INSTANCES
- if len(results) > maximum:
- print("Only showing %d/%d results." % (maximum, len(results)))
- results = {key: results[key] for index, key in enumerate(results, start=1) if index <= maximum}
- temp_files = []
- for name, path in results.items():
- # TODO: Check if pak file was changed during the writing of each file, raising an exception?
- if type(path) == PakFileInfo:
- pak_info = path
- temp_file_path = os.path.join(CACHE_DIRECTORY, "%d-%s" % (pak_info.bz2dir.cache_id, pak_info.file_name))
- is_compressed = (pak_info.comp_size != pak_info.real_size)
- with open(pak_info.bz2dir.path, "rb") as pak:
- pak.seek(pak_info.data_offset)
- with open(temp_file_path, "wb") as tmp:
- if is_compressed:
- tmp.write(zlib.decompress(pak.read(pak_info.comp_size)))
- else:
- tmp.write(pak.read(pak_info.real_size))
- temp_files += [temp_file_path]
- results[name] = temp_file_path
- open_files(list(results.values()))
- # Cleanup
- for temp_file_path in temp_files:
- if os.path.exists(temp_file_path):
- os.unlink(temp_file_path)
- return (force_input != None)
- def search(self, bzone, queries, regex_queries=[]):
- """Returns results when all queries are matched or at end of search."""
- results = dict()
- suggestions = {query: list() for query in queries} if IMPLEMENT_SUGGESTIONS else None
- temp_files = [] # Temporary file paths marked for deletion (from pak files)
- if not os.path.exists(CACHE_DIRECTORY):
- os.makedirs(CACHE_DIRECTORY)
- for bz2dir in bzone:
- generate_cache = use_cache = can_cache = False
- # Default iterator - iterate over files on disk in directory
- odf_iterator = odf_file_iter(bz2dir.path, recursive=bz2dir.recursive)
- if bz2dir.kind == KIND_PAK:
- odf_iterator = pak_archive_iter(bz2dir)
- elif bz2dir.kind == KIND_ROOT and CACHE_ROOT_DIRECTORIES:
- can_cache = True
- if self.last_game_update != None and os.path.exists(bz2dir.cache):
- cache_file_time = os.path.getmtime(bz2dir.cache)
- if self.last_game_update > cache_file_time:
- game_date = datetime.fromtimestamp(self.last_game_update).strftime(TIME_FORMAT)
- cache_date = datetime.fromtimestamp(cache_file_time).strftime(TIME_FORMAT)
- print("Game was patched on %s, but cache last updated %s." % (game_date, cache_date))
- generate_cache = True
- elif bz2dir.kind == KIND_MYDOCS and CACHE_MYDOCS_DIRECTORIES:
- can_cache = True
- elif bz2dir.workshop_id and CACHE_WORKSHOP_DIRECTORIES:
- can_cache = True
- if self.last_workshop_update != None and os.path.exists(bz2dir.cache):
- if bz2dir.workshop_id in self.last_workshop_update:
- cache_file_time = os.path.getmtime(bz2dir.cache)
- if self.last_workshop_update[bz2dir.workshop_id] > cache_file_time:
- game_date = datetime.fromtimestamp(self.last_workshop_update[bz2dir.workshop_id]).strftime(TIME_FORMAT)
- cache_date = datetime.fromtimestamp(cache_file_time).strftime(TIME_FORMAT)
- print("Workshop item %d was updated %s, but cache last updated %s." % (bz2dir.workshop_id, game_date, cache_date))
- generate_cache = True
- if can_cache and not generate_cache:
- if os.path.exists(bz2dir.cache) and not FORCE_GENERATE_CACHE:
- if not bz2dir.cache in self.cache:
- with open(bz2dir.cache, "rb") as f:
- # Warning: The pickle module is not secure. Only unpickle data you trust!
- # cache[bz2dir.cache] = pickle.load(f)
- self.cache[bz2dir.cache] = pickle.loads(zlib.decompress(f.read()))
- use_cache = True
- else:
- generate_cache = True
- if use_cache:
- for query in tuple(queries):
- if query in self.cache[bz2dir.cache]:
- odf_file = self.cache[bz2dir.cache][query]
- if not os.path.exists(odf_file):
- print("Error - Cache for query %r points to non-existent file: %r" % (query, odf_file))
- generate_cache = True # Corrupt, file got moved or renamed, etc...
- else:
- if not query in results:
- results[query] = odf_file
- queries.remove(query)
- if not queries and not regex_queries and not generate_cache:
- return results
- if not suggestions and not regex_queries:
- continue # Next bz2dir, not found in this one, and not checking for suggestions or regex matches.
- # Iterate over cache values instead of traversing directory.
- odf_iterator = iter(self.cache[bz2dir.cache].items())
- if generate_cache:
- print("Generating cache for %r..." % bz2dir.path)
- self.cache[bz2dir.cache] = dict()
- for odf_name, odf_file in odf_iterator:
- if generate_cache:
- # If multiple files with the same name exist, only 1 will regin supreme
- self.cache[bz2dir.cache][odf_name] = odf_file
- if not use_cache and odf_name in tuple(queries):
- if not odf_name in results:
- results[odf_name] = odf_file
- queries.remove(odf_name)
- # All queries matched, and a cache is not being generated.
- if not queries and not regex_queries and not generate_cache:
- return results
- # Do not check regex_queries by skipping the rest of the
- # loop body for this iteration because we already matched this one.
- continue
- # Check for partial matches
- elif suggestions:
- for query in queries:
- if query in odf_name:
- suggestions[query] += [odf_name]
- # Regex (wildcard) matches
- for re_query in tuple(regex_queries):
- # if re_query.search(odf_name): # Match anywhere in name
- if re_query.match(odf_name): # Match beginning of name
- results[odf_name] = odf_file
- if generate_cache:
- with open(bz2dir.cache, "wb") as f:
- # pickle.dump(cache[bz2dir.cache], f)
- f.write(zlib.compress(pickle.dumps(self.cache[bz2dir.cache]), CACHE_COMPRESSION_LEVEL))
- if queries:
- if suggestions:
- # Any matched query with suggestions for it may not have all possible suggestions in its list.
- # Only trust suggestions for unmatched queries.
- for query in queries:
- if suggestions[query]:
- print("Suggestions for %r:" % query, ", ".join(suggestions[query]))
- else:
- print("Query %r not found in any ODF names." % query)
- else:
- print("Query %r not found." % query)
- elif regex_queries and len(results) == 0:
- print("No matches for your query.")
- return results
- if __name__ == "__main__":
- if AUTO_CLEAN_PAK:
- for root, folders, files in os.walk(CACHE_DIRECTORY):
- for file in files:
- name, ext = os.path.splitext(file)
- if ext.casefold() == ".odf":
- print("Deleting temp file %r..." % file)
- os.unlink(os.path.join(root, file))
- break
- print("")
- global_info = read_launch_ini(BZ2R_LAUNCHINI_FILE)
- mod_info = None
- bzone_cfg = BZ2R_BZONECFG_FILE
- # In BZCC 'AddLocal' directives assume selected mod as the global mod, but you can override this by changing the variable below.
- if USE_ACTIVE_MOD_INSTEAD_OF_SELECTED_MOD:
- global_info.selected_workshop_id = global_info._replace(selected_workshop_id=global_info.active_workshop_id)
- # Command line argument '/config "path/to/bzone.cfg"' will override the mod and stock cfg.
- # This will cause the mod set by launch.ini to be ignored, which seems to be the case in BZCC.
- if len(sys.argv) > 2:
- re_config = re.compile("(?i)^\s*[+-/]config\s*$")
- re_mod = re.compile("(?i)^\s*[+-/]workshopid\s*$")
- is_config = re_config.match(sys.argv[1])
- is_mod = re_mod.match(sys.argv[1]) if not is_config else None
- if len(sys.argv) > 3 and not is_config and not is_mod:
- raise Exception("only /config 'path\\to\\bzone.cfg' or '/workshopid 1234567890' accepted as command line arguments")
- if is_config:
- # Using no mod results in using either the stock bzone.cfg or the "/config" bzone cfg
- global_info = global_info._replace(active_workshop_id=0, selected_workshop_id=0)
- bzone_cfg = sys.argv[2]
- if not os.path.isabs(bzone_cfg):
- bzone_cfg = os.path.join(BZ2R_ROOT_DIRECTORY, bzone_cfg)
- elif is_mod:
- mod_id = int(sys.argv[2])
- global_info = global_info._replace(active_workshop_id=mod_id, selected_workshop_id=mod_id)
- SearchTool(global_info, bzone_cfg)
- # SearchTool(global_info, bzone_cfg, force_input="ivtank")
- # SearchTool(GlobalInfo(active_workshop_id=0, selected_workshop_id=0, active_addons=[], language_id=0)) # Force stock with no addons
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement