Source code for romsearch.modules.datparser

from zipfile import ZipFile

import copy
import glob
import os
from urllib.request import urlopen

import romsearch
from ..util import (
    centred_string,
    load_yml,
    setup_logger,
    unzip_file,
    save_json,
    get_dat,
    format_dat,
)

ALLOWED_GROUPS = [
    "No-Intro",
    "Redump",
]


[docs] class DATParser: def __init__( self, platform=None, use_subchannels=False, config_file=None, config=None, platform_config=None, logger=None, log_line_sep="=", log_line_length=100, ): """Parser for dat files from Redump or No-Intro For Redump dats, we can download directly from the site. Users will have to provide their own files for No-Intro, since there's no good way to scrape them automatically Args: platform (str, optional): Platform name. Defaults to None, which will throw a ValueError use_subchannels (bool, optional): Whether to get dats for subchannels. Defaults to False. config_file (str, optional): Configuration file. Defaults to None config (dict, optional): Configuration dictionary. Defaults to None platform_config (dict, optional): Platform configuration dictionary. Defaults to None logger (logging.Logger, optional): Logger instance. Defaults to None log_line_length (int, optional): Line length of log. Defaults to 100 """ if platform is None: raise ValueError("platform must be specified") self.platform = platform self.use_subchannels = use_subchannels if config_file is None and config is None: raise ValueError("config_file or config must be specified") if config is None: config = load_yml(config_file) self.config = config if logger is None: log_dir = self.config.get("dirs", {}).get( "log_dir", os.path.join(os.getcwd(), "logs") ) log_level = self.config.get("logger", {}).get("level", "info") logger = setup_logger( log_level=log_level, script_name=f"DATParser", log_dir=log_dir, additional_dir=platform, ) self.logger = logger self.dat_dir = self.config.get("dirs", {}).get("dat_dir", None) self.parsed_dat_dir = self.config.get("dirs", {}).get("parsed_dat_dir", None) self.platform = platform # Read in the specific platform configuration mod_dir = os.path.dirname(romsearch.__file__) if platform_config is None: platform_config_file = os.path.join( mod_dir, "configs", "platforms", f"{platform}.yml" ) platform_config = load_yml(platform_config_file) self.platform_config = platform_config self.group = self.platform_config.get("group", None) if self.group is None: raise ValueError("No group name specified in platform config file") if self.group not in ALLOWED_GROUPS: raise ValueError(f"Group needs to be one of {ALLOWED_GROUPS}") # Pull out the platform specifics for the dats dat_config_file = os.path.join( mod_dir, "configs", "dats", f"{self.group.lower()}.yml" ) dat_config = load_yml(dat_config_file) self.dat_url = dat_config.get("url", None) dat_config = dat_config.get(self.platform, None) self.dat_config = dat_config # Set up the name for the file self.out_file = os.path.join( self.parsed_dat_dir, f"{self.platform} (dat parsed).json" ) self.log_line_sep = log_line_sep self.log_line_length = log_line_length
[docs] def run(self): run_datparser = True rom_dict = None if self.dat_dir is None: self.logger.warning(f"{self.log_line_sep * self.log_line_length}") self.logger.warning( centred_string( "No dat_dir defined in config file", total_length=self.log_line_length, ) ) self.logger.warning(f"{self.log_line_sep * self.log_line_length}") run_datparser = False if self.parsed_dat_dir is None: self.logger.warning(f"{self.log_line_sep * self.log_line_length}") self.logger.warning( centred_string( "No parsed_dat_dir defined in config file", total_length=self.log_line_length, ) ) self.logger.warning(f"{self.log_line_sep * self.log_line_length}") run_datparser = False if self.dat_config is None: self.logger.warning(f"{self.log_line_sep * self.log_line_length}") self.logger.warning( centred_string( "No platform-specific dat config in the dat configuration file", total_length=self.log_line_length, ) ) self.logger.warning(f"{self.log_line_sep * self.log_line_length}") run_datparser = False if self.use_subchannels and len(self.platform_config.get("subchannels", {})) == 0: self.logger.debug( centred_string( "No subchannels defined in the platform configuration file", total_length=self.log_line_length, ) ) run_datparser = False if run_datparser: rom_dict = self.run_datparser() return rom_dict
[docs] def run_datparser(self): """The main meat of running the dat parser""" self.logger.info(f"{self.log_line_sep * self.log_line_length}") d_str = f"Running DATParser for {self.platform}" if self.use_subchannels: d_str += f" subchannels" self.logger.info( centred_string(d_str, total_length=self.log_line_length) ) self.logger.info(f"{self.log_line_sep * self.log_line_length}") if self.use_subchannels: subchannels = self.platform_config.get("subchannels", {}) rom_dict = {} for subchannel in subchannels: # Get the subchannel zip zip_file = self.get_zip_file(subchannel=subchannel) if zip_file is None: return False # This actually contains all the files we care about, so # just look inside and get the names with ZipFile(zip_file) as f: nl = f.namelist() subchannel_file_list = [os.path.splitext(n)[0] for n in nl] rom_dict[subchannel] = copy.deepcopy(subchannel_file_list) else: zip_file = self.get_zip_file() if zip_file is None: return False # Unzip the file if it doesn't already exist dat_file_name = zip_file.replace(".zip", ".dat") if not os.path.exists(dat_file_name): unzip_file(zip_file, self.dat_dir) dat = get_dat(dat_file_name) if dat is None: return False self.logger.info(f"{'-' * self.log_line_length}") self.logger.info( centred_string("Using dat file:", total_length=self.log_line_length) ) self.logger.info( centred_string( f"{os.path.split(dat_file_name)[-1]}", total_length=self.log_line_length ) ) rom_dict = format_dat(dat) self.save_rom_dict(rom_dict) self.logger.info(f"{self.log_line_sep * self.log_line_length}") return rom_dict
[docs] def get_zip_file(self, subchannel=None, ): """Get zip file from the dat directory If this is a Redump file, we can download the latest directly from the site. Otherwise, you will need to download them manually Args: subchannel: Specific subchannel. Defaults to None, which will use the main dat """ if subchannel is None: file_mapping = self.dat_config.get("file_mapping", None) else: file_mapping = self.dat_config.get("subchannels", {}).get(subchannel, {}).get("file_mapping", None) if file_mapping is None: raise ValueError("No file mapping defined in dat config file") if self.group == "Redump": self.download_latest_redump_dat(subchannel=subchannel) zip_files = glob.glob(os.path.join(self.dat_dir, f"{file_mapping}*.zip")) zip_files.sort() if len(zip_files) > 1: self.logger.info( centred_string( f"Found {len(zip_files)} zip files.", total_length=self.log_line_length, ) ) self.logger.info( centred_string( f"Will remove all but the latest (and associated dats)", total_length=self.log_line_length, ) ) for z in zip_files[:-1]: os.remove(z) d = z.replace(".zip", ".dat") if os.path.exists(d): os.remove(d) zip_files = glob.glob(os.path.join(self.dat_dir, f"{file_mapping}*.zip")) zip_files.sort() if len(zip_files) == 0: self.logger.warning( centred_string( f"No zip files found. ", total_length=self.log_line_length ) ) self.logger.warning( centred_string( f"You need to manually download {self.group} dat " f"files for {self.platform}", total_length=self.log_line_length, ) ) return None return zip_files[-1]
[docs] def download_latest_redump_dat(self, subchannel=None, ): """Download Redump zip file for the platform Args: subchannel: Specific subchannel. Defaults to None, which will use the main dat """ if subchannel is None: web_mapping = self.dat_config.get("web_mapping", None) else: web_mapping = self.dat_config.get("subchannels", {}).get(subchannel, {}).get("web_mapping", None) if web_mapping is None: raise ValueError("No web mapping defined in dat config file") response = urlopen(f"{self.dat_url}/{web_mapping}") f = response.headers.get_filename() out_file = os.path.join(self.dat_dir, f) if os.path.exists(out_file): self.logger.info( centred_string( f"{f} already downloaded", total_length=self.log_line_length ) ) return True self.logger.info( centred_string(f"Downloading {f}", total_length=self.log_line_length) ) if not os.path.exists(self.dat_dir): os.makedirs(self.dat_dir) with open(out_file, mode="wb") as d: d.write(response.read()) return True
[docs] def save_rom_dict( self, rom_dict, ): """Save the dat file parsed as a dictionary to JSON""" if not os.path.exists(self.parsed_dat_dir): os.makedirs(self.parsed_dat_dir) out_file = os.path.join(self.out_file) save_json(rom_dict, out_file)