Source code for ramble.cmd

# Copyright 2022-2025 The Ramble Authors
#
# Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
# https://www.apache.org/licenses/LICENSE-2.0> or the MIT license
# <LICENSE-MIT or https://opensource.org/licenses/MIT>, at your
# option. This file may not be copied, modified, or distributed
# except according to those terms.


import argparse
import os
import re

import ruamel.yaml as yaml
from ruamel.yaml.error import MarkedYAMLError

from llnl.util.filesystem import join_path
from llnl.util.lang import attr_setdefault

import ramble.config
import ramble.error
import ramble.paths
import ramble.workspace
from ramble.error import RambleCommandError
from ramble.util.logger import logger

import spack.extensions
import spack.util.string

# cmd has a submodule called "list" so preserve the python list module
python_list = list

# Patterns to ignore in the commands directory when looking for commands.
ignore_files = r"^\.|^__init__.py$|^#"

SETUP_PARSER = "setup_parser"
DESCRIPTION = "description"


[docs] def python_name(cmd_name): """Convert ``-`` to ``_`` in command name, to make a valid identifier.""" return cmd_name.replace("-", "_")
[docs] def require_python_name(pname): """Require that the provided name is a valid python name (per python_name()). Useful for checking parameters for function prerequisites.""" if python_name(pname) != pname: raise PythonNameError(pname)
[docs] def cmd_name(python_name): """Convert module name (with ``_``) to command name (with ``-``).""" return python_name.replace("_", "-")
[docs] def require_cmd_name(cname): """Require that the provided name is a valid command name (per cmd_name()). Useful for checking parameters for function prerequisites. """ if cmd_name(cname) != cname: raise CommandNameError(cname)
#: global, cached list of all commands -- access through all_commands() _all_commands = None
[docs] def all_commands(): """Get a sorted list of all ramble commands. This will list the lib/ramble/ramble/cmd directory and find the commands there to construct the list. It does not actually import the python files -- just gets the names. """ global _all_commands if _all_commands is None: _all_commands = [] command_paths = [ramble.paths.command_path] # Built-in commands for path in command_paths: for file in os.listdir(path): if file.endswith(".py") and not re.search(ignore_files, file): cmd = re.sub(r".py$", "", file) _all_commands.append(cmd_name(cmd)) _all_commands.sort() return _all_commands
[docs] def remove_options(parser, *options): """Remove some options from a parser.""" for option in options: for action in parser._actions: if vars(action)["option_strings"][0] == option: parser._handle_conflict_resolve(None, [(option, action)]) break
[docs] def get_module(cmd_name): """Imports the module for a particular command name and returns it. Args: cmd_name (str): name of the command for which to get a module (contains ``-``, not ``_``). """ require_cmd_name(cmd_name) pname = python_name(cmd_name) logger.debug(f"Getting module for command {cmd_name}") try: # Try to import the command from the built-in directory module_name = f"{__name__}.{pname}" module = __import__(module_name, fromlist=[pname, SETUP_PARSER, DESCRIPTION], level=0) logger.debug(f"Imported {pname} from built-in commands") except ImportError: try: module = spack.extensions.get_module(cmd_name) except AttributeError: raise RambleCommandError("Command %s does not exist." % cmd_name) attr_setdefault(module, SETUP_PARSER, lambda *args: None) # null-op attr_setdefault(module, DESCRIPTION, "") if not hasattr(module, pname): logger.die( f"Command module {module.__name__} ({module.__file__}) must define function '{pname}'." ) return module
[docs] def get_command(cmd_name): """Imports the command function associated with cmd_name. The function's name is derived from cmd_name using python_name(). Args: cmd_name (str): name of the command (contains ``-``, not ``_``). """ require_cmd_name(cmd_name) pname = python_name(cmd_name) return getattr(get_module(cmd_name), pname)
[docs] def elide_list(line_list, max_num=10): """Takes a long list and limits it to a smaller number of elements, replacing intervening elements with '...'. For example:: elide_list([1,2,3,4,5,6], 4) gives:: [1, 2, 3, '...', 6] """ if len(line_list) > max_num: return line_list[: max_num - 1] + ["..."] + line_list[-1:] else: return line_list
[docs] def ramble_is_git_repo(): """Ensure that this instance of Ramble is a git clone.""" return is_git_repo(ramble.paths.prefix)
[docs] def is_git_repo(path): dotgit_path = join_path(path, ".git") if os.path.isdir(dotgit_path): # we are in a regular git repo return True if os.path.isfile(dotgit_path): # we might be in a git worktree try: with open(dotgit_path, "rb") as f: dotgit_content = yaml.load(f) return os.path.isdir(dotgit_content.get("gitdir", dotgit_path)) except MarkedYAMLError: pass return False
[docs] class PythonNameError(ramble.error.RambleError): """Exception class thrown for impermissible python names""" def __init__(self, name): self.name = name super().__init__(f"{name} is not a permissible Python name.")
[docs] class CommandNameError(ramble.error.RambleError): """Exception class thrown for impermissible command names""" def __init__(self, name): self.name = name super().__init__(f"{name} is not a permissible Ramble command name.")
######################################## # argparse types for argument validation ########################################
[docs] def extant_file(f): """ Argparse type for files that exist. """ if not os.path.isfile(f): raise argparse.ArgumentTypeError("%s does not exist" % f) return f
[docs] def require_active_workspace(cmd_name): """Used by commands to get the active workspace If a workspace is not found, print an error message that says the calling command *needs* an active workspace. Arguments: cmd_name (str): name of calling command Returns: (ramble.workspace.Workspace): the active workspace """ ws = ramble.workspace.active_workspace() if ws: return ws else: logger.die( f"`ramble {cmd_name}` requires a workspace", "activate a workspace first:", " ramble workspace activate WRKSPC", "or use:", f" ramble -w WRKSPC {cmd_name} ...", )
[docs] def find_workspace(args): """Find active workspace from args or environment variable. Check for a workspace in this order: 1. via ``ramble -w WRKSPC`` or ``ramble -D DIR`` (arguments) 2. via a path in the ramble.workspace.ramble_workspace_var environment variable. If a workspace is found, read it in. If not, return None. Arguments: args (argparse.Namespace): argparse namespace with command arguments Returns: (ramble.workspace.Workspace): a found workspace, or ``None`` """ # treat workspace as a name ws = args.workspace if ws: if ramble.workspace.exists(ws): return ramble.workspace.read(ws) else: # if workspace was specified, see if it is a directory otherwise, look # at workspace_dir (workspace and workspace_dir are mutually exclusive) ws = args.workspace_dir # if no argument, look for the environment variable if not ws: ws = os.environ.get(ramble.workspace.ramble_workspace_var) # nothing was set; there's no active environment if not ws: return None elif not ramble.workspace.is_workspace_dir(ws): env_var = ramble.workspace.ramble_workspace_var raise ramble.workspace.RambleActiveWorkspaceError( f"The environment variable {env_var} refers to an invalid ramble workspace." ) # if we get here, ws isn't the name of a ramble workspace; it has # to be a path to a workspace, or there is something wrong. if ramble.workspace.is_workspace_dir(ws): return ramble.workspace.Workspace(ws) raise ramble.workspace.RambleWorkspaceError(f"No workspace in {ws}")
[docs] def find_workspace_path(args): """Find path to active workspace from args or environment variable. Check for a workspace in this order: 1. via ``ramble -w WRKSPC`` or ``ramble -D DIR`` (arguments) 2. via a path in the ramble.workspace.ramble_workspace_var environment variable. If a workspace is found, return it's path. If not, return None. Arguments: args (argparse.Namespace): argparse namespace with command arguments Returns: (str): Path to workspace root, or None """ # treat workspace as a name ws = args.workspace if ws: if ramble.workspace.exists(ws): return ramble.workspace.root(ws) else: # if workspace was specified, see if it is a directory otherwise, look # at workspace_dir (workspace and workspace_dir are mutually exclusive) ws = args.workspace_dir # if no argument, look for the environment variable if not ws: ws = os.environ.get(ramble.workspace.ramble_workspace_var) # nothing was set; there's no active environment if not ws: return None # if we get here, env isn't the name of a spack environment; it has # to be a path to an environment, or there is something wrong. if ramble.workspace.is_workspace_dir(ws): return ws raise ramble.workspace.RambleWorkspaceError("no workspace in %s" % ws)