# Copyright 2022-2026 The Ramble Authors
#
# Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
# https://www.apache.org/licenses/LICENSE-2.0> or the MIT license
# <LICENSE-MIT or https://opensource.org/licenses/MIT>, at your
# option. This file may not be copied, modified, or distributed
# except according to those terms.
import contextlib
import copy
import datetime
import fnmatch
import itertools
import os
import re
import shutil
from collections import defaultdict
from typing import Optional, Set
import llnl.util.filesystem as fs
import llnl.util.tty as tty
import llnl.util.tty.log as log
import ramble.config
import ramble.context
import ramble.error
import ramble.experiment_set
import ramble.keywords
import ramble.repository
import ramble.results_table
import ramble.schema.applications
import ramble.schema.merged
import ramble.schema.workspace
import ramble.software_environments
import ramble.util.colors as rucolor
import ramble.util.hashing
import ramble.util.install_cache
import ramble.util.lock as lk
import ramble.util.path
import ramble.util.version
from ramble.mirror import MirrorStats
from ramble.namespace import namespace
from ramble.util.conversions import list_str_to_list
from ramble.util.logger import logger
from ramble.util.path import substitute_path_variables
import spack.util.spack_json as sjson
import spack.util.spack_yaml as syaml
import spack.util.url as url_util
import spack.util.web as web_util
# Workspace-related constants
#: Environment variable used to indicate the active workspace
RAMBLE_WORKSPACE_VAR = "RAMBLE_WORKSPACE"
#: Subdirectory where workspace configs are stored
WORKSPACE_CONFIG_PATH = "configs"
#: Name of subdirectory within workspace where tables are stored
WORKSPACE_TABLES_PATH = "tables"
#: Name of subdirectory within workspace where results are stored
WORKSPACE_RESULTS_PATH = "results"
#: Name of subdirectory within workspaces where logs are stored
WORKSPACE_LOG_PATH = "logs"
#: Name of subdirectory within workspaces where experiments are stored
WORKSPACE_EXPERIMENT_PATH = "experiments"
#: Name of subdirectory within workspaces where input files are stored
WORKSPACE_INPUT_PATH = "inputs"
#: Name of subdirectory within workspaces where software environment
#: are stored
WORKSPACE_SOFTWARE_PATH = "software"
#: Name of the subdirectory where workspace archives are stored
WORKSPACE_ARCHIVE_PATH = "archive"
#: Name of the subdirectory where shared files are stored
WORKSPACE_SHARED_PATH = "shared"
#: Name of the subdirectory where shared license files are stored
WORKSPACE_SHARED_LICENSE_PATH = "licenses"
#: Name of the subdirectory where deployments are stored
WORKSPACE_DEPLOYMENTS_PATH = "deployments"
#: regex for validating workspace names
VALID_WORKSPACE_NAME_RE = re.compile(r"^\w[\w-]*$")
# File that includes licensing information for sourcing
LICENSE_INC_NAME = "license.inc"
#: Extension for template files
# Only files that end with this extension are considered valid templates by Ramble
TEMPLATE_EXTENSION = ".tpl"
#: Directory name for auxiliary software files
AUXILIARY_SOFTWARE_DIR_NAME = "auxiliary_software_files"
CONFIG_SECTION = namespace.workspace
CONFIG_FILE_NAME = "ramble.yaml"
LICENSES_FILE_NAME = "licenses.yaml"
METADATA_FILE_NAME = "workspace_metadata.yaml"
WORKSPACE_ALL_EXPERIMENTS_FILE = "all_experiments"
WORKSPACE_EXECUTION_TEMPLATE = "execute_experiment" + TEMPLATE_EXTENSION
#: Name of lockfile within a workspace
LOCKFILE_NAME = "ramble.lock"
#: Config schema for application files
applications_schema = ramble.schema.applications.schema
#: Config file information for workspaces.
#: Keys are filenames, values are dictionaries describing the config files.
config_schema = ramble.schema.workspace.schema
#: Currently activated workspace
_active_workspace = None
[docs]
def valid_workspace_name(name):
return re.match(VALID_WORKSPACE_NAME_RE, name)
[docs]
def validate_workspace_name(name):
if not valid_workspace_name(name):
logger.debug(f"Validation failed for {name}")
raise ValueError(
(
"'%s': names must start with a letter, and only contain "
"letters, numbers, _, and -."
)
% name
)
return name
[docs]
def activate(ws):
"""Activate a workspace.
To activate a workspace, we add its configuration scope to the
existing Ramble configuration, and we set active to the current
workspace.
Arguments:
ws (Workspace): the workspace to activate
"""
global _active_workspace
# Fail early to avoid ending in an invalid state
if not isinstance(ws, Workspace):
raise TypeError(f"`ws` should be of type {Workspace.__name__}")
# Check if we need to reinitialize the store due to pushing the configuration
# below.
prepare_config_scope(ws)
logger.debug(f"Using workspace '{ws.root}'")
# Do this last, because setting up the config must succeed first.
_active_workspace = ws
[docs]
def deactivate():
"""Undo any configuration settings modified by ``activate()``."""
global _active_workspace
if not _active_workspace:
return
logger.debug(f"Deactivated workspace '{_active_workspace.root}'")
deactivate_config_scope(_active_workspace)
_active_workspace = None
[docs]
def prepare_config_scope(workspace):
"""Add workspace's scope to the global configuration search path."""
for scope in workspace.config_scopes():
ramble.config.config.push_scope(scope)
[docs]
def deactivate_config_scope(workspace):
"""Remove any scopes from workspace from the global config path."""
for scope in workspace.config_scopes():
ramble.config.config.remove_scope(scope.name)
[docs]
def all_workspace_names():
"""List the names of workspaces that currently exist."""
# just return empty if the workspace path does not exist. A read-only
# operation like list should not try to create a directory.
wspath = get_workspace_path()
if not os.path.exists(wspath):
return []
candidates = sorted(os.listdir(wspath))
names = []
for candidate in candidates:
configured = True
yaml_path = os.path.join(_root(candidate), WORKSPACE_CONFIG_PATH, CONFIG_FILE_NAME)
if not os.path.exists(yaml_path):
configured = False
if valid_workspace_name(candidate) and configured:
names.append(candidate)
return names
[docs]
def active_workspace():
"""Returns the active workspace when there is any"""
return _active_workspace
[docs]
def get_workspace_path():
"""Returns current directory of ramble-managed workspaces"""
path_in_config = ramble.config.get("config:workspace_dirs")
if not path_in_config:
# command above should have worked, so if it doesn't, error out:
logger.die("No config:workspace_dirs setting found in configuration!")
wspath = ramble.util.path.canonicalize_path(str(path_in_config))
return wspath
def _root(name):
"""Non-validating version of root(), to be used internally."""
wspath = get_workspace_path()
return os.path.join(wspath, name)
[docs]
def root(name):
"""Get the root directory for a workspace by name."""
validate_workspace_name(name)
return _root(name)
[docs]
def exists(name):
"""Whether a workspace with this name exists or not."""
if not valid_workspace_name(name):
return False
return os.path.isdir(root(name))
[docs]
def active(name):
"""True if the named workspace is active."""
return _active_workspace and name == _active_workspace.name
[docs]
def get_filepath(path, file_name):
if is_workspace_dir(path):
return os.path.join(path, WORKSPACE_CONFIG_PATH, file_name)
return None
[docs]
def config_file(path):
"""Returns the path to a workspace's ramble.yaml"""
return get_filepath(path, CONFIG_FILE_NAME)
[docs]
def licenses_file(path):
"""Returns the path to a workspace's licenses.yaml"""
return get_filepath(path, LICENSES_FILE_NAME)
[docs]
def all_config_files(path):
"""Returns path to all yaml files in workspace config directory"""
config_files = []
config_path = os.path.join(path, WORKSPACE_CONFIG_PATH)
for f in os.listdir(config_path):
if f.endswith(".yaml"):
config_files.append(os.path.join(config_path, f))
return config_files
[docs]
def template_path(ws_path, requested_template_name):
"""Returns the path to a workspace's template file"""
config_path = os.path.join(ws_path, WORKSPACE_CONFIG_PATH)
template_file = requested_template_name + TEMPLATE_EXTENSION
template_path = os.path.join(config_path, template_file)
return template_path
[docs]
def all_template_paths(path):
"""Returns (abs) path to available template files in the workspace"""
templates = []
config_path = os.path.join(path, WORKSPACE_CONFIG_PATH)
for root, _, files in os.walk(config_path):
for f in files:
if f.endswith(TEMPLATE_EXTENSION):
templates.append(os.path.join(root, f))
return templates
[docs]
def is_workspace_dir(path):
"""Whether a directory contains a ramble workspace."""
ret_val = os.path.isdir(path)
if ret_val:
ret_val = ret_val and os.path.exists(
os.path.join(path, WORKSPACE_CONFIG_PATH, CONFIG_FILE_NAME)
)
return ret_val
[docs]
def create(name, read_default_template=True):
"""Create a named workspace in Ramble"""
validate_workspace_name(name)
if exists(name):
raise RambleWorkspaceError("'%s': workspace already exists" % name)
return Workspace(root(name), read_default_template=read_default_template)
[docs]
def config_dict(yaml_data):
"""Get the configuration scope section out of a ramble.yaml"""
try:
key = ramble.config.first_existing(yaml_data, ramble.schema.workspace.keys)
return yaml_data[key]
except KeyError:
raise RambleActiveWorkspaceError(
"ramble.yaml needs to contain at least one of the "
f"required keys {ramble.schema.workspace.keys}"
) from None
[docs]
def get_workspace(args, cmd_name, required=False):
"""Used by commands to get the active workspace.
This first checks for a ``workspace`` argument, then looks at the
``active`` workspace. We check args first because Ramble's
subcommand arguments are parsed *after* the ``-s`` and ``-D``
arguments to ``ramble``. So there may be a ``workspace``
argument that is *not* the active workspace, and we give it
precedence.
This is used by a number of commands for determining whether there is
an active workspace.
If a workspace is not found *and* is required, print an error
message that says the calling command *needs* an active
workspace.
Arguments:
args (ramble.namespace): argparse namespace with command arguments
cmd_name (str): name of calling command
required (bool): if ``True``, raise an exception when no workspace
is found; if ``False``, just return ``None``
Returns:
(Workspace): if there is an arg or active workspace
"""
logger.debug("In get_workspace()")
workspace = getattr(args, namespace.workspace, None)
if workspace:
if exists(workspace):
return read(workspace)
elif is_workspace_dir(workspace):
return Workspace(workspace)
else:
raise RambleWorkspaceError("no workspace in %s" % workspace)
# try the active workspace. This is set by find_workspace (above)
if _active_workspace:
return _active_workspace
# elif not required:
else:
logger.die(
f"`ramble {cmd_name}` requires a workspace",
"activate a workspace first:",
" ramble workspace activate WRKSPC",
"or use:",
f" ramble -w WRKSPC {cmd_name} ...",
)
def _get_all_obj_var_names(obj, obj_type):
"""Return a set of all variables names defined in the given object."""
if obj is None:
if obj_type == ramble.repository.ObjectTypes.package_managers:
variant_name = namespace.package_manager
elif obj_type == ramble.repository.ObjectTypes.workflow_managers:
variant_name = namespace.workflow_manager
else:
raise ValueError("Only package manager and workflow manager types are supported")
variants_dict = ramble.config.get(namespace.variants)
obj_name = variants_dict.get(variant_name)
if obj_name is None:
return set()
else:
obj_name = obj
try:
obj_inst = ramble.repository.get(obj_name, object_type=obj_type)
except ramble.repository.UnknownObjectError:
return set()
vars = list(itertools.chain.from_iterable(obj_inst.object_variables.values()))
return {var.name for var in vars}
[docs]
class Workspace:
"""Class representing a working directory for workload
experiments
Each workspace must have a config directory, that contains 2
files by default.
- ramble.yaml
- execute_experiment.tpl
The ramble.yaml file is the overall configuration file for
this workspace. It defines all experiments, variables, and
the entire software stack.
The execute_experiment.tpl file is a template script that
constants the blueprints for running each experiment.
There are several ramble language features that can be used
within the script, to help it render properly for all
experiments.
Each file with the suffix of .tpl will be expanded into the
experiment directory, with the .tpl suffix removed.
Directories will be created for each experiment, when the
relevant phase of the application is executed. The workspace
provides a self contained execution environment where experiments
can be performed.
"""
inventory_file_name = "ramble_inventory.json"
hash_file_name = "workspace_hash.sha256"
def __init__(self, root, dry_run=False, read_default_template=True):
logger.debug(f"In workspace init. Root = {root}")
self.root = ramble.util.path.canonicalize_path(root)
self.txlock = lk.Lock(self._transaction_lock_path)
self.dry_run = dry_run
self.repeat_success_strict = True
self.read_default_template = read_default_template
self.configs = ramble.config.ConfigScope(namespace.workspace, self.config_dir)
self._templates = {}
self._auxiliary_software_files = {}
self.software_mirror_path = None
self.input_mirror_path = None
self.mirror_existed = None
self.software_mirror_stats = None
self.input_mirror_stats = None
self.input_mirror_cache = None
self.software_mirror_cache = None
self.software_environments = None
self.metadata = syaml.syaml_dict()
self.hash_inventory = {namespace.experiment: [], "versions": []}
version = ramble.util.version.get_version()
self.hash_inventory["versions"].append(
{
"name": namespace.ramble,
"version": version,
"digest": ramble.util.hashing.hash_string(version),
}
)
self.workspace_hash = None
self.results_tables = ramble.results_table.ResultsTables()
self.specs = []
self.config_sections = {}
self.install_cache = ramble.util.install_cache.SetCache()
# A per-package_manager dict mapping package spec to its install prefix.
# This can be re-used by all experiments of the workspace.
self.pkg_path_cache = defaultdict(dict)
# A simple dict mapping a file's src_path to its content.
# This is currently used as a cache for reading per-object template contents.
self._inmem_file_cache = {}
# A workspace-level cache that's used by objects to cache command returns.
self.object_command_cache = {}
self.results = self.default_results()
# A cache structured as {pkg_man: {env_name: pkg_list}}.
# It's used to cache package provenance info from different package managers.
self.pkg_prov_cache = defaultdict(dict)
# Key for each application config should be it's filepath
# Format for an application config should be:
# {
# 'filename': <filename>,
# 'path': <filepath>,
# 'raw_yaml': <raw_yaml>,
# 'yaml': <yaml>
# }
self.application_configs = {}
self.experiments_script = None
self._read()
# Create a logger to redirect certain prints from screen to log file
self.logger = log.log_output(echo=False, debug=tty.debug_level())
self.deployment_name = self.name
# Used by profiling phases
self.profile_config = None
def _re_read(self):
"""Reinitialize the workspace object if it has been written (this
may not be true if the workspace was just created in this running
instance of ramble)."""
for _, section in self.config_sections.items():
if not os.path.exists(section["filename"]):
return
self.clear()
self._read()
def _read(self):
# Create the workspace config section
with lk.ReadTransaction(self.txlock):
self.config_sections[namespace.workspace] = {
"filename": self.config_file_path,
"path": self.config_file_path,
"schema": config_schema,
"section_filename": self.config_file_path,
"raw_yaml": None,
"yaml": None,
}
keywords = ramble.keywords.keywords
read_default = not os.path.exists(self.config_file_path)
if read_default:
self._read_config(CONFIG_SECTION, self._default_config_yaml())
else:
with open(self.config_file_path) as f:
self._read_config(CONFIG_SECTION, f)
read_default_script = self.read_default_template
ext_len = len(TEMPLATE_EXTENSION)
if os.path.exists(self.config_dir):
for root, _, files in os.walk(self.config_dir):
processed_root = root.replace(self.config_dir, "")
if len(processed_root) > 1 and processed_root[0] == os.sep:
processed_root = processed_root[1:]
if len(processed_root) > 1:
processed_root += os.sep
for filename in files:
if filename.endswith(TEMPLATE_EXTENSION):
read_default_script = False
template_name = processed_root + filename[0:-ext_len]
template_path = os.path.join(root, filename)
if keywords.is_reserved(template_name):
raise RambleInvalidTemplateNameError(
f"Template file {filename} results in a "
f"template name of {template_name}"
+ " which is reserved by ramble."
)
with open(template_path) as f:
self._read_template(template_name, f.read())
if os.path.exists(self.auxiliary_software_dir):
for filename in os.listdir(self.auxiliary_software_dir):
aux_file_path = os.path.join(self.auxiliary_software_dir, filename)
with open(aux_file_path) as f:
self._read_auxiliary_software_file(filename, f.read())
if read_default_script:
template_name = WORKSPACE_EXECUTION_TEMPLATE[0:-ext_len]
self._read_template(template_name, self._template_execute_script())
self._read_metadata()
if hasattr(self, "results") and self.results:
self.results["metadata"] = self.metadata
@classmethod
def _template_execute_script(self):
shell = ramble.config.get("config:shell")
shell_path = os.path.join("/bin/", shell)
script = (
f"#!{shell_path}\n"
+ """\
# This is a template execution script for
# running the execute pipeline.
#
# Variables surrounded by curly braces will be expanded
# when generating a specific execution script.
# Some example variables are:
# - experiment_run_dir (Will be replaced with the experiment directory)
# - command (Will be replaced with the command to run the experiment)
# - log_dir (Will be replaced with the logs directory)
# - experiment_name (Will be replaced with the name of the experiment)
# - workload_run_dir (Will be replaced with the directory of the workload
# - application_name (Will be replaced with the name of the application)
# - n_nodes (Will be replaced with the required number of nodes)
# Any experiment parameters will be available as variables as well.
{workflow_banner}
cd "{experiment_run_dir}"
{command}
"""
)
return script
@classmethod
def _default_config_yaml(self):
# Construct string for default variants
variant_string = ""
# Set default workflow_manager to the user-managed one,
# which provides defaults for required variables such as
# batch_submit and mpi_command.
all_variants = {"workflow_manager": "user-managed"}
for scope in ramble.config.scopes():
if namespace.workspace not in scope:
variant_dict = ramble.config.get(namespace.variants, scope=scope)
for var, val in variant_dict.items():
all_variants[var] = val
variant_defs = []
for var, val in all_variants.items():
variant_defs.append(f" {var}: {val}")
if variant_defs:
merged_string = "\n".join(variant_defs)
variant_string = f""" {namespace.variants}:
{merged_string}"""
return f"""\
# This is a ramble workspace config file.
#
# It describes the experiments, the software stack
# and all variables required for ramble to configure
# experiments.
# As an example, experiments can be defined as follows.
# applications:
# hostname: # Application name, as seen in `ramble list`
# variables:
# iterations: '5'
# workloads:
# serial: # Workload name, as seen in `ramble info <app>`
# variables:
# type: 'test'
# experiments:
# single_node: # Arbitrary experiment name
# variables:
# n_ranks: '{{processes_per_node}}'
ramble:
env_vars:
set:
OMP_NUM_THREADS: '{{n_threads}}'
{variant_string}
{namespace.variables}:
processes_per_node: 1
{namespace.application}: {{}}
{namespace.software}:
{namespace.packages}: {{}}
{namespace.environments}: {{}}
"""
def _read_application_config(self, path, f, raw_yaml=None):
"""Read an application configuration file"""
if path not in self.application_configs:
self.application_configs[path] = {
"filename": os.path.basename(path),
"path": path,
"schema": applications_schema,
"raw_yaml": None,
"yaml": None,
}
config = self.application_configs[path]
self._read_yaml(config, f, raw_yaml)
def _read_config(self, section, f, raw_yaml=None):
"""Read configuration file"""
config = self.config_sections[section]
self._read_yaml(config, f, raw_yaml)
self._check_deprecated(config["yaml"])
def _read_metadata(self):
"""Read workspace metadata file
If a metadata file exists in the workspace root, read it in, and
populate this workspace's metadata object with its contents.
"""
metadata_file_path = os.path.join(self.root, METADATA_FILE_NAME)
if os.path.exists(metadata_file_path):
with open(metadata_file_path) as f:
self.metadata = syaml.load(f)
else:
self.metadata = syaml.syaml_dict()
self.metadata[namespace.metadata] = syaml.syaml_dict()
def _write_metadata(self):
"""Write out workspace metadata file
Create, and populate the metadata file in the root of the workspace.
This file can be used to house cross-pipeline information.
"""
metadata_file_path = os.path.join(self.root, METADATA_FILE_NAME)
with open(metadata_file_path, "w+") as f:
syaml.dump(self.metadata, stream=f)
def _check_deprecated(self, config):
"""
Trap and warn (or error) on deprecated configuration settings
in the workspace config.
"""
error_sections = []
deprecated_sections = []
if len(deprecated_sections) > 0:
logger.warn("Your workspace configuration contains deprecated sections:")
for section in deprecated_sections:
logger.warn(f" {section}")
logger.warn("Please see the current workspace documentation and update")
logger.warn("to ensure your workspace continues to function properly")
if len(error_sections) > 0:
logger.warn("Your workspace configuration contains invalid sections:")
for section in deprecated_sections:
logger.warn(f" {section}")
logger.die("Please update to the latest format.")
def _read_yaml(self, config, f, raw_yaml=None):
if raw_yaml:
_, config["yaml"] = _read_yaml(f, config["schema"])
config["raw_yaml"], _ = _read_yaml(raw_yaml, config["schema"])
else:
config["raw_yaml"], config["yaml"] = _read_yaml(f, config["schema"])
def _read_template(self, name, f):
"""Read a template file"""
self._templates[name] = {
"contents": f,
"digest": ramble.util.hashing.hash_string(f),
}
def _read_auxiliary_software_file(self, name, f):
"""Read an auxiliary software file for generated software directories"""
self._auxiliary_software_files[name] = f
[docs]
def write(self, software_dir=None, inputs_dir=None):
"""Write an in-memory workspace to its location on disk."""
with lk.WriteTransaction(self.txlock, acquire=self._re_read):
# Ensure required directory structure exists
fs.mkdirp(self.path)
fs.mkdirp(self.config_dir)
fs.mkdirp(self.auxiliary_software_dir)
fs.mkdirp(self.log_dir)
fs.mkdirp(self.experiment_dir)
if inputs_dir:
os.symlink(os.path.abspath(inputs_dir), self.input_dir, target_is_directory=True)
elif not os.path.exists(self.input_dir):
fs.mkdirp(self.input_dir)
if software_dir:
os.symlink(
os.path.abspath(software_dir), self.software_dir, target_is_directory=True
)
elif not os.path.exists(self.software_dir):
fs.mkdirp(self.software_dir)
fs.mkdirp(self.shared_dir)
fs.mkdirp(self.shared_license_dir)
self._write_config(CONFIG_SECTION)
self._write_templates()
self._write_metadata()
def _write_config(self, section, force=False):
"""Update YAML config file for this workspace, based on
changes and write it"""
config = self.config_sections[section]
changed = not yaml_equivalent(config["raw_yaml"], config["yaml"])
written = os.path.exists(config["path"])
if changed or not written or force:
config["raw_yaml"] = copy.deepcopy(config["yaml"])
with fs.write_tmp_and_move(config["path"]) as f:
_write_yaml(config["yaml"], f, config["schema"])
def _write_templates(self):
"""Write all templates out to workspace"""
for name, conf in self._templates.items():
template_path = self.template_path(name)
with open(template_path, "w+") as f:
f.write(conf["contents"])
[docs]
def clear(self):
self.config_sections = {}
self.application_configs = []
self._previous_active = None # previously active environment
self.specs = []
@property
def all_experiments_path(self):
return os.path.join(self.root, WORKSPACE_ALL_EXPERIMENTS_FILE)
[docs]
def build_experiment_set(self, die_on_validate_error=True):
"""Create an experiment set representing this workspace"""
experiment_set = ramble.experiment_set.ExperimentSet(self)
experiment_set.set_base_var("experiments_file", self.all_experiments_path)
for workloads, application_context in self.all_applications():
experiment_set.set_application_context(application_context)
for experiments, workload_context in self.all_workloads(workloads):
experiment_set.set_workload_context(workload_context)
for _, experiment_context in self.all_experiments(experiments):
experiment_set.set_experiment_context(
experiment_context, die_on_validate_error=die_on_validate_error
)
experiment_set.build_experiment_chains()
return experiment_set
[docs]
def all_applications(self):
"""Iterator over applications
Returns application, context
where context contains the platform level variables that
should be applied.
"""
ws_dict = self._get_workspace_dict()
logger.debug(f" With ws dict: {ws_dict}")
# Iterate over applications in ramble.yaml first
app_dict = ramble.config.config.get_config(namespace.application)
for application, contents in app_dict.items():
app_name, _, maybe_version = application.partition("@")
if maybe_version:
contents[namespace.version] = maybe_version
application_context = ramble.context.create_context_from_dict(app_name, contents)
yield contents, application_context
logger.debug(" Iterating over configs in directories...")
# Iterate over applications defined in application directories
# files after the ramble.yaml file is complete
for app_conf in self.application_configs:
config = self._get_application_dict_config(app_conf)
if namespace.application not in config:
logger.msg(f"No applications in config file {app_conf}")
app_dict = config[namespace.application]
for application, contents in app_dict.items():
app_name, _, maybe_version = application.partition("@")
if maybe_version:
contents[namespace.version] = maybe_version
application_context = ramble.context.create_context_from_dict(app_name, contents)
yield contents, application_context
[docs]
def all_workloads(self, application):
"""Iterator over workloads in an application dict
Returns workload, context
where context contains the application level variables that
should be applied.
"""
if namespace.workload not in application:
logger.msg("No workloads in application")
return
workloads = application[namespace.workload]
for workload, contents in workloads.items():
workload_context = ramble.context.create_context_from_dict(workload, contents)
yield contents, workload_context
[docs]
def all_experiments(self, workload):
"""Iterator over experiments in a workload dict
Returns experiment, context
Where context contains the workload level variables that
should be applied.
"""
if namespace.experiment not in workload:
logger.msg("No experiments in workload")
return
experiments = workload[namespace.experiment]
for experiment, contents in experiments.items():
experiment_context = ramble.context.create_context_from_dict(experiment, contents)
yield contents, experiment_context
[docs]
def print_config(self):
workspace_dict = self._get_workspace_dict()
print(f"\n{syaml.dump(workspace_dict)}")
[docs]
def manage_environments(
self,
env_name,
env_packages="",
external_path=None,
remove=False,
overwrite=False,
):
"""Manipulate software environments
Create, change, remove, and augment software environment definitions.
Args:
env_name (str): Name of environment to manipulate
env_packages (str): (Optional) Comma delimited list of packages to add into
this environment
external_path (str): (Optional) Path to external environment definition
remove (bool): Whether the named environment should be removed from the workspace
overwrite (bool): Whether new definition should overwrite existing definitions
"""
package_list = []
if env_packages:
package_list = env_packages.split(",")
if package_list and external_path is not None:
logger.die("Can only manage environments with one of package_list or external_path")
software_dict = self.get_software_dict().copy()
if namespace.environments in software_dict:
environments = software_dict[namespace.environments]
else:
environments = None
# Ensure package dict is an syaml_dict, for formatting
if not environments:
software_dict[namespace.environments] = syaml.syaml_dict()
environments = software_dict[namespace.environments]
if remove:
if env_name in environments:
del environments[env_name]
else:
if env_name in environments:
conflicting_type = (
namespace.external_env in environments[env_name]
and package_list
or namespace.packages in environments[env_name]
and external_path
)
if overwrite:
del environments[env_name]
elif conflicting_type:
logger.die(
"Cannot convert between internal and "
"external environments without --overwrite"
)
if env_name not in environments:
environments[env_name] = syaml.syaml_dict()
if package_list:
environments[env_name][namespace.packages] = package_list.copy()
elif external_path:
environments[env_name][namespace.external_env] = external_path
if not self.dry_run:
ramble.config.config.update_config(
namespace.software, software_dict, scope=self.ws_file_config_scope_name()
)
else:
workspace_dict = self._get_workspace_dict()
workspace_dict[namespace.software] = software_dict
[docs]
def manage_packages(
self,
pkg_name,
pkg_spec="",
compiler_pkg=None,
compiler_spec=None,
package_manager_prefix=None,
remove=False,
overwrite=False,
):
"""Manage workspace package definitions
Create, remove, update, or augment package definitions.
Args:
pkg_name (str): Name of package to manipulate
pkg_spec (str): Package spec for the package manager
compiler_pkg (str): Name of the package to use as a compiler for this package
compiler_spec (str): When this package is used as a compiler for
another, the string to refer to this package.
package_manager_prefix (str): A package manager specific prefix to
apply to package attribute definitions
remove (bool): Whether the named package should be removed from the workspace
overwrite (bool): Whether colliding definitions should be overwritten
"""
software_dict = self.get_software_dict().copy()
if namespace.packages in software_dict:
packages = software_dict[namespace.packages]
else:
packages = None
# Ensure package dict is an syaml_dict, for formatting
if not packages:
software_dict[namespace.packages] = syaml.syaml_dict()
packages = software_dict[namespace.packages]
if remove:
if pkg_name in packages:
del packages[pkg_name]
else:
if not pkg_spec:
logger.die("Cannot define a package without a --pkg-spec attribute")
pkg_def = syaml.syaml_dict()
prefix = ""
if package_manager_prefix:
prefix = f"{package_manager_prefix}_"
pkg_def[f"{prefix}{namespace.pkg_spec}"] = pkg_spec
if compiler_pkg:
pkg_def[f"{prefix}{namespace.compiler}"] = compiler_pkg
if compiler_spec:
pkg_def[f"{prefix}{namespace.compiler_spec}"] = compiler_spec
if pkg_name in packages:
for attr, val in packages[pkg_name].items():
if attr in pkg_def and pkg_def[attr] != val and not overwrite:
logger.warn(
f"Cannot overwrite existing value of {attr} without --overwrite"
)
del pkg_def[attr]
else:
packages[pkg_name] = syaml.syaml_dict()
for attr, val in pkg_def.items():
packages[pkg_name][attr] = val
if not self.dry_run:
ramble.config.config.update_config(
namespace.software, software_dict, scope=self.ws_file_config_scope_name()
)
else:
workspace_dict = self._get_workspace_dict()
workspace_dict[namespace.software] = software_dict
[docs]
def squash_and_print_config(self, included_section=None, excluded_section=None):
workspace_dict = self._get_workspace_dict()
# Remove includes if they were defined before.
if "include" in workspace_dict["ramble"]:
del workspace_dict["ramble"]["include"]
for section in ramble.config.section_schemas:
keep = True
if included_section:
keep = False
for pattern in included_section:
if fnmatch.fnmatch(section, pattern):
keep = True
if excluded_section:
for pattern in excluded_section:
if fnmatch.fnmatch(section, pattern):
keep = False
if keep:
section_dict = ramble.config.get(section)
if section_dict:
workspace_dict["ramble"][section] = section_dict
print(f"\n{syaml.dump(workspace_dict, Dumper=syaml.OrderedLineDumper)}")
[docs]
def add_experiments(
self,
application,
workload_name_variable,
workload_filters,
include_default_variables,
variable_filters,
variable_definitions,
experiment_name,
package_manager=None,
workflow_manager=None,
zips=None,
matrix=None,
overwrite=False,
):
"""Add new experiments to this workspace
Iterate over the workloads of the input application and define new
experiments for each workload that matches any filter provided in
workload_filters.
Args:
application (str): Name of application to define experiments for
workload_name_variable (str): Name of variable to contain workload names,
if the workload names should be collapsed
workload_filters (list(str)): List of filters to downselect workloads with
include_default_variables (bool): Whether to include default variables in the
resulting config or not
variable_filters (list(str)): List of filters to downselect variables with
variable_definitions (list(str)): List of variable definitions to use
within generated experiments
experiment_name (str): The name of the experiments to add
package_manager (str): Name of package manager to use for the generated experiments
workflow_manager (str): Name of workflow manager to use for the generated experiments
zips (list(str) | None): List of strings representing zips to define, in the
format zipname=[var1,var2,var3]
matrix (str): String representing a matrix to define within the
experiment in the format of var1,var2,var3.
overwrite (bool): Whether to overwrite existing definitions that
collide with new definitions or not.
"""
if zips is None:
zips = []
def yaml_add_comment_before_key(
base, key, comment, column=None, clear=False, start_char="#"
):
"""
Insert a comment before the provided key within the base commented
object.
Args:
base: Typically a CommentedMap, but the object comments should
be added to
key: Key in base object to inject the comment before.
column (int): Column to start the comment at. If not specified,
will use previously defined comments to determine
indentation.
clear (bool): Whether to clear previous comments or not
start_char (str): Character to begin the comment with
"""
import ruamel.yaml as yaml
key_comment = base.ca.items.setdefault(key, [None, [], None, None])
if clear:
key_comment[1] = []
comment_list = key_comment[1]
if comment:
comment_start = f"{start_char} "
if comment[-1] == "\n":
comment = comment[:-1] # strip final newline if there
else:
comment_start = f"{start_char}"
if column is None:
if comment_list:
# if there already are other comments get the column from them
column = comment_list[-1].start_mark.column
else:
column = 0
start_mark = yaml.error.Mark(None, None, None, column, None, None)
comment_list.append(
yaml.tokens.CommentToken(comment_start + comment + "\n", start_mark, None)
)
return base
import ruamel.yaml as yaml
edited = False
workspace_vars = self.get_workspace_vars()
apps_dict = self.get_applications().copy()
app_inst = ramble.repository.get(application)
app_inst.validate_version()
var_def_dict = {}
def_regex = re.compile(r"\s*=\s*")
for definition in variable_definitions:
m = def_regex.search(definition)
if m:
key = definition[0 : m.start()]
value = list_str_to_list(definition[m.end() :])
var_def_dict[key] = value
else:
logger.die(
f"Invalid variable definition provided: {definition}. "
+ "Accepted form is 'key=value'"
)
if application not in apps_dict:
apps_dict[application] = syaml.syaml_dict()
apps_dict[application][namespace.workload] = syaml.syaml_dict()
workloads_dict = apps_dict[application][namespace.workload]
exp_zips = {}
for zip_def in zips:
m = def_regex.match(zip_def)
if m:
key = m.group("key")
value = list_str_to_list(m.group("value"))
exp_zips[key] = value
else:
logger.die(
f"Invalid zip definition provided: {zip_def}. "
+ "Accepted form is 'zipname=[var1,var2,var3]'"
)
exp_matrix = []
if matrix:
for part in matrix.split(","):
exp_matrix.append(part)
# Unpack all workload names from `when` sets
all_workload_names = set()
for _, workloads in app_inst.workloads.items():
for workload in workloads.keys():
all_workload_names.add(workload)
workload_names = []
for workload in all_workload_names:
add_workload = False
for wl_filter in workload_filters:
if fnmatch.fnmatch(workload, wl_filter):
add_workload = True
break
# Don't add this experiment if it already exists in the workspace
if add_workload:
if application in apps_dict:
subdict = apps_dict[application]
if namespace.workload in subdict:
subdict = subdict[namespace.workload]
if workload in subdict:
subdict = subdict[workload]
if namespace.experiment in subdict:
subdict = subdict[namespace.experiment]
if experiment_name in subdict:
exp_name = f"{application}.{workload}.{experiment_name}"
if not overwrite:
logger.warn(
f"Experiment {exp_name} is defined already. "
+ "To overwrite, use '--overwrite'"
)
add_workload = overwrite
if add_workload:
workload_names.append(workload)
if workload_name_variable:
var_def_dict[workload_name_variable] = workload_names.copy()
workload_names = [ramble.expander.Expander.expansion_str(workload_name_variable)]
obj_var_names = _get_all_obj_var_names(
workflow_manager, obj_type=ramble.repository.ObjectTypes.package_managers
) | _get_all_obj_var_names(
workflow_manager, obj_type=ramble.repository.ObjectTypes.workflow_managers
)
for workload_name in workload_names:
edited = True
if workload_name not in workloads_dict:
workloads_dict[workload_name] = syaml.syaml_dict()
workloads_dict[workload_name][namespace.experiment] = syaml.syaml_dict()
exps_dict = workloads_dict[workload_name][namespace.experiment]
exps_dict[experiment_name] = syaml.syaml_dict()
exp_dict = exps_dict[experiment_name]
if package_manager is not None or workflow_manager is not None:
exp_dict[namespace.variants] = syaml.syaml_dict()
variants_dict = exp_dict[namespace.variants]
if package_manager is not None:
variants_dict[namespace.package_manager] = package_manager
if workflow_manager is not None:
variants_dict[namespace.workflow_manager] = workflow_manager
if namespace.variables not in exp_dict:
exp_dict[namespace.variables] = yaml.comments.CommentedMap()
vars_dict = exp_dict[namespace.variables]
# Ensure required variables are defined
for key in app_inst.keywords.all_required_keys():
# Do not define missing required variables that are defined in
# the associated package and workflow managers.
# TODO: should include consideration for when clause, right now
# the `selected_variables` property cannot be used due to no associated
# expander at this stage.
if key not in workspace_vars and key not in obj_var_names:
vars_dict[key] = ""
# Only extract variable defaults if requested.
# This is mutually exclusive with workload_name_variable
if include_default_variables:
# At this point we should only have a valid workload name
workload = app_inst.workloads[workload_name]
if workload.variables:
first_var = True
for var in workload.variables.values():
keep_var = False
for var_filter in variable_filters:
if fnmatch.fnmatch(var.name, var_filter):
keep_var = True
break
if keep_var:
vars_dict[var.name] = var.default
# Add blank line before all variables except
# the first
if first_var:
first_var = False
else:
yaml_add_comment_before_key(
vars_dict, var.name, "", column=17, start_char=""
)
if var.description:
yaml_add_comment_before_key(
vars_dict, var.name, var.description, column=17
)
if len(var.values) > 1 or var.values[0] is not None:
yaml_add_comment_before_key(
vars_dict,
var.name,
f"Suggested values: {var.values}",
column=17,
)
if workload.environment_variables:
if namespace.env_var not in exps_dict[experiment_name]:
exp_dict[namespace.env_var] = syaml.syaml_dict()
exp_dict[namespace.env_var]["set"] = syaml.syaml_dict()
env_vars_dict = exp_dict[namespace.env_var]["set"]
for env_var in workload.environment_variables.values():
env_vars_dict[env_var.name] = env_var.value
# Add any variables that are defined to the variables dict
if var_def_dict:
vars_dict.update(var_def_dict)
if exp_zips:
if namespace.zips not in exp_dict:
exp_dict[namespace.zips] = exp_zips.copy()
if exp_matrix:
if namespace.matrix not in exp_dict:
exp_dict[namespace.matrix] = exp_matrix.copy()
if edited and not self.dry_run:
ramble.config.config.update_config(
namespace.application, apps_dict, scope=self.ws_file_config_scope_name()
)
elif edited:
workspace_dict = self._get_workspace_dict()
workspace_dict[namespace.ramble][namespace.application] = apps_dict
[docs]
def concretize(self, force=False, quiet=False):
"""Concretize software definitions for defined experiments
Extract suggested software for experiments defined in a workspace, and
ensure the software environments are defined properly.
Args:
force (bool): Whether to overwrite conflicting definitions of named packages or not
quiet (bool): Whether to silently ignore conflicts or not
"""
full_software_dict = self.get_software_dict()
if (
namespace.packages not in full_software_dict
or not full_software_dict[namespace.packages]
):
full_software_dict[namespace.packages] = syaml.syaml_dict()
if (
namespace.environments not in full_software_dict
or not full_software_dict[namespace.environments]
):
full_software_dict[namespace.environments] = syaml.syaml_dict()
packages_dict = full_software_dict[namespace.packages]
environments_dict = full_software_dict[namespace.environments]
newly_created_packages = set()
self.software_environments = ramble.software_environments.SoftwareEnvironments(self)
experiment_set = self.build_experiment_set(die_on_validate_error=False)
force_prefix = False
pkgman_prefixes = set()
for _, app_inst, _ in experiment_set.all_experiments():
app_inst.build_modifier_instances()
app_inst.define_variables_for_template_path(self)
if app_inst.package_manager is not None:
pkgman_prefixes.add(app_inst.package_manager.spec_prefix)
force_prefix = force_prefix or not app_inst.package_manager.allow_unprefixed_specs
force_prefix = force_prefix or len(pkgman_prefixes) > 1
for _, app_inst, _ in experiment_set.all_experiments():
app_inst.build_modifier_instances()
app_inst.define_variables_for_template_path(self)
env_name_str = app_inst.expander.expansion_str(ramble.keywords.keywords.env_name)
env_name = app_inst.expander.expand_var(env_name_str)
if app_inst.package_manager is None:
continue
compiler_packages = app_inst.package_manager.get_experiment_compilers(
app_inst=app_inst, prefixed=force_prefix
)
for comp, definitions in compiler_packages.items():
for info in definitions:
if (
not quiet
and comp in packages_dict
and info.conflict_dict(packages_dict[comp])
):
logger.debug(f" Spec 1: {str(info)}")
logger.debug(f" Spec 2: {str(packages_dict[comp])}")
raise RambleConflictingDefinitionError(
f"Compiler {comp} would be defined " "in multiple conflicting ways"
)
if comp not in packages_dict or (force and comp not in newly_created_packages):
newly_created_packages.add(comp)
packages_dict[comp] = syaml.syaml_dict()
compiler_packages[comp] = False
packages_dict[comp].update(info.to_dict(apply_prefix=force_prefix))
for conf in info.config_opts():
ramble.config.add(conf, scope=self.ws_file_config_scope_name())
logger.debug(f"Trying to define packages for {env_name}")
app_packages = []
if env_name in environments_dict:
if namespace.packages in environments_dict[env_name]:
app_packages = environments_dict[env_name][namespace.packages].copy()
software_packages = app_inst.package_manager.get_experiment_specs(
app_inst=app_inst, prefixed=force_prefix
)
for spec_name, definitions in software_packages.items():
for info in definitions:
logger.debug(f" Found spec: {spec_name}")
if (
not quiet
and spec_name in packages_dict
and info.conflict_dict(packages_dict[spec_name])
):
logger.debug(f" Spec 1: {str(info)}")
logger.debug(f" Spec 2: {str(packages_dict[spec_name])}")
raise RambleConflictingDefinitionError(
f"Package {spec_name} would be defined in multiple " "conflicting ways"
)
if spec_name not in packages_dict or (
force and spec_name not in newly_created_packages
):
packages_dict[spec_name] = syaml.syaml_dict()
# Check for usage of compilers
expanded_compiler = app_inst.expander.expand_var(info.compiler)
if expanded_compiler in compiler_packages:
compiler_packages[expanded_compiler] = True
packages_dict[spec_name].update(info.to_dict(apply_prefix=force_prefix))
if spec_name not in app_packages:
app_packages.append(spec_name)
if app_packages:
if env_name not in environments_dict:
environments_dict[env_name] = syaml.syaml_dict()
environments_dict[env_name][namespace.packages] = app_packages.copy()
# Ensure all compilers in this experiment are used.
comp_list = []
for name, used in compiler_packages.items():
if not used:
comp_list.append(name)
if comp_list:
logger.warn(
"Unused compiler(s) found in experiment: "
+ app_inst.expander.experiment_namespace
)
logger.warn(f" {comp_list}")
ramble.config.config.update_config(
namespace.software, full_software_dict, scope=self.ws_file_config_scope_name()
)
return
[docs]
def write_json_results(self):
fs.mkdirp(self.results_dir)
out_file = os.path.join(self.results_dir, "results.json")
with open(out_file, "w+") as f:
sjson.dump(self.results, f)
return out_file
[docs]
def default_results(self):
res = {}
if self.workspace_hash:
res["workspace_hash"] = self.workspace_hash
else:
try:
with open(os.path.join(self.root, self.hash_file_name)) as f:
res["workspace_hash"] = f.readline().rstrip()
except OSError:
res["workspace_hash"] = "Unknown.."
res["workspace_name"] = self.name
res["metadata"] = self.metadata
res["ramble_version"] = ramble.util.version.get_version()
res[namespace.experiment] = []
return res
[docs]
def append_result(self, result):
if not self.results:
self.results = self.default_results()
self.results[namespace.experiment].append(result)
[docs]
def insert_result(self, result, insert_before_exp):
"""Insert a result before a specified experiment"""
def search_exp_index(results_list, exp_to_search):
for i, exp in enumerate(results_list):
if exp["name"] == exp_to_search:
return i
return None
if not self.results:
self.results = self.default_results()
insert_index = search_exp_index(self.results[namespace.experiment], insert_before_exp)
tty.debug(f"Attempting to insert result before experiment {insert_before_exp}")
if insert_index is not None:
self.results[namespace.experiment].insert(insert_index, result)
else:
tty.debug(f"Could not find {insert_before_exp}, appending result to end instead.")
self.results[namespace.experiment].append(result)
[docs]
def symlink_result(self, out_file, latest_file):
"""
Create symlink of result file so that results.latest.txt always points
to the most recent analysis. This clobbers the existing link
"""
from ramble.util.file_util import create_symlink
create_symlink(out_file, latest_file)
[docs]
def write_software_info(self, f, exp):
f.write(" Software definitions:\n")
for package_manager, packages in exp["SOFTWARE"].items():
f.write(f" {package_manager} packages:\n")
if not packages:
f.write(" None\n")
else:
# Dedupe entries that have the same version texts.
# This can happen for instance if a package is built against different compilers.
pkg_info_set = set()
for pkg_info in packages:
text = pkg_info.to_version_text()
if text not in pkg_info_set:
f.write(f" {text}\n")
pkg_info_set.add(text)
[docs]
def dump_tables(self, experiment_set, filters):
tables_config = ramble.config.get("tables", [])
if tables_config:
for table_conf in tables_config:
self.results_tables.add_table_template(table_conf)
if self.results_tables.num_tables > 0:
self.results_tables.build_tables(experiment_set, filters)
fs.mkdirp(self.tables_dir)
self.results_tables.output_tables(self.tables_dir, self.date_string())
def _create_result_symlinks(self, out_file, latest_base, file_extension, symlinks_updated):
latest_file = os.path.join(self.results_dir, latest_base + file_extension)
symlinks_updated.append(latest_file)
self.symlink_result(out_file, latest_file)
# Allow one simlink to the latest result in the top level for backwards compat
latest_file_parent = os.path.join(self.root, latest_base + file_extension)
self.symlink_result(out_file, latest_file_parent)
[docs]
def dump_results(
self, output_formats=None, print_results=False, summary_only=False, fom_origin_types=None
):
"""
Write out result file in desired format
This attempts to avoid the loss of previous results data by appending
the datetime to the filename, but is willing to clobber the file
results.latest.<extension>
"""
if output_formats is None:
output_formats = ["text"]
if not self.results:
self.results = {}
results = _filter_results(
self.results, summary_only=summary_only, fom_origin_types=fom_origin_types
)
fs.mkdirp(self.results_dir)
results_written = []
symlinks_updated = []
dt = self.date_string()
inner_delim = "."
filename_base = "results" + inner_delim + dt
latest_base = "results" + inner_delim + "latest"
software_key = ramble.experiment_result._OUTPUT_MAPPING[namespace.software]
if "text" in output_formats:
file_extension = ".txt"
out_file = os.path.join(self.results_dir, filename_base + file_extension)
results_written.append(out_file)
with open(out_file, "w+") as f:
f.write(f"From Workspace: {self.name} (hash: {results['workspace_hash']})\n")
if namespace.experiment in results:
for exp in results[namespace.experiment]:
f.write("Experiment %s figures of merit:\n" % exp["name"])
f.write(" Status = %s\n" % exp["RAMBLE_STATUS"])
if "TAGS" in exp:
f.write(f' Tags = {exp["TAGS"]}\n')
if exp["N_REPEATS"] > 0: # this is a base exp with summary of repeats
for context in exp["CONTEXTS"]:
f.write(f' {context["display_name"]} figures of merit:\n')
fom_summary = {}
for fom in context["foms"]:
name = fom["name"]
if name not in fom_summary.keys():
fom_summary[name] = []
stat_name = fom["origin_type"]
if not stat_name.startswith("summary::"):
display_name = "value"
else:
display_name = stat_name
value = fom["value"]
units = fom["units"]
output = f"{display_name} = {value} {units}\n"
fom_summary[name].append(output)
for fom_name, fom_val_list in fom_summary.items():
f.write(f" {fom_name}:\n")
for fom_val in fom_val_list:
f.write(f" {fom_val.strip()}\n")
if software_key in exp and exp[software_key]:
self.write_software_info(f, exp)
else:
for context in exp["CONTEXTS"]:
f.write(f' {context["display_name"]} figures of merit:\n')
for fom in context["foms"]:
name = fom["name"]
if fom["origin_type"] == "modifier":
delim = "::"
mod = fom["origin"]
name = f"{fom['origin_type']}{delim}{mod}{delim}{name}"
output = "{} = {} {}".format(name, fom["value"], fom["units"])
f.write(" %s\n" % (output.strip()))
if software_key in exp and exp[software_key]:
self.write_software_info(f, exp)
if exp["VARIANTS"]:
f.write(" Experiment variants:\n")
for variant in exp["VARIANTS"]:
f.write(f" - {rucolor.plaintext(variant)}\n")
if exp["SUCCESS_CRITERIA"]:
f.write(" Success criteria summary:\n")
for name, result in exp["SUCCESS_CRITERIA"].items():
f.write(f" {name} = {result}\n")
else:
logger.msg("No results to write")
self._create_result_symlinks(out_file, latest_base, file_extension, symlinks_updated)
# Convert SoftwareInfo classes to dicts
for exp in results[namespace.experiment]:
for key, pkg_list in exp[software_key].items():
exp[software_key][key] = [pkg.to_dict() for pkg in pkg_list]
if "json" in output_formats:
file_extension = ".json"
out_file = os.path.join(self.results_dir, filename_base + file_extension)
results_written.append(out_file)
with open(out_file, "w+") as f:
sjson.dump(results, f)
self._create_result_symlinks(out_file, latest_base, file_extension, symlinks_updated)
if "yaml" in output_formats:
file_extension = ".yaml"
out_file = os.path.join(self.results_dir, filename_base + file_extension)
results_written.append(out_file)
from ruamel.yaml import RoundTripDumper
class RambleSafeDumper(RoundTripDumper):
def ignore_aliases(self, _data):
"""Make the dumper NEVER print YAML aliases."""
return True
def call_value(dumper, data):
return dumper.represent_data(data.value)
RambleSafeDumper.add_representer(ramble.experiment_result.ExperimentStatus, call_value)
with open(out_file, "w+") as f:
syaml.dump(results, stream=f, Dumper=RambleSafeDumper)
self._create_result_symlinks(out_file, latest_base, file_extension, symlinks_updated)
if not results_written:
logger.die("Results were not written.")
logger.all_msg("Results are written to:")
for out_file in results_written:
logger.all_msg(f" {out_file}")
logger.all_msg("Symlinks updated:")
for symlink_path in symlinks_updated:
logger.all_msg(f" {symlink_path}")
if print_results:
with open(results_written[0]) as f:
# Use tty directly to avoid cluttering the analyze log
tty.msg(f"Results from the analysis pipeline:\n{f.read()}")
return filename_base
[docs]
def create_mirror(self, mirror_root):
parsed_url = url_util.parse(mirror_root)
self.mirror_path = url_util.local_file_path(parsed_url)
self.mirror_existed = web_util.url_exists(self.mirror_path)
self.input_mirror_path = os.path.join(self.mirror_path, WORKSPACE_INPUT_PATH)
self.software_mirror_path = os.path.join(self.mirror_path, WORKSPACE_SOFTWARE_PATH)
mirror_dirs = [self.mirror_path, self.input_mirror_path, self.software_mirror_path]
for subdir in mirror_dirs:
if not os.path.isdir(subdir):
try:
fs.mkdirp(subdir)
except OSError as e:
raise ramble.mirror.MirrorError(
"Cannot create directory '%s':" % subdir
) from e
self.software_mirror_stats = MirrorStats()
self.input_mirror_stats = MirrorStats()
self.input_mirror_cache = ramble.caches.MirrorCache(self.input_mirror_path)
self.software_mirror_cache = ramble.caches.MirrorCache(self.software_mirror_path)
[docs]
def simplify_variables(self):
"""Simplify variable sections in workspace configuration file"""
def _remove_scoped_variables(scope_name: str, used_variables: Set):
"""Remove unused variables from a specific workspace scope.
Args:
scope_name (str): Name of scope to remove definitions from.
used_variables (set): Set of used definitions that should be kept.
"""
changed = False
# Delete unused variables from requested scope.
to_remove = set()
scope_section = self._get_scope_section(scope_name)
if scope_section is None:
return changed
if namespace.variables in scope_section:
for var in scope_section[namespace.variables]:
if var not in used_variables:
to_remove.add(var)
for var in to_remove:
del scope_section[namespace.variables][var]
changed = True
if not scope_section[namespace.variables]:
del scope_section[namespace.variables]
return changed
# Build software environments to determine which variables are used
self.software_environments = ramble.software_environments.SoftwareEnvironments(self)
experiment_set = self.build_experiment_set()
workspace_used_variables = set()
prev_app = None
prev_wl = None
prev_exp = None
app_used_vars = set()
wl_used_vars = set()
exp_used_vars = set()
changed = False
for _, app_inst, _ in experiment_set.all_experiments():
app_inst.build_used_variables(self)
if app_inst.repeats.is_repeat_base or app_inst.repeats.repeat_index is None:
# Either there are no repeats, or this is the base
if prev_exp is not None:
changed = changed or _remove_scoped_variables(
f"{prev_app}:{prev_wl}:{prev_exp}", exp_used_vars
)
prev_exp = app_inst.variables[app_inst.keywords.experiment_template_name]
exp_used_vars = set()
if prev_wl != app_inst.variables[app_inst.keywords.workload_template_name]:
if prev_wl is not None:
changed = changed or _remove_scoped_variables(
f"{prev_app}:{prev_wl}", wl_used_vars
)
prev_wl = app_inst.variables[app_inst.keywords.workload_template_name]
wl_used_vars = set()
if prev_app != app_inst.variables[app_inst.keywords.application_name]:
if prev_app is not None:
changed = changed or _remove_scoped_variables(prev_app, app_used_vars)
prev_app = app_inst.variables[app_inst.keywords.application_name]
app_used_vars = set()
workspace_used_variables = workspace_used_variables.union(
app_inst.expander._used_variables
)
app_used_vars = app_used_vars.union(app_inst.expander._used_variables)
wl_used_vars = wl_used_vars.union(app_inst.expander._used_variables)
exp_used_vars = exp_used_vars.union(app_inst.expander._used_variables)
if prev_exp is not None:
changed = changed or _remove_scoped_variables(
f"{prev_app}:{prev_wl}:{prev_exp}", exp_used_vars
)
if prev_wl is not None:
changed = changed or _remove_scoped_variables(f"{prev_app}:{prev_wl}", wl_used_vars)
if prev_app is not None:
changed = changed or _remove_scoped_variables(prev_app, app_used_vars)
changed = changed or _remove_scoped_variables("workspace", workspace_used_variables)
if changed:
self._write_config(CONFIG_SECTION)
else:
logger.all_msg("No variables were changed.")
[docs]
def simplify_software(self):
# First drop unused experiment templates from app dict so environments aren't rendered
app_dict = ramble.config.config.get_config(
namespace.application, scope=self.ws_file_config_scope_name()
)
# Build experiment sets to determine which templates never get used
self.software_environments = ramble.software_environments.SoftwareEnvironments(self)
experiment_set = self.build_experiment_set()
logger.debug("Software environments:")
logger.debug(str(self.software_environments))
for _, app_inst in experiment_set.template_experiments():
if app_inst.is_template and not app_inst.generated_experiments:
app = app_inst.expander.application_name
wl = app_inst.expander.workload_name
exp = app_inst.expander.experiment_name
try:
app_dict[app][namespace.workload][wl][namespace.experiment].pop(exp)
if not app_dict[app][namespace.workload][wl][namespace.experiment]:
app_dict[app][namespace.workload][wl].pop(namespace.experiment)
if not app_dict[app][namespace.workload][wl]:
app_dict[app][namespace.workload].pop(wl)
if not app_dict[app][namespace.workload]:
app_dict[app].pop(namespace.workload)
if not app_dict[app]:
app_dict.pop(app)
except KeyError:
continue
ramble.config.config.update_config(
namespace.application, app_dict, scope=self.ws_file_config_scope_name()
)
# Regenerate environments without the unused templates to see which env never get rendered
self.software_environments = ramble.software_environments.SoftwareEnvironments(self)
software_environments = self.software_environments
experiment_set = self.build_experiment_set()
changed = False
software_dict = None
package_dict = None
environments_dict = None
software_dict = ramble.config.config.get_config(
namespace.software, scope=self.ws_file_config_scope_name()
)
if namespace.packages in software_dict:
package_dict = software_dict[namespace.packages]
if namespace.environments in software_dict:
environments_dict = software_dict[namespace.environments]
tty.debug("Removing configurations that do not spark joy.")
if package_dict:
for pkg in software_environments.unused_packages():
if pkg.name in package_dict:
tty.debug(f"Removing {pkg.name} from software packages")
package_dict.pop(pkg.name)
changed = True
if environments_dict:
for env in software_environments.unused_environments():
if env.name in environments_dict:
tty.debug(f"Removing {env.name} from software environments")
environments_dict.pop(env.name)
changed = True
if changed:
ramble.config.config.update_config(
namespace.software, software_dict, scope=self.ws_file_config_scope_name()
)
else:
logger.all_msg("No changes were made to software configuration sections.")
@property
def latest_archive_path(self):
return os.path.join(self.archive_dir, self.latest_archive)
@property
def latest_archive(self):
if hasattr(self, "_latest_archive") and self._latest_archive:
return self._latest_archive
if os.path.exists(self.archive_dir):
archive_dirs = []
for subdir in os.listdir(self.archive_dir):
archive_path = os.path.join(self.archive_dir, subdir)
if os.path.isdir(archive_path) and not os.path.islink(archive_path):
archive_dirs.append(archive_path)
if archive_dirs:
latest_path = max(archive_dirs, key=os.path.getmtime)
self._latest_archive = os.path.basename(latest_path)
return self._latest_archive
return None
[docs]
def date_string(self):
now = datetime.datetime.now()
return now.strftime("%Y-%m-%d_%H.%M.%S")
[docs]
def read_file_content(self, file_path):
"""Read and cache the file content
This should only be used on files that are not modified during the lifetime of the
workspace command execution.
"""
if file_path in self._inmem_file_cache:
return self._inmem_file_cache[file_path]
with open(file_path) as f:
content = f.read()
self._inmem_file_cache[file_path] = content
return content
@property
def internal(self):
"""Whether this workspace is managed by Ramble."""
wspath = get_workspace_path()
return self.path.startswith(wspath)
@property
def name(self):
"""Human-readable representation of the workspace.
The name of the workspace is the basename of its path
"""
return os.path.basename(self.path)
@property
def path(self):
"""Location of the workspace"""
return self.root
@property
def active(self):
"""True if this workspace is currently active."""
return _active_workspace and self.path == _active_workspace.path
@property
def internal_subdir(self):
"""Subdirectory for housing ramble internals"""
return os.path.join(self.root, ".ramble-workspace")
@property
def _transaction_lock_path(self):
"""The location of the lock file used to synchronize multiple
processes updating the same workspace.
"""
return os.path.join(self.internal_subdir, "transaction_lock")
@property
def experiment_dir(self):
"""Path to the experiment directory"""
return os.path.join(self.root, WORKSPACE_EXPERIMENT_PATH)
@property
def input_dir(self):
"""Path to the input directory"""
return os.path.join(self.root, WORKSPACE_INPUT_PATH)
@property
def software_dir(self):
"""Path to the software directory"""
return os.path.join(self.root, WORKSPACE_SOFTWARE_PATH)
@property
def tables_dir(self):
"""Directory where workspace tables are stored"""
return os.path.join(self.root, WORKSPACE_TABLES_PATH)
@property
def results_dir(self):
"""Directory where workspace results are stored"""
return os.path.join(self.root, WORKSPACE_RESULTS_PATH)
@property
def log_dir(self):
"""Path to the logs directory"""
return os.path.join(self.root, WORKSPACE_LOG_PATH)
@property
def config_dir(self):
"""Path to the configuration file directory"""
return os.path.join(self.root, WORKSPACE_CONFIG_PATH)
@property
def auxiliary_software_dir(self):
"""Path to the auxiliary software files directory"""
return os.path.join(self.config_dir, AUXILIARY_SOFTWARE_DIR_NAME)
@property
def config_file_path(self):
"""Path to the configuration file directory"""
return os.path.join(self.config_dir, CONFIG_FILE_NAME)
@property
def archive_dir(self):
"""Path to the archive directory"""
return os.path.join(self.root, WORKSPACE_ARCHIVE_PATH)
@property
def shared_dir(self):
"""Path to the shared directory"""
return os.path.join(self.root, WORKSPACE_SHARED_PATH)
@property
def deployments_dir(self):
"""Path to the deployments directory"""
return os.path.join(self.root, WORKSPACE_DEPLOYMENTS_PATH)
@property
def named_deployment(self):
"""Path to the specific deployment directory"""
return os.path.join(self.deployments_dir, self.deployment_name)
@property
def deployment_repos_dir(self):
"""Path to the specific deployment directory that contains all the repos"""
return os.path.join(self.named_deployment, "object_repos")
@property
def shared_license_dir(self):
"""Path to the shared license directory"""
return os.path.join(self.shared_dir, WORKSPACE_SHARED_LICENSE_PATH)
[docs]
def template_path(self, name):
if name in self._templates.keys():
return os.path.join(self.config_dir, name + TEMPLATE_EXTENSION)
return None
[docs]
def all_templates(self):
"""Iterator over each template in the workspace"""
yield from self._templates.items()
[docs]
def all_auxiliary_software_files(self):
"""Iterator over each file in $workspace/configs/auxiliary_software_files"""
yield from self._auxiliary_software_files.items()
[docs]
@classmethod
def get_workspace_paths(cls, root):
"""Construct dictionary of path replacements for workspace"""
workspace_path_replacements = {
"workspace_root": root,
namespace.workspace: root,
"workspace_configs": os.path.join(root, WORKSPACE_CONFIG_PATH),
"workspace_software": os.path.join(root, WORKSPACE_SOFTWARE_PATH),
"workspace_tables": os.path.join(root, WORKSPACE_TABLES_PATH),
"workspace_results": os.path.join(root, WORKSPACE_RESULTS_PATH),
"workspace_logs": os.path.join(root, WORKSPACE_LOG_PATH),
"workspace_inputs": os.path.join(root, WORKSPACE_INPUT_PATH),
"workspace_experiments": os.path.join(root, WORKSPACE_EXPERIMENT_PATH),
"workspace_shared": os.path.join(root, WORKSPACE_SHARED_PATH),
"workspace_archives": os.path.join(root, WORKSPACE_ARCHIVE_PATH),
"workspace_deployments": os.path.join(root, WORKSPACE_DEPLOYMENTS_PATH),
}
return workspace_path_replacements
[docs]
def workspace_paths(self):
"""Dictionary of path replacements for workspace"""
if not hasattr(self, "_workspace_path_replacements"):
self._workspace_path_replacements = self.get_workspace_paths(self.root)
return self._workspace_path_replacements
def _get_scope_section(self, scope):
base_section = None
if scope == "workspace":
base_section = self.config_sections["workspace"]["yaml"][namespace.ramble]
else:
scope_parts = scope.split(":")
if (
namespace.application
not in self.config_sections["workspace"]["yaml"][namespace.ramble]
):
return None
base_section = self.config_sections["workspace"]["yaml"][namespace.ramble][
namespace.application
]
if len(scope_parts) >= 1: # Extract application section
if scope_parts[0] not in base_section:
logger.die(f"No application matches requested scope {scope_parts[0]}")
base_section = base_section[scope_parts[0]]
if len(scope_parts) >= 2: # Extract workload section
if scope_parts[1] not in base_section[namespace.workload]:
logger.die(
f"No workload matches requested scope {scope_parts[1]} "
f"in application {scope_parts[0]}"
)
base_section = base_section[namespace.workload][scope_parts[1]]
if len(scope_parts) >= 3: # Extract experiment section
joined_scope_part = ":".join(scope_parts[2:])
if joined_scope_part not in base_section[namespace.experiment]:
logger.die(
f"No experiment matches requested scope {joined_scope_part} "
f"in application{scope_parts[0]} and workload {scope_parts[1]}"
)
base_section = base_section[namespace.experiment][joined_scope_part]
if base_section is None:
logger.die(f"No scope matches requested scope of {scope}")
return base_section
[docs]
def index_modifiers(self):
"""Construct a list of all modifiers in this workspace, and their associated scope.
Returns:
(list): List of tuples, of the form (scope, modifier_definition)
"""
mod_list = []
base_section = self._get_scope_section("workspace")
ws_mods = base_section[namespace.modifiers] if namespace.modifiers in base_section else []
# Add workspace modifiers
for mod in ws_mods:
mod_list.append(("workspace", mod))
# Define scoped modifiers
for workloads, application_context in self.all_applications():
app_context = f"{application_context.context_name}"
for mod in application_context.modifiers:
mod_list.append((f"{app_context}", mod))
for experiments, workload_context in self.all_workloads(workloads):
wl_context = f"{app_context}:{workload_context.context_name}"
for mod in workload_context.modifiers:
mod_list.append((f"{wl_context}", mod))
for _, experiment_context in self.all_experiments(experiments):
exp_context = f"{wl_context}:{experiment_context.context_name}"
for mod in experiment_context.modifiers:
mod_list.append((f"{exp_context}", mod))
return mod_list
[docs]
def print_modifiers(self):
"""Print an indexed list of all modifiers in this workspace"""
mod_list = self.index_modifiers()
logger.all_msg(f"Workspace contains {len(mod_list)} modifiers.")
logger.all_msg("Additional modifiers may come from scopes outside of wokrspace")
logger.all_msg("To see a complete list of these, use:")
logger.all_msg(" ramble config get modifiers\n\n")
prev_scope = None
for idx, conf in enumerate(mod_list):
scope = conf[0]
entry = conf[1]
if scope != prev_scope:
if prev_scope is not None:
logger.all_msg("")
logger.all_msg(f"Modifier scope: {scope}")
prev_scope = scope
name = entry["name"]
mode_str = ""
if "mode" in entry and entry["mode"] is not None:
mode_str = f" -- Mode: {entry['mode']}"
out_str = f"Name: {name}{mode_str}"
logger.all_msg(f"{idx}: {out_str}")
[docs]
def remove_modifier(
self,
remove_index: Optional[int] = None,
scope_pattern: Optional[str] = None,
name_pattern: Optional[str] = None,
mode_pattern: Optional[str] = None,
dry_run: bool = False,
):
"""Remove an arbitrary number of modifiers from this workspace based
on some input arguments.
Args:
remove_index: Index of modifier to remove. Indices match ordering
from the output of print_modifiers
scope_pattern: Pattern to select which scopes to remove modifiers from.
If the pattern matches multiple scopes, each will
have matching modifiers removed from them.
name_pattern: Pattern to select which modifier names should be removed.
Modifiers with matching names that are in included
scopes will be removed. If the scope doesn't match,
but the name does, the modifier will not be removed.
mode_pattern: Pattern to select which modes should be removed. Only
Modifiers which match the scope and name patterns will be
considered for removal, but this can be used to downselect further.
dry_run: Whether to print the config instead of editing it, or to edit it directly.
Returns:
(int) Number of modifiers removed
"""
mod_list = self.index_modifiers()
to_remove = []
if remove_index is not None:
if not isinstance(remove_index, int):
logger.error(
"Cannot remove modifier index without an integer index. "
f"Given index was {remove_index}"
)
if remove_index < 0 or remove_index > len(mod_list):
logger.error(
f"Modifier index {remove_index} is outside of the range of modifiers."
"Use `ramble worksapce manage modifiers --list` to see indices"
)
to_remove.append(mod_list[remove_index])
if scope_pattern is not None:
if name_pattern is None:
name_pattern = "*"
if mode_pattern is None:
mode_pattern = "*"
for mod_tup in mod_list:
scope = mod_tup[0]
mod_conf = mod_tup[1]
if fnmatch.fnmatch(scope, scope_pattern):
if fnmatch.fnmatch(mod_conf["name"], name_pattern):
if "mode" in mod_conf and fnmatch.fnmatch(mod_conf["mode"], mode_pattern):
to_remove.append(mod_tup)
else:
to_remove.append(mod_tup)
removed = 0
for mod_tup in to_remove:
base_section = self._get_scope_section(mod_tup[0])
if base_section is not None:
base_mods = base_section[namespace.modifiers]
remove_index = -1
for idx, ws_mod in enumerate(base_mods):
if ws_mod == mod_tup[1]:
remove_index = idx
break
if remove_index != -1:
removed += 1
del base_mods[remove_index]
if not base_section[namespace.modifiers]:
del base_section[namespace.modifiers]
if not dry_run:
self._write_config(CONFIG_SECTION)
return removed
[docs]
def add_modifier(
self,
name_pattern: str,
scope: Optional[str] = None,
mode: Optional[str] = None,
on_executable: Optional[str] = None,
dry_run: bool = False,
):
"""Add an arbitrary number of modifiers to this workspace within a single scope
Args:
scope: Scope to add modifiers within.
name_pattern: Pattern to determine which modifiers should be added.
If multiple modifiers match, all will be added with
the additional arguments.
mode: Mode to set within the new modifier definitions
on_executable: A list string to set for the on_executable
attribute of the new modifier definitions.
dry_run: Whether to print the config instead of editing it, or to edit it directly.
Returns:
(int) Number of modifiers added to workspace
"""
on_exec_list = None
if on_executable is not None:
on_exec_list = syaml.syaml_list()
on_exec_list.extend(on_executable[1:-1].split(","))
base_section = self._get_scope_section(scope)
if namespace.modifiers not in base_section:
base_section[namespace.modifiers] = syaml.syaml_list()
mod_type = ramble.repository.ObjectTypes.modifiers
mod_objects = ramble.repository.all_object_names(object_type=mod_type)
if isinstance(name_pattern, str):
spec_parts = name_pattern.partition("@")
mod_names = [
name
for name in mod_objects
if fnmatch.fnmatchcase(name.lower(), spec_parts[0].lower())
]
if len(mod_names) < 1:
logger.error(f"No modifiers found matching name pattern of {name_pattern}")
added = 0
for mod_name in mod_names:
mod_def = syaml.syaml_dict()
mod_def["name"] = mod_name
if spec_parts[2]:
mod_def["name"] += f"@{spec_parts[2]}"
if mode is not None:
mod_def["mode"] = mode
if on_exec_list is not None:
mod_def["on_executable"] = on_exec_list
base_section[namespace.modifiers].append(mod_def)
added += 1
if not dry_run:
self._write_config(CONFIG_SECTION)
return added
[docs]
def add_include(self, new_include):
"""Add a new include to this workspace"""
if (
namespace.include
not in self.config_sections[namespace.workspace]["yaml"][namespace.ramble]
):
self.config_sections[namespace.workspace]["yaml"][namespace.ramble][
namespace.include
] = []
includes = self.config_sections[namespace.workspace]["yaml"][namespace.ramble][
namespace.include
]
includes.append(new_include)
self._write_config(CONFIG_SECTION)
[docs]
def remove_include(self, index=None, pattern=None):
"""Remove one or more includes from this workspace.
Args:
index (int): (Optional) Numerical position of include to remove
pattern (str): (Optional) String or pattern of include to remove.
Removes all matching includes.
"""
if (
namespace.include
not in self.config_sections[namespace.workspace]["yaml"][namespace.ramble]
):
return
includes = self.config_sections[namespace.workspace]["yaml"][namespace.ramble][
namespace.include
]
changed = False
if index is not None:
if index < 0 or index >= len(includes):
logger.die(
f"Requested index {index} " "is outside of the range of existing includes."
)
includes.pop(index)
changed = True
if pattern is not None:
remove_indices = []
for idx, include in enumerate(includes):
if fnmatch.fnmatch(include, pattern):
remove_indices.append(idx)
for remove_idx in reversed(remove_indices):
if remove_idx >= 0 and remove_idx < len(includes):
includes.pop(remove_idx)
changed = True
if changed:
self._write_config(CONFIG_SECTION)
[docs]
def included_config_scopes(self):
"""List of included configuration scopes from the environment.
Scopes are listed in the YAML file in order from highest to
lowest precedence, so configuration from earlier scope will take
precedence over later ones.
This routine returns them in the order they should be pushed onto
the internal scope stack (so, in reverse, from lowest to highest).
"""
scopes = []
# load config scopes added via 'include:', in reverse so that
# highest-precedence scopes are last.
includes = config_dict(self.config_sections[namespace.workspace]["yaml"]).get(
"include", []
)
missing = []
for full_config_path in reversed(includes):
# Remove trailing slash
config_path = full_config_path
if full_config_path.endswith("/"):
config_path = full_config_path[:-1]
# allow paths to contain ramble config/environment variables, etc.
config_path = substitute_path_variables(
config_path, local_replacements=self.workspace_paths()
)
# treat relative paths as relative to the environment
if not os.path.isabs(config_path):
config_path = os.path.join(self.path, config_path)
config_path = os.path.normpath(os.path.realpath(config_path))
if os.path.isdir(config_path):
# directories are treated as regular ConfigScopes
config_name = f"workspace:{self.name}:{os.path.basename(config_path)}"
scope = ramble.config.ConfigScope(config_name, config_path)
elif os.path.exists(config_path):
# files are assumed to be SingleFileScopes
config_name = f"workspace:{self.name}:{config_path}"
scope = ramble.config.SingleFileScope(
config_name, config_path, ramble.schema.merged.schema
)
else:
missing.append(config_path)
continue
scopes.append(scope)
if missing:
msg = f"Detected {len(missing)} missing include path(s):"
msg += "\n {}".format("\n ".join(missing))
msg = f"{msg}\nPlease correct."
logger.warn(msg)
raise RambleActiveWorkspaceError(msg)
return scopes
[docs]
def ws_file_config_scope_name(self):
"""Name of the config scope of this workspace's config file."""
return f"{namespace.workspace}:{self.name}:{self.config_dir}"
# return 'ws:%s' % self.name
[docs]
def ws_file_config_scope(self):
"""Get the configuration scope for the workspace's config file."""
section = self.config_sections[namespace.workspace]
config_name = self.ws_file_config_scope_name()
return ramble.config.SingleFileScope(
config_name,
section["path"],
ramble.schema.workspace.schema,
[ramble.config.first_existing(section["raw_yaml"], ramble.schema.workspace.keys)],
)
[docs]
def config_scopes(self):
"""A list of all configuration scopes for this workspace."""
return self.included_config_scopes() + [self.ws_file_config_scope()] + [self.configs]
[docs]
def destroy(self):
"""Remove this workspace from Ramble entirely."""
shutil.rmtree(self.path)
def _get_workspace_dict(self):
return (
self.config_sections[namespace.workspace]["yaml"]
if namespace.workspace in self.config_sections
else None
)
def _get_application_dict_config(self, key):
return self.application_configs[key]["yaml"] if key in self.application_configs else None
def _get_workspace_section(self, section):
"""Return a dict of a workspace section"""
workspace_dict = self._get_workspace_dict()
return (
workspace_dict[namespace.ramble][section]
if section in workspace_dict[namespace.ramble]
else syaml.syaml_dict()
)
[docs]
def get_workspace_vars(self):
"""Return a dict of workspace variables"""
return ramble.config.config.get_config(namespace.variables)
[docs]
def get_workspace_env_vars(self):
"""Return a dict of workspace environment variables"""
return ramble.config.config.get_config(namespace.env_var)
[docs]
def get_workspace_internals(self):
"""Return a dict of workspace internals"""
return ramble.config.config.get_config(namespace.internals)
[docs]
def get_workspace_modifiers(self):
"""Return a dict of workspace modifiers"""
return ramble.config.config.get_config(namespace.modifiers)
[docs]
def get_workspace_zips(self):
"""Return a dict of workspace zips"""
return ramble.config.config.get_config(namespace.zips)
[docs]
def get_workspace_variants(self):
"""Return a dict of workspace variants"""
return ramble.config.config.get_config(namespace.variants)
[docs]
def get_workspace_success_criteria(self):
"""Return a dict of workspace success_criteria"""
return ramble.config.config.get_config(namespace.success)
[docs]
def get_software_dict(self):
"""Return the software dictionary for this workspace"""
software_dict = ramble.config.config.get_config(namespace.software)
return software_dict
[docs]
def get_workspace_tables(self):
"""Return a dict of workspace tables"""
return ramble.config.config.get_config(namespace.tables)
[docs]
def get_applications(self):
"""Get the dictionary of applications"""
logger.debug("Getting app dict.")
logger.debug(f" {self._get_workspace_dict()}")
workspace_dict = self._get_workspace_dict()
if namespace.application not in workspace_dict[namespace.ramble]:
workspace_dict[namespace.ramble][namespace.application] = syaml.syaml_dict()
return workspace_dict[namespace.ramble][namespace.application]
[docs]
def read_transaction(self):
"""Get a read lock context manager for use in a `with` block."""
return lk.ReadTransaction(self.txlock, acquire=self._re_read)
[docs]
def write_transaction(self):
"""Get a write lock context manager for use in a `with` block."""
return lk.WriteTransaction(self.txlock, acquire=self._re_read)
def __enter__(self):
self._previous_active = _active_workspace
activate(self)
return self
def __exit__(self, exc_type, exc_val, exc_tb):
deactivate()
if self._previous_active:
activate(self._previous_active)
[docs]
def check_cache(self, tupl):
return self.install_cache.contains(tupl)
[docs]
def add_to_cache(self, tupl):
self.install_cache.add(tupl)
[docs]
def read(name):
"""Get a workspace with the supplied name."""
validate_workspace_name(name)
if not exists(name):
raise RambleWorkspaceError("no such workspace '%s'" % name)
return Workspace(root(name))
[docs]
def yaml_equivalent(first, second):
"""Returns whether two ramble yaml items are equivalent, including overrides"""
if isinstance(first, dict):
return isinstance(second, dict) and _equiv_dict(first, second)
elif isinstance(first, list):
return isinstance(second, list) and _equiv_list(first, second)
else: # it's a string
return isinstance(second, str) and first == second
def _equiv_list(first, second):
"""Returns whether two ramble yaml lists are equivalent, including overrides"""
if len(first) != len(second):
return False
return all(yaml_equivalent(f, s) for f, s in zip(first, second))
def _equiv_dict(first, second):
"""Returns whether two ramble yaml dicts are equivalent, including overrides"""
if len(first) != len(second):
return False
same_values = all(yaml_equivalent(fv, sv) for fv, sv in zip(first.values(), second.values()))
same_keys_with_same_overrides = all(
fk == sk and getattr(fk, "override", False) == getattr(sk, "override", False)
for fk, sk in zip(first.keys(), second.keys())
)
return same_values and same_keys_with_same_overrides
def _read_yaml(str_or_file, schema):
"""Read YAML from a file for round-trip parsing."""
data = syaml.load_config(str_or_file)
filename = getattr(str_or_file, "name", None)
default_data = ramble.config.validate(data, schema, filename)
return (data, default_data)
def _write_yaml(data, str_or_file, schema):
"""Write YAML to a file preserving comments and dict order."""
filename = getattr(str_or_file, "name", None)
ramble.config.validate(data, schema, filename)
syaml.dump_config(data, str_or_file, default_flow_style=False)
[docs]
@contextlib.contextmanager
def no_active_workspace():
"""Deactivate the active workspace for the duration of the context. Has no
effect when there is no active workspace."""
ws = active_workspace()
env_var = None
if RAMBLE_WORKSPACE_VAR in os.environ.keys():
env_var = os.environ[RAMBLE_WORKSPACE_VAR]
del os.environ[RAMBLE_WORKSPACE_VAR]
try:
deactivate()
yield
finally:
if ws:
os.environ[RAMBLE_WORKSPACE_VAR] = env_var
activate(ws)
def _filter_results(results, summary_only, fom_origin_types=None):
if (not summary_only and not fom_origin_types) or namespace.experiment not in results:
return results
results = copy.deepcopy(results)
filtered_experiments = []
for r in results[namespace.experiment]:
if summary_only and r["N_REPEATS"] == 0:
continue
if fom_origin_types:
filtered_contexts = []
for context in r.get("CONTEXTS", []):
filtered_foms = []
for fom in context.get("foms", []):
if fom.get("origin_type") in fom_origin_types:
filtered_foms.append(fom)
if filtered_foms:
context["foms"] = filtered_foms
filtered_contexts.append(context)
r["CONTEXTS"] = filtered_contexts
filtered_experiments.append(r)
results[namespace.experiment] = filtered_experiments
return results
[docs]
class RambleWorkspaceError(ramble.error.RambleError):
"""Superclass for all errors to do with Ramble Workspaces"""
[docs]
class RambleInvalidTemplateNameError(ramble.error.RambleError):
"""Error when an invalid template name is provided"""
[docs]
class RambleConflictingDefinitionError(RambleWorkspaceError):
"""Error when conflicting software definitions are found"""
[docs]
class RambleActiveWorkspaceError(RambleWorkspaceError):
"""Error when an invalid workspace is activated"""
[docs]
class RambleMissingApplicationError(RambleWorkspaceError):
"""Error when using an undefined application in an experiment
specification"""
[docs]
class RambleMissingWorkloadError(RambleWorkspaceError):
"""Error when using an undefined workload in an experiment
specification"""
[docs]
class RambleMissingExperimentError(RambleWorkspaceError):
"""Error when using an undefined experiment in an experiment
specification"""
[docs]
class RambleMissingApplicationDirError(RambleWorkspaceError):
"""Error when using a non-existent application directory"""