Source code for romsearch.modules.rommover

import copy
import glob
import numpy as np
import os
import shutil
import zipfile

import romsearch
from .romcompressor import ROMCompressor
from .rompatcher import ROMPatcher
from ..util import (
    get_directory_name,
    centred_string,
    load_yml,
    setup_logger,
    unzip_file,
    load_json,
    save_json,
    remove_case_insensitive_matches,
)

COMPRESSION_FILES = {
    "chdman": [".bin", ".cue"],
}


def create_m3u(
        m3u_file,
        out_files,
        relative_dir=None,
):
    """Create an m3u playlist file for multi-disc games

    Args:
        m3u_file (str): Path to the m3u file.
        out_files (list): List of files to put into m3u file.
        relative_dir: If not None, will append this relative directory
            to each out file. Defaults to None.
    """

    with open(m3u_file, "w+") as f:

        for o in out_files:
            if relative_dir is not None:
                o = f"{relative_dir}/{o}"
            f.write(f"{o}\n")

    return True


[docs] class ROMMover: def __init__( self, platform, config_file=None, config=None, platform_config=None, regex_config=None, logger=None, log_line_sep="=", log_line_length=100, ): """ROM Moving and cache updating tool Because we do this per-platform, per-game, they need to be specified here Args: platform (str): Platform name config_file (str, optional): path to config file. Defaults to None. config (dict, optional): configuration dictionary. Defaults to None. platform_config (dict, optional): platform configuration dictionary. Defaults to None. regex_config (dict, optional): regex configuration dictionary. Defaults to None. logger (logging.Logger, optional): logger. Defaults to None. log_line_length (int, optional): Line length of log. Defaults to 100 """ if config_file is None and config is None: raise ValueError("config_file or config must be specified") if config is None: config = load_yml(config_file) self.config = config if logger is None: log_dir = self.config.get("dirs", {}).get( "log_dir", os.path.join(os.getcwd(), "logs") ) logger_add_dir = str(os.path.join(platform)) log_level = self.config.get("logger", {}).get("level", "info") logger = setup_logger( log_level=log_level, script_name=f"ROMMover", log_dir=log_dir, additional_dir=logger_add_dir, ) self.logger = logger self.log_line_sep = log_line_sep self.log_line_length = log_line_length # Pull in directories self.raw_dir = self.config.get("dirs", {}).get("raw_dir", None) if self.raw_dir is None: raise ValueError("raw_dir needs to be defined in config") self.rom_dir = self.config.get("dirs", {}).get("rom_dir", None) if self.rom_dir is None: raise ValueError("rom_dir needs to be defined in config") # Whether we'll separate directories or not self.separate_directories = self.config.get("romsearch", {}).get( "separate_directories", True ) # Whether we'll handle multi-disc files or not self.handle_multi_discs = self.config.get("romsearch", {}).get( "handle_multi_discs", False ) self.run_rompatcher = self.config.get("romsearch", {}).get( "run_rompatcher", False ) self.patch_dir = self.config.get("dirs", {}).get("patch_dir", None) if self.patch_dir is None and self.run_rompatcher: raise ValueError("patch_dir needs to be defined in config") cache_dir = self.config.get("dirs", {}).get("cache_dir", os.getcwd()) if not os.path.exists(cache_dir): os.makedirs(cache_dir) cache_file = os.path.join(cache_dir, f"cache ({platform}).json") if os.path.exists(cache_file): cache = load_json(cache_file) else: cache = {} self.platform = platform self.cache_file = cache_file self.cache = cache # Pull in platform config that we need mod_dir = os.path.dirname(romsearch.__file__) if platform_config is None: platform_config_file = os.path.join( mod_dir, "configs", "platforms", f"{platform}.yml" ) platform_config = load_yml(platform_config_file) self.platform_config = platform_config if regex_config is None: regex_file = os.path.join(mod_dir, "configs", "regex.yml") regex_config = load_yml(regex_file) self.regex_config = regex_config self.unzip = self.platform_config.get("unzip", False) self.compress = self.platform_config.get("compress", False) self.compress_method = self.platform_config.get("compress_method", None) # If we have compression on, we need to define a compression method if self.compress and self.compress_method is None: raise ValueError("compress_method needs to be defined if compressing") # If we have compression, make sure we have a path to the tool defined self.compress_method_path = self.config.get("romcompressor", {}).get( f"{self.compress_method}_path", None ) if self.compress and self.compress_method_path is None: self.logger.error( centred_string( f"{self.compress_method}_path needs to be defined in the user config if compressing." f"Disabling compression", total_length=self.log_line_length, ) ) self.compress = False
[docs] def run( self, rom_dict, ): """Run the ROMMover Args: rom_dict (dict): ROM dictionary """ self.logger.info(f"{self.log_line_sep * self.log_line_length}") self.logger.info( centred_string( f"Running ROMMover for {self.platform}", total_length=self.log_line_length, ) ) self.logger.info(f"{self.log_line_sep * self.log_line_length}") # Clear out the compress and patch directory if self.compress: compress_files_removed = self.clean_compress_dir(rom_dict=rom_dict) if len(compress_files_removed) > 0: self.logger.info( centred_string("Compressed files removed:", total_length=self.log_line_length, ) ) for c in compress_files_removed: self.logger.info(centred_string(f"{c}", total_length=self.log_line_length, ) ) self.logger.info(f"{'-' * self.log_line_length}") patch_dirs_removed = self.clean_patch_dir(rom_dict=rom_dict) if len(patch_dirs_removed) > 0: self.logger.info( centred_string("Patch directories removed:", total_length=self.log_line_length, ) ) for p in patch_dirs_removed: self.logger.info(centred_string(f"{p}", total_length=self.log_line_length, ) ) self.logger.info(f"{'-' * self.log_line_length}") roms_moved = self.move_roms(rom_dict) self.save_cache() self.logger.info(f"{self.log_line_sep * self.log_line_length}") return roms_moved
[docs] def clean_compress_dir(self, rom_dict, ): """Clean any unneeded files from the compress directory Args: rom_dict (dict): ROM dictionary for everything """ files_removed = [] compress_dir = self.config.get("dirs", {}).get("compress_dir", None) # If we don't have a compress directory set, then just return if compress_dir is None: return files_removed # Get a list of all the ROM files, without extensions since they will change all_rom_files = [] for game in rom_dict: game_dict = rom_dict[game] all_rom_files.extend([os.path.splitext(game_dict[rom]["download_name"])[0] for rom in game_dict] ) # Get a list of files in the compress directory platform_compress_dir = os.path.join(compress_dir, self.platform) compress_files = glob.glob(os.path.join(platform_compress_dir, "*")) compress_files = [os.path.basename(f) for f in compress_files] compress_files.sort() compress_files_no_ext = [os.path.splitext(f)[0] for f in compress_files] for c_idx, c in enumerate(compress_files_no_ext): if c not in all_rom_files: c_orig = compress_files[c_idx] file_to_remove = os.path.join(platform_compress_dir, c_orig) # For some reason, sometimes this fails, so just keep going success = False while not success: try: os.remove(file_to_remove) success = True except FileNotFoundError: pass files_removed.append(c_orig) return files_removed
[docs] def clean_patch_dir(self, rom_dict, ): """Clean any unneeded directories from the patch directory Args: rom_dict (dict): ROM dictionary for everything """ patch_dirs_removed = [] # If we don't have a patch directory set, then just return if self.patch_dir is None: return patch_dirs_removed # If the platform patch directory doesn't exist, just return platform_patch_dir = os.path.join(self.patch_dir, self.platform) if not os.path.exists(platform_patch_dir): return patch_dirs_removed # Get a list of all the ROM files, without extensions since we're going into directories here all_rom_files = [] for game in rom_dict: game_dict = rom_dict[game] all_rom_files.extend([os.path.splitext(game_dict[rom]["download_name"])[0] for rom in game_dict] ) # Get a list of patch directories patch_dirs = os.listdir(platform_patch_dir) # If they don't match files we're expecting, then remove for p in patch_dirs: if p not in all_rom_files: shutil.rmtree(os.path.join(platform_patch_dir, p)) patch_dirs_removed.append(p) return patch_dirs_removed
[docs] def move_roms(self, all_rom_dict): """Actually move the roms Args: all_rom_dict (dict): ROM dictionary for everything """ roms_moved = [] all_multi_discs = {} total_games = len(all_rom_dict) for game_no, game in enumerate(all_rom_dict): rom_dict = all_rom_dict[game] for rom_no, rom in enumerate(rom_dict): # Because the filename can change, keep it here full_name = copy.deepcopy(rom_dict[rom]["full_name"]) rom_file = copy.deepcopy(rom_dict[rom]["download_name"]) short_name = copy.deepcopy(rom_dict[rom]["short_name"]) # If we're either a superset or a compilation, then # inherit a game and directory name from the ROM # instead. This will avoid multiple downloads in # some circumstances is_superset = rom_dict[rom].get("is_superset", False) is_compilation = rom_dict[rom].get("is_compilation", False) # Pull out a clean directory name and disc-free name in case we need it if is_superset or is_compilation: dir_name = get_directory_name(full_name) game = copy.deepcopy(short_name) else: dir_name = str(copy.deepcopy(rom_dict[rom]["dir_name"])) # Keep track of what this original directory name is, since we might need # it dir_name_original = copy.deepcopy(dir_name) disc_free_name = str(copy.deepcopy(rom_dict[rom]["disc_free_name"])) cache_mod_time = ( self.cache.get(self.platform, {}) .get(game, {}) .get(rom, {}) .get("file_mod_time", 0) ) # Figure out if we're patching the ROM, based on whether it's already been patched # and if there's a patch file rom_patched = ( self.cache.get(self.platform, {}) .get(game, {}) .get(rom, {}) .get("patched", False) ) patch_url = rom_dict.get(rom, {}).get("patch_file", "") to_patch_rom = False if not rom_patched and patch_url != "" and self.run_rompatcher: to_patch_rom = True # Keep track if we're expecting a patched file expecting_patched_rom = False if patch_url != "" and self.run_rompatcher: expecting_patched_rom = True # Keep track of absolute directories and relative directories to the # platform itself, for cache reasons # Decide whether to use platform name or ES-friendly name if ( self.config.get("rommover", {}).get("use_es_friendly_names", False) and self.platform_config.get("es_friendly_name", None) is not None ): base_dir = self.platform_config.get("es_friendly_name") else: base_dir = self.platform # Build output path if self.separate_directories: out_dir = os.path.join(self.rom_dir, base_dir, dir_name) out_base_dir = copy.deepcopy(dir_name) else: out_dir = os.path.join(self.rom_dir, base_dir) out_base_dir = "" # Skip if the file modification time matches the one in the cache, we're not patching, and the # destination file exists # If we're handling multi-disc files, and we have a multi-disc file, then make the output # directory a hidden one if rom_dict[rom]["multi_disc"] and self.handle_multi_discs: m3u_out_dir = copy.deepcopy(out_dir) out_dir = os.path.join(str(out_dir), f".{disc_free_name}") # Also update the dir name, since we use that later game_dir_name = copy.deepcopy(out_base_dir) out_base_dir = os.path.join(out_base_dir, f".{disc_free_name}") if disc_free_name not in all_multi_discs: all_multi_discs[disc_free_name] = { "m3u_out_dir": m3u_out_dir, "game": dir_name_original, "game_dir_name": game_dir_name, "relative_dir": f".{disc_free_name}", "out_files": [], "modified": False, } # Loop over here, since the extensions might change. Search specifically by the filename # to make things quicker rom_file_no_ext = os.path.splitext(rom_file)[0] out_files = glob.glob(os.path.join(str(out_dir), f"{rom_file_no_ext}*")) short_out_files = [os.path.basename(o) for o in out_files] final_file_exists = False final_file_name = None # First, search for an exact match, but only if we're not unzipping if not self.unzip and not to_patch_rom: for o in short_out_files: if final_file_exists: continue if o == rom_file: final_file_exists = True final_file_name = os.path.basename(o) # If we have unzip and compress set on, then we need to # figure out which we're actually doing unzip = copy.deepcopy(self.unzip) compress = copy.deepcopy(self.compress) if self.unzip and self.compress: raw_file = os.path.join(self.raw_dir, self.platform, rom_file) compress_suitable = self.check_compress_suitable(raw_file) if compress_suitable: unzip = False compress = True else: unzip = True compress = False # If we're unzipping (and potentially patching), then pull the expected files out here and check again if unzip or expecting_patched_rom: final_file_exists, final_file_name = self.check_files_exist( rom=rom_file, files=short_out_files, file_ext_key="file_exts", patched_rom=expecting_patched_rom, ) # If we're compressing, then pull the expected files out here and check again if compress: final_file_exists, final_file_name = self.check_files_exist( rom=rom_file, files=short_out_files, file_ext_key="compress_file_exts", patched_rom=expecting_patched_rom, ) # Now check if we've got a match in all but case. Period here is important to avoid # accidentally removing patched files remove_case_insensitive_matches(file_to_match=rom_file_no_ext, pattern=f"{rom_file_no_ext}.*", path=str(out_dir), ) # If nothing has changed, then move on if ( rom_dict[rom]["file_mod_time"] == cache_mod_time and final_file_exists ): self.logger.info( centred_string( f"[{game_no + 1}/{total_games}]: No updates for {rom}, skipping", total_length=self.log_line_length, ) ) # Make sure we keep track of multi-disc stuff, even if we're not necessarily changing it if rom_dict[rom]["multi_disc"] and self.handle_multi_discs: if isinstance(final_file_name, str): final_file_name = [final_file_name] all_multi_discs[disc_free_name]["out_files"].extend( final_file_name ) # Gracefully update the cache from earlier versions cache_files = ( self.cache.get(self.platform, {}) .get(game, {}) .get(rom, {}) .get("all_files", []) ) if len(cache_files) == 0: # Log whether we've patched or not patched = False if expecting_patched_rom: patched = True rom_dict[rom]["patched"] = patched # Update the cache self.cache_update( game=game, rom=rom, files=short_out_files, out_dir=out_base_dir, rom_dict=rom_dict, ) continue # We need to keep track of output files out_files = [] # If we're patching ROMs, then do that here if to_patch_rom: rom_patch_file = os.path.join(self.raw_dir, self.platform, rom_file) patcher = ROMPatcher( platform=self.platform, config=self.config, platform_config=self.platform_config, regex_config=self.regex_config, logger=self.logger, log_line_length=self.log_line_length, ) full_rom = patcher.run( file=str(rom_patch_file), patch_url=patch_url, rom_dict=rom_dict[rom], ) unzip = False patched = True else: full_dir = os.path.join(self.raw_dir, self.platform) full_rom = os.path.join(str(full_dir), rom_file) patched = False # If we're compressing ROMs, do that here if compress: if to_patch_rom: raise NotImplementedError( "Currently cannot handle compressing of patched files" ) rom_compress_file = os.path.join( self.raw_dir, self.platform, rom_file ) full_rom = self.compress_file( rom_compress_file, ) # Log whether we've patched or not rom_dict[rom]["patched"] = patched # Move the main file. Don't delete folders as the game and the out directory # don't necessarily match move_file_success, moved_files = self.move_file( full_rom, game=game, out_dir=out_dir, unzip=unzip ) if not move_file_success: self.logger.warning( centred_string( f"[{game_no + 1}/{total_games}]: {rom_file} not found in raw directory, skipping", total_length=self.log_line_length, ) ) continue out_files.extend(moved_files) self.logger.info( centred_string( f"[{game_no + 1}/{total_games}]: Moved {rom_file}", total_length=self.log_line_length, ) ) # If there are additional files to move/unzip, do that now if "subchannels" in self.platform_config: for subchannel in self.platform_config["subchannels"]: add_full_dir = os.path.join( self.raw_dir, f"{self.platform} {subchannel}" ) add_file = os.path.join(add_full_dir, rom_file) if os.path.exists(add_file): # These files should *always* be unzipped move_file_success, moved_files = self.move_file( add_file, game=game, out_dir=out_dir, unzip=True ) if not move_file_success: self.logger.warning( centred_string( f"{rom_file} {subchannel} not found in raw directory, skipping", total_length=self.log_line_length, ) ) else: self.logger.info( centred_string( f"[{game_no + 1}/{total_games}]: Moved {rom_file} ({subchannel})", total_length=self.log_line_length, ) ) out_files.extend(moved_files) # Add these to the multi-disc dictionary, if needed if rom_dict[rom]["multi_disc"] and self.handle_multi_discs: all_multi_discs[disc_free_name]["out_files"].extend(out_files) all_multi_discs[disc_free_name]["modified"] = True # Update and save the cache self.cache_update( game=game, rom=rom, files=out_files, out_dir=out_base_dir, rom_dict=rom_dict, ) self.save_cache() roms_moved.append(rom_file) # Handle the multi-disc files by generating m3u playlists if self.handle_multi_discs and len(all_multi_discs) > 0: for multi_disc in all_multi_discs: # Check if any unexpected files are in the out folder, and if so, remove md_files = glob.glob(os.path.join(all_multi_discs[multi_disc]["m3u_out_dir"], all_multi_discs[multi_disc]["relative_dir"], "*.*") ) md_files_short = [os.path.basename(f) for f in md_files] for md_idx, md_file in enumerate(md_files_short): if md_file not in all_multi_discs[multi_disc]["out_files"]: file_to_remove = md_files[md_idx] success = False while not success: try: os.remove(file_to_remove) success = True except FileNotFoundError: pass # Flag as modified so we recreate the m3u all_multi_discs[multi_disc]["modified"] = True # Get the full m3u file name m3u_file_name = f"{multi_disc}.m3u" m3u_full_file_name = os.path.join( all_multi_discs[multi_disc]["m3u_out_dir"], m3u_file_name ) # Assume that by sorting we're good on the file order out_files = sorted(all_multi_discs[multi_disc]["out_files"]) # If we already have this file and nothing's changed, then continue on if os.path.exists(m3u_full_file_name) and not all_multi_discs[multi_disc]["modified"]: continue # Or, create the m3u and put this into the cache else: create_m3u( m3u_file=m3u_full_file_name, out_files=out_files, relative_dir=all_multi_discs[multi_disc]["relative_dir"], ) self.cache_update_multi_disc( game=all_multi_discs[multi_disc]["game"], m3u_name=f"{multi_disc} [Multi Disc]", files=[m3u_file_name], out_dir=all_multi_discs[multi_disc]["game_dir_name"], ) self.save_cache() self.logger.info( centred_string( f"Created {m3u_file_name}", total_length=self.log_line_length, ) ) return roms_moved
[docs] def check_files_exist( self, rom, files, file_ext_key, patched_rom=False, ): """Check files exist, so we know whether to move things around or not Args: rom: ROM name files: List of existing files file_ext_key: Potential file extensions patched_rom: Whether ROM is patched or not. Defaults to False """ final_file_exists = False final_file_name = None # Pull potential file extensions out file_exts = self.platform_config.get(file_ext_key, []) if len(file_exts) == 0: raise ValueError( "ROM file extensions should be defined in the platform config" ) rom_no_ext = os.path.splitext(rom)[0] # Add in that this has been patched, if necessary if patched_rom: rom_no_ext += " (ROMPatched)" # Loop over the files, loop over the ROM file extensions, look for a match for o in files: for file_ext in file_exts: rom_w_ext = rom_no_ext + file_ext if o == rom_w_ext: final_file_exists = True final_file_name = os.path.basename(o) return final_file_exists, final_file_name
[docs] def check_compress_suitable( self, rom_file, ): """Check if a file is suitable for compression given compression method Args: rom_file: File to check """ with zipfile.ZipFile(rom_file, "r") as zip_file: # Get the names of all the files in the zip file names = zip_file.namelist() name_exts = np.unique([os.path.splitext(n)[-1] for n in names]) name_exts = [str(n) for n in name_exts] name_exts.sort() if name_exts == COMPRESSION_FILES[self.compress_method]: compress_suitable = True else: compress_suitable = False return compress_suitable
[docs] def move_file( self, zip_file_name, game, out_dir=None, unzip=False, delete_folder=False, ): """Move file to directory structure, optionally unzipping""" moved_files = [] # If the file doesn't exist, crash out if not os.path.exists(zip_file_name): return False, moved_files # If we don't have an output directory, set one from the game name if out_dir is None: out_dir = os.path.join(self.rom_dir, self.platform, game) if delete_folder and os.path.exists(out_dir): shutil.rmtree(out_dir) if not os.path.exists(out_dir): os.makedirs(out_dir) out_dir = str(out_dir) if unzip: unzipped_files = unzip_file(zip_file_name, out_dir) moved_files.extend(unzipped_files) else: short_zip_file = os.path.split(zip_file_name)[-1] out_file = os.path.join(out_dir, short_zip_file) # Remove this file if it already exists if os.path.exists(out_file): os.remove(out_file) os.link(zip_file_name, out_file) moved_files.append(short_zip_file) return True, moved_files
[docs] def compress_file( self, rom, ): """Compress a file Args: rom: ROM name """ rc = ROMCompressor( platform=self.platform, config=self.config, compress_method=self.compress_method, compress_method_path=self.compress_method_path, logger=self.logger, log_line_length=self.log_line_length, log_line_sep=self.log_line_sep, ) compressed_file = rc.run(rom) return compressed_file
[docs] def cache_update( self, game, rom, files, out_dir, rom_dict, ): """Update the cache with new file data Args: game: Game name rom: ROM name for dictionary info files: List of files to save to cache out_dir: Output directory for files, relative to [rom_dir]/[platform] rom_dict: Dictionary of ROM properties """ # If we don't have dictionaries already set, create if self.platform not in self.cache: self.cache[self.platform] = {} if game not in self.cache[self.platform]: self.cache[self.platform][game] = {} if not self.cache[self.platform][game]: self.cache[self.platform][game] = {} # Include info about whether the ROM has been patched or not, # the patch file, and all the files we've included and where # we've put em self.cache[self.platform][game][rom] = { "file_mod_time": rom_dict[rom]["file_mod_time"], "patch_file": rom_dict[rom]["patch_file"], "patched": rom_dict[rom]["patched"], "output_directory": out_dir, "all_files": files, }
[docs] def cache_update_multi_disc( self, game, m3u_name, files, out_dir, ): """Update the cache with m3u playlist info Args: game: Game name m3u_name: m3u playlist name to save to cache files: List of files to save to cache out_dir: Output directory for files, relative to [rom_dir]/[platform] """ # If we don't have dictionaries already set, create if self.platform not in self.cache: self.cache[self.platform] = {} if game not in self.cache[self.platform]: self.cache[self.platform][game] = {} if not self.cache[self.platform][game]: self.cache[self.platform][game] = {} # Include just info on where the m3u file is, and that it's a multi-disc file self.cache[self.platform][game][m3u_name] = { "multi_disc": True, "output_directory": out_dir, "all_files": files, }
[docs] def save_cache(self): """Save out the cache file""" cache = copy.deepcopy(self.cache) save_json(cache, self.cache_file, sort_key=self.platform)