# Copyright 2022-2025 The Ramble Authors
#
# Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
# https://www.apache.org/licenses/LICENSE-2.0> or the MIT license
# <LICENSE-MIT or https://opensource.org/licenses/MIT>, at your
# option. This file may not be copied, modified, or distributed
# except according to those terms.
"""Define base classes for application definitions"""
import fnmatch
import io
import operator
import os
import re
import shutil
import stat
import string
import textwrap
import time
from enum import Enum
from typing import List
import llnl.util.filesystem as fs
import llnl.util.tty.color as color
import ramble.config
import ramble.expander
import ramble.fetch_strategy
import ramble.graphs
import ramble.keywords
import ramble.mirror
import ramble.modifier
import ramble.modifier_types.disabled
import ramble.paths
import ramble.repeats
import ramble.repository
import ramble.stage
import ramble.success_criteria
import ramble.util.class_attributes
import ramble.util.colors as rucolor
import ramble.util.directives
import ramble.util.env
import ramble.util.executable
import ramble.util.graph
import ramble.util.hashing
import ramble.util.lock as lk
import ramble.util.path
import ramble.util.stats
import ramble.workflow_manager
from ramble.error import RambleError
from ramble.experiment_result import ExperimentResult
from ramble.language.application_language import ApplicationMeta
from ramble.language.shared_language import SharedMeta, register_builtin, register_phase
from ramble.util import conversions
from ramble.util.foms import FomType
from ramble.util.logger import logger
from ramble.util.naming import NS_SEPARATOR
from ramble.util.output_capture import output_mapper
from ramble.util.shell_utils import source_str
from ramble.workspace import namespace
import spack.util.compression
import spack.util.executable
import spack.util.spack_json
experiment_status = Enum(
"experiment_status",
[
"UNKNOWN",
"UNQUEUED",
# unresolved means the status is not fetched successfully
"UNRESOLVED",
"SETUP",
"SUBMITTED",
"RUNNING",
"COMPLETE",
"SUCCESS",
"FAILED",
"CANCELLED",
"TIMEOUT",
],
)
_NULL_CONTEXT = "null"
_DEFAULT_CONTENT_PERM = stat.S_IRWXU | stat.S_IRWXG | stat.S_IROTH | stat.S_IXOTH
def _get_context_display_name(context):
return (
f"default ({_NULL_CONTEXT}) context" if context == _NULL_CONTEXT else f"{context} context"
)
def _check_shell_support(app_inst):
def _check_match(inst, shell_to_support):
pat = getattr(inst, "shell_support_pattern", None)
matched = pat is None or fnmatch.fnmatch(shell_to_support, pat)
if not matched:
logger.die(
f"{inst.name} does not support {shell_to_support} shell"
f", the supported shell pattern is '{pat}'"
)
shell = ramble.config.get("config:shell")
_check_match(app_inst, shell)
for mod_inst in app_inst._modifier_instances:
_check_match(mod_inst, shell)
_check_match(app_inst.package_manager, shell)
def _run_phase_hook(obj, workspace, pipeline, hook):
"""Helper to enable an object run phase hooks defined in application"""
phase_defs = getattr(obj, "phase_definitions")
if pipeline in phase_defs and hook in phase_defs[pipeline]:
return
hook_func_name = f"_{hook}"
if hasattr(obj, hook_func_name):
phase_func = _get_phase_func_wrapper(workspace, getattr(obj, hook_func_name), hook)
phase_func(workspace)
def _get_phase_func_wrapper(workspace, phase_func, phase_name):
if workspace.profile_config is None:
return phase_func
(profiler, profile_phases) = workspace.profile_config
if phase_name not in profile_phases:
return phase_func
return profiler(phase_func)
[docs]
class ApplicationBase(metaclass=ApplicationMeta):
name = None
_builtin_name = NS_SEPARATOR.join(("builtin", "{name}"))
_builtin_required_key = "required"
_inventory_file_name = "ramble_inventory.json"
_status_file_name = "ramble_status.json"
_pipelines = [
"analyze",
"archive",
"mirror",
"setup",
"pushdeployment",
"pushtocache",
"execute",
"logs",
]
_language_classes = [ApplicationMeta, SharedMeta]
#: Lists of strings which contains GitHub usernames of attributes.
#: Do not include @ here in order not to unnecessarily ping the users.
maintainers: List[str] = []
tags: List[str] = []
license_inc_name = "license.inc"
def __init__(self, file_path):
super().__init__()
ramble.util.class_attributes.convert_class_attributes(self)
self.keywords = ramble.keywords.keywords.copy()
self._vars_are_expanded = False
self.expander = None
self._formatted_executables = {}
self.variables = None
self.variants = None
self.no_expand_vars = None
self.experiment_set = None
self.internals = {}
self.is_template = False
self.generated_experiments = []
self.repeats = ramble.repeats.Repeats()
self._command_list = []
self._command_list_without_logs = []
self.chained_experiments = None
self.chain_order = []
self.chain_prepend = []
self.chain_append = []
self.chain_commands = {}
self._env_variable_sets = []
self.modifiers = []
self.experiment_tags = []
self._modifier_instances = []
self._input_fetchers = None
self.result = None
self._phase_times = {}
self._pipeline_graphs = None
self.package_manager = None
self.custom_executables = {}
self._exp_lock = None
self._input_lock = None
self._software_lock = None
self._experiment_graph = None
self.hash_inventory = {
"application_definition": None,
"modifier_definitions": [],
"attributes": [],
"inputs": [],
"software": [],
"templates": [],
"package_manager": [],
"modifier_artifacts": [],
}
self.experiment_hash = None
self._file_path = file_path
self.application_class = "ApplicationBase"
self._verbosity = "short"
self.license_path = ""
self.license_file = ""
self.workflow_manager = None
ramble.util.directives.define_directive_methods(self)
[docs]
def experiment_lock(self):
"""Create a lock for the experiment directory, and return it"""
if self._exp_lock is None:
lock_path = os.path.join(
self.expander.expand_var("{experiment_run_dir}"), ".ramble-experiment"
)
self._exp_lock = lk.Lock(lock_path)
return self._exp_lock
[docs]
def copy(self):
"""Deep copy an application instance"""
new_copy = type(self)(self._file_path)
self.generated_experiments.append(new_copy)
if self._env_variable_sets:
new_copy.set_env_variable_sets(self._env_variable_sets.copy())
if self.variables:
new_copy.set_variables(self.variables.copy(), self.experiment_set)
if self.internals:
new_copy.set_internals(self.internals.copy())
if self._formatted_executables:
new_copy.set_formatted_executables(self._formatted_executables.copy())
new_copy.keywords = ramble.keywords.keywords.copy()
new_copy.set_template(False)
new_copy.repeats.set_repeats(False, 0)
new_copy.set_chained_experiments(None)
if self.variants:
new_copy.set_variants(self.variants)
return new_copy
[docs]
def is_actionable(self):
"""Determine if an experiment should be actioned in pipelines
Returns True if the experiment should be actioned in a pipeline, False
if not.
"""
if self.is_template:
return False
return True
[docs]
def set_variants(self, variants):
"""Set variants within an experiment instance, and process their
contents.
Args:
variants (dict): Dictionary of variant controls for this
experiment.
"""
self.variants = variants.copy()
self._set_package_manager()
self._set_workflow_manager()
def _set_package_manager(self):
if namespace.package_manager in self.variants:
pkgman_name = conversions.canonical_none(
self.expander.expand_var(self.variants[namespace.package_manager], typed=True)
)
if pkgman_name is not None:
try:
pkgman_type = ramble.repository.ObjectTypes.package_managers
self.package_manager = ramble.repository.get(pkgman_name, pkgman_type).copy()
self.package_manager.set_application(self)
except ramble.repository.UnknownObjectError:
logger.die(
f"{pkgman_name} is not a valid package manager. "
"Valid package managers can be listed via:\n"
"\tramble list --type package_managers"
)
if self.package_manager is not None:
for pkgname, config in self.required_packages.items():
if fnmatch.fnmatch(self.package_manager.name, config["package_manager"]):
self.keywords.update_keys(
{
f"{pkgname}_path": {
"type": ramble.keywords.key_type.reserved,
"level": ramble.keywords.output_level.variable,
}
}
)
def _set_workflow_manager(self):
if namespace.workflow_manager in self.variants:
wm_name = conversions.canonical_none(
self.expander.expand_var(self.variants[namespace.workflow_manager], typed=True)
)
if wm_name is not None:
try:
wfman_type = ramble.repository.ObjectTypes.workflow_managers
self.workflow_manager = ramble.repository.get(wm_name, wfman_type).copy()
self.workflow_manager.set_application(self)
except ramble.repository.UnknownObjectError:
logger.die(
f"{wm_name} is not a valid workflow manager. "
"Valid workflow managers can be listed via:\n"
"\tramble list --type workflow_managers"
)
if self.workflow_manager is None:
base_path = os.path.join(ramble.paths.module_path, "workflow_manager.py")
self.workflow_manager = ramble.workflow_manager.WorkflowManagerBase(base_path)
self.workflow_manager.set_application(self)
[docs]
def build_phase_order(self):
if self._pipeline_graphs is not None:
return
self._pipeline_graphs = {}
for pipeline in self._pipelines:
if pipeline not in self.phase_definitions:
self.phase_definitions[pipeline] = {}
self._pipeline_graphs[pipeline] = ramble.graphs.PhaseGraph(
self.phase_definitions[pipeline], self
)
for mod_inst in self._modifier_instances:
# Define phase nodes
for _, phase_node in mod_inst.all_pipeline_phases(pipeline):
self._pipeline_graphs[pipeline].add_node(phase_node, obj_inst=mod_inst)
# Define phase edges
for _, phase_node in mod_inst.all_pipeline_phases(pipeline):
self._pipeline_graphs[pipeline].define_edges(phase_node, internal_order=True)
if self.package_manager:
# Define phase nodes
for _, phase_node in self.package_manager.all_pipeline_phases(pipeline):
self._pipeline_graphs[pipeline].add_node(
phase_node, obj_inst=self.package_manager
)
# Define phase edges
for _, phase_node in self.package_manager.all_pipeline_phases(pipeline):
self._pipeline_graphs[pipeline].define_edges(phase_node, internal_order=True)
[docs]
def set_env_variable_sets(self, env_variable_sets):
"""Set internal reference to environment variable sets"""
self._env_variable_sets = env_variable_sets.copy()
[docs]
def set_variables(self, variables, experiment_set):
"""Set internal reference to variables
Also, create an application specific expander class.
"""
self.variables = variables
self.experiment_set = experiment_set
self.expander = ramble.expander.Expander(self.variables, self.experiment_set)
self.no_expand_vars = set()
workload_name = self.expander.workload_name
if workload_name in self.workloads:
for var in self.workloads[workload_name].variables.values():
if not var.expandable:
self.no_expand_vars.add(var.name)
self.expander.set_no_expand_vars(self.no_expand_vars)
[docs]
def set_internals(self, internals):
"""Set internal reference to application internals"""
self.internals = internals
[docs]
def set_template(self, is_template):
"""Set if this instance is a template or not"""
self.is_template = is_template
[docs]
def set_chained_experiments(self, chained_experiments):
"""Set chained experiments for this instance"""
self.chained_experiments = None
if chained_experiments:
self.chained_experiments = chained_experiments.copy()
[docs]
def set_modifiers(self, modifiers):
"""Set modifiers for this instance"""
if modifiers:
self.modifiers = modifiers.copy()
self.build_modifier_instances()
[docs]
def experiment_log_file(self, logs_dir):
"""Returns an experiment log file path for the given logs directory"""
return os.path.join(logs_dir, self.expander.experiment_namespace) + ".out"
[docs]
def get_pipeline_phases(self, pipeline, phase_filters=None):
if phase_filters is None:
phase_filters = ["*"]
self.build_modifier_instances()
self.build_phase_order()
if pipeline not in self._pipelines:
logger.die(
f"Requested pipeline {pipeline} is not valid.\n",
f"\tAvailable pipelinese are {self._pipelines}",
)
phases = set()
final_added_index = None
if pipeline in self._pipeline_graphs:
for idx, phase in enumerate(self._pipeline_graphs[pipeline].walk()):
for phase_filter in phase_filters:
if fnmatch.fnmatch(phase.key, phase_filter):
phases.add(phase)
final_added_index = idx
include_phase_deps = ramble.config.get("config:include_phase_dependencies")
if include_phase_deps:
for idx, phase in enumerate(self._pipeline_graphs[pipeline].walk()):
if idx < final_added_index and phase not in phases:
phases.add(phase)
phase_order = []
for node in self._pipeline_graphs[pipeline].walk():
if node in phases:
phase_order.append(node.key)
return phase_order
def __str__(self):
return self.name
[docs]
def print_vars(self, header="", vars_to_print=None, indent=""):
print_vars = vars_to_print
if not print_vars:
print_vars = self.variables
color.cprint(f"{indent}{header}:")
for var, val in print_vars.items():
expansion_var = self.expander.expansion_str(var)
expanded = self.expander.expand_var(expansion_var)
color.cprint(f"{indent} {var} = {val} ==> {expanded}".replace("@", "@@"))
[docs]
def build_used_variables(self, workspace):
"""Build a set of all used variables
By expanding all necessary portions of this experiment (required /
reserved keywords, templates, commands, etc...), determine which
variables are used throughout the experiment definition.
Variables can have list definitions. These are iterated over to ensure
variables referenced by any of them are tracked properly.
Args:
workspace (ramble.workspace.Workspace): Workspace to extract templates from
Returns:
(set): All variable names used by this experiment.
"""
self.build_modifier_instances()
self.add_expand_vars(workspace)
backup_variables = self.variables.copy()
self._define_commands(self._executable_graph, workspace.success_list)
self._define_formatted_executables()
########################
# Define extra variables
########################
# Add modifier mode variables defaults as a placeholder:
for mod_inst in self._modifier_instances:
for var in mod_inst.mode_variables().values():
if var.name not in self.variables:
self.variables[var.name] = var.default
if self.package_manager is not None:
# Add package manager variable defaults as placeholder
for var in self.package_manager.package_manager_variables.values():
self.variables[var.name] = var.default
if self.workflow_manager is not None:
for var in self.workflow_manager.wm_vars.values():
self.variables[var.name] = var.default
##########################################
# Expand used variables to track all usage
##########################################
# Add all known keywords
for key in self.keywords.keys:
self.expander.expand_var_name(key)
if self.chained_experiments:
for chained_exp in self.chained_experiments:
if namespace.inherit_variables in chained_exp:
for var in chained_exp[namespace.inherit_variables]:
self.expander._used_variables.add(var)
self.expander.expand_var_name(var)
# Add variables from success criteria
criteria_list = workspace.success_list
for criteria in criteria_list.all_criteria():
if criteria.mode == "fom_comparison":
self.expander.expand_var(criteria.fom_formula)
self.expander.expand_var(criteria.fom_name)
self.expander.expand_var(criteria.fom_context)
elif criteria.mode == "application_function":
self.evaluate_success()
if self.package_manager is not None:
self.package_manager.build_used_variables(workspace)
for template_name, template_conf in workspace.all_templates():
self.expander._used_variables.add(template_name)
self.expander.expand_var(template_conf["contents"])
############################
# Reset variable definitions
############################
to_remove = set()
for var in self.variables:
if var not in backup_variables:
to_remove.add(var)
for var in to_remove:
del self.variables[var]
for var, val in backup_variables.items():
self.variables[var] = val
self._command_list = []
return self.expander._used_variables
[docs]
def print_internals(self, indent=""):
if not self.internals:
return
if namespace.custom_executables in self.internals:
header = rucolor.nested_4("Custom Executables")
color.cprint(f"{indent}{header}:")
for name in self.internals[namespace.custom_executables]:
color.cprint(f"{indent} {name}")
if namespace.executables in self.internals:
header = rucolor.nested_4("Executable Order")
color.cprint(f"{indent}{header}: {str(self.internals[namespace.executables])}")
if namespace.executable_injection in self.internals:
header = rucolor.nested_4("Executable Injection")
color.cprint(
f"{indent}{header}: {str(self.internals[namespace.executable_injection])}"
)
[docs]
def print_chain_order(self, indent=""):
if not self.chain_order:
return
header = rucolor.nested_4("Experiment Chain")
color.cprint(f"{indent}{header}:")
for exp in self.chain_order:
color.cprint(f"{indent}- {exp}")
# Phase execution helpers
[docs]
def run_phase(self, pipeline, phase, workspace):
"""Run a phase, by getting its function pointer"""
self.add_expand_vars(workspace)
if self.is_template:
logger.debug(f"{self.name} is a template. Skipping phases")
return
if self.repeats.is_repeat_base:
logger.debug(f"{self.name} is a repeat base. Skipping phases")
return
phase_node = self._pipeline_graphs[pipeline].get_node(phase)
if phase_node is None:
logger.die(f"Phase {phase} is not defined in pipeline {pipeline}")
logger.msg(f" Executing phase {phase}")
start_time = time.time()
for _, obj in self._objects(exclude_types=[ramble.repository.ObjectTypes.applications]):
_run_phase_hook(obj, workspace, pipeline, phase)
phase_func = _get_phase_func_wrapper(workspace, phase_node.attribute, phase)
phase_func(workspace, app_inst=self)
self._phase_times[phase] = time.time() - start_time
[docs]
def print_phase_times(self, pipeline, phase_filters=None):
"""Print phase execution times by pipeline phase order
Args:
pipeline (str): Name of pipeline to print timing information for
phase_filters (list(str) | None): Filters to limit phases to print
"""
logger.msg("Phase timing statistics:")
if phase_filters is None:
phase_filters = ["*"]
for phase in self.get_pipeline_phases(pipeline, phase_filters=phase_filters):
# Set default time to 0.0 s, to prevent KeyError from skipped phases
if phase not in self._phase_times:
self._phase_times[phase] = 0.0
logger.msg(f" {phase} time: {round(self._phase_times[phase], 5)} (s)")
[docs]
def create_experiment_chain(self, workspace):
"""Create the necessary chained experiments for this instance
This method determines which experiments need to be chained, grabs the
base instance from the experiment set, creates a copy of it (with a
unique name), injects the copy back into the experiment set,
and builds an internal mapping from unique name to the chaining definition.
"""
if not self.chained_experiments or self.is_template:
return
# Build initial stack. Uses a reversal of the current instance's
# chained experiments
parent_namespace = self.expander.experiment_namespace
classes_in_stack = {self}
chain_idx = 0
chain_stack = []
for exp in reversed(self.chained_experiments):
for exp_name in self.experiment_set.search_primary_experiments(exp["name"]):
child_inst = self.experiment_set.get_experiment(exp_name)
if child_inst in classes_in_stack:
raise ChainCycleDetectedError(
"Cycle detected in experiment chain:\n"
+ f" Primary experiment {parent_namespace}\n"
+ f" Chained expeirment name: {exp_name}\n"
+ f" Chain definition: {str(exp)}"
)
chain_stack.append((exp_name, exp.copy()))
parent_run_dir = self.expander.expand_var(
self.expander.expansion_str(self.keywords.experiment_run_dir)
)
# Continue until the stack is empty
while len(chain_stack) > 0:
cur_exp_name = chain_stack[-1][0]
cur_exp_def = chain_stack[-1][1]
# Perform basic validation on the chained experiment definition
if "name" not in cur_exp_def:
raise InvalidChainError(
"Invalid experiment chain defined:\n"
+ f" Primary experiment {parent_namespace}\n"
+ f" Chain definition: {str(exp)}\n"
+ ' "name" keyword must be defined'
)
if "order" in cur_exp_def:
possible_orders = ["after_chain", "after_root", "before_chain", "before_root"]
if cur_exp_def["order"] not in possible_orders:
raise InvalidChainError(
"Invalid experiment chain defined:\n"
+ f" Primary experiment {parent_namespace}\n"
+ f" Chain definition: {str(exp)}\n"
+ ' Optional keyword "order" must '
+ f"be one of {str(possible_orders)}\n"
)
if "command" not in cur_exp_def.keys():
raise InvalidChainError(
"Invalid experiment chain defined:\n"
+ f" Primary experiment {parent_namespace}\n"
+ f" Chain definition: {str(exp)}\n"
+ ' "command" keyword must be defined'
)
if "variables" in cur_exp_def:
if not isinstance(cur_exp_def["variables"], dict):
raise InvalidChainError(
"Invalid experiment chain defined:\n"
+ f" Primary experiment {parent_namespace}\n"
+ f" Chain definition: {str(exp)}\n"
+ ' Optional keyword "variables" '
+ "must be a dictionary"
)
base_inst = self.experiment_set.get_experiment(cur_exp_name)
if base_inst in classes_in_stack:
chain_stack.pop()
classes_in_stack.remove(base_inst)
order = "after_root"
if "order" in cur_exp_def:
order = cur_exp_def["order"]
chained_name = f"chain.{chain_idx}.{cur_exp_name}"
new_name = f"{parent_namespace}.{chained_name}"
new_run_dir = os.path.join(
parent_run_dir, namespace.chained_experiments, chained_name
)
if order == "before_chain":
self.chain_prepend.insert(0, new_name)
elif order == "before_root":
self.chain_prepend.append(new_name)
elif order == "after_root":
self.chain_append.insert(0, new_name)
elif order == "after_chain":
self.chain_append.append(new_name)
self.chain_commands[new_name] = cur_exp_def[namespace.command]
# Skip editing the new instance if the base_inst doesn't work
# This happens if the originating command is `workspace info`
# The printing experiment set doesn't have access to all
# of the experiment, so the base_inst command above
# doesn't get an application instance.
if base_inst:
new_inst = base_inst.copy()
if namespace.variables in cur_exp_def:
for var, val in cur_exp_def[namespace.variables].items():
new_inst.variables[var] = val
new_inst.expander._experiment_namespace = new_name
new_inst.variables[self.keywords.experiment_run_dir] = new_run_dir
new_inst.variables[self.keywords.experiment_name] = new_name
new_inst.variables[self.keywords.experiment_index] = (
self.expander.expand_var_name(self.keywords.experiment_index)
)
new_inst.repeats = self.repeats
new_inst.read_status()
# Extract inherited variables
if namespace.inherit_variables in cur_exp_def:
for inherit_var in cur_exp_def[namespace.inherit_variables]:
new_inst.variables[inherit_var] = self.variables[inherit_var]
# Expand the chained experiment vars, so we can build the execution command
new_inst.add_expand_vars(workspace)
chain_cmd = new_inst.expander.expand_var(cur_exp_def[namespace.command])
self.chain_commands[new_name] = chain_cmd
cur_exp_def[namespace.command] = chain_cmd
self.experiment_set.add_chained_experiment(new_name, new_inst)
chain_idx += 1
else:
# Avoid cycles, from children
if base_inst in classes_in_stack:
chain_stack.pop()
else:
if base_inst.chained_experiments:
for exp in reversed(base_inst.chained_experiments):
for exp_name in self.experiment_set.search_primary_experiments(
exp["name"]
):
child_inst = self.experiment_set.get_experiment(exp_name)
if child_inst in classes_in_stack:
raise ChainCycleDetectedError(
"Cycle detected in "
+ "experiment chain:\n"
+ " Primary experiment "
+ f"{parent_namespace}\n"
+ " Chained expeirment name: "
+ f"{cur_exp_name}\n"
+ " Chain definition: "
+ f"{str(cur_exp_def)}"
)
chain_stack.append((exp_name, exp))
classes_in_stack.add(base_inst)
# Create the final chain order
for exp in self.chain_prepend:
self.chain_order.append(exp)
self.chain_order.append(self.expander.experiment_namespace)
for exp in self.chain_append:
self.chain_order.append(exp)
# Inject the chain order into the children experiments
for exp in self.chain_prepend:
exp_inst = self.experiment_set.get_experiment(exp)
if exp_inst:
exp_inst.chain_order = self.chain_order.copy()
for exp in self.chain_append:
exp_inst = self.experiment_set.get_experiment(exp)
if exp_inst:
exp_inst.chain_order = self.chain_order.copy()
[docs]
def define_variable(self, var_name, var_value):
self.variables[var_name] = var_value
self.expander._variables[var_name] = var_value
for mod_inst in self._modifier_instances:
mod_inst.define_variable(var_name, var_value)
[docs]
def build_modifier_instances(self):
"""Built a map of modifier names to modifier instances needed for this
application instance
"""
if not self.modifiers:
return
self._modifier_instances = []
mod_type = ramble.repository.ObjectTypes.modifiers
for mod in self.modifiers:
mod_inst = ramble.repository.get(mod["name"], mod_type).copy()
if "on_executable" in mod:
mod_inst.set_on_executables(mod["on_executable"])
else:
mod_inst.set_on_executables(None)
if "mode" in mod:
mode_name = self.expander.expand_var(mod["mode"])
mod_inst.set_usage_mode(mode_name)
else:
mod_inst.set_usage_mode(None)
if not mod_inst.disabled:
mod_inst.inherit_from_application(self)
mod_inst.modify_experiment(self)
else:
mod_inst = ramble.modifier_types.disabled.DisabledModifier(mod_inst)
self._modifier_instances.append(mod_inst)
# Add this modifiers required variables for validation
self.keywords.update_keys(mod_inst.get_required_variables())
# Ensure no expand vars are set correctly for modifiers
for mod_inst in self._modifier_instances:
for var in mod_inst.no_expand_vars():
self.expander.add_no_expand_var(var)
mod_inst.expander.add_no_expand_var(var)
[docs]
def validate_experiment(self):
# Validate the new modifiers variables exist
# (note: the base ramble variables are checked earlier too)
self.keywords.check_required_keys(self.variables)
self._check_object_validators()
def _check_object_validators(self):
expander = self.expander
for _, obj in self._objects():
for name, validator in obj.validators.items():
valid = expander.evaluate_predicate(validator["predicate"])
if not valid:
msg = expander.expand_var(validator["message"])
err_msg = (
f"Validator '{name}' (defined in '{obj.name}') "
f"fails with message: '{msg}'"
)
if validator["fail_on_invalid"]:
raise ObjectValidationError(err_msg)
else:
logger.warn(err_msg)
def _define_custom_executables(self):
# Define custom executables
if namespace.custom_executables in self.internals:
for name, conf in self.internals[namespace.custom_executables].items():
if name in self.executables or name in self.custom_executables:
experiment_namespace = self.expander.expand_var_name("experiment_namespace")
raise ExecutableNameError(
f"In experiment {experiment_namespace} "
f'a custom executable "{name}" is defined.\n'
f'However, an executable "{name}" is already '
"defined"
)
self.custom_executables[name] = ramble.util.executable.CommandExecutable(
name=name, **conf
)
def _get_exec_order(self, workload_name):
graph = self._get_executable_graph(workload_name)
order = []
for node in graph.walk():
order.append(node.key)
return order
def _get_executable_graph(self, workload_name):
"""Return executables for add_expand_vars"""
self._define_custom_executables()
exec_order = self.workloads[workload_name].executables
# Use yaml defined executable order, if defined
if namespace.executables in self.internals:
exec_order = self.internals[namespace.executables]
builtin_objects = []
all_builtins = []
for _, obj in self._objects():
builtin_objects.append(obj)
all_builtins.append(obj.builtins)
all_executables = self.executables.copy()
all_executables.update(self.custom_executables)
executable_graph = ramble.graphs.ExecutableGraph(
exec_order, all_executables, builtin_objects, all_builtins, self
)
# Perform executable injection
if namespace.executable_injection in self.internals:
for exec_injection in self.internals[namespace.executable_injection]:
exec_name = exec_injection["name"]
order = "before"
if "order" in exec_injection:
order = exec_injection["order"]
relative_to = None
if "relative_to" in exec_injection:
relative_to = exec_injection["relative_to"]
executable_graph.inject_executable(exec_name, order, relative_to)
return executable_graph
def _set_input_path(self):
"""Put input_path into self.variables[input_file] for add_expand_vars"""
self._inputs_and_fetchers(self.expander.workload_name)
for input_file, input_conf in self._input_fetchers.items():
input_vars = {}
if input_conf["expand"]:
input_vars[self.keywords.input_name] = input_conf["input_name"]
else:
input_vars[self.keywords.input_name] = input_file
input_path = os.path.join(
self.expander.expand_var(
os.path.join(input_conf["target_dir"], input_file), extra_vars=input_vars
),
)
self.variables[input_conf["input_name"]] = input_path
def _set_default_experiment_variables(self):
"""Set default experiment variables (for add_expand_vars),
if they haven't been set already"""
# Set default experiment variables, if they haven't been set already
var_sets = []
if self.expander.workload_name in self.workloads:
var_sets.append(self.workloads[self.expander.workload_name].variables)
if self.package_manager is not None:
var_sets.append(self.package_manager.package_manager_variables)
for mod_inst in self._modifier_instances:
var_sets.append(mod_inst.mode_variables())
if self.workflow_manager is not None:
var_sets.append(self.workflow_manager.wm_vars)
for var_set in var_sets:
for var, val in var_set.items():
if var not in self.variables.keys():
self.define_variable(var, val.default)
if self.expander.workload_name in self.workloads:
workload = self.workloads[self.expander.workload_name]
new_env_vars = {}
for env_var in workload.environment_variables.values():
action = "set"
value = env_var.value
add = True
for env_var_set in self._env_variable_sets:
if action in env_var_set:
if env_var.name in env_var_set[action].keys():
add = False
if add:
new_env_vars[env_var.name] = value
self._env_variable_sets.append({"set": new_env_vars})
def _define_commands(
self, exec_graph, success_list=ramble.success_criteria.ScopedCriteriaList()
):
"""Populate the internal list of commands based on executables
Populates self._command_list with a list of the executable commands that
should be executed by this experiment.
"""
if len(self._command_list) > 0:
return
self._command_list = []
self._command_list_without_logs = []
# Inject all prepended chained experiments
for chained_exp in self.chain_prepend:
self._command_list.append(self.chain_commands[chained_exp])
self._command_list_without_logs.append(self.chain_commands[chained_exp])
# ensure all log files are purged and set up
logs = []
for exec_node in exec_graph.walk():
if isinstance(exec_node.attribute, ramble.util.executable.CommandExecutable):
exec_cmd = exec_node.attribute
if exec_cmd.redirect:
expanded_log = self.expander.expand_var(exec_cmd.redirect)
logs.append(expanded_log)
analysis_logs, _, _ = self._analysis_dicts(success_list)
for log in analysis_logs:
logs.append(log)
logs = list(dict.fromkeys(logs))
for log in logs:
self._command_list.append('rm -f "%s"' % log)
self._command_list.append('touch "%s"' % log)
for exec_node in exec_graph.walk():
exec_vars = {"executable_name": exec_node.key}
if isinstance(exec_node.attribute, ramble.util.executable.CommandExecutable):
exec_vars.update(exec_node.attribute.variables)
for mod in self._modifier_instances:
if mod.applies_to_executable(exec_node.key):
exec_vars.update(mod.modded_variables(self, exec_vars))
if isinstance(exec_node.attribute, ramble.util.executable.CommandExecutable):
# Process directive defined executables
base_command = exec_node.attribute.copy()
pre_commands = []
post_commands = []
for mod in self._modifier_instances:
if mod.applies_to_executable(exec_node.key):
pre_cmd, post_cmd = mod.apply_executable_modifiers(
exec_node.key, base_command, app_inst=self
)
pre_commands.extend(pre_cmd)
post_commands.extend(post_cmd)
command_configs = pre_commands.copy()
command_configs.append(base_command)
command_configs.extend(post_commands)
for cmd_conf in command_configs:
mpi_cmd = ""
if cmd_conf.mpi:
raw_mpi_cmd = self.expander.expand_var("{mpi_command}", exec_vars).strip()
if (
not raw_mpi_cmd
and int(self.expander.expand_var_name(self.keywords.n_nodes)) > 1
):
logger.warn(
f"Command {cmd_conf.name} requires a non-empty `mpi_command` "
"variable in a multi-node experiment"
)
mpi_cmd = " " + raw_mpi_cmd + " "
redirect = ""
if cmd_conf.redirect:
out_log = self.expander.expand_var(cmd_conf.redirect, exec_vars)
output_operator = cmd_conf.output_capture
redirect_mapper = output_mapper()
redirect = redirect_mapper.generate_out_string(out_log, output_operator)
if cmd_conf.run_in_background:
bg_cmd = " &"
else:
bg_cmd = ""
for part in cmd_conf.template:
command_part = f"{mpi_cmd}{part}"
suffix_part = f"{redirect}{bg_cmd}"
expanded_cmd = self.expander.expand_var(command_part, exec_vars)
suffix_cmd = self.expander.expand_var(suffix_part, exec_vars)
self._command_list.append(expanded_cmd + " " + suffix_cmd)
self._command_list_without_logs.append(expanded_cmd)
else: # All Builtins
func = exec_node.attribute
func_cmds = func()
for cmd in func_cmds:
expanded = self.expander.expand_var(cmd, exec_vars)
self._command_list.append(expanded)
self._command_list_without_logs.append(expanded)
# Inject all appended chained experiments
for chained_exp in self.chain_append:
expanded = self.expander.expand_var(self.chain_commands[chained_exp])
self._command_list.append(expanded)
self._command_list_without_logs.append(expanded)
def _define_formatted_executables(self):
"""Define variables representing the formatted executables
Process the formatted_executables definitions, and construct their
variable definitions.
Each formatted executable definition is injected as its own variable
based on the formatting requested.
"""
self.variables[self.keywords.unformatted_command] = "\n".join(self._command_list)
self.variables[self.keywords.unformatted_command_without_logs] = "\n".join(
self._command_list_without_logs
)
formatted_exec_groups = [self._formatted_executables]
objs_to_extract = [self, self.workflow_manager, self.package_manager]
for obj in objs_to_extract + self._modifier_instances:
if obj and hasattr(obj, "formatted_executables"):
formatted_exec_groups.append(obj.formatted_executables)
for formatted_exec_group in formatted_exec_groups:
for var_name, formatted_conf in formatted_exec_group.items():
if var_name in self.variables:
raise FormattedExecutableError(
f"Formatted executable {var_name} defined, but variable "
"definition already exists."
)
n_indentation = 0
if namespace.indentation in formatted_conf:
n_indentation = int(formatted_conf[namespace.indentation])
prefix = ""
if namespace.prefix in formatted_conf:
prefix = formatted_conf[namespace.prefix]
join_separator = "\n"
if namespace.join_separator in formatted_conf:
join_separator = formatted_conf[namespace.join_separator].replace(r"\n", "\n")
indentation = " " * n_indentation
commands_to_format = self._command_list
if namespace.commands in formatted_conf:
commands_to_format = formatted_conf[namespace.commands].copy()
formatted_lines = []
for command in commands_to_format:
expanded = self.expander.expand_var(command)
for out_line in expanded.split("\n"):
formatted_lines.append(indentation + prefix + out_line)
self.variables[var_name] = join_separator.join(formatted_lines)
def _derive_variables_for_template_path(self, workspace):
"""Define variables for template paths (for add_expand_vars)"""
for template_name, _ in workspace.all_templates():
expand_path = os.path.join(
self.expander.expand_var(f"{{experiment_run_dir}}"), template_name # noqa: F541
)
self.variables[template_name] = expand_path
def _validate_experiment(self):
"""Perform validation of an experiment before performing actions with it
This function is an entry point to validate various aspects of an
experiment definition before it is used. It is expected to raise errors
when validation fails.
"""
if self.expander.workload_name not in self.workloads:
raise ApplicationError(
f"Workload {self.expander.workload_name} is not defined "
f"as a workload of application {self.name}."
)
[docs]
def add_expand_vars(self, workspace):
"""Add application specific expansion variables
Applications require several variables to be defined to function properly.
This method defines these variables, including:
- command: set to the commands needed to execute the experiment
- spack_setup: set to an empty string, so spack applications can override this
"""
if not self._vars_are_expanded:
self._validate_experiment()
self._executable_graph = self._get_executable_graph(self.expander.workload_name)
self._set_default_experiment_variables()
self._set_input_path()
self._derive_variables_for_template_path(workspace)
self._define_object_template_vars(workspace)
self._vars_are_expanded = True
def _inputs_and_fetchers(self, workload=None):
"""Extract all inputs for a given workload
Take a workload name and extract all inputs for the workload.
If the workload is set to None, extract all inputs for all workloads.
"""
if self._input_fetchers is not None:
return
self._input_fetchers = {}
workload_names = [workload] if workload else self.workloads.keys()
inputs = {}
for workload_name in workload_names:
workload = self.workloads[workload_name]
for input_file in workload.inputs:
if input_file not in self.inputs:
logger.die(
f"Workload {workload_name} references a non-existent input file "
f"{input_file}.\n"
f"Make sure this input file is defined before using it in a workload."
)
input_conf = self.inputs[input_file].copy()
# Expand input value as it may be a var
expanded_url = self.expander.expand_var(input_conf["url"])
input_conf["url"] = expanded_url
fetcher = ramble.fetch_strategy.URLFetchStrategy(**input_conf)
file_name = os.path.basename(input_conf["url"])
if not fetcher.extension:
fetcher.extension = spack.util.compression.extension(file_name)
file_name = file_name.replace(f".{fetcher.extension}", "")
namespace = f"{self.name}.{workload_name}"
inputs[file_name] = {
"fetcher": fetcher,
"namespace": namespace,
"target_dir": input_conf["target_dir"],
"extension": fetcher.extension,
"input_name": input_file,
"expand": input_conf["expand"],
}
self._input_fetchers = inputs
register_phase("mirror_inputs", pipeline="mirror")
def _mirror_inputs(self, workspace, app_inst=None):
"""Mirror application inputs
Perform mirroring of inputs within this application class.
"""
mirror_lock = lk.Lock(os.path.join(workspace.input_mirror_path, ".ramble-mirror"))
self._inputs_and_fetchers(self.expander.workload_name)
with lk.WriteTransaction(mirror_lock):
for input_file, input_conf in self._input_fetchers.items():
mirror_paths = ramble.mirror.mirror_archive_paths(
input_conf["fetcher"], os.path.join(self.name, input_file)
)
fetch_dir = os.path.join(workspace.input_mirror_path, self.name)
fs.mkdirp(fetch_dir)
stage = ramble.stage.InputStage(
input_conf["fetcher"],
name=input_conf["namespace"],
path=fetch_dir,
mirror_paths=mirror_paths,
lock=False,
)
stage.cache_mirror(workspace.input_mirror_cache, workspace.input_mirror_stats)
register_phase("get_inputs", pipeline="setup")
def _get_inputs(self, workspace, app_inst=None):
"""Download application inputs
Download application inputs into the proper directory within the workspace.
"""
workload_namespace = self.expander.workload_namespace
self._inputs_and_fetchers(self.expander.workload_name)
for input_file, input_conf in self._input_fetchers.items():
if not workspace.dry_run:
input_vars = {self.keywords.input_name: input_conf["input_name"]}
input_namespace = workload_namespace + "." + input_file
input_path = self.expander.expand_var(
os.path.join(input_conf["target_dir"], input_file), extra_vars=input_vars
)
input_tuple = (f"input-file-{input_file}", input_path)
# Skip inputs that have already been cached
if workspace.check_cache(input_tuple):
continue
mirror_paths = ramble.mirror.mirror_archive_paths(
input_conf["fetcher"], os.path.join(self.name, input_file)
)
input_dir = os.path.dirname(input_path)
input_base = os.path.basename(input_path)
input_lock = lk.Lock(os.path.join(input_dir, ".ramble-input"))
with lk.WriteTransaction(input_lock):
with ramble.stage.InputStage(
input_conf["fetcher"],
name=input_namespace,
path=input_dir,
mirror_paths=mirror_paths,
) as stage:
stage.set_subdir(input_base)
try:
stage.fetch()
if input_conf["fetcher"].digest:
stage.check()
stage.cache_local()
if input_conf["expand"]:
try:
stage.expand_archive()
except spack.util.executable.ProcessError:
pass
except ramble.fetch_strategy.FetchError as e:
logger.all_msg(
f"Failed fetching input {input_file} in application {self.name}"
)
logger.all_msg(f"Input url was: {input_conf['fetcher'].url}")
logger.die(str(e))
workspace.add_to_cache(input_tuple)
else:
logger.msg(f'DRY-RUN: Would download {input_conf["fetcher"].url}')
def _prepare_license_path(self, workspace):
self.license_path = os.path.join(workspace.shared_license_dir, self.name)
self.license_file = os.path.join(self.license_path, self.license_inc_name)
fs.mkdirp(self.license_path)
register_phase("license_includes", pipeline="setup")
def _license_includes(self, workspace, app_inst=None):
logger.debug("Writing License Includes")
self._prepare_license_path(workspace)
action_funcs = ramble.util.env.action_funcs
config_scopes = ramble.config.scopes()
shell = ramble.config.get("config:shell")
var_set = set()
for scope in config_scopes:
license_conf = ramble.config.config.get_config("licenses", scope=scope)
if license_conf:
app_licenses = license_conf[self.name] if self.name in license_conf else {}
for action, conf in app_licenses.items():
(env_cmds, var_set) = action_funcs[action](
conf, self.expander, var_set, shell=shell
)
lock = lk.Lock(os.path.join(self.license_path, ".ramble-license"))
with lk.WriteTransaction(lock):
with open(self.license_file, "w+") as f:
for cmd in env_cmds:
if cmd:
f.write(cmd + "\n")
register_phase("make_experiments", pipeline="setup", run_after=["get_inputs"])
def _make_experiments(self, workspace, app_inst=None):
"""Create experiment directories
Create the experiment this application encapsulates. This includes
creating the experiment run directory, rendering the necessary
templates, and injecting the experiment into the workspace all
experiments file.
"""
_check_shell_support(self)
exp_lock = self.experiment_lock()
self._define_commands(self._executable_graph, workspace.success_list)
self._define_formatted_executables()
with lk.WriteTransaction(exp_lock):
experiment_run_dir = self.expander.experiment_run_dir
fs.mkdirp(experiment_run_dir)
exec_vars = {}
for mod in self._modifier_instances:
exec_vars.update(mod.modded_variables(self, exec_vars))
for template_name, template_conf in workspace.all_templates():
expand_path = os.path.join(experiment_run_dir, template_name)
logger.msg(f"Writing template {template_name} to {expand_path}")
fs.mkdirp(os.path.dirname(expand_path))
with open(expand_path, "w+") as f:
f.write(
self.expander.expand_var(template_conf["contents"], extra_vars=exec_vars)
)
os.chmod(expand_path, _DEFAULT_CONTENT_PERM)
self._render_object_templates(exec_vars, workspace)
experiment_script = workspace.experiments_script
experiment_script.write(self.expander.expand_var("{batch_submit}\n"))
self.set_status(status=experiment_status.SETUP)
def _clean_hash_variables(self, workspace, variables):
"""Cleanup variables to hash before computing the hash
Perform some general cleanup operations on variables
before hashing, to help give useful hashes.
"""
# Purge workspace name, as this shouldn't affect the experiments
if "workspace_name" in variables:
del variables["workspace_name"]
# Remove the workspace path from variable definitions before hashing
for var in variables:
if isinstance(variables[var], str):
variables[var] = variables[var].replace(workspace.root + os.path.sep, "")
[docs]
def populate_inventory(self, workspace, force_compute=False, require_exist=False):
"""Populate this experiment's hash inventory
If an inventory file exists, read it first.
If it does not exist, compute it using the internal information.
If force_compute is set to true, always compute and never read.
Args:
force_compute: Boolean that allows forces the inventory to be computed instead of read
Used in pipelines that should create the inventory, instead of
consuming it.
"""
experiment_run_dir = self.expander.experiment_run_dir
inventory_file = os.path.join(experiment_run_dir, self._inventory_file_name)
if os.path.exists(inventory_file) and not force_compute:
with open(inventory_file) as f:
self.hash_inventory = spack.util.spack_json.load(f)
else:
# Clean up variables before hashing
vars_to_hash = self.variables.copy()
self._clean_hash_variables(workspace, vars_to_hash)
# Build inventory of attributes
attributes_to_hash = [
("variables", vars_to_hash),
("modifiers", self.modifiers),
("chained_experiments", self.chained_experiments),
("internals", self.internals),
("env_vars", self._env_variable_sets),
]
self.hash_inventory["application_definition"] = ramble.util.hashing.hash_file(
self._file_path
)
added_mods = set()
for mod_inst in self._modifier_instances:
if mod_inst.name not in added_mods:
self.hash_inventory["modifier_definitions"].append(
{
"name": mod_inst.name,
"digest": ramble.util.hashing.hash_file(mod_inst._file_path),
}
)
added_mods.add(mod_inst.name)
for attr, attr_dict in attributes_to_hash:
self.hash_inventory["attributes"].append(
{
"name": attr,
"digest": ramble.util.hashing.hash_json(attr_dict),
}
)
# Build inventory of workspace templates
for template_name, template_conf in workspace.all_templates():
self.hash_inventory["templates"].append(
{
"name": template_name,
"digest": template_conf["digest"],
}
)
# Build inventory of inputs
self._inputs_and_fetchers(self.expander.workload_name)
for input_conf in self._input_fetchers.values():
if input_conf["fetcher"].digest:
self.hash_inventory["inputs"].append(
{"name": input_conf["input_name"], "digest": input_conf["fetcher"].digest}
)
else:
self.hash_inventory["inputs"].append(
{
"name": input_conf["input_name"],
"digest": ramble.util.hashing.hash_string(input_conf["fetcher"].url),
}
)
if self.package_manager is not None:
self.package_manager.populate_inventory(workspace, force_compute, require_exist)
self.experiment_hash = ramble.util.hashing.hash_json(self.hash_inventory)
self.variables[self.keywords.experiment_hash] = self.experiment_hash
register_phase("write_inventory", pipeline="setup", run_after=["make_experiments"])
def _write_inventory(self, workspace, app_inst=None):
"""Build and write an inventory of an experiment
Write an inventory file describing all of the contents of this
experiment.
"""
experiment_run_dir = self.expander.experiment_run_dir
inventory_file = os.path.join(experiment_run_dir, self._inventory_file_name)
# Populate modifier artifacts portion of inventory
# This happens here to allow modifiers to hash files
# that are downloaded within phases earlier than this.
for mod_inst in self._modifier_instances:
inventory = mod_inst.artifact_inventory(workspace, app_inst)
if inventory:
self.hash_inventory["modifier_artifacts"].append(
{"name": mod_inst.name, "mode": mod_inst._usage_mode, "artifacts": inventory}
)
with lk.WriteTransaction(self.experiment_lock()):
with open(inventory_file, "w+") as f:
spack.util.spack_json.dump(self.hash_inventory, f)
register_phase("archive_experiments", pipeline="archive")
def _archive_experiments(self, workspace, app_inst=None):
"""Archive an experiment directory
Perform the archiving action on an experiment.
This includes capturing:
- Rendered templates within the experiment directory
- All files that contain a figure of merit or success criteria
- Any files that match an archive pattern
"""
import glob
experiment_run_dir = self.expander.experiment_run_dir
ws_archive_dir = workspace.latest_archive_path
archive_experiment_dir = experiment_run_dir.replace(workspace.root, ws_archive_dir)
fs.mkdirp(archive_experiment_dir)
archive_lock = lk.Lock(os.path.join(archive_experiment_dir, ".ramble-exp-archive"))
with lk.WriteTransaction(archive_lock):
# Copy all of the templates to the archive directory
for template_name, _ in workspace.all_templates():
src = os.path.join(experiment_run_dir, template_name)
if os.path.exists(src):
shutil.copy(src, archive_experiment_dir)
# Copy all rendered templates generated by `register_template`
for _, tpl_config in self._object_templates(workspace):
src_path = tpl_config["dest_path"]
if os.path.exists(src_path):
shutil.copy(src_path, archive_experiment_dir)
# Copy all figure of merit files
criteria_list = workspace.success_list
analysis_files, _, _ = self._analysis_dicts(criteria_list)
for file in analysis_files.keys():
if os.path.exists(file):
shutil.copy(file, archive_experiment_dir)
# Copy all archive patterns
archive_patterns = set(self.archive_patterns.keys())
if self.package_manager:
for pattern in self.package_manager.archive_patterns.keys():
archive_patterns.add(pattern)
for mod in self._modifier_instances:
for pattern in mod.archive_patterns.keys():
archive_patterns.add(pattern)
for pattern in archive_patterns:
exp_pattern = self.expander.expand_var(pattern)
for file in glob.glob(exp_pattern):
dest_dir = os.path.dirname(file.replace(workspace.root, ws_archive_dir))
fs.mkdirp(dest_dir)
shutil.copy(file, dest_dir)
for file_name in [self._inventory_file_name, self._status_file_name]:
file = os.path.join(experiment_run_dir, file_name)
if os.path.exists(file):
shutil.copy(file, archive_experiment_dir)
register_phase("prepare_analysis", pipeline="analyze")
def _prepare_analysis(self, workspace, app_inst=None):
"""Prepapre experiment for analysis extraction
This function performs any actions that are necessary before the
figures of merit, and success criteria can be properly extracted.
This function can be overridden at the application level to perform
application specific processing of the output.
"""
pass
register_phase("analyze_experiments", pipeline="analyze", run_after=["prepare_analysis"])
def _analyze_experiments(self, workspace, app_inst=None):
"""Perform experiment analysis.
This method will build up the fom_values dictionary. Its structure is:
fom_values[context][fom]
A fom can show up in any number of explicit contexts (including zero).
If the number of explicit contexts is zero, the fom is associated with
the default '(null)' context.
Success is determined at analysis time as well. This happens by checking if:
- At least one FOM is extracted
AND
- Any defined success criteria pass
Success criteria are defined within the application.py, but can also be
injected in a workspace config.
"""
if self.get_status() == experiment_status.UNKNOWN.name and not workspace.dry_run:
logger.warn(f"Experiment has status {self.get_status()}. Skipping analysis..\n")
return
def format_context(context_match, context_format):
context_val = {}
if isinstance(context_format, str):
for group in string.Formatter().parse(context_format):
if group[1]:
context_val[group[1]] = context_match[group[1]]
context_string = context_format.format(**context_val)
return context_string
fom_values = {}
criteria_list = workspace.success_list
criteria_list.reset()
files, contexts, foms = self._analysis_dicts(criteria_list)
exp_lock = self.experiment_lock()
# Iterate over files. We already know they exist
with lk.ReadTransaction(exp_lock):
for file, file_conf in files.items():
# Start with no active contexts in a file.
active_contexts = {}
logger.debug(f"Reading log file: {file}")
if not os.path.exists(file):
logger.debug(f"Skipping analysis of non-existent file: {file}")
continue
per_file_crit_objs = [
criteria_list.find_criteria(c) for c in file_conf["success_criteria"]
]
with open(file) as f:
for line in f.readlines():
new_per_file_crit_objs = []
for crit_obj in per_file_crit_objs:
if crit_obj.passed(line, self):
crit_obj.mark_found()
elif crit_obj.anti_matched(line):
crit_obj.mark_anti_found()
else:
new_per_file_crit_objs.append(crit_obj)
per_file_crit_objs = new_per_file_crit_objs
for context in file_conf["contexts"]:
context_conf = contexts[context]
context_match = context_conf["regex"].match(line)
if context_match:
context_name = format_context(
context_match, context_conf["format"]
)
logger.debug("Line was: %s" % line)
logger.debug(f" Context match {context} -- {context_name}")
active_contexts[context] = context_name
if context_name not in fom_values:
fom_values[context_name] = {}
for fom in file_conf["foms"]:
fom_conf = foms[fom]
fom_match = fom_conf["regex"].match(line)
if fom_match:
fom_vars = {}
for k, v in fom_match.groupdict().items():
fom_vars[k] = v
if fom_conf["fom_name_expanded"] is not None:
fom_name = fom_conf["fom_name_expanded"]
else:
fom_name = self.expander.expand_var(fom, extra_vars=fom_vars)
if fom_conf["group"] in fom_conf["regex"].groupindex:
logger.debug(" --- Matched fom %s" % fom_name)
fom_contexts = []
if fom_conf["contexts"]:
for context in fom_conf["contexts"]:
context_name = (
active_contexts[context]
if context in active_contexts
else _NULL_CONTEXT
)
fom_contexts.append(context_name)
else:
fom_contexts.append(_NULL_CONTEXT)
for context in fom_contexts:
if context not in fom_values:
fom_values[context] = {}
fom_val = fom_match.group(fom_conf["group"])
if fom_conf["units_expanded"] is not None:
fom_unit = fom_conf["units"]
else:
fom_unit = self.expander.expand_var(
fom_conf["units"], extra_vars=fom_vars
)
fom_values[context][fom_name] = {
"value": fom_val,
"units": fom_unit,
"origin": fom_conf["origin"],
"origin_type": fom_conf["origin_type"],
"fom_type": fom_conf["fom_type"],
}
# Test all non-file based success criteria
for criteria_obj in criteria_list.all_criteria():
if criteria_obj.file is None:
if criteria_obj.passed(app_inst=self, fom_values=fom_values):
criteria_obj.mark_found()
success = False
for fom in fom_values.values():
for value in fom.values():
if "origin_type" in value and value["origin_type"] == "application":
success = True
success = success and criteria_list.passed()
status = experiment_status.SUCCESS if success else experiment_status.FAILED
# When workflow_manager is present, only use app_status when workflow is completed or
# unresolved.
if self.workflow_manager is not None:
wm_status = self.workflow_manager.get_status(workspace)
if not (
wm_status is None
or wm_status in [experiment_status.COMPLETE, experiment_status.UNRESOLVED]
):
status = wm_status
self.set_status(status)
self._init_result()
for context, fom_map in fom_values.items():
context_map = {
"name": context,
"foms": [],
"display_name": _get_context_display_name(context),
}
for fom_name, fom in fom_map.items():
fom_copy = fom.copy()
fom_copy["name"] = fom_name
context_map["foms"].append(fom_copy)
if context == _NULL_CONTEXT:
self.result.contexts.insert(0, context_map)
else:
self.result.contexts.append(context_map)
register_phase(
"append_results_to_workspace", pipeline="analyze", run_after=["analyze_experiments"]
)
def _append_results_to_workspace(self, workspace, app_inst=None):
"""Phase for injected experiment results into workspace results
This allows an experiment to register its results into the workspace,
so when the workspace dumps results they are included.
"""
if hasattr(self, "result") and self.result is not None:
workspace.append_result(self.result.to_dict())
[docs]
def calculate_statistics(self, workspace):
"""Calculate statistics for results of repeated experiments
When repeated experiments are used, this method aggregates the results of
each experiment's repeats and calculates statistics for each numeric FOM.
If a FOM is non-numeric, no calculations are performed.
Statistics are injected into the results file under the base experiment
namespace.
"""
def is_numeric_fom(fom):
"""Returns true if a fom value is numeric, and of an applicable type"""
value = fom["value"]
try:
value = float(value)
if (
fom["fom_type"]["name"] is FomType.CATEGORY.name
or fom["fom_type"]["name"] is FomType.INFO.name
):
return False
return True
except (ValueError, TypeError):
return False
if not self.repeats.is_repeat_base:
return
repeat_experiments = {}
repeat_foms = {}
first_repeat_exp = ""
# repeat_experiments dict = {repeat_experiment_namespace: {dict}}
# repeat_foms dict = {context: {(fom_name, units, origin, origin_type): [list of values]}}
# origin_type is generated as 'summary::stat_name'
base_exp_name = self.expander.experiment_name
base_exp_namespace = self.expander.experiment_namespace
# Create a list of all repeats by inserting repeat index
for n in range(1, self.repeats.n_repeats + 1):
if (
base_exp_name in self.experiment_set.chained_experiments.keys()
and base_exp_name not in self.experiment_set.experiments.keys()
):
insert_idx = base_exp_name.find(".chain")
repeat_exp_namespace = (
base_exp_name[:insert_idx] + f".{n}" + base_exp_name[insert_idx:]
)
else:
base_exp_namespace = self.expander.experiment_namespace
repeat_exp_namespace = f"{base_exp_namespace}.{n}"
repeat_experiments[repeat_exp_namespace] = {}
repeat_experiments[repeat_exp_namespace]["base_exp"] = base_exp_namespace
if n == 1:
first_repeat_exp = repeat_exp_namespace
# If repeat_success_strict is true, one failed experiment will fail the whole set
# If repeat_success_strict is false, any passing experiment will pass the whole set
repeat_success = False
exp_status_list = []
for exp in repeat_experiments.keys():
if exp in self.experiment_set.experiments.keys():
exp_inst = self.experiment_set.experiments[exp]
elif exp in self.experiment_set.chained_experiments.keys():
exp_inst = self.experiment_set.chained_experiments[exp]
else:
continue
exp_status_list.append(exp_inst.get_status())
if workspace.repeat_success_strict:
if experiment_status.FAILED.name in exp_status_list:
repeat_success = False
else:
repeat_success = True
else:
if experiment_status.SUCCESS.name in exp_status_list:
repeat_success = True
else:
repeat_success = False
if repeat_success:
self.set_status(status=experiment_status.SUCCESS)
else:
self.set_status(status=experiment_status.FAILED)
self._init_result()
logger.debug(
f"Calculating statistics for {self.repeats.n_repeats} repeats of " f"{base_exp_name}"
)
results = []
# Iterate through repeat experiment instances, extract foms, and aggregate them
for exp in repeat_experiments.keys():
if exp in self.experiment_set.experiments.keys():
exp_inst = self.experiment_set.experiments[exp]
elif exp in self.experiment_set.chained_experiments.keys():
exp_inst = self.experiment_set.chained_experiments[exp]
else:
continue
# When strict success is off for repeats (loose success), skip failed exps
if exp_inst.result.status == experiment_status.FAILED.name:
continue
if exp_inst.result.contexts:
for context in exp_inst.result.contexts:
context_name = context["name"]
if context_name not in repeat_foms.keys():
repeat_foms[context_name] = {}
for fom in context["foms"]:
fom_key = (
fom["name"],
fom["units"],
fom["origin"],
fom["origin_type"],
)
# Stats will not be calculated for non-numeric foms so they're skipped
if fom_key not in repeat_foms[context_name].keys():
repeat_foms[context_name][fom_key] = {
"fom_type": fom["fom_type"],
"fom_values": [],
}
if is_numeric_fom(fom):
repeat_foms[context_name][fom_key]["fom_is_numeric"] = True
else:
repeat_foms[context_name][fom_key]["fom_is_numeric"] = False
fom_contents = (False, fom["value"], fom["fom_type"])
if repeat_foms[context_name][fom_key]["fom_is_numeric"]:
repeat_foms[context_name][fom_key]["fom_values"].append(
float(fom["value"])
)
else:
repeat_foms[context_name][fom_key]["fom_values"].append(fom["value"])
# Iterate through the aggregated foms, calculate stats, and insert into results
for context, fom_dict in repeat_foms.items():
if not fom_dict:
continue
context_map = {
"name": context,
"foms": [],
"display_name": _get_context_display_name(context),
}
summary_foms = []
if context == _NULL_CONTEXT:
# Use the app name as the origin of the FOM
summary_origin = self.name
n_total_dict = {
"value": self.repeats.n_repeats,
"units": "repeats",
"origin": summary_origin,
"origin_type": "summary::n_total_repeats",
"name": "Experiment Summary",
"fom_type": FomType.MEASURE.to_dict(),
}
summary_foms.append(n_total_dict)
n_success = exp_status_list.count(experiment_status.SUCCESS.name)
n_success_dict = {
"value": n_success,
"units": "repeats",
"origin": summary_origin,
"origin_type": "summary::n_successful_repeats",
"name": "Experiment Summary",
"fom_type": FomType.MEASURE.to_dict(),
}
summary_foms.append(n_success_dict)
for fom_key, fom_contents in fom_dict.items():
fom_name, fom_units, fom_origin, fom_origin_type = fom_key
fom_type = fom_contents["fom_type"]
fom_values = fom_contents["fom_values"]
if fom_contents["fom_is_numeric"]:
calcs = []
for statistic in ramble.util.stats.all_stats:
calcs.append(statistic.report(fom_values, fom_units))
for calc in calcs:
fom_calc_dict = {
"value": calc[0],
"units": calc[1],
"origin": fom_origin,
"origin_type": calc[2],
"name": fom_name,
"fom_type": fom_type,
}
context_map["foms"].append(fom_calc_dict)
else:
# Only elevate non-numeric FOMs when they have the same value for all repeats
if len(set(fom_values)) == 1:
fom_str_dict = {
"value": fom_values[0],
"units": fom_units,
"origin": fom_origin,
"origin_type": fom_origin_type,
"name": fom_name,
"fom_type": fom_type,
}
context_map["foms"].append(fom_str_dict)
else:
continue
# Display the FOMs in alphabetic order, even if the corresponding log entries
# may be in different ordering.
context_map["foms"].sort(key=operator.itemgetter("name"))
if context == _NULL_CONTEXT:
context_map["foms"] = summary_foms + context_map["foms"]
results.append(context_map)
if results:
self.result.contexts = results
workspace.insert_result(self.result.to_dict(), first_repeat_exp)
def _init_result(self):
if self.result is None:
self.result = ExperimentResult(self)
def _new_file_dict(self):
"""Create a dictionary to represent a new log file"""
return {"success_criteria": [], "contexts": [], "foms": []}
def _analysis_dicts(self, criteria_list):
"""Extract files that need to be analyzed.
Process figures_of_merit, and return the manipulated dictionaries
to allow them to be extracted.
Additionally, ensure the success criteria list is complete.
Returns:
files (dict): All files that need to be processed
contexts (dict): Any contexts that have been defined
foms (dict): All figures of merit that need to be extracted
"""
files = {}
contexts = {}
foms = {}
# Add the application defined criteria
criteria_list.flush_scope("application_definition")
success_lists = [
("application_definition", self.success_criteria),
]
logger.debug(f" Number of modifiers are: {len(self._modifier_instances)}")
if self._modifier_instances:
criteria_list.flush_scope("modifier_definition")
for mod in self._modifier_instances:
success_lists.append(("modifier_definition", mod.success_criteria))
for success_scope, success_list in success_lists:
for criteria, conf in success_list.items():
if conf["mode"] == "string":
match = (
self.expander.expand_var(conf["match"])
if conf["match"] is not None
else None
)
anti_match = (
self.expander.expand_var(conf["anti_match"])
if conf["anti_match"] is not None
else None
)
criteria_list.add_criteria(
success_scope,
criteria,
mode=conf["mode"],
match=match,
file=conf["file"],
anti_match=anti_match,
)
elif conf["mode"] == "fom_comparison":
criteria_list.add_criteria(
success_scope,
criteria,
conf["mode"],
fom_name=conf["fom_name"],
fom_context=conf["fom_context"],
formula=conf["formula"],
)
criteria_list.add_criteria(
scope="application_definition",
name="_application_function",
mode="application_function",
)
# Extract file paths for all criteria
for criteria in criteria_list.all_criteria():
log_path = self.expander.expand_var(criteria.file)
# Ensure log path is absolute. If not, prepend the experiment run directory
if not os.path.isabs(log_path) and self.expander.experiment_run_dir not in log_path:
log_path = os.path.join(self.expander.experiment_run_dir, log_path)
if log_path not in files and os.path.exists(log_path):
files[log_path] = self._new_file_dict()
if log_path in files:
files[log_path]["success_criteria"].append(criteria.name)
# Remap fom / context / file data
# Could push this into the language features in the future
fom_definitions = self.figures_of_merit.copy()
for fom, fom_def in fom_definitions.items():
fom_def["origin"] = self.name
fom_def["origin_type"] = "application"
fom_contexts = self.figure_of_merit_contexts.copy()
for mod in self._modifier_instances:
fom_contexts.update(mod.figure_of_merit_contexts)
mod_vars = mod.modded_variables(self)
for fom, fom_def in mod.figures_of_merit.items():
fom_definitions[fom] = {"origin": f"{mod}", "origin_type": "modifier"}
for attr in fom_def.keys():
if isinstance(fom_def[attr], (list, FomType)):
fom_definitions[fom][attr] = fom_def[attr].copy()
else:
fom_definitions[fom][attr] = self.expander.expand_var(
fom_def[attr], mod_vars
)
if self.workflow_manager is not None:
fom_contexts.update(self.workflow_manager.figure_of_merit_contexts)
for fom, fom_def in self.workflow_manager.figures_of_merit.items():
fom_definitions[fom] = {
"origin": f"{self.workflow_manager}",
"origin_type": "workflow_manager",
}
for attr in fom_def.keys():
if isinstance(fom_def[attr], (list, FomType)):
fom_definitions[fom][attr] = fom_def[attr].copy()
else:
fom_definitions[fom][attr] = self.expander.expand_var(fom_def[attr])
for fom, conf in fom_definitions.items():
log_path = self.expander.expand_var(conf["log_file"])
# Ensure log path is absolute. If not, prepend the experiment run directory
if not os.path.isabs(log_path) and self.expander.experiment_run_dir not in log_path:
log_path = os.path.join(self.expander.experiment_run_dir, log_path)
if log_path not in files:
files[log_path] = self._new_file_dict()
logger.debug("Log = %s" % log_path)
logger.debug("Conf = %s" % conf)
if conf["contexts"]:
files[log_path]["contexts"].extend(conf["contexts"])
files[log_path]["foms"].append(fom)
def _try_expand_var_or_none(var: str, expander):
try:
return expander.expand_var(var, allow_passthrough=False)
except ramble.expander.RambleSyntaxError:
return None
foms[fom] = {
"regex": re.compile(r"%s" % self.expander.expand_var(conf["regex"])),
"contexts": [],
"group": conf["group_name"],
"units": conf["units"],
"origin": conf["origin"],
"origin_type": conf["origin_type"],
"fom_type": conf["fom_type"].to_dict(),
# If expansion works (i.e., it doesn't rely on the matched fom groups),
# then cache it here to avoid repeated expansion later.
"units_expanded": _try_expand_var_or_none(conf["units"], self.expander),
"fom_name_expanded": _try_expand_var_or_none(fom, self.expander),
}
if conf["contexts"]:
foms[fom]["contexts"].extend(conf["contexts"])
for context in conf["contexts"]:
regex_str = self.expander.expand_var(fom_contexts[context]["regex"])
format_str = fom_contexts[context]["output_format"]
contexts[context] = {
"regex": re.compile(r"%s" % regex_str),
"format": format_str,
}
return files, contexts, foms
[docs]
def read_status(self):
"""Read status from an experiment's status file, if possible.
Set this experiment's status based on the status file in the experiment
run directory, if it exists. If it doesn't exist, set its status to
experiment_status.UNKNOWN
"""
status_path = os.path.join(
self.expander.expand_var_name(self.keywords.experiment_run_dir), self._status_file_name
)
if os.path.isfile(status_path):
exp_lock = self.experiment_lock()
with lk.ReadTransaction(exp_lock):
with open(status_path) as f:
status_data = spack.util.spack_json.load(f)
self.variables[self.keywords.experiment_status] = status_data[
self.keywords.experiment_status
]
else:
self.set_status(experiment_status.UNKNOWN)
[docs]
def set_status(self, status=experiment_status.UNKNOWN):
"""Set the status of this experiment"""
self.variables[self.keywords.experiment_status] = status.name
[docs]
def get_status(self):
"""Get the status of this experiment"""
return self.variables[self.keywords.experiment_status]
register_phase("write_status", pipeline="analyze", run_after=["analyze_experiments"])
register_phase("write_status", pipeline="setup", run_after=["make_experiments"])
def _write_status(self, workspace, app_inst=None):
"""Phase to write an experiment's ramble_status.json file"""
status_data = {}
status_data[self.keywords.experiment_status] = self.expander.expand_var_name(
self.keywords.experiment_status
)
exp_dir = self.expander.expand_var_name(self.keywords.experiment_run_dir)
status_path = os.path.join(exp_dir, self._status_file_name)
if os.path.exists(exp_dir):
exp_lock = self.experiment_lock()
with lk.ReadTransaction(exp_lock):
with open(status_path, "w+") as f:
spack.util.spack_json.dump(status_data, f)
register_phase("deploy_artifacts", pipeline="pushdeployment")
def _deploy_artifacts(self, workspace, app_inst=None):
def _copy_files(obj_inst, obj_type, repo_root):
flist = ramble.repository.list_object_files(obj_inst, obj_type)
for type_dir_name, obj_path in flist:
obj_src_dir_path = os.path.dirname(obj_path)
obj_dir_name = os.path.basename(obj_src_dir_path)
obj_dest_dir = os.path.join(repo_root, type_dir_name, obj_dir_name)
shutil.rmtree(obj_dest_dir, ignore_errors=True)
shutil.copytree(
obj_src_dir_path,
obj_dest_dir,
ignore=shutil.ignore_patterns("*.pyc", "__pycache__"),
)
repo_path = os.path.join(workspace.named_deployment, "object_repo")
repo_lock = lk.Lock(os.path.join(repo_path, ".ramble-obj-repo.lock"))
with lk.WriteTransaction(repo_lock):
for obj_type, obj in self._objects():
_copy_files(obj, obj_type, repo_path)
register_builtin("env_vars", required=True)
[docs]
def env_vars(self):
command = []
# ensure license variables are set / modified
# Process one scope at a time, to ensure
# highest-precedence scopes are processed last
config_scopes = ramble.config.scopes()
shell = ramble.config.get("config:shell")
action_funcs = ramble.util.env.action_funcs
for scope in config_scopes:
license_conf = ramble.config.config.get_config("licenses", scope=scope)
if license_conf:
if self.name in license_conf:
app_licenses = license_conf[self.name]
if app_licenses:
# Append logic to source file which contains the exports
shell = ramble.config.get("config:shell")
command.append(
f"{source_str(shell)} {{license_input_dir}}/{self.license_inc_name}"
)
# Process environment variable actions
for env_var_set in self._env_variable_sets:
for action, conf in env_var_set.items():
(env_cmds, _) = action_funcs[action](conf, self.expander, set(), shell=shell)
for cmd in env_cmds:
if cmd:
command.append(cmd)
for mod_inst in self._modifier_instances:
for action, conf in mod_inst.all_env_var_modifications():
(env_cmds, _) = action_funcs[action](conf, self.expander, set(), shell=shell)
for cmd in env_cmds:
if cmd:
command.append(cmd)
return command
[docs]
def evaluate_success(self):
"""Hook for applications to evaluate custom success criteria
Expected to perform analysis and return either true or false.
"""
return True
def _object_templates(self, workspace):
"""Return templates defined from different objects associated with the app_inst"""
run_dir = self.expander.experiment_run_dir
replacements = workspace.workspace_paths()
expander = self.expander
def _expand_path(path):
return ramble.util.path.substitute_path_variables(
expander.expand_var(path), local_replacements=replacements
)
def _get_template_config(obj, tpl_config, obj_type):
# Resolve the source path
src_path_config = _expand_path(tpl_config["src_path"])
if not os.path.isabs(src_path_config):
# Search up the object chain to resolve source path
found = False
object_paths = [e[1] for e in ramble.repository.list_object_files(obj, obj_type)]
searched_paths = []
for obj_path in object_paths:
src_path = os.path.join(os.path.dirname(obj_path), src_path_config)
if os.path.isfile(src_path):
found = True
break
searched_paths.append(src_path)
if not found:
raise ApplicationError(
f"Object {obj.name} is missing template file {src_path_config}. "
f"Searched paths: {searched_paths}"
)
else:
if not os.path.isfile(src_path_config):
raise ApplicationError(f"Template file {src_path_config} does not exist")
src_path = src_path_config
# Resolve the destination path
tpl_ext = ".tpl"
dest_path_config = tpl_config["dest_path"]
if dest_path_config is None:
dest_path = os.path.basename(src_path)
if dest_path.endswith(tpl_ext):
dest_path = dest_path[: -len(tpl_ext)]
else:
dest_path = _expand_path(tpl_config["dest_path"])
if not os.path.isabs(dest_path):
dest_path = os.path.join(run_dir, dest_path)
return (obj, {**tpl_config, "src_path": src_path, "dest_path": dest_path})
for obj_type, obj in self._objects():
for tpl_conf in obj.templates.values():
yield _get_template_config(obj, tpl_conf, obj_type=obj_type)
def _render_object_templates(self, extra_vars_origin, workspace):
for obj, tpl_config in self._object_templates(workspace):
extra_vars = extra_vars_origin.copy()
src_path = tpl_config["src_path"]
content = workspace.read_file_content(src_path)
extra_vars_dict = tpl_config.get("extra_vars")
if extra_vars_dict is not None:
extra_vars.update(extra_vars_dict)
extra_vars_func_name = tpl_config.get("extra_vars_func_name")
if extra_vars_func_name is not None:
extra_vars_func = getattr(obj, extra_vars_func_name)
extra_vars.update(extra_vars_func())
rendered = self.expander.expand_var(content, extra_vars=extra_vars)
out_path = tpl_config["dest_path"]
perm = tpl_config.get("content_perm", _DEFAULT_CONTENT_PERM)
with open(out_path, "w+") as f_out:
f_out.write(rendered)
f_out.write("\n")
os.chmod(out_path, perm)
def _define_object_template_vars(self, workspace):
var_attr = {
"type": ramble.keywords.key_type.reserved,
"level": ramble.keywords.output_level.variable,
}
for obj, tpl_config in self._object_templates(workspace):
var_name = tpl_config["var_name"]
if var_name is not None:
if var_name in self.variables:
old_var = f"_old_{var_name}"
self.variables[old_var] = self.variables[var_name]
self.keywords.update_keys({old_var: var_attr})
self.variables[var_name] = tpl_config["dest_path"]
self.keywords.update_keys({var_name: var_attr})
if callable(getattr(obj, "template_render_vars", None)):
render_vars = obj.template_render_vars()
self.variables.update(render_vars)
for name in render_vars.keys():
self.keywords.update_keys({name: var_attr})
def _objects(self, exclude_types=None):
"""Return a tuple for each object instance associated with the app_inst.
The tuple format is (obj_type, obj_inst). This is used to iterate over
all associated objects with the given app_inst.
Args:
exclude_types (list(obj_type) | None): object types to exclude
"""
if exclude_types is None:
exclude_types = set()
else:
exclude_types = set(exclude_types)
if ramble.repository.ObjectTypes.applications not in exclude_types:
yield (ramble.repository.ObjectTypes.applications, self)
if ramble.repository.ObjectTypes.modifiers not in exclude_types:
for mod_inst in self._modifier_instances:
yield (ramble.repository.ObjectTypes.modifiers, mod_inst)
if ramble.repository.ObjectTypes.package_managers not in exclude_types:
if self.package_manager is not None:
yield (ramble.repository.ObjectTypes.package_managers, self.package_manager)
if ramble.repository.ObjectTypes.workflow_managers not in exclude_types:
if self.workflow_manager is not None:
yield (ramble.repository.ObjectTypes.workflow_managers, self.workflow_manager)
[docs]
class ApplicationError(RambleError):
"""
Exception that is raised by applications
"""
[docs]
class ExecutableNameError(RambleError):
"""
Exception raised when a name collision in executables happens
"""
[docs]
class PhaseCycleDetectedError(ApplicationError):
"""
Exception raised when a cycle is detected while ordering phases
"""
[docs]
class InvalidPhaseError(ApplicationError):
"""
Exception raised when a phase is used but not defined
"""
[docs]
class ChainCycleDetectedError(ApplicationError):
"""
Exception raised when a cycle is detected in a defined experiment chain
"""
[docs]
class InvalidChainError(ApplicationError):
"""
Exception raised when a invalid chained experiment is defined
"""
[docs]
class ObjectValidationError(ApplicationError):
"""Error when an object validator fails"""