Source code for romsearch.modules.romdownloader

import copy
import glob
import numpy as np
import os
import re
import subprocess

import romsearch
from ..util import (
    setup_logger,
    load_yml,
    get_file_pattern,
    discord_push,
    split,
    centred_string,
    remove_case_insensitive_matches,
)

RCLONE_METHODS = [
    "sync",
    "copy",
]


def add_rclone_filter(
    pattern=None,
    filter_type="include",
    include_wildcard=True,
):
    if filter_type == "include":
        filter_str = "+"
    elif filter_type == "exclude":
        filter_str = "-"
    else:
        raise ValueError("filter_type should be one of include or exclude")

    # rclone wants double curly braces which we need to escape in python strings (yum)
    filter_pattern = ""

    if pattern is not None:
        filter_pattern += f"{{{{{pattern}}}}}"

    if include_wildcard:
        filter_pattern += "*"

    cmd = f'--filter "{filter_str} {filter_pattern}" '

    return cmd


def get_tidy_files(glob_pattern):
    """Get a tidy list of files from a glob pattern.

    This just strips off the leading directories to just get a filename

    Args:
        glob_pattern (str): glob pattern to match
    """

    files = glob.glob(glob_pattern)
    files = [os.path.split(f)[-1] for f in files]

    return files


[docs] class ROMDownloader: def __init__( self, platform=None, config_file=None, config=None, platform_config=None, rclone_method="sync", copy_files=None, subchannel_dict=None, logger=None, include_filter_wildcard=True, log_line_sep="=", log_line_length=100, ): """Downloader tool via rclone This works per-platform, so must be specified here. For this, we can either sync an entire directory (rclone_method='sync'), or copy individual files (rclone_method='copy'). If the method is copy, then copy_files must be set as a list Args: platform (str, optional): Platform name. Defaults to None, which will throw a ValueError config_file (str, optional): Configuration file. Defaults to None config (dict, optional): Configuration dictionary. Defaults to None platform_config (dict, optional): Platform configuration dictionary. Defaults to None rclone_method (str, optional): Should be one of 'sync' or 'copy'. Defaults to 'sync' copy_files (list, optional): Must be set if rclone_method is 'copy'. Determines the filenames to copy over. Defaults to None subchannel_dict (dict, optional): Dictionary of subchannel files. Defaults to None logger (logging.Logger, optional): Logger instance. Defaults to None include_filter_wildcard (bool, optional): If set, will include wildcards in rclone filters. Defaults to True. log_line_length (int, optional): Line length of log. Defaults to 100 """ if platform is None: raise ValueError("platform must be specified") self.platform = platform if config_file is None and config is None: raise ValueError("config_file or config must be specified") if config is None: config = load_yml(config_file) self.config = config if logger is None: log_dir = self.config.get("dirs", {}).get( "log_dir", os.path.join(os.getcwd(), "logs") ) log_level = self.config.get("logger", {}).get("level", "info") logger = setup_logger( log_level=log_level, script_name=f"ROMDownloader", log_dir=log_dir, additional_dir=platform, ) self.logger = logger if rclone_method not in RCLONE_METHODS: raise ValueError(f"rclone_method must be one of {RCLONE_METHODS}") self.rclone_method = rclone_method self.copy_files = copy_files self.subchannel_dict = subchannel_dict out_dir = self.config.get("dirs", {}).get("raw_dir", None) if out_dir is None: raise ValueError("raw_dir needs to be defined in config") self.out_dir = os.path.join(out_dir, platform) # Get any specific includes/excludes include_games = self.config.get("include_games", None) if isinstance(include_games, dict): include_games = include_games.get(platform, None) else: include_games = copy.deepcopy(include_games) self.include_games = include_games exclude_games = self.config.get("exclude_games", None) if isinstance(exclude_games, dict): exclude_games = exclude_games.get(platform, None) else: exclude_games = copy.deepcopy(exclude_games) self.exclude_games = exclude_games remote_name = self.config.get("romdownloader", {}).get("remote_name", None) if remote_name is None: raise ValueError("remote_name must be specified in config") self.remote_name = remote_name sync_all = self.config.get("romdownloader", {}).get("sync_all", True) # If we have includes or excludes, force sync all False if self.include_games is not None or self.exclude_games is not None: sync_all = False self.sync_all = sync_all # If we're skipping existing files for rclone copy skip_existing_files = self.config.get("romdownloader", {}).get( "skip_existing_files", True ) self.skip_existing_files = skip_existing_files self.include_filter_wildcard = include_filter_wildcard # Read in the specific platform configuration mod_dir = os.path.dirname(romsearch.__file__) if platform_config is None: platform_config_file = os.path.join( mod_dir, "configs", "platforms", f"{platform}.yml" ) platform_config = load_yml(platform_config_file) self.platform_config = platform_config remote_dir = self.platform_config.get("dir", None) if remote_dir is None: raise ValueError(f"dir should be defined in the platform config file!") # If we're not using absolute URLs, then remove that here self.use_absolute_url = self.config.get("romdownloader", {}).get( "use_absolute_url", True ) if not self.use_absolute_url: if remote_dir[0] == "/": remote_dir = remote_dir[1:] self.remote_dir = remote_dir self.discord_url = self.config.get("discord", {}).get("webhook_url", None) self.dry_run = self.config.get("romdownloader", {}).get("dry_run", False) self.log_line_sep = log_line_sep self.log_line_length = log_line_length
[docs] def run( self, ): """Run Rclone downloader tool""" start_files = get_tidy_files(os.path.join(str(self.out_dir), "*")) self.logger.info(f"{self.log_line_sep * self.log_line_length}") self.logger.info( centred_string("Running ROMDownloader", total_length=self.log_line_length) ) self.logger.info(f"{self.log_line_sep * self.log_line_length}") self.rclone_download( remote_dir=self.remote_dir, out_dir=self.out_dir, ) end_files = get_tidy_files(os.path.join(str(self.out_dir), "*")) items_added = list(set(end_files).difference(start_files)) items_added.sort() items_deleted = list(set(start_files).difference(end_files)) items_deleted.sort() self.log_items_added_deleted(items_added=items_added, items_deleted=items_deleted, ) if self.discord_url is not None: name = f"ROMDownloader: {self.platform}" self.post_to_discord(items_added=items_added, items_deleted=items_deleted, name=name, ) # If there are potential additional files to download, do that here if "subchannels" in self.platform_config: for sc in self.platform_config["subchannels"]: sc_remote_dir = self.platform_config["subchannels"][sc] # If using relative URL, strip the leading slash if not self.use_absolute_url: if sc_remote_dir[0] == "/": sc_remote_dir = sc_remote_dir[1:] sc_out_dir = f"{self.out_dir} {sc}" start_files = get_tidy_files(os.path.join(str(sc_out_dir), "*")) self.rclone_download( remote_dir=sc_remote_dir, out_dir=sc_out_dir, subchannel=sc, subchannel_original_dir=self.out_dir, ) end_files = get_tidy_files(os.path.join(str(sc_out_dir), "*")) items_added = list(set(end_files).difference(start_files)) items_added.sort() items_deleted = list(set(start_files).difference(end_files)) items_deleted.sort() self.log_items_added_deleted(items_added=items_added, items_deleted=items_deleted, ) if self.discord_url is not None: name = f"ROMDownloader: {self.platform} ({sc})" self.post_to_discord(items_added=items_added, items_deleted=items_deleted, name=name, ) self.logger.info(f"{self.log_line_sep * self.log_line_length}") return True
[docs] def rclone_download( self, remote_dir, out_dir=None, subchannel=None, subchannel_original_dir=None, max_retries=5, ): """Download from rclone, either via sync or copy Args: remote_dir: rclone remote path out_dir: directory to download to subchannel: subchannel to download. Defaults to None subchannel_original_dir: Original directory for subchannel-related files. Defaults to None max_retries: maximum number of retries """ if out_dir is None: out_dir = os.getcwd() if not os.path.exists(out_dir): os.makedirs(out_dir) if self.rclone_method == "sync": self.rclone_sync( remote_dir=remote_dir, out_dir=out_dir, max_retries=max_retries, ) elif self.rclone_method == "copy": self.rclone_copy( remote_dir=remote_dir, out_dir=out_dir, subchannel=subchannel, subchannel_original_dir=subchannel_original_dir, max_retries=max_retries, ) else: raise ValueError(f"rclone_method should be one of {RCLONE_METHODS}")
[docs] def rclone_sync( self, remote_dir, out_dir=None, transfers=5, max_retries=5, ): """Use rclone to sync an entire directory Args: remote_dir: rclone remote path out_dir: directory to download to transfers: number of simultaneous transfers max_retries: maximum number of retries """ # Build the sync command. We take the following steps to avoid errors # - Disable HTTP2 to avoid GOAWAY errors # - Disable multi-thread transfers cmd = ( f"rclone sync " f"--inplace " f"--fast-list " f"--delete-after " f"--disable-http2 " f"--multi-thread-streams=0 " f"--size-only " f"--transfers={transfers} " f'"{self.remote_name}:{remote_dir}" ' f'"{out_dir}" ' f"-v " ) # Include any filters if necessary (which is probably the case) if not self.sync_all: # Start with any negative filters searches = [] if self.exclude_games is not None: searches.extend(self.exclude_games) if len(searches) > 0: pattern = get_file_pattern(searches) else: pattern = None if pattern: cmd += add_rclone_filter( pattern=pattern, filter_type="exclude", include_wildcard=self.include_filter_wildcard, ) # Now onto positive filters searches = [] # Specific games if self.include_games is not None: searches.extend(self.include_games) if len(searches) > 0: pattern = get_file_pattern(searches) else: pattern = None if pattern: cmd += add_rclone_filter( pattern=pattern, filter_type="include", include_wildcard=self.include_filter_wildcard, ) cmd += '--filter "- *" ' if self.dry_run: self.logger.info( centred_string( f"Dry run, would rclone sync with:", total_length=self.log_line_length, ) ) self.logger.info(centred_string(cmd, total_length=self.log_line_length)) else: retry = 0 retcode = 1 self.logger.info( centred_string("Running rclone sync", total_length=self.log_line_length) ) while retcode != 0 and retry < max_retries: # Execute the command and capture the output with subprocess.Popen( cmd, text=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT ) as process: for line in process.stdout: # Replace any potential tabs in the line, strip whitespace and skip newline at the end line = re.sub(r"\s+", " ", line[:-1]) line = line.lstrip().rstrip() if len(line) == 0: continue # Skip the warning about directory filters using regex if "Can't figure out directory filters" in line: continue # Log each line of the output using the provided logger self.logger.info( centred_string(line, total_length=self.log_line_length) ) retcode = process.poll() retry += 1 # If we've hit the maximum retries and still we have errors, raise an error with the args if retcode != 0: raise subprocess.CalledProcessError(retcode, process.args) return True
[docs] def rclone_copy( self, remote_dir, out_dir=None, subchannel=None, subchannel_original_dir=None, max_retries=5, ): """Use rclone to copy files one-by-one Args: remote_dir: rclone remote path out_dir: directory to download to subchannel: subchannel to copy to subchannel_original_dir: Original directory for subchannel-related files. Defaults to None max_retries: maximum number of retries """ if self.copy_files is None: raise ValueError("copy_files needs to be defined for rclone copy") n_files = len(self.copy_files) s = f"Downloading {n_files} games" if subchannel is not None: s += f" ({subchannel})" self.logger.info( centred_string( s, total_length=self.log_line_length, ) ) self.logger.info(f"{self.log_line_sep * self.log_line_length}") all_files_to_copy = [] subchannel_original_files = [] if subchannel_original_dir is not None: subchannel_original_files = [ os.path.splitext(f)[0] for f in get_tidy_files(os.path.join(str(subchannel_original_dir), "*")) ] for fi, copy_file in enumerate(self.copy_files): # If we have a dictionary here, then we have files with priorities, so find the highest # priority file that exists on the remote if isinstance(self.copy_files, dict): priorities = np.unique( [ self.copy_files[copy_file][r]["score"] for r in self.copy_files[copy_file] ] )[::-1] # Otherwise, priorities is just a 0 else: priorities = [0] found_files_at_priority = False # Loop through priorities and download files at the highest possible priority for priority in priorities: if found_files_at_priority: continue if isinstance(self.copy_files, dict): fs = [] for priority_key in self.copy_files[copy_file]: if ( self.copy_files[copy_file][priority_key]["score"] != priority ): continue fs.append(self.copy_files[copy_file][priority_key]["name"]) else: fs = [copy_file] all_files_to_copy.extend(fs) for f in fs: if subchannel is not None: f_no_ext = os.path.splitext(f)[0] skip_sc = False # If we don't have this file in the subchannel list, then just skip if not f_no_ext in self.subchannel_dict[subchannel]: self.logger.debug( centred_string( f"{f_no_ext} not found in subchannel files", total_length=self.log_line_length, ) ) skip_sc = True # And if we don't have the existing file on disc, also skip if not f_no_ext in subchannel_original_files: self.logger.debug( centred_string( f"{f_no_ext} not found in original directory", total_length=self.log_line_length, ) ) skip_sc = True if skip_sc: all_files_to_copy.remove(f) continue # If we already have the file, then skip if we have that option turned on out_file = os.path.join(out_dir, f) file_already_exists = os.path.exists(out_file) if file_already_exists and self.skip_existing_files: self.logger.info( centred_string( f"[{fi + 1}/{n_files}]: {f} already in {out_dir}, skipping", total_length=self.log_line_length, ) ) found_files_at_priority = True else: # Check if the file we're trying to download does not match exactly, but does # match in a case-insensitive way, and remove if so f_no_ext = os.path.splitext(f)[0] remove_case_insensitive_matches(file_to_match=f_no_ext, pattern=f, path=str(out_dir), ) remote_file_name = f"{self.remote_name}:{remote_dir}{f}" cmd = ( f"rclone copy " f"--inplace " f"--no-traverse " f"--disable-http2 " f"--multi-thread-streams=0 " f"--size-only " f'"{remote_file_name}" "{out_dir}" ' f"-v " ) short_out_dir = os.path.split(out_dir)[-1] if self.dry_run: self.logger.info( centred_string( f"Dry run, would rclone copy {short_out_dir}: {f} with:", total_length=self.log_line_length, ) ) self.logger.info( centred_string(cmd, total_length=self.log_line_length) ) found_files_at_priority = True else: retry = 0 retcode = 1 self.logger.info( centred_string( f"[{fi + 1}/{n_files}]: Running rclone copy for {short_out_dir}: {f}", total_length=self.log_line_length, ) ) # The retcode is set to 9999 if the file doesn't exist while retcode not in [0, 9999] and retry < max_retries: # Execute the command and capture the output with subprocess.Popen( cmd, text=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, ) as process: for line in process.stdout: # Replace any potential tabs in the line, strip whitespace and skip newline # at the end line = re.sub(r"\s+", " ", line[:-1]) line = line.lstrip().rstrip() if len(line) == 0: continue # Skip weird time notifications if "Time may be set wrong" in line: continue # If the file doesn't exist, set a high number and immediately terminate the # process if ( "directory not found" in line or "object not found" in line ): retcode = 9999 process.kill() continue # Log each line of the output using the provided logger self.logger.info( centred_string( line, # Exclude the newline character total_length=self.log_line_length, ) ) if retcode != 9999: retcode = process.poll() retry += 1 # If we've hit the maximum retries and still we have errors, raise an error with the args if retcode not in [0, 9999]: raise subprocess.CalledProcessError( retcode, process.args ) if retcode == 9999: self.logger.warning( centred_string( f"Could not find {self.remote_name}:{remote_dir}{f}.", total_length=self.log_line_length, ) ) # Check if the file exists if os.path.exists(out_file): found_files_at_priority = True if fi != len(self.copy_files) - 1 and found_files_at_priority: self.logger.info(f"{'-' * self.log_line_length}") # If we haven't moved anything, note that here if len(all_files_to_copy) == 0: self.logger.info( centred_string( f"No files downloaded", total_length=self.log_line_length, ) ) self.logger.info(f"{self.log_line_sep * self.log_line_length}") # Do a pass through where we delete all extraneous files at the end # If we're checking files, then do a pass where if we don't find the file in the includes, then # we delete it all_files = get_tidy_files(os.path.join(str(out_dir), "*")) found_matches = [] for f in all_files: found_match = False for c in all_files_to_copy: if f == c: found_match = True if not found_match: os.remove(os.path.join(str(out_dir), f)) found_matches.append(f) return True
[docs] def post_to_discord( self, items_added, items_deleted, name, max_per_message=10, ): """Create a discord post summarising files added and removed Args: items_added (list): list of files added items_deleted (list): list of files deleted name (string): Name of the post title max_per_message (int, optional): Maximum number of items per post. Defaults to 10. """ if len(items_added) > 0: for items_split in split(items_added, chunk_size=max_per_message): fields = [] field_dict = {"name": "Added", "value": "\n".join(items_split)} fields.append(field_dict) if len(fields) > 0: discord_push( url=self.discord_url, name=name, fields=fields, ) if len(items_deleted) > 0: for items_split in split(items_deleted, chunk_size=max_per_message): fields = [] field_dict = {"name": "Deleted", "value": "\n".join(items_split)} fields.append(field_dict) if len(fields) > 0: discord_push( url=self.discord_url, name=name, fields=fields, ) return True
[docs] def log_items_added_deleted(self, items_added, items_deleted, ): """Log a summary of items added and removed Args: items_added (list): list of items added items_deleted (list): list of items deleted """ if len(items_added) > 0: self.logger.info(f"{'-' * self.log_line_length}") self.logger.info( centred_string(f"Added files:", total_length=self.log_line_length) ) for f in items_added: self.logger.info( centred_string(f"{f}", total_length=self.log_line_length) ) if len(items_deleted) > 0: self.logger.info(f"{'-' * self.log_line_length}") self.logger.info( centred_string(f"Removed files:", total_length=self.log_line_length) ) for f in items_deleted: self.logger.info( centred_string(f"{f}", total_length=self.log_line_length) ) return True