Source code for romsearch.modules.romcompressor

import glob
import os
import shutil

import subprocess
import re

from ..util import (
    load_yml,
    setup_logger,
    unzip_file,
    centred_string,
)

ALLOWED_COMPRESSION_METHODS = [
    "chdman",
]

CHDMAN_EXTS = [
    "cue",
    "gdi",
]

CUE_FILE_PLATFORMS = [
    "Sony - PlayStation",
]


def create_cue_file(
    rom,
    in_dir=None,
):
    """Create a .cue file for a bunch of files in a directory

    Args:
        rom (str): ROM file
        in_dir (str): Directory containing files to compress. Defaults to None
    """

    orig_dir = None
    if in_dir is not None:
        orig_dir = os.getcwd()
        os.chdir(in_dir)

    # .cue file name
    cue_file_name = os.path.basename(rom)
    cue_file_name = os.path.splitext(cue_file_name)[0]
    cue_file_name = os.path.join(f"{cue_file_name}.cue")

    bin_files = glob.glob("*.bin")
    bin_files.sort()

    # If we don't find any bin files, panic
    if len(bin_files) == 0:
        raise ValueError("No .bin files found in directory")

    # Create the .cue file
    with open(cue_file_name, "w+") as f:

        # Add the first bin file as the main data
        data_file = os.path.basename(bin_files[0])

        f.write(f'FILE "{data_file}" BINARY\n')
        f.write("  TRACK 01 MODE2/2352\n")
        f.write("    INDEX 01 00:00:00\n")

        # And the rest of the bin files as audio
        for i, bin_file in enumerate(bin_files[1:]):

            bin_file = os.path.basename(bin_file)

            f.write(f'FILE "{bin_file}" BINARY\n')

            # Increment by 2 to account for 0-indexing and the fact
            # we've already used the first file
            f.write(f"  TRACK {i+2:02d} AUDIO\n")
            f.write("    INDEX 00 00:00:00\n")
            f.write("    INDEX 01 00:02:00\n")

    # If we've moved, go back to the original directory
    if in_dir is not None:
        os.chdir(orig_dir)
        cue_file_name = os.path.join(in_dir, cue_file_name)

    # Return the .cue file as a list, so it works passing through
    return [cue_file_name]


[docs] class ROMCompressor: def __init__( self, platform, compress_method="chdman", compress_method_path=None, config_file=None, config=None, logger=None, log_line_sep="=", log_line_length=100, ): """ROM compression tool Offers a way to (re)compress a file. Currently only does CHD compression, but can be extended to other compression methods if needed. Args: platform (str): Platform name compress_method (str, optional): compression method. Defaults to "chdman". compress_method_path (str, optional): path to compression executable. Defaults to None. config_file (str, optional): path to config file. Defaults to None. config (dict, optional): configuration dictionary. Defaults to None. logger (logging.Logger, optional): logger. Defaults to None. log_line_length (int, optional): Line length of log. Defaults to 100 """ self.platform = platform if config_file is None and config is None: raise ValueError("config_file or config must be specified") if config is None: config = load_yml(config_file) self.config = config if logger is None: log_dir = self.config.get("dirs", {}).get( "log_dir", os.path.join(os.getcwd(), "logs") ) log_level = self.config.get("logger", {}).get("level", "info") logger = setup_logger( log_level=log_level, script_name=f"ROMCompressor", log_dir=log_dir, ) self.logger = logger if compress_method not in ALLOWED_COMPRESSION_METHODS: raise ValueError( f"compress_method should be one of {ALLOWED_COMPRESSION_METHODS}, not {compress_method}" ) self.compress_method = compress_method if compress_method_path is None: raise ValueError("compress_method_path must be specified") self.compress_method_path = compress_method_path # Pull in directories compress_dir = self.config.get("dirs", {}).get("compress_dir", None) if compress_dir is None: raise ValueError("compress_dir needs to be defined in config") compress_dir = os.path.join(compress_dir, platform) self.compress_dir = compress_dir if not os.path.exists(self.compress_dir): os.makedirs(self.compress_dir) self.log_line_sep = log_line_sep self.log_line_length = log_line_length
[docs] def run(self, rom): """Run the ROMCompressor Args: rom: ROM file to compress """ # Check if we've already compressed rom_no_ext = os.path.splitext(os.path.basename(rom))[0] compressed_file = glob.glob(os.path.join(self.compress_dir, f"{rom_no_ext}.*")) # If we already have the compressed file, we can return if len(compressed_file) == 1: compressed_file = compressed_file[0] # If we've got nothing, do the compression elif len(compressed_file) == 0: # Start by unzipping to a temp folder in the compress directory, then do the compression temp_dir = os.path.join(self.compress_dir, "temp") if not os.path.exists(temp_dir): os.makedirs(temp_dir) self.logger.info( centred_string( f"Compressing {os.path.basename(rom)} with {self.compress_method}", total_length=self.log_line_length, ) ) # If we have a zip file, unzip it. Else just move the file if rom.endswith(".zip"): unzip_file(rom, temp_dir) else: base_rom = os.path.basename(rom) out_rom = os.path.join(temp_dir, base_rom) shutil.copy(rom, out_rom) if self.compress_method == "chdman": compress_files = self.compress_chd( rom=rom, rom_dir=self.compress_dir, temp_dir=temp_dir, ) if len(compress_files) > 1: raise ValueError(f"More compressed files than expected!") compressed_file = compress_files[0] else: raise ValueError( f"compress_method should be one of {ALLOWED_COMPRESSION_METHODS}, " f"not {self.compress_method}" ) # Delete the temp dir shutil.rmtree(temp_dir) else: # If we've found more than one file, freak out raise ValueError("Found more files than expected!") return compressed_file
[docs] def compress_chd( self, rom, rom_dir, temp_dir, ): """Compress using CHDMAN Args: rom (str): ROM file to compress rom_dir (str): Directory for final, compressed ROM temp_dir (str): Directory containing files to compress """ orig_dir = os.getcwd() os.chdir(temp_dir) # Find the files suitable for CHDMAN compression input_files = [] for ext in CHDMAN_EXTS: input_files.extend(glob.glob(f"*.{ext}")) # Freak out if we've got more input files than expected if len(input_files) > 1: raise ValueError(f"More input files than expected!") # Alternatively, if we have no files, then create one if len(input_files) == 0: if self.platform in CUE_FILE_PLATFORMS: self.logger.info( centred_string("No .cue file found, creating one", total_length=self.log_line_length ) ) input_files = create_cue_file( rom=rom, ) else: raise ValueError( f"No input files found, and ROMCompressor cannot create files for {self.platform}" ) # Run CHDMAN compress_files = [] for i in input_files: # Set up the output filename o = os.path.splitext(i)[0] o = f"{o}.chd" cmd = f'{self.compress_method_path} createcd -i "{i}" -o "{o}"' # Execute the command with subprocess.Popen( cmd, text=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT ) as process: for line in process.stdout: # Replace any potential tabs in the line, strip whitespace and skip newline at the end line = re.sub(r"\s+", " ", line[:-1]) line = line.lstrip().rstrip() if len(line) == 0: continue # Log each line of the output using the provided logger self.logger.info( centred_string(line, total_length=self.log_line_length) ) # Move from temp to ROM dir, then delete out_file = os.path.join(rom_dir, o) shutil.copy(o, out_file) os.remove(o) compress_files.append(out_file) # Return back to the original directory os.chdir(orig_dir) return compress_files