Source code for ramble.pipeline

# Copyright 2022-2026 The Ramble Authors
#
# Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
# https://www.apache.org/licenses/LICENSE-2.0> or the MIT license
# <LICENSE-MIT or https://opensource.org/licenses/MIT>, at your
# option. This file may not be copied, modified, or distributed
# except according to those terms.

import glob
import itertools
import os
import shlex
import shutil
import stat
from enum import Enum

import llnl.util.filesystem as fs
from llnl.util import tty

import ramble.config
import ramble.expander
import ramble.experiment_result
import ramble.fetch_strategy
import ramble.software_environments
import ramble.stage
import ramble.uploader
import ramble.util.hashing
import ramble.util.path
import ramble.workspace
from ramble.util.colors import cprint
from ramble.util.file_util import create_symlink
from ramble.util.logger import logger

import spack.util.spack_json as sjson
from spack.util.executable import Executable, which

if not ramble.config.get("config:disable_progress_bar", False):
    try:
        import tqdm
    except ModuleNotFoundError:
        logger.die("Module `tqdm` is not found. Ensure requirements.txt are installed.")


[docs] class Pipeline: """Base Class for all pipeline objects""" name = "base" def __init__(self, workspace, filters): """Create a new pipeline instance""" self.filters = filters self.workspace = workspace self.force_inventory = False self.action_string = "Operating on" self.suppress_per_experiment_prints = False self.suppress_run_header = False dt = self.workspace.date_string() log_file = f"{self.name}.{dt}.out" self.log_dir = os.path.join(self.workspace.log_dir, f"{self.name}.{dt}") self.log_dir_latest = os.path.join(self.workspace.log_dir, f"{self.name}.latest") self.log_path = os.path.join(self.workspace.log_dir, log_file) self.log_path_latest = os.path.join(self.workspace.log_dir, f"{self.name}.latest.out") self._software_environments = ramble.software_environments.SoftwareEnvironments(workspace) self.workspace.software_environments = self._software_environments self._experiment_set = workspace.build_experiment_set() self.updated_experiment_hashes = False @property def experiment_set(self): return self._experiment_set def _construct_experiment_hashes(self) -> bool: """Hash all of the experiments. Populate the workspace inventory information with experiment hash data. """ changed = False for _, app_inst, _ in self._experiment_set.all_experiments(): changed = app_inst.populate_inventory( self.workspace, force_compute=self.force_inventory, ) return changed def _construct_workspace_hash(self): """Construct workspace inventory Assumes experiment hashes are already constructed and populated into the workspace. """ workspace_inventory = os.path.join(self.workspace.root, self.workspace.inventory_file_name) workspace_hash_file = os.path.join(self.workspace.root, self.workspace.hash_file_name) files_exist = os.path.exists(workspace_inventory) and os.path.exists(workspace_hash_file) if not self.force_inventory and files_exist: with open(workspace_inventory) as f: self.workspace.hash_inventory = sjson.load(f) self.workspace.workspace_hash = ramble.util.hashing.hash_json( self.workspace.hash_inventory ) else: for exp, app_inst, _ in sorted(self._experiment_set.all_experiments()): if not (app_inst.is_template or app_inst.repeats.is_repeat_base): self.workspace.hash_inventory["experiments"].append( { "name": exp, "digest": app_inst.experiment_hash, "contents": app_inst.hash_inventory, } ) self.workspace.workspace_hash = ramble.util.hashing.hash_json( self.workspace.hash_inventory ) with open( os.path.join(self.workspace.root, self.workspace.inventory_file_name), "w+" ) as f: sjson.dump(self.workspace.hash_inventory, f) with open(os.path.join(self.workspace.root, self.workspace.hash_file_name), "w+") as f: f.write(self.workspace.workspace_hash + "\n") self.workspace.update_metadata("workspace_digest", self.workspace.workspace_hash) def _prepare(self): """Perform preparation for pipeline execution""" self.updated_experiment_hashes = self._construct_experiment_hashes() def _execute(self): """Hook for executing the pipeline""" num_exps = self._experiment_set.num_filtered_experiments(self.filters) if logger.enabled: fs.mkdirp(self.log_dir) # Also create symlink to give known paths create_symlink(self.log_dir, self.log_dir_latest) if self.suppress_per_experiment_prints and not self.suppress_run_header: logger.all_msg(f" Log files for experiments are stored in: {self.log_dir}") count = 1 phase_total = 0 for exp, app_inst, idx in self._experiment_set.filtered_experiments(self.filters): exp_log_path = app_inst.experiment_log_file(self.log_dir) experiment_index_value = app_inst.expander.expand_var_name( app_inst.keywords.experiment_index ) if not self.suppress_per_experiment_prints: logger.all_msg(f"Experiment #{idx} ({count}/{num_exps}):") logger.all_msg(f" name: {exp}") logger.all_msg(f" root experiment_index: {experiment_index_value}") logger.all_msg(f" log file: {exp_log_path}") with logger.add_log_context(exp_log_path): logger.msg(f"Experiment inventory:\n{sjson.dump(app_inst.hash_inventory)}") phase_list = list( app_inst.get_pipeline_phases(self.name, phase_filters=self.filters.phases) ) disable_progress = ( ramble.config.get("config:disable_progress_bar", False) or self.suppress_per_experiment_prints ) if not disable_progress: try: progress = tqdm.tqdm( total=len(phase_list), leave=True, ascii=" >=", bar_format="{l_bar}{bar}| Elapsed (s): {elapsed_s:.2f}", ) except AttributeError: logger.die( "tqdm.tqdm is not found. Ensure requirements.txt are installed." ) for phase_idx, phase in enumerate(phase_list): if not disable_progress: progress.set_description( f"Processing phase {phase} ({phase_idx}/{len(phase_list)})" ) app_inst.run_phase(self.name, phase) phase_total += 1 if not disable_progress: progress.update() app_inst.print_phase_times(self.name, phase_filters=self.filters.phases) if not disable_progress: progress.set_description("Experiment complete") progress.close() if not self.suppress_per_experiment_prints: logger.all_msg(f" Returning to log file: {logger.active_log()}") count += 1 if phase_total == 0 and self.filters.phases != ramble.filters.ALL_PHASES: logger.warn("No valid phases were selected, please verify requested phases") def _complete(self): """Hook for performing pipeline actions after execution is complete""" self.workspace.update_metadata("ramble_version", ramble.util.version.get_version()) if self.updated_experiment_hashes: try: self._construct_workspace_hash() except FileNotFoundError as e: tty.warn("Unable to construct workspace hash due to missing file") tty.warn(e) self.workspace.write_metadata()
[docs] def run(self): """Run the full pipeline""" if not self.suppress_run_header: logger.all_msg("Streaming details to log:") logger.all_msg(f" {self.log_path}") if self.workspace.dry_run: cprint("@*g{ -- DRY-RUN -- DRY-RUN -- DRY-RUN -- DRY-RUN -- DRY-RUN --}") experiment_count = self._experiment_set.num_filtered_experiments(self.filters) experiment_total = self._experiment_set.num_experiments() logger.all_msg( f" {self.action_string} {experiment_count} out of " f"{experiment_total} experiments:" ) with logger.add_log_context(self.log_path): if logger.enabled: create_symlink(self.log_path, self.log_path_latest) self._prepare() self._execute() self._complete()
def _copy_workspace_root_files(self, workspace, dest_dir): root_files = [ ramble.workspace.Workspace.inventory_file_name, ramble.workspace.Workspace.hash_file_name, self.workspace.inventory_file_name, self.workspace.hash_file_name, ramble.workspace.METADATA_FILE_NAME, ] for filename in root_files: src = os.path.join(workspace.root, filename) if os.path.exists(src): dest = os.path.join(dest_dir, filename) shutil.copyfile(src, dest)
[docs] class AnalyzePipeline(Pipeline): """Class for the analyze pipeline""" name = "analyze" def __init__( self, workspace, filters, output_formats=None, upload=False, print_results=False, summary_only=False, fom_origin_types=None, ): super().__init__(workspace, filters) self.action_string = "Analyzing" self.output_formats = ["text"] if output_formats is None else output_formats self.upload_results = upload self.print_results = print_results self.summary_only = summary_only self.fom_origin_types = fom_origin_types def _prepare(self): # We only want to let the user run analyze if one of the following is true: # - At least one expeirment is set up # - `--dry-run` is enabled found_valid_experiment = False # Record how many non-analyzable experiments are encountered no_analyze_cnt = 0 for _, app_inst, _ in self._experiment_set.filtered_experiments(self.filters): if not (app_inst.is_template or app_inst.repeats.is_repeat_base): if app_inst.get_status() != ramble.experiment_result.ExperimentStatus.UNKNOWN: found_valid_experiment = True else: no_analyze_cnt += 1 num_total_exps = self._experiment_set.num_experiments() num_filtered_exps = self._experiment_set.num_filtered_experiments(self.filters) if not found_valid_experiment and num_total_exps: if not num_filtered_exps: logger.die("No experiment left for analysis after filtering.") if num_filtered_exps == no_analyze_cnt: logger.die( "No analyzeable experiment detected." " All selected ones are either templates or the base of" " repeated experiments." ) logger.die( "No analyzeable experiment detected." " Make sure your workspace is setup with\n" " ramble workspace setup" ) super()._prepare() def _complete(self): super()._complete() # Calculate statistics for repeats and inject into base experiment results for _, app_inst, _ in self._experiment_set.filtered_experiments(self.filters): if app_inst.repeats.n_repeats > 0: app_inst.calculate_statistics(self.workspace) self.workspace.dump_results( output_formats=self.output_formats, print_results=self.print_results, summary_only=self.summary_only, fom_origin_types=self.fom_origin_types, ) self.workspace.dump_tables(self.experiment_set, self.filters) if self.upload_results: ramble.uploader.upload_results(self.workspace.results)
[docs] class ArchivePipeline(Pipeline): """Class for the archive pipeline""" name = "archive" def __init__( self, workspace, filters, create_tar=False, archive_prefix=None, upload_url=None, include_secrets=False, archive_patterns=None, ): super().__init__(workspace, filters) self.action_string = "Archiving" self.create_tar = create_tar self.upload_url = upload_url self.include_secrets = include_secrets self.archive_prefix = archive_prefix self.archive_name = None self.archive_patterns = archive_patterns.copy() if archive_patterns else [] if self.upload_url and not self.create_tar: logger.warn("Upload URL is currently only supported when using tar format (-t)") logger.warn("Forcing `-t` on to enable archive upload.\n") self.create_tar = True def _prepare(self): super()._prepare() date_str = self.workspace.date_string() # Use the basename from the path as the name of the workspace. # If we use `self.workspace.name` we get the path multiple times. if not self.archive_prefix: self.archive_prefix = os.path.basename(self.workspace.path) self.archive_name = f"{self.archive_prefix}-archive-{date_str}" archive_path = os.path.join(self.workspace.archive_dir, self.archive_name) fs.mkdirp(archive_path) self._copy_workspace_root_files(self.workspace, archive_path) # Copy current configs archive_configs = os.path.join( self.workspace.latest_archive_path, ramble.workspace.WORKSPACE_CONFIG_PATH ) _copy_tree(self.workspace.config_dir, archive_configs) # Copy shared files archive_shared = os.path.join( self.workspace.latest_archive_path, ramble.workspace.WORKSPACE_SHARED_PATH ) excluded_secrets = set() if not self.include_secrets: excluded_secrets.add(ramble.workspace.LICENSE_INC_NAME) fs.mkdirp(archive_shared) for root, _, files in os.walk(self.workspace.shared_dir): for name in files: if name not in excluded_secrets: src_dir = os.path.join(self.workspace.shared_dir, root) src = os.path.join(src_dir, name) dest = src.replace(self.workspace.shared_dir, archive_shared) fs.mkdirp(os.path.dirname(dest)) shutil.copy(src, dest) # Copy logs, but omit all symlinks (i.e. "latest") archive_logs = os.path.join( self.workspace.latest_archive_path, ramble.workspace.WORKSPACE_LOG_PATH ) fs.mkdirp(archive_logs) for root, _, files in os.walk(self.workspace.log_dir): for name in files: src_dir = os.path.join(self.workspace.log_dir, root) src = os.path.join(src_dir, name) if not (os.path.islink(src_dir) or os.path.islink(src)) and os.path.isfile(src): dest = src.replace(self.workspace.log_dir, archive_logs) fs.mkdirp(os.path.dirname(dest)) shutil.copyfile(src, dest) for pattern in itertools.chain( self.archive_patterns, [os.path.join(ramble.workspace.WORKSPACE_RESULTS_PATH, "results.*")], ): # Escape workspace root incase it contains glob characters. pattern_path = glob.escape(self.workspace.root) + os.sep + pattern for file in glob.glob(pattern_path): dest_dir = os.path.dirname( file.replace(self.workspace.root, self.workspace.latest_archive_path) ) fs.mkdirp(dest_dir) shutil.copy(file, dest_dir) archive_path_latest = os.path.join(self.workspace.archive_dir, "archive.latest") create_symlink(archive_path, archive_path_latest) def _complete(self): super()._complete() if self.create_tar: tar_extension = ".tar.gz" tar = which("tar", required=True) tar_path = self.archive_name + tar_extension with fs.working_dir(self.workspace.archive_dir): tar("-czf", tar_path, self.archive_name) archive_url = ( self.upload_url if self.upload_url else ramble.config.get("config:archive_url") ) archive_url = archive_url.rstrip("/") if archive_url else None tar_path_latest = os.path.join( self.workspace.archive_dir, "archive.latest" + tar_extension ) create_symlink(tar_path, tar_path_latest) logger.debug(f"Archive url: {archive_url}") if archive_url: # Perform Upload tar_path = self.workspace.latest_archive_path + tar_extension remote_tar_path = archive_url + "/" + self.workspace.latest_archive + tar_extension _upload_file(tar_path, remote_tar_path) logger.all_msg(f"Archive Uploaded to {remote_tar_path}") # Record upload URL to workspace metadata self.workspace.update_metadata("archive_url", remote_tar_path) self.workspace.write_metadata()
[docs] class MirrorPipeline(Pipeline): """Class for the mirror pipeline""" name = "mirror" def __init__(self, workspace, filters, mirror_path=None): super().__init__(workspace, filters) self.action_string = "Mirroring" self.mirror_path = mirror_path def _prepare(self): self.workspace.create_mirror(self.mirror_path) def _complete(self): verb = "updated" if self.workspace.mirror_existed else "created" logger.msg( f"Successfully {verb} spack software in {self.workspace.mirror_path}", "Archive stats:", f" {len(self.workspace.software_mirror_stats.present):<4} already present", f" {len(self.workspace.software_mirror_stats.new):<4} added", f" {len(self.workspace.software_mirror_stats.errors):<4} failed to fetch.", ) logger.msg( f"Successfully {verb} inputs in {self.workspace.mirror_path}", "Archive stats:", f" {len(self.workspace.input_mirror_stats.present):<4} already present", f" {len(self.workspace.input_mirror_stats.new):<4} added", f" {len(self.workspace.input_mirror_stats.errors):<4} failed to fetch.", ) if self.workspace.input_mirror_stats.errors: logger.error("Failed downloads:") tty.colify( (s.cformat("{name}") for s in list(self.workspace.input_mirror_stats.errors)), output=logger.active_stream(), ) logger.die("Mirroring has errors.")
[docs] class SetupPipeline(Pipeline): """Class for the setup pipeline""" name = "setup" def __init__(self, workspace, filters): super().__init__(workspace, filters) self.force_inventory = True self.action_string = "Setting up" def _prepare(self): super()._prepare() experiment_file = open(self.workspace.all_experiments_path, "w+") shell = ramble.config.get("config:shell") shell_path = os.path.join("/bin/", shell) experiment_file.write(f"#!{shell_path}\n") self.workspace.experiments_script = experiment_file def _complete(self): super()._complete() self.workspace.experiments_script.close() experiment_file_path = os.path.join( self.workspace.root, self.workspace.all_experiments_path ) os.chmod(experiment_file_path, stat.S_IRWXU | stat.S_IRWXG | stat.S_IROTH | stat.S_IXOTH)
[docs] class PushToCachePipeline(Pipeline): """Class for the pushtocache pipeline""" name = "pushtocache" def __init__(self, workspace, filters, spack_cache_path=None): super().__init__(workspace, filters) self.action_string = "Pushing to Spack Cache" self.spack_cache_path = spack_cache_path def _prepare(self): super()._prepare() self.workspace.spack_cache_path = self.spack_cache_path def _complete(self): super()._complete() logger.msg(f"Pushed envs to spack cache {self.spack_cache_path}")
[docs] class ExecutePipeline(Pipeline): """class for the `execute` (`on`) pipeline""" name = "execute" def __init__( self, workspace, filters, executor="{batch_submit}", suppress_per_experiment_prints=True, suppress_run_header=False, ): super().__init__(workspace, filters) self.action_string = "Executing" self.executor = executor self.suppress_per_experiment_prints = suppress_per_experiment_prints self.suppress_run_header = suppress_run_header def _execute(self): super()._execute() resolve_env_vars = ramble.config.get("config:resolve_variables_in_subprocesses", False) if not self.suppress_run_header: logger.all_msg("Running executors...") for _, app_inst, _ in self._experiment_set.filtered_experiments(self.filters): if app_inst.is_template: logger.debug(f"{app_inst.name} is a template. Skipping execution.") continue if app_inst.repeats.is_repeat_base: logger.debug(f"{app_inst.name} is a repeat base. Skipping execution.") continue app_inst.define_variables_for_template_path() exec_str = app_inst.expander.expand_var(self.executor) if resolve_env_vars: exec_str = os.path.expandvars(exec_str) exec_parts = shlex.split(exec_str) exec_name = exec_parts[0] exec_args = exec_parts[1:] executor = which(exec_name) if executor is None: # attempt searching inside the run directory if not os.path.isabs(exec_name): alt_exec_path = os.path.join(app_inst.expander.experiment_run_dir, exec_name) executor = which(alt_exec_path) if executor is None: # Attempt the execution, even though it won't succeed. # The raised os error gives better reason for troubleshooting # (like whether the exec doesn't exist, or due to missing permission.) executor = Executable(exec_name) executor(*exec_args)
[docs] class LogsPipeline(Pipeline): """class for the `logs` pipeline""" name = "logs" def __init__( self, workspace, filters, first_only=False, suppress_per_experiment_prints=True, suppress_run_header=True, ): super().__init__(workspace, filters) self.action_string = "Getting log information for" self.first_only = first_only self.suppress_per_experiment_prints = suppress_per_experiment_prints self.suppress_run_header = suppress_run_header def _execute(self): def print_archive_files(app_inst, pattern_title, patterns): print_header = True if patterns: for pattern in patterns: exp_pattern = app_inst.expander.expand_var(pattern) for file in glob.glob(exp_pattern): # Only print the header if a file matched the glob if print_header: logger.all_msg(f" Archive files from {pattern_title}:") print_header = False logger.all_msg(f" - {file}") super()._execute() if not self.suppress_run_header: logger.all_msg("Finding log information...") for exp, app_inst, _ in self._experiment_set.filtered_experiments(self.filters): if app_inst.is_template: continue if app_inst.repeats.is_repeat_base: continue log_file = app_inst.expander.expand_var_name("log_file") logger.all_msg(f"Experiment: {exp}") logger.all_msg(f" Experiment log file: {log_file}") analysis_logs, _, _ = app_inst.analysis_dicts(app_inst.success_list) logger.all_msg(" Auxiliary experiment logs:") for log in analysis_logs: logger.all_msg(f" - {log}") print_archive_files(app_inst, "application", app_inst.archive_patterns) if app_inst.package_manager: pm_name = app_inst.package_manager.name print_archive_files( app_inst, f"package manager {pm_name}", app_inst.package_manager.archive_patterns, ) for mod in app_inst.modifier_instances: print_archive_files(app_inst, f"modifier {mod.name}", mod.archive_patterns.keys()) if self.first_only: break
[docs] class PushDeploymentPipeline(Pipeline): """class for the `prepare-deployment` pipeline""" name = "pushdeployment" index_filename = "index.json" index_namespace = "deployment_files" tar_extension = ".tar.gz" legacy_object_repo_name = "object_repo" def __init__( self, workspace, filters, create_tar=False, upload_url=None, deployment_name=None ): super().__init__(workspace, filters) workspace_expander = ramble.expander.Expander(workspace.get_workspace_vars(), None) self.action_string = "Pushing deployment of" self.create_tar = create_tar self.force_inventory = True if upload_url: expanded_url = workspace_expander.expand_var(upload_url) self.upload_url = ramble.util.path.normalize_path_or_url(expanded_url) else: self.upload_url = None if deployment_name: expanded_name = workspace_expander.expand_var(deployment_name) workspace.deployment_name = expanded_name self.deployment_name = expanded_name else: self.deployment_name = workspace.name def _execute(self): configs_dir = os.path.join( self.workspace.named_deployment, ramble.workspace.WORKSPACE_CONFIG_PATH ) fs.mkdirp(configs_dir) _copy_tree(self.workspace.config_dir, configs_dir) aux_software_dir = os.path.join(configs_dir, ramble.workspace.AUXILIARY_SOFTWARE_DIR_NAME) fs.mkdirp(aux_software_dir) super()._execute() def _deployment_files(self): """Yield the full path to each file in a deployment""" for root, _, files in os.walk(self.workspace.named_deployment): for name in files: yield os.path.join(self.workspace.named_deployment, root, name) def _complete(self): super()._complete() # Copy inventory files into deployment self._copy_workspace_root_files(self.workspace, self.workspace.named_deployment) # Create an index.json of the deployment deployment_index = {self.index_namespace: []} for file in self._deployment_files(): deployment_index[self.index_namespace].append( file.replace(self.workspace.named_deployment + os.path.sep, "") ) index_file = os.path.join(self.workspace.named_deployment, self.index_filename) with open(index_file, "w+") as f: f.write(sjson.dump(deployment_index)) tar_path = os.path.join( self.workspace.deployments_dir, self.deployment_name + self.tar_extension ) if self.create_tar: tar = which("tar", required=True) with fs.working_dir(self.workspace.deployments_dir): tar("-czf", tar_path, self.deployment_name) if self.upload_url: remote_base = self.upload_url + "/" + self.deployment_name for file in self._deployment_files(): dest = file.replace(self.workspace.named_deployment, remote_base) _upload_file(file, dest) if self.create_tar: stage_dir = self.workspace.deployments_dir tar_path = os.path.join(stage_dir, self.deployment_name + self.tar_extension) remote_tar_path = self.upload_url + "/" + self.deployment_name + self.tar_extension _upload_file(tar_path, remote_tar_path) logger.all_msg(f"Deployment created in: {self.workspace.named_deployment}") if self.create_tar: logger.all_msg(f" Tar of deployment created in: {tar_path}") if self.upload_url: remote_base = self.upload_url + "/" + self.deployment_name logger.all_msg(f" Deployment uploaded to: {remote_base}")
def _copy_tree(src_dir, dest_dir): """Copy all files in src_dir to dest_dir""" for root, _, files in os.walk(src_dir): for name in files: src = os.path.join(src_dir, root, name) dest = src.replace(src_dir, dest_dir) fs.mkdirp(os.path.dirname(dest)) shutil.copyfile(src, dest) def _upload_file(src_file, dest_file): stage_dir = os.path.dirname(src_file) fetcher = ramble.fetch_strategy.URLFetchStrategy(src_file) fetcher.stage = ramble.stage.DIYStage(stage_dir) fetcher.stage.archive_file = src_file fetcher.archive(dest_file) pipelines = Enum( "pipelines", [ "analyze", "archive", "mirror", "setup", "pushtocache", "execute", "pushdeployment", "logs", ], ) _pipeline_map = { pipelines.analyze: AnalyzePipeline, pipelines.archive: ArchivePipeline, pipelines.mirror: MirrorPipeline, pipelines.setup: SetupPipeline, pipelines.pushtocache: PushToCachePipeline, pipelines.execute: ExecutePipeline, pipelines.pushdeployment: PushDeploymentPipeline, pipelines.logs: LogsPipeline, }
[docs] def pipeline_class(name): """Factory for determining a pipeline class from its name""" if name not in _pipeline_map: logger.die( f"Pipeline {name} is not valid.\n" f"Valid pipelines are {_pipeline_map.keys()}" ) return _pipeline_map[name]