import copy
from urllib import parse as urlparse
import glob
import os
import re
import shutil
import subprocess
import wget
import romsearch
from ..util import (
load_yml,
setup_logger,
unzip_file,
centred_string,
left_aligned_string,
)
ALLOWED_PATCH_METHODS = [
"xdelta",
"rompatcher.js",
]
def find_file_by_extensions(
file_exts,
patch_dir,
):
"""Find a file by extension
Args:
file_exts (list): File extensions to loop over
patch_dir (str): Patch directory
"""
files = []
for file_ext in file_exts:
file = glob.glob(os.path.join(patch_dir, f"*{file_ext}"))
files.extend(file)
return files
[docs]
class ROMPatcher:
def __init__(
self,
platform,
config_file=None,
config=None,
platform_config=None,
regex_config=None,
logger=None,
log_line_sep="=",
log_line_length=100,
):
"""ROM Patching tool
There are different ways to patch files based on platforms, so we need
to keep track of a number of things here
Args:
platform (str): Platform name
config_file (str, optional): path to config file. Defaults to None.
config (dict, optional): configuration dictionary. Defaults to None.
platform_config (dict, optional): platform configuration dictionary. Defaults to None.
regex_config (dict, optional): regex configuration dictionary. Defaults to None.
logger (logging.Logger, optional): logger. Defaults to None.
log_line_length (int, optional): Line length of log. Defaults to 100
"""
if config_file is None and config is None:
raise ValueError("config_file or config must be specified")
if config is None:
config = load_yml(config_file)
self.config = config
if logger is None:
log_dir = self.config.get("dirs", {}).get(
"log_dir", os.path.join(os.getcwd(), "logs")
)
log_level = self.config.get("logger", {}).get("level", "info")
logger = setup_logger(
log_level=log_level,
script_name=f"ROMPatcher",
log_dir=log_dir,
)
self.logger = logger
# Pull in directories
self.patch_dir = self.config.get("dirs", {}).get("patch_dir", None)
if self.patch_dir is None:
raise ValueError("patch_dir needs to be defined in config")
if not os.path.exists(self.patch_dir):
os.makedirs(self.patch_dir)
self.platform = platform
# Pull in platform config that we need
mod_dir = os.path.dirname(romsearch.__file__)
if platform_config is None:
platform_config_file = os.path.join(
mod_dir, "configs", "platforms", f"{platform}.yml"
)
platform_config = load_yml(platform_config_file)
self.platform_config = platform_config
# Read in the various pre-set configs we've got
mod_dir = os.path.dirname(romsearch.__file__)
if regex_config is None:
regex_file = os.path.join(mod_dir, "configs", "regex.yml")
regex_config = load_yml(regex_file)
self.regex_config = regex_config
self.log_line_sep = log_line_sep
self.log_line_length = log_line_length
[docs]
def run(
self,
file,
patch_url,
rom_dict=None,
):
"""Run the ROMPatcher
Args:
file (str): file to patch
patch_url (str): URL for patch files
rom_dict: Parsed ROM dictionary
"""
filename_no_ext = os.path.splitext(os.path.basename(file))[0]
patch_dir = str(os.path.join(self.patch_dir, self.platform, filename_no_ext))
# Clean out and create patch directory
if os.path.exists(patch_dir):
shutil.rmtree(patch_dir)
if not os.path.exists(patch_dir):
os.makedirs(patch_dir)
# Move and unzip file, if needed
self.logger.info(
centred_string(
f"Moving {file} to {patch_dir}",
total_length=self.log_line_length,
)
)
# If we have a zip, unzip that cutie
if file.endswith(".zip"):
unzip_file(file, patch_dir)
else:
ensure_directory = ""
if not self.patch_dir.endswith(os.path.sep):
ensure_directory = os.path.sep
shutil.copy(file, patch_dir + ensure_directory)
# Find the unpatched file
unpatched_file = self.get_unpatched_file(patch_dir=patch_dir)
# Next up, download the patch file
patch_file = self.download_patch_file(
patch_url,
patch_dir=patch_dir,
)
# If we don't find a patch in the base directory, search
# via various conditions. For now, just revision
if len(patch_file) == 0:
if rom_dict is not None:
revision = rom_dict["revision"]
if revision == "":
revision = "v1"
else:
raise ValueError("Searching for patch files requires ROM parsing!")
patch_subdirs = [x[0] for x in os.walk(patch_dir)]
found_patch = False
for patch_subdir in patch_subdirs:
if found_patch:
continue
rel_patch_subdir = os.path.basename(patch_subdir)
# Search by revision
if "rev" in rel_patch_subdir.lower():
rev_dir_parsed = re.sub(
self.regex_config["revision"]["transform_pattern"],
self.regex_config["revision"]["transform_repl"],
rel_patch_subdir,
)
if revision == rev_dir_parsed:
patch_file = self.find_patch_file(patch_subdir)
if len(patch_file) > 0:
found_patch = True
# Otherwise, give up and move on
else:
continue
# If we have multiple potential patch files, try and find one
# that matches the name most closely
if len(patch_file) > 1:
best_patch_file = None
base_file = os.path.basename(file)
base_file = os.path.splitext(base_file)[0]
for p in patch_file:
if best_patch_file is not None:
continue
base_patch_file = os.path.basename(p)
if base_file in base_patch_file:
best_patch_file = p
if best_patch_file is not None:
patch_file = copy.deepcopy(best_patch_file)
else:
pretty_patch_files = [os.path.basename(p) for p in patch_file]
raise ValueError(
f"Could not find suitable patch file in {pretty_patch_files}"
)
elif len(patch_file) == 1:
patch_file = patch_file[0]
else:
raise ValueError("No patch files found!")
# Now we have everything we need to patch this ROM
patched_file = self.patch_rom(
unpatched_file=unpatched_file,
patch_dir=patch_dir,
patch_file=patch_file,
)
return patched_file
[docs]
def get_unpatched_file(
self,
patch_dir,
):
"""Get the unpatched file from the patch directory
Args:
patch_dir (str): Patch directory
"""
file_exts = self.platform_config.get("file_exts", [])
# Error if we don't have file extensions defined
if len(file_exts) == 0:
raise ValueError(
"File extensions need to be defined in the platform config file"
)
rom_file = find_file_by_extensions(
file_exts=file_exts,
patch_dir=patch_dir,
)
# If we've got more than one ROM file, error out
if len(rom_file) > 1:
raise ValueError("Cannot handle multiple ROM files!")
rom_file = rom_file[0]
return rom_file
[docs]
def download_patch_file(
self,
patch_url,
patch_dir,
):
"""Download a patch file
Args:
patch_url (str): URL to patch file
patch_dir (str): Patch directory
"""
self.logger.info(
centred_string(
f"Downloading patch file: {patch_url}",
total_length=self.log_line_length,
)
)
# Since the URL can already have the % in, unquote before passing to wget
patch_file = wget.download(urlparse.unquote(patch_url), out=patch_dir)
if patch_file.endswith(".zip"):
unzip_file(patch_file, patch_dir)
patch_file = self.find_patch_file(patch_dir=patch_dir)
return patch_file
[docs]
def find_patch_file(
self,
patch_dir,
):
""" Find potentially multiple patch files within directory
Args:
patch_dir (str): Patch directory to search
"""
# Find the patch file
patch_file_exts = self.platform_config.get("patch_file_exts", [])
# Error if we don't have patch file extensions defined
if len(patch_file_exts) == 0:
raise ValueError(
"Patch file extensions need to be defined in the platform config file"
)
patch_file = find_file_by_extensions(
patch_file_exts,
patch_dir=patch_dir,
)
return patch_file
[docs]
def patch_rom(
self,
unpatched_file,
patch_file,
patch_dir,
):
"""Patch a ROM
Args:
unpatched_file (str): ROM file to patch
patch_file (str): Patch file to patch
patch_dir (str): Patch directory
"""
# Get the method we're using to patch things
patch_method = self.platform_config.get("patch_method", None)
# Error out if the patch method is not defined
if patch_method is None:
raise ValueError(
"Patch method needs to be defined in the platform config file"
)
# Build an output file, adding a (ROMPatched) to the bit before the file extension
unpatch_file_split = os.path.splitext(unpatched_file)
patched_file = f"{unpatch_file_split[0]} (ROMPatched){unpatch_file_split[1]}"
if patch_method == "xdelta":
self.xdelta_patch(
unpatched_file=unpatched_file,
patch_file=patch_file,
out_file=patched_file,
)
elif patch_method == "rompatcher.js":
rompatcher_js_file = (
f"{unpatch_file_split[0]} (patched){unpatch_file_split[1]}"
)
self.rompatcher_js_patch(
unpatched_file=unpatched_file,
patch_file=patch_file,
rompatcher_js_file=rompatcher_js_file,
out_file=patched_file,
patch_dir=patch_dir,
)
else:
raise ValueError(
f"Patch method needs to be one of {', '.join(ALLOWED_PATCH_METHODS)}, not {patch_method}"
)
self.logger.info(
centred_string(
f"Patching complete!",
total_length=self.log_line_length,
)
)
return patched_file
[docs]
def xdelta_patch(
self,
unpatched_file,
patch_file,
out_file,
):
"""Patch using xdelta
Args:
unpatched_file (str): ROM file to patch
patch_file (str): Patch file to patch
out_file (str): Path for output file
"""
xdelta_path = self.config.get("rompatcher", {}).get("xdelta_path", None)
if xdelta_path is None:
raise ValueError("Path to xdelta needs to be defined in user config")
if not os.path.exists(xdelta_path):
raise ValueError("xdelta path not found")
cmd = f'{xdelta_path} -d -s "{unpatched_file}" "{patch_file}" "{out_file}"'
self.logger.info(
centred_string(
f"Patching file with xdelta:",
total_length=self.log_line_length,
)
)
self.logger.info(
left_aligned_string(
f"-> Unpatched file: {os.path.basename(unpatched_file)}",
total_length=self.log_line_length,
)
)
self.logger.info(
left_aligned_string(
f"-> Patch file: {os.path.basename(patch_file)}",
total_length=self.log_line_length,
)
)
self.logger.info(
left_aligned_string(
f"-> Output file: {os.path.basename(out_file)}",
total_length=self.log_line_length,
)
)
os.system(cmd)
return True
[docs]
def rompatcher_js_patch(
self,
unpatched_file,
patch_file,
rompatcher_js_file,
out_file,
patch_dir,
):
"""Patch using RomPatcher.js
Args:
unpatched_file (str): ROM file to patch
patch_file (str): Patch file to patch
rompatcher_js_file (str): Filename that RomPatcher.js will output
out_file (str): Path for output file
patch_dir (str): Patch directory
"""
rompatcher_js_path = self.config.get("rompatcher", {}).get(
"rompatcher_js_path", None
)
if rompatcher_js_path is None:
raise ValueError("Path to RomPatcher.js needs to be defined in user config")
if not os.path.exists(rompatcher_js_path):
raise ValueError("RomPatcher.js path not found")
cmd = f'node {rompatcher_js_path} patch "{unpatched_file}" "{patch_file}"'
self.logger.info(
centred_string(
f"Patching file with RomPatcher.js:",
total_length=self.log_line_length,
)
)
self.logger.info(
left_aligned_string(
f"-> Unpatched file: {os.path.basename(unpatched_file)}",
total_length=self.log_line_length,
)
)
self.logger.info(
left_aligned_string(
f"-> Patch file: {os.path.basename(patch_file)}",
total_length=self.log_line_length,
)
)
self.logger.info(
left_aligned_string(
f"-> RomPatcher.js file: {os.path.basename(rompatcher_js_file)}",
total_length=self.log_line_length,
)
)
# Change to patch directory so file ends up in a sensible spot
orig_dir = os.getcwd()
os.chdir(patch_dir)
with subprocess.Popen(
cmd, text=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT
) as process:
for line in process.stdout:
# Replace any potential tabs in the line, strip whitespace and skip newline at the end
line = re.sub(r"\s+", " ", line[:-1])
line = line.lstrip().rstrip()
if len(line) == 0:
continue
# Log each line of the output using the provided logger
self.logger.info(
centred_string(line, total_length=self.log_line_length)
)
# Return to working directory
os.chdir(orig_dir)
self.logger.info(
left_aligned_string(
f"-> Renaming file to: {os.path.basename(out_file)}",
total_length=self.log_line_length,
)
)
shutil.copy(rompatcher_js_file, out_file)
os.remove(rompatcher_js_file)
return True