Source code for romsearch.modules.gamefinder

import copy
import numpy as np
import os
import re
from iso639 import Lang

import romsearch
from .romparser import ROMParser
from ..util import (
    centred_string,
    left_aligned_string,
    setup_logger,
    load_yml,
    load_json,
    match_retool_search_terms,
    get_directory_name,
)

DUPE_DEFAULT = {
    "is_compilation": False,
    "name_type": None,
    "priority": 1,
    "title_pos": None,
    "filters": None,
}


def check_regions(
    filter_regions,
    rom_regions,
):
    """Check condition regions against parsed regions

    Here, we only check that any of the ROM regions match any of the
    filter regions

    Args:
        filter_regions (list): List of regions to match from the filter
        rom_regions (list): Parsed ROM regions
    """

    s_f = set(filter_regions)
    s_r = set(rom_regions)

    s_i = s_r.intersection(s_f)

    # If we've got no matches, then this hasn't been satisfied
    if len(s_i) == 0:
        return False

    return True


def check_languages(
    filter_langs,
    rom_langs,
):
    """Check condition languages against parsed languages

    Here, we only check that any of the ROM languages match any of the
    filter languages. N.B. retool here uses ISO 639-1, so we need to
    parse them

    Args:
        filter_langs (list): List of languages to match from the filter
        rom_langs (list): Parsed ROM languages
    """

    long_filter_langs = [Lang(fl.lower()).name for fl in filter_langs]

    s_f = set(long_filter_langs)
    s_r = set(rom_langs)

    s_i = s_r.intersection(s_f)

    # If we've got no matches, then this hasn't been satisfied
    if len(s_i) == 0:
        return False

    return True


def check_string(
    regex_str,
    file_name,
):
    """Check filename for regex string

    Args:
        regex_str (str): String to check
        file_name (str): Name of file to check
    """

    match = re.search(regex_str, file_name)

    if match is None:
        return False

    return True


def check_region_order(
    region_order,
    user_region_preferences,
    all_regions=None,
):
    """Check region order against parsed regions

    Here, we only check that *any* of the higher regions are
    higher in user preference than *all* of the lower regions

    Args:
        region_order (dict): Dictionary of form {"higherRegions": [], "lowerRegions": []}
        user_region_preferences (list): Ordered list of user region preferences
        all_regions (list): List of all available regions
    """

    if all_regions is None:
        all_regions = []

    higher_regions = region_order["higherRegions"]
    lower_regions = region_order["lowerRegions"]

    # If we've got an 'all other regions' here, then
    # pull them out
    updated_higher_regions = copy.deepcopy(higher_regions)
    if higher_regions == ["All other regions"]:
        updated_higher_regions = copy.deepcopy(all_regions)
        for reg in lower_regions:
            updated_higher_regions.remove(reg)

    updated_lower_regions = copy.deepcopy(lower_regions)
    if lower_regions == ["All other regions"]:
        updated_lower_regions = copy.deepcopy(all_regions)
        for reg in higher_regions:
            updated_lower_regions.remove(reg)

    higher_regions = copy.deepcopy(updated_higher_regions)
    lower_regions = copy.deepcopy(updated_lower_regions)

    high_prio_region = 99
    for reg in higher_regions:

        # If we have the region in the preferences, note the location
        if reg in user_region_preferences:

            # Take the highest priority
            high_prio_region = min(
                [high_prio_region, user_region_preferences.index(reg)]
            )

    # If we still have a high priority of 99, we haven't found anything so jump out
    if high_prio_region == 99:
        return False

    # Now look through the low priorities, and find the highest priority of those
    low_prio_region = 99
    for reg in lower_regions:

        # If we have the region in the preferences, note the location
        if reg in user_region_preferences:

            # Take the highest priority
            low_prio_region = min([low_prio_region, user_region_preferences.index(reg)])

    # If we're at a low priority of 99, then we haven't found anything so we're good to
    # return a True
    if low_prio_region == 99:
        return True

    # If *any* of the high priority regions are higher than *all* of the low
    # priority, then return True. Else, False
    if low_prio_region > high_prio_region:
        return False

    return True


def add_dupe_entry_to_game_dict(
    game,
    game_dict,
    parent,
):
    """Add a dupe entry to the game dict

    Args:
        game (dict): Dictionary containing various game names
        game_dict (dict): Dictionary of games. Defaults
            to None, which will create an empty dict
        parent: Parent for the game. Can either
            be a string, which will set up a default
            dictionary for the parent, or a dictionary
            to inherit
    """

    game_name = copy.deepcopy(game["original_name"])

    if isinstance(parent, str):
        parent_name = copy.deepcopy(parent)
        dupe_entry = DUPE_DEFAULT
    elif isinstance(parent, dict):
        parent_name = parent["parent_name"]
        dupe_entry = parent["dupe_entry"]
    else:
        raise ValueError(f"parent should be one of str, dict")

    # Update the directory name, so that it matches the short parent name
    new_dir_name = get_directory_name(parent_name)
    game["dir_name"] = copy.deepcopy(new_dir_name)

    parent_name_lower = parent_name.lower()
    game_dict_keys = [key for key in game_dict.keys()]
    game_dict_keys_lower = [key.lower() for key in game_dict.keys()]

    if parent_name_lower not in game_dict_keys_lower:
        game_dict[parent_name] = {}
        final_parent_name = copy.deepcopy(parent_name)
    else:
        final_parent_idx = game_dict_keys_lower.index(parent_name_lower)
        final_parent_name = game_dict_keys[final_parent_idx]

    # We want to make sure we also don't duplicate on the names being upper/lowercase
    g_names = [g_dict for g_dict in game_dict[final_parent_name]]
    g_names_lower = [g_name.lower() for g_name in g_names]
    if game_name.lower() in g_names_lower:
        g_idx = g_names_lower.index(game_name.lower())
        game_name = g_names[g_idx]

    if game_name not in game_dict[final_parent_name]:
        game_dict[final_parent_name][game_name] = {}

    # Update with names, then with any dupe entry
    game_dict[final_parent_name][game_name].update(game)
    game_dict[final_parent_name][game_name].update(dupe_entry)

    return game_dict


[docs] class GameFinder: def __init__( self, platform, dat=None, retool=None, ra_hashes=None, config_file=None, config=None, dupe_dict=None, platform_config=None, default_config=None, regex_config=None, logger=None, log_line_sep="=", log_line_length=100, ): """Tool to find games within a list of files Will parse through files to get a unique list of games, then pull out potential aliases and optionally remove things from user excluded list Args: platform (str): Platform name config_file (str, optional): Path to config file. Defaults to None. config (dict, optional): Configuration dictionary. Defaults to None. dupe_dict (dict, optional): Dupe dictionary. Defaults to None. default_config (dict, optional): Default configuration dictionary. Defaults to None. regex_config (dict, optional): Dictionary of regex config. Defaults to None. logger (logging.Logger, optional): Logger instance. Defaults to None. log_line_length (int, optional): Line length of log. Defaults to 100 """ if platform is None: raise ValueError("platform must be specified") self.platform = platform if config_file is None and config is None: raise ValueError("config_file or config must be specified") if config is None: config = load_yml(config_file) self.config = config if logger is None: log_dir = self.config.get("dirs", {}).get( "log_dir", os.path.join(os.getcwd(), "logs") ) log_level = self.config.get("logger", {}).get("level", "info") logger = setup_logger( log_level=log_level, script_name=f"GameFinder", log_dir=log_dir, additional_dir=platform, ) self.logger = logger # Pull in specifics to include/exclude self.include_games = self.config.get("include_games", None) self.exclude_games = self.config.get("exclude_games", None) # Pull in defaults for finding short game names mod_dir = os.path.dirname(romsearch.__file__) if default_config is None: default_file = os.path.join(mod_dir, "configs", "defaults.yml") default_config = load_yml(default_file) self.default_config = default_config if regex_config is None: regex_file = os.path.join(mod_dir, "configs", "regex.yml") regex_config = load_yml(regex_file) self.regex_config = regex_config self.dat = dat self.retool = retool self.ra_hashes = ra_hashes self.platform_config = platform_config # Info for dupes self.dupe_dict = dupe_dict self.dupe_dir = config.get("dirs", {}).get("dupe_dir", None) self.filter_dupes = config.get("gamefinder", {}).get("filter_dupes", True) self.log_line_sep = log_line_sep self.log_line_length = log_line_length
[docs] def run( self, files, ): self.logger.info(f"{self.log_line_sep * self.log_line_length}") self.logger.info( centred_string("Running GameFinder", total_length=self.log_line_length) ) self.logger.info(f"{self.log_line_sep * self.log_line_length}") games_dict = self.get_game_dict(files) games_dict = dict(sorted(games_dict.items())) self.logger.info( centred_string( f"Found {len(games_dict)} games", total_length=self.log_line_length ) ) self.logger.debug(f"{'-' * self.log_line_length}") for gi, g in enumerate(games_dict): self.logger.debug( left_aligned_string(f"{g}:", total_length=self.log_line_length) ) for ga in games_dict[g]: self.logger.debug( left_aligned_string( f"-> Priority {games_dict[g][ga]['priority']}: {ga}", total_length=self.log_line_length, ) ) if gi != len(games_dict) - 1: self.logger.debug(f"{'-' * self.log_line_length}") self.logger.info(f"{self.log_line_sep * self.log_line_length}") return games_dict
[docs] def get_game_dict( self, files, ): """Get a game dictionary out. From a list of files, parse out dupes, apply includes and excludes, and return a game dictionary. Args: files (dict): Dictionary of files with names for association """ # We need to trim down dupes here. Otherwise, the # dict is just the list we already have game_dict = None if self.filter_dupes: game_dict = self.get_filter_dupes(files) # If the dupe filtering has failed, then just assign all the short names to the default dupe dict if game_dict is None: game_dict = {} for f in files: short_name = copy.deepcopy(files[f]["short_name"]) game_dict[short_name] = { short_name: DUPE_DEFAULT, } # Remove any excluded files if self.exclude_games is not None: games_to_remove = self.get_game_matches( game_dict, self.exclude_games, ) if games_to_remove is not None: for g in games_to_remove: del game_dict[g] # Include only included files if self.include_games is not None: games_to_include = self.get_game_matches( game_dict, self.include_games, ) if games_to_include is not None: filtered_game_dict = {} for g in games_to_include: filtered_game_dict[g] = game_dict[g] game_dict = copy.deepcopy(filtered_game_dict) return game_dict
[docs] def get_game_matches( self, game_dict, games_to_match, ): """Get files that match an input dictionary (to properly handle dupes) Args: - game_dict (dict): Dictionary of games to match against - games_to_match (list): List of values to match against """ games_matched = [] if isinstance(games_to_match, dict): games_to_match = games_to_match.get(self.platform, None) else: games_to_match = copy.deepcopy(games_to_match) if games_to_match is None: return None games_matched.extend(games_to_match) game_dict_keys = [] for g in game_dict: found_f = False for game_matched in games_matched: if found_f: continue # Look in the group name re_find = re.findall(f"^({re.escape(game_matched)}).*", g) if len(re_find) > 0: game_dict_keys.append(g) found_f = True # If not found, look in the dupe names if not found_f: for g_d in game_dict[g]: if found_f: continue re_find = re.findall(f"^({re.escape(game_matched)}).*", g_d) if len(re_find) > 0: game_dict_keys.append(g) found_f = True game_dict_keys = np.unique(game_dict_keys) game_dict_keys = [str(g) for g in game_dict_keys] if len(game_dict_keys) == 0: game_dict_keys = None return game_dict_keys
[docs] def get_filter_dupes(self, games): """Parse down a list of files based on an input dupe list""" if self.dupe_dict is None and self.dupe_dir is None: raise ValueError( "dupe_dict or dupe_dir must be specified if filtering dupes" ) if self.dupe_dict is None: dupe_file = os.path.join(self.dupe_dir, f"{self.platform} (dupes).json") if not os.path.exists(dupe_file): self.logger.warning(f"{self.log_line_sep * self.log_line_length}") self.logger.warning( centred_string( "No dupe files found", total_length=self.log_line_length ) ) self.logger.warning(f"{self.log_line_sep * self.log_line_length}") return None self.dupe_dict = load_json(dupe_file) game_dict = {} # Loop over games, and the dupes dictionary. Also pull out various other important info for i, g in enumerate(games): # Look at the search terms game_dict = self.filter_by_search_terms( game=games[g], game_dict=game_dict, ) return game_dict
[docs] def filter_by_search_terms( self, game, game_dict=None, ): """Add entries to game dict based on search term conditions Will find possible parents, then add those to the game dict Args: game (dict): Dictionary containing various game names game_dict (dict): Dictionary of games to match against. Defaults to None, which will create an empty dict """ if game_dict is None: game_dict = {} found_parents = self.get_parents( game_name=game, ) # If we have multiple matches, and more than one aren't part of a compilation/superset, freak out if len(found_parents) > 1: n_supersets_compilations = 0 for parent in found_parents: is_compilation = parent.get("dupe_entry").get("is_compilation", False) is_superset = parent.get("dupe_entry").get("is_superset", False) if is_compilation or is_superset: n_supersets_compilations += 1 if not n_supersets_compilations >= len(found_parents) - 1: self.logger.warning( centred_string( f"{game['original_name']} has multiple non compilation/superset matches! " f"This should not generally happen", total_length=self.log_line_length, ) ) for parent in found_parents: game_dict = add_dupe_entry_to_game_dict( game=game, game_dict=game_dict, parent=parent, ) return game_dict
[docs] def get_parents( self, game_name, ): """Get the parent(s) recursively searching through a dupe dict Because we can have compilations, find all cases where things match up Args: game_name (dict): game name to find parents for """ # Keep track of the fact that we might have region-free names here if isinstance(game_name, dict): full_name = copy.deepcopy(game_name["full_name"]) short_name = copy.deepcopy(game_name["short_name"]) region_free_name = copy.deepcopy(game_name["region_free_name"]) else: full_name = copy.deepcopy(game_name) short_name = copy.deepcopy(game_name) region_free_name = copy.deepcopy(game_name) # We do this by lowercase checking dupes = [{g: self.dupe_dict[g]} for g in self.dupe_dict] # Pull out all the clones (and whether they've got name types) so we can check that way as well all_clones = [] name_types = [] for d in self.dupe_dict: all_clones.append( {key: self.dupe_dict[d][key] for key in self.dupe_dict[d]} ) for clone in all_clones: name_type = [clone[k].get("name_type", None) for k in clone] name_types.append(name_type) found_dupe = False found_parents = [] # Check all the clones within the dupes. These can potentially # have different name types, so loop for clone, name_type, dupe in zip(all_clones, name_types, dupes): for c, n in zip(clone, name_type): found_dupe_in_clone = match_retool_search_terms( full_name=full_name, search_term=c, short_name=short_name, region_free_name=region_free_name, match_type=n, ) if found_dupe_in_clone: found_dupe = True if found_dupe_in_clone: parent_name = list(dupe.keys())[0] dupe_entry = copy.deepcopy(clone[c]) # We need to apply filters here, hey filters = clone[c].get("filters", None) if filters is not None: parent_name, dupe_entry = self.apply_filters( parent_name=parent_name, dupe_entry=dupe_entry, game_name=game_name, filters=filters, ) is_compilation = dupe_entry.get("is_compilation", False) is_superset = dupe_entry.get("is_superset", False) # Set up the dictionary found_parent = { "parent_name": parent_name, "dupe_name": c, "dupe_entry": dupe_entry, "is_compilation": is_compilation, "is_superset": is_superset, } # Don't duplicate if found_parent not in found_parents: found_parents.append(found_parent) if not found_dupe: found_parents = copy.deepcopy(short_name) if found_parents is None: raise ValueError("Could not find a parent name!") if not isinstance(found_parents, list): found_parents = [found_parents] return found_parents
[docs] def apply_filters( self, game_name, parent_name, dupe_entry, filters, ): """Apply filters to game if conditions are met Args: game_name: Dictionary containing various game names parent_name: Game name dupe_entry: Dictionary of game properties filters: Filters with conditions and results to apply """ # Parse out the filename file_to_parse = {game_name["full_name"]: game_name} # Don't use RA hashes here rp_config = copy.deepcopy(self.config) if "romparser" not in rp_config: rp_config["romparser"] = {} rp_config["romparser"]["use_ra_hashes"] = False rp = ROMParser( platform=self.platform, game=parent_name, dat=self.dat, retool=self.retool, ra_hashes=self.ra_hashes, config=rp_config, platform_config=self.platform_config, default_config=self.default_config, regex_config=self.regex_config, logger=self.logger, ) rom_parsed = rp.run(file_to_parse) rom_parsed = rom_parsed.get(game_name["full_name"], None) if rom_parsed is None: raise ValueError("ROM parsing failed") # Having parsed the file, now loop over the conditions for filt in filters: conditions_met = [] for c in filt["conditions"]: if c == "matchLanguages": condition_met = check_languages( filt["conditions"][c], rom_parsed["languages"] ) elif c == "matchRegions": condition_met = check_regions( filt["conditions"][c], rom_parsed["regions"] ) elif c == "matchString": condition_met = check_string( filt["conditions"][c], game_name["full_name"] ) elif c == "regionOrder": condition_met = check_region_order( filt["conditions"][c], self.config["region_preferences"], all_regions=list(self.default_config["regions"]), ) else: raise ValueError(f"Unsure how to deal with condition {c}") conditions_met.append(condition_met) # If we've met the conditions, then apply the results if all(conditions_met): parent_name, dupe_entry = self.apply_results( parent_name, dupe_entry, filt["results"], ) return parent_name, dupe_entry
[docs] def apply_results( self, parent_name, game_dict, results, ): """Apply results to a filtered match Args: parent_name: Parent name game_dict: Dictionary of game properties results: Dictionary of results to apply to the game/game dict """ for r in results: # If we're changing the name, that gets pulled out here if r == "group": parent_name = copy.deepcopy(results[r]) # If we're changing priority, edit the game dict elif r == "priority": game_dict[r] = results[r] # If the filter is for supersets, take that bool elif r == "superset": game_dict["flag_as_superset"] = results[r] # Ignore local names, since we won't use them elif r == "localNames": continue # If we have something else unexpected, raise a warning but do nothing else: self.logger.warning(f"Unsure how to deal with result type {r}") continue return parent_name, game_dict