Source code for romsearch.modules.rommover

import copy
import glob
import os
import shutil

import romsearch
from .romcompressor import ROMCompressor
from .rompatcher import ROMPatcher
from ..util import (
    centred_string,
    load_yml,
    setup_logger,
    unzip_file,
    load_json,
    save_json,
)


def create_m3u(
    m3u_file,
    out_files,
    relative_dir=None,
):
    """Create an m3u playlist file for multi-disc games

    Args:
        m3u_file (str): Path to the m3u file.
        out_files (list): List of files to put into m3u file.
        relative_dir: If not None, will append this relative directory
            to each out file. Defaults to None.
    """

    with open(m3u_file, "w+") as f:

        for o in out_files:
            if relative_dir is not None:
                o = f"{relative_dir}/{o}"
            f.write(f"{o}\n")

    return True


[docs] class ROMMover: def __init__( self, platform, config_file=None, config=None, platform_config=None, logger=None, log_line_sep="=", log_line_length=100, ): """ROM Moving and cache updating tool Because we do this per-platform, per-game, they need to be specified here Args: platform (str): Platform name config_file (str, optional): path to config file. Defaults to None. config (dict, optional): configuration dictionary. Defaults to None. platform_config (dict, optional): platform configuration dictionary. Defaults to None. logger (logging.Logger, optional): logger. Defaults to None. log_line_length (int, optional): Line length of log. Defaults to 100 """ if config_file is None and config is None: raise ValueError("config_file or config must be specified") if config is None: config = load_yml(config_file) self.config = config if logger is None: log_dir = self.config.get("dirs", {}).get( "log_dir", os.path.join(os.getcwd(), "logs") ) logger_add_dir = str(os.path.join(platform)) log_level = self.config.get("logger", {}).get("level", "info") logger = setup_logger( log_level=log_level, script_name=f"ROMMover", log_dir=log_dir, additional_dir=logger_add_dir, ) self.logger = logger self.log_line_sep = log_line_sep self.log_line_length = log_line_length # Pull in directories self.raw_dir = self.config.get("dirs", {}).get("raw_dir", None) if self.raw_dir is None: raise ValueError("raw_dir needs to be defined in config") self.rom_dir = self.config.get("dirs", {}).get("rom_dir", None) if self.rom_dir is None: raise ValueError("rom_dir needs to be defined in config") # Whether we'll handle multi-disc files or not self.handle_multi_discs = self.config.get("romsearch", {}).get( "handle_multi_discs", False ) self.run_rompatcher = self.config.get("romsearch", {}).get( "run_rompatcher", False ) self.patch_dir = self.config.get("dirs", {}).get("patch_dir", None) if self.patch_dir is None and self.run_rompatcher: raise ValueError("patch_dir needs to be defined in config") cache_dir = self.config.get("dirs", {}).get("cache_dir", os.getcwd()) if not os.path.exists(cache_dir): os.makedirs(cache_dir) cache_file = os.path.join(cache_dir, f"cache ({platform}).json") if os.path.exists(cache_file): cache = load_json(cache_file) else: cache = {} self.platform = platform self.cache_file = cache_file self.cache = cache # Pull in platform config that we need mod_dir = os.path.dirname(romsearch.__file__) if platform_config is None: platform_config_file = os.path.join( mod_dir, "configs", "platforms", f"{platform}.yml" ) platform_config = load_yml(platform_config_file) self.platform_config = platform_config self.unzip = self.platform_config.get("unzip", False) self.compress = self.platform_config.get("compress", False) self.compress_method = self.platform_config.get("compress_method", None) # We should only have one of unzip or compress set if self.unzip and self.compress: raise ValueError("unzip and compress can't be used together") # If we have compression on, we need to define a compression method if self.compress and self.compress_method is None: raise ValueError("compress_method needs to be defined if compressing") # If we have compression, make sure we have a path to the tool defined self.compress_method_path = self.config.get("romcompressor", {}).get( f"{self.compress_method}_path", None ) if self.compress and self.compress_method_path is None: self.logger.error( centred_string( f"{self.compress_method}_path needs to be defined in the user config if compressing." f"Disabling compression", total_length=self.log_line_length, ) ) self.compress = False
[docs] def run( self, rom_dict, ): """Run the ROMMover Args: rom_dict (dict): ROM dictionary """ self.logger.info(f"{self.log_line_sep * self.log_line_length}") self.logger.info( centred_string( f"Running ROMMover for {self.platform}", total_length=self.log_line_length, ) ) self.logger.info(f"{self.log_line_sep * self.log_line_length}") roms_moved = self.move_roms(rom_dict) self.save_cache() self.logger.info(f"{self.log_line_sep * self.log_line_length}") return roms_moved
[docs] def move_roms(self, all_rom_dict): """Actually move the roms Args: all_rom_dict (dict): ROM dictionary for everything """ roms_moved = [] all_multi_discs = {} for game in all_rom_dict: rom_dict = all_rom_dict[game] for rom_no, rom in enumerate(rom_dict): # Because the filename can change, keep it here rom_file = rom_dict[rom]["download_name"] # Pull out a clean directory name, and disc-free name in case we need it dir_name = str(copy.deepcopy(rom_dict[rom]["dir_name"])) disc_free_name = str(copy.deepcopy(rom_dict[rom]["disc_free_name"])) cache_mod_time = ( self.cache.get(self.platform, {}) .get(game, {}) .get(rom, {}) .get("file_mod_time", 0) ) # Figure out if we're patching the ROM, based on whether it's already been patched # and if there's a patch file rom_patched = ( self.cache.get(self.platform, {}) .get(game, {}) .get(rom, {}) .get("patched", False) ) patch_url = rom_dict.get(rom, {}).get("patch_file", "") to_patch_rom = False if not rom_patched and patch_url != "" and self.run_rompatcher: to_patch_rom = True # Keep track if we're expecting a patched file expecting_patched_rom = False if patch_url != "" and self.run_rompatcher: expecting_patched_rom = True # Skip if the file modification time matches the one in the cache, we're not patching, and the # destination file exists out_dir = os.path.join(self.rom_dir, self.platform, dir_name) # If we're handling multi-disc files, and we have a multi-disc file, then make the output # directory a hidden one if rom_dict[rom]["multi_disc"] and self.handle_multi_discs: m3u_out_dir = copy.deepcopy(out_dir) out_dir = os.path.join(out_dir, f".{disc_free_name}") # Also update the dir name, since we use that later game_dir_name = copy.deepcopy(dir_name) dir_name = os.path.join(dir_name, f".{disc_free_name}") if disc_free_name not in all_multi_discs: all_multi_discs[disc_free_name] = { "m3u_out_dir": m3u_out_dir, "game_dir_name": game_dir_name, "relative_dir": f".{disc_free_name}", "out_files": [], } # Loop over here, since the extensions might change out_files = glob.glob(os.path.join(out_dir, "*")) short_out_files = [os.path.basename(o) for o in out_files] final_file_exists = False final_file_name = None # First, search for an exact match, but only if we're not unzipping if not self.unzip and not to_patch_rom: for o in short_out_files: if final_file_exists: continue if o == rom_file: final_file_exists = True final_file_name = os.path.basename(o) # If we're unzipping (and potentially patching), then pull the expected files out here and check again if self.unzip or expecting_patched_rom: final_file_exists, final_file_name = self.check_files_exist( rom=rom_file, files=short_out_files, file_ext_key="file_exts", patched_rom=expecting_patched_rom, ) # If we're compressing, then pull the expected files out here and check again if self.compress: final_file_exists, final_file_name = self.check_files_exist( rom=rom_file, files=short_out_files, file_ext_key="compress_file_exts", patched_rom=expecting_patched_rom, ) # If nothing has changed, then move on if ( rom_dict[rom]["file_mod_time"] == cache_mod_time and final_file_exists ): self.logger.info( centred_string( f"No updates for {rom}, skipping", total_length=self.log_line_length, ) ) # Make sure we keep track of multi-disc stuff, even if we're not necessarily changing it if rom_dict[rom]["multi_disc"] and self.handle_multi_discs: if isinstance(final_file_name, str): final_file_name = [final_file_name] all_multi_discs[disc_free_name]["out_files"].extend( final_file_name ) # Gracefully update the cache from earlier versions cache_files = ( self.cache.get(self.platform, {}) .get(game, {}) .get(rom, {}) .get("all_files", []) ) if len(cache_files) == 0: # Log whether we've patched or not patched = False if expecting_patched_rom: patched = True rom_dict[rom]["patched"] = patched # Update the cache self.cache_update( game=game, rom=rom, files=short_out_files, out_dir=dir_name, rom_dict=rom_dict, ) continue # We need to keep track of output files out_files = [] # If we're patching ROMs, then do that here if to_patch_rom: rom_patch_file = os.path.join(self.raw_dir, self.platform, rom_file) patcher = ROMPatcher( platform=self.platform, config=self.config, logger=self.logger, log_line_length=self.log_line_length, ) full_rom = patcher.run( file=rom_patch_file, patch_url=patch_url, ) unzip = False patched = True else: full_dir = os.path.join(self.raw_dir, self.platform) full_rom = os.path.join(str(full_dir), rom_file) unzip = copy.deepcopy(self.unzip) patched = False # If we're compressing ROMs, do that here if self.compress: if to_patch_rom: raise NotImplementedError( "Currently cannot handle compressing of patched files" ) rom_compress_file = os.path.join(self.raw_dir, self.platform, rom_file) full_rom = self.compress_file( rom_compress_file, ) # Log whether we've patched or not rom_dict[rom]["patched"] = patched # Move the main file. Don't delete folders as the game and the out directory # don't necessarily match move_file_success, moved_files = self.move_file( full_rom, game=game, out_dir=out_dir, unzip=unzip ) if not move_file_success: self.logger.warning( centred_string( f"{rom_file} not found in raw directory, skipping", total_length=self.log_line_length, ) ) continue out_files.extend(moved_files) self.logger.info( centred_string(f"Moved {rom_file}", total_length=self.log_line_length) ) # If there are additional file to move/unzip, do that now if "additional_dirs" in self.platform_config: for add_dir in self.platform_config["additional_dirs"]: add_full_dir = f"{self.raw_dir} {add_dir}" add_file = os.path.join(add_full_dir, rom_file) if os.path.exists(add_file): move_file_success, moved_files = self.move_file( add_file, game=game, out_dir=out_dir, unzip=unzip ) if not move_file_success: self.logger.warning( centred_string( f"{rom_file} {add_dir} not found in raw directory, skipping", total_length=self.log_line_length, ) ) else: self.logger.info( centred_string( f"Moved {rom_file} {add_dir}", total_length=self.log_line_length, ) ) out_files.extend(moved_files) # Add these to the multi-disc dictionary, if needed if rom_dict[rom]["multi_disc"] and self.handle_multi_discs: all_multi_discs[disc_free_name]["out_files"].extend(out_files) # Update the cache self.cache_update( game=game, rom=rom, files=out_files, out_dir=dir_name, rom_dict=rom_dict, ) roms_moved.append(rom_file) # Handle the multi-disc files by generating m3u playlists if self.handle_multi_discs and len(all_multi_discs) > 0: for multi_disc in all_multi_discs: # Get the full m3u file name m3u_file_name = f"{multi_disc}.m3u" m3u_full_file_name = os.path.join( all_multi_discs[multi_disc]["m3u_out_dir"], m3u_file_name ) # Assume that by sorting we're good on the file order out_files = sorted(all_multi_discs[multi_disc]["out_files"]) # If we already have this file, then continue on if os.path.exists(m3u_full_file_name): continue # Or, create the m3u and put this into the cache else: create_m3u( m3u_file=m3u_full_file_name, out_files=out_files, relative_dir=all_multi_discs[multi_disc]["relative_dir"], ) self.cache_update_multi_disc( game=all_multi_discs[multi_disc]["game_dir_name"], m3u_name=f"{multi_disc} [Multi Disc]", files=[m3u_file_name], out_dir=all_multi_discs[multi_disc]["game_dir_name"], ) self.logger.info( centred_string( f"Created {m3u_file_name}", total_length=self.log_line_length, ) ) return roms_moved
[docs] def check_files_exist( self, rom, files, file_ext_key, patched_rom=False, ): """Check files exist, so we know whether to move things around or not Args: rom: ROM name files: List of existing files file_ext_key: Potential file extensions patched_rom: Whether ROM is patched or not. Defaults to False """ final_file_exists = False final_file_name = None # Pull potential file extensions out file_exts = self.platform_config.get(file_ext_key, []) if len(file_exts) == 0: raise ValueError( "ROM file extensions should be defined in the platform config" ) rom_no_ext = os.path.splitext(rom)[0] # Add in that this has been patched, if necessary if patched_rom: rom_no_ext += " (ROMPatched)" # Loop over the files, loop over the ROM file extensions, look for a match for o in files: for file_ext in file_exts: rom_w_ext = rom_no_ext + file_ext if o == rom_w_ext: final_file_exists = True final_file_name = os.path.basename(o) return final_file_exists, final_file_name
[docs] def move_file( self, zip_file_name, game, out_dir=None, unzip=False, delete_folder=False, ): """Move file to directory structure, optionally unzipping""" moved_files = [] # If the file doesn't exist, crash out if not os.path.exists(zip_file_name): return False, moved_files # If we don't have an output directory, set one from the game name if out_dir is None: out_dir = os.path.join(self.rom_dir, self.platform, game) if delete_folder and os.path.exists(out_dir): shutil.rmtree(out_dir) if not os.path.exists(out_dir): os.makedirs(out_dir) out_dir = str(out_dir) if unzip: unzipped_files = unzip_file(zip_file_name, out_dir) moved_files.extend(unzipped_files) else: short_zip_file = os.path.split(zip_file_name)[-1] out_file = os.path.join(out_dir, short_zip_file) # Remove this file if it already exists if os.path.exists(out_file): os.remove(out_file) os.link(zip_file_name, out_file) moved_files.append(short_zip_file) return True, moved_files
[docs] def compress_file( self, rom, ): """Compress a file Args: rom: ROM name """ rc = ROMCompressor( platform=self.platform, config=self.config, compress_method=self.compress_method, compress_method_path=self.compress_method_path, logger=self.logger, log_line_length=self.log_line_length, log_line_sep=self.log_line_sep, ) compressed_file = rc.run(rom) return compressed_file
[docs] def cache_update( self, game, rom, files, out_dir, rom_dict, ): """Update the cache with new file data Args: game: Game name rom: ROM name for dictionary info files: List of files to save to cache out_dir: Output directory for files, relative to [rom_dir]/[platform] rom_dict: Dictionary of ROM properties """ # If we don't have dictionaries already set, create if self.platform not in self.cache: self.cache[self.platform] = {} if game not in self.cache[self.platform]: self.cache[self.platform][game] = {} if not self.cache[self.platform][game]: self.cache[self.platform][game] = {} # Include info about whether the ROM has been patched or not, # the patch file, and all the files we've included and where # we've put em self.cache[self.platform][game][rom] = { "file_mod_time": rom_dict[rom]["file_mod_time"], "patch_file": rom_dict[rom]["patch_file"], "patched": rom_dict[rom]["patched"], "output_directory": out_dir, "all_files": files, }
[docs] def cache_update_multi_disc( self, game, m3u_name, files, out_dir, ): """Update the cache with m3u playlist info Args: game: Game name m3u_name: m3u playlist name to save to cache files: List of files to save to cache out_dir: Output directory for files, relative to [rom_dir]/[platform] """ # If we don't have dictionaries already set, create if self.platform not in self.cache: self.cache[self.platform] = {} if game not in self.cache[self.platform]: self.cache[self.platform][game] = {} if not self.cache[self.platform][game]: self.cache[self.platform][game] = {} # Include just info on where the m3u file is, and that it's a multi-disc file self.cache[self.platform][game][m3u_name] = { "multi_disc": True, "output_directory": out_dir, "all_files": files, }
[docs] def save_cache(self): """Save out the cache file""" cache = copy.deepcopy(self.cache) save_json(cache, self.cache_file, sort_key=self.platform)