import copy
import os
import re
import romsearch
from ..util import (centred_string,
left_aligned_string,
setup_logger,
get_file_time,
load_yml,
load_json,
get_game_name,
)
DICT_DEFAULT_VALS = {
"bool": False,
"str": "",
"list": []
}
def find_pattern(regex, search_str, group_number=0):
"""
Take a regex pattern and find potential matches within a search string
"""
regex_search_str = None
regex_search = re.search(regex, search_str)
if regex_search:
regex_search_str = regex_search.group(group_number)
return regex_search_str
def get_pattern_val(regex,
tag,
regex_type,
pattern_mappings=None
):
"""Get values out from a regex pattern, optionally mapping back to something more readable for lists"""
pattern_string = find_pattern(regex, tag)
if pattern_string is not None:
pattern_string = pattern_string.strip("()")
if regex_type == "bool":
pattern_val = True
elif regex_type == "str":
pattern_val = pattern_string
elif regex_type == "list":
if pattern_mappings is not None:
parsed_pattern_string = []
# Match to pattern mappings
for p in pattern_mappings:
if re.search(pattern_mappings[p], pattern_string):
parsed_pattern_string.append(p)
else:
# Split, and remove and trailing whitespace
parsed_pattern_string = pattern_string.split(",")
parsed_pattern_string = [s.strip() for s in parsed_pattern_string]
pattern_val = parsed_pattern_string
else:
raise ValueError("regex_type should be one of 'bool', 'str', or 'list'")
else:
pattern_val = None
return pattern_val
[docs]
class ROMParser:
def __init__(self,
platform,
game,
config_file=None,
config=None,
platform_config=None,
default_config=None,
regex_config=None,
logger=None,
log_line_sep="=",
log_line_length=100,
):
"""ROM parser tool
This works per-game, per-platform, so must be specified here
Args:
platform (str): Platform name
game (str): Game name
config_file (str, optional): path to config file. Defaults to None.
config (dict, optional): configuration dictionary. Defaults to None.
platform_config (dict, optional): platform configuration dictionary. Defaults to None.
default_config (dict, optional): default configuration dictionary. Defaults to None.
regex_config (dict, optional): regex configuration dictionary. Defaults to None.
logger (logging.Logger, optional): logger instance. Defaults to None.
log_line_length (int, optional): Line length of log. Defaults to 100
"""
if platform is None:
raise ValueError("platform must be specified")
self.platform = platform
if config_file is None and config is None:
raise ValueError("config_file or config must be specified")
if config is None:
config = load_yml(config_file)
self.config = config
self.game = game
if logger is None:
log_dir = self.config.get("dirs", {}).get("log_dir", os.path.join(os.getcwd(), "logs"))
logger_add_dir = str(os.path.join(platform, game))
log_level = self.config.get("logger", {}).get("level", "info")
logger = setup_logger(log_level=log_level,
script_name=f"ROMParser",
log_dir=log_dir,
additional_dir=logger_add_dir,
)
self.logger = logger
mod_dir = os.path.dirname(romsearch.__file__)
if default_config is None:
default_file = os.path.join(mod_dir, "configs", "defaults.yml")
default_config = load_yml(default_file)
self.default_config = default_config
if regex_config is None:
regex_file = os.path.join(mod_dir, "configs", "regex.yml")
regex_config = load_yml(regex_file)
self.regex_config = regex_config
if platform_config is None:
platform_config_file = os.path.join(mod_dir, "configs", "platforms", f"{platform}.yml")
platform_config = load_yml(platform_config_file)
self.platform_config = platform_config
self.raw_dir = self.config.get("dirs", {}).get("raw_dir", None)
if not self.raw_dir:
raise ValueError("raw_dir must be specified in config.yml")
self.use_dat = self.config.get("romparser", {}).get("use_dat", True)
self.use_retool = self.config.get("romparser", {}).get("use_retool", True)
self.use_filename = self.config.get("romparser", {}).get("use_filename", True)
self.dry_run = self.config.get("romparser", {}).get("dry_run", False)
# If we're using the dat file, pull it out here
self.dat = None
if self.use_dat:
dat_dir = self.config.get("dirs", {}).get("parsed_dat_dir", None)
if dat_dir is None:
raise ValueError("parsed_dat_dir must be specified in config.yml")
dat_file = os.path.join(dat_dir, f"{platform} (dat parsed).json")
if os.path.exists(dat_file):
self.dat = load_json(dat_file)
# If we're using the retool file, pull it out here
self.retool = None
if self.use_retool:
dat_dir = self.config.get("dirs", {}).get("parsed_dat_dir", None)
if dat_dir is None:
raise ValueError("parsed_dat_dir must be specified in config.yml")
retool_file = os.path.join(dat_dir, f"{platform} (retool).json")
if os.path.exists(retool_file):
self.retool = load_json(retool_file)
self.log_line_sep = log_line_sep
self.log_line_length = log_line_length
[docs]
def run(self,
files,
):
"""Run the ROM parser"""
game_dict = {}
self.logger.debug(f"{self.log_line_sep * self.log_line_length}")
self.logger.debug(centred_string(f"Running ROMParser for {self.game}",
total_length=self.log_line_length)
)
self.logger.debug(f"{self.log_line_sep * self.log_line_length}")
for f in files:
game_dict[f] = self.parse_file(f)
# Include the priority
game_dict[f]["priority"] = files[f]["priority"]
return game_dict
[docs]
def parse_file(self,
f,
):
"""Parse useful info out of a specific file"""
file_dict = {}
if self.use_filename:
file_dict = self.parse_filename(f, file_dict)
if self.use_retool:
file_dict = self.parse_retool(f, file_dict)
if self.use_dat:
file_dict = self.parse_dat(f, file_dict)
# Any last minute finalisations
self.finalise_file_dict(file_dict)
# File modification time
full_file_path = os.path.join(self.raw_dir, self.platform, f)
file_time = get_file_time(full_file_path,
datetime_format=self.default_config["datetime_format"],
)
file_dict["file_mod_time"] = file_time
# Log out these tags in a nice readable way
self.logger.debug(centred_string(f"{f}:",
total_length=self.log_line_length)
)
# Track the various tags we can have
true_tags = []
false_tags = []
str_tags = {}
list_tags = {}
for key in file_dict:
if isinstance(file_dict[key], bool):
if file_dict[key]:
true_tags.append(key)
else:
false_tags.append(key)
elif isinstance(file_dict[key], str):
str_tags[key] = file_dict[key]
elif isinstance(file_dict[key], list):
list_tags[key] = file_dict[key]
else:
raise ValueError(f"{file_dict[key]} is not something I know how to parse")
# Log the string tags
self.logger.debug(left_aligned_string(f"String tags:",
total_length=self.log_line_length)
)
for tag in str_tags:
if str_tags[tag] == "":
continue
self.logger.debug(left_aligned_string(f"-> {tag}: {str_tags[tag]}",
total_length=self.log_line_length)
)
# Log the list tags
self.logger.debug(left_aligned_string(f"List tags:",
total_length=self.log_line_length)
)
for tag in list_tags:
if not list_tags[tag]:
continue
self.logger.debug(left_aligned_string(f"-> {tag}: {', '.join(str(i) for i in list_tags[tag])}",
total_length=self.log_line_length)
)
# Log the True bool tags
self.logger.debug(left_aligned_string(f"Tagged:",
total_length=self.log_line_length)
)
for tag in true_tags:
self.logger.debug(left_aligned_string(f"-> {tag}",
total_length=self.log_line_length)
)
# Log the False bool tags
self.logger.debug(left_aligned_string(f"Not tagged:",
total_length=self.log_line_length)
)
for tag in false_tags:
self.logger.debug(left_aligned_string(f"-> {tag}",
total_length=self.log_line_length)
)
self.logger.debug(f"{'-' * self.log_line_length}")
return file_dict
[docs]
def parse_dat(self, f, file_dict=None):
"""Parse info out of the dat file"""
if file_dict is None:
file_dict = {}
if self.dat is None:
self.logger.warning(f"{self.log_line_sep * self.log_line_length}")
self.logger.warning(centred_string(f"No dat file found for {self.platform}. Skipping",
total_length=self.log_line_length)
)
self.logger.warning(f"{self.log_line_sep * self.log_line_length}")
return file_dict
# Remember there aren't zips in the dat entries
dat_entry = self.dat.get(f.strip(".zip"), None)
if not dat_entry:
self.logger.warning(f"{self.log_line_sep * self.log_line_length}")
self.logger.warning(centred_string(f"No dat entry found for {f}. Skipping",
total_length=self.log_line_length)
)
self.logger.warning(f"{self.log_line_sep * self.log_line_length}")
return file_dict
dat_categories = self.default_config.get("dat_categories", [])
for dat_cat in dat_categories:
dat_val = dat_entry.get("category", "")
cat_val = dat_val == dat_cat
dat_cat_dict = dat_cat.lower().replace(" ", "_")
if dat_cat_dict in file_dict:
file_dict[dat_cat_dict] = file_dict[dat_cat_dict] | cat_val
else:
file_dict[dat_cat_dict] = cat_val
return file_dict
[docs]
def finalise_file_dict(self,
file_dict,
):
"""Do any last minute finalisation to the file dict"""
file_dict = self.set_game_category(file_dict)
file_dict = self.set_implicit_languages(file_dict)
return file_dict
[docs]
def set_game_category(self,
file_dict,
):
"""If a dat category hasn't been set, set it to game"""
dat_categories = self.default_config.get("dat_categories", [])
for d in dat_categories:
d_sanitized = d.lower().replace(" ", "_")
if d_sanitized not in file_dict:
file_dict[d_sanitized] = False
if all([file_dict[d.lower().replace(" ", "_")] is False for d in dat_categories]):
file_dict["games"] = True
return file_dict
[docs]
def set_implicit_languages(self,
file_dict,
):
"""Set implicit language from region, if we don't already have languages"""
implied_languages = self.default_config.get("implied_languages", {})
# Only set if languages is an empty list
if not file_dict["languages"]:
for r in file_dict["regions"]:
if r in implied_languages:
file_dict["languages"].append(implied_languages[r])
return file_dict
[docs]
def parse_filename(self, f, file_dict=None):
"""Parse info out of filename"""
if file_dict is None:
file_dict = {}
# Split file into tags
tags = [f'({x}' for x in f.strip(".zip").split(' (')][1:]
for regex_key in self.regex_config:
regex_type = self.regex_config[regex_key].get("type", "bool")
search_tags = self.regex_config[regex_key].get("search_tags", True)
group = self.regex_config[regex_key].get("group", None)
regex_flags = self.regex_config[regex_key].get("flags", "I")
transform_pattern = self.regex_config[regex_key].get("transform_pattern", None)
transform_repl = self.regex_config[regex_key].get("transform_repl", None)
dict_default_val = DICT_DEFAULT_VALS.get(regex_type, None)
if dict_default_val is None:
raise ValueError(f"regex_type should be one of {list(DICT_DEFAULT_VALS.keys())}")
if regex_key not in file_dict:
file_dict[regex_key] = dict_default_val
if regex_flags == "NOFLAG":
regex_flags = re.NOFLAG
elif regex_flags == "I":
regex_flags = re.I
else:
raise ValueError("regex_flags should be one of 'NOFLAG', 'I'")
pattern = self.regex_config[regex_key]["pattern"]
pattern_mappings = None
if regex_type == "list":
if isinstance(self.default_config[regex_key], dict):
str_to_join = [self.default_config[regex_key][key] for key in self.default_config[regex_key]]
pattern_mappings = self.default_config[regex_key]
else:
str_to_join = copy.deepcopy(self.default_config[regex_key])
pattern = pattern.replace(f"[{regex_key}]", "|".join(str_to_join))
regex = re.compile(pattern, flags=regex_flags)
if search_tags:
found_tag = False
for tag in tags:
if found_tag:
continue
pattern_string = get_pattern_val(regex,
tag,
regex_type,
pattern_mappings=pattern_mappings,
)
if pattern_string is not None:
if transform_pattern is not None:
pattern_string = re.sub(transform_pattern, transform_repl, pattern_string)
file_dict[regex_key] = pattern_string
found_tag = True
else:
pattern_string = get_pattern_val(regex,
f,
regex_type,
pattern_mappings=pattern_mappings
)
if pattern_string is not None:
file_dict[regex_key] = pattern_string
# Update groups, if needed
if group is not None:
if group not in file_dict:
file_dict[group] = dict_default_val
if regex_type == "bool":
file_dict[group] = file_dict[group] | file_dict[regex_key]
elif regex_type == "str":
if file_dict[group] and file_dict[regex_key]:
raise ValueError("Can't combine multiple groups with type str")
else:
file_dict[group] += file_dict[regex_key]
elif regex_type == "list":
file_dict[group].extend(file_dict[regex_key])
else:
raise ValueError(f"regex_type should be one of {list(DICT_DEFAULT_VALS.keys())}")
return file_dict