Source code for ramble.cmd.deployment

# Copyright 2022-2026 The Ramble Authors
#
# Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
# https://www.apache.org/licenses/LICENSE-2.0> or the MIT license
# <LICENSE-MIT or https://opensource.org/licenses/MIT>, at your
# option. This file may not be copied, modified, or distributed
# except according to those terms.

import os

import llnl.util.filesystem as fs

import ramble.cmd
import ramble.cmd.common.arguments as arguments
import ramble.config
import ramble.fetch_strategy
import ramble.filters
import ramble.pipeline
import ramble.repository
import ramble.stage
import ramble.util.path
from ramble.main import RambleCommand
from ramble.util.logger import logger

import spack.util.spack_json as sjson
import spack.util.url as surl

description = "manage workspace deployments"
section = "workspaces"
level = "short"

subcommands = [
    "push",
    "pull",
]


[docs] def deployment_push_setup_parser(subparser): """Push a workspace deployment""" subparser.add_argument( "--tar-archive", "-t", action="store_true", dest="tar_archive", help="create a tar.gz of the deployment directory.", ) subparser.add_argument( "--deployment-name", "-d", dest="deployment_name", default=None, help="Name for deployment. Uses workspace name if not set.", ) subparser.add_argument( "--upload-url", "-u", dest="upload_url", default=None, help="URL to upload deployment into. Upload tar if `-t` is specified..", ) arguments.add_common_arguments( subparser, ["phases", "include_phase_dependencies", "where", "exclude_where", "filter_tags"], )
[docs] def deployment_push(args): current_pipeline = ramble.pipeline.pipelines.pushdeployment ws = ramble.cmd.require_active_workspace(cmd_name="deployment push") filters = ramble.filters.Filters( phase_filters=args.phases, include_where_filters=args.where, exclude_where_filters=args.exclude_where, tags=args.filter_tags, ) pipeline_cls = ramble.pipeline.pipeline_class(current_pipeline) pipeline = pipeline_cls( ws, filters, create_tar=args.tar_archive, upload_url=args.upload_url, deployment_name=args.deployment_name, ) with ws.write_transaction(): deployment_run_pipeline(args, pipeline)
[docs] def deployment_pull_setup_parser(subparser): """Pull a workspace deployment into current workspace""" subparser.add_argument( "--deployment-path", "-p", "-u", dest="deployment_path", help="Path to deployment that should be pulled", required=True, )
[docs] def deployment_pull(args): def pull_file(src, dest): fetcher = ramble.fetch_strategy.URLFetchStrategy(url=src) stage_dir = os.path.dirname(dest) fs.mkdirp(stage_dir) with ramble.stage.InputStage(fetcher, path=stage_dir, name=os.path.basename(src)) as stage: stage.fetch() ws = ramble.cmd.require_active_workspace(cmd_name="deployment pull") with ws.write_transaction(): # Fetch deployment index first: push_cls = ramble.pipeline.PushDeploymentPipeline # Handle local relative path deployment_path = ramble.util.path.normalize_path_or_url(args.deployment_path) remote_index_path = surl.join( deployment_path, ramble.pipeline.PushDeploymentPipeline.index_filename ) local_index_path = os.path.join(ws.root, push_cls.index_filename) try: pull_file(remote_index_path, local_index_path) except ramble.fetch_strategy.FetchError: logger.die( "Error extracting deployment index file.\n" " Make sure your deployment was pushed with " "`ramble deployment push` before pulling it." ) with open(local_index_path) as f: index_data = sjson.load(f) for file in index_data[push_cls.index_namespace]: src = surl.join(deployment_path, file) dest = os.path.join(ws.root, file) if os.path.exists(dest): fs.force_remove(dest) try: pull_file(src, dest) except ramble.fetch_strategy.FetchError: logger.die( f"Error fetching file {file} from deployment. " "Deployment may be corrupt or incomplete." ) legacy_obj_repo_path = os.path.join( ws.root, ramble.pipeline.PushDeploymentPipeline.legacy_object_repo_name ) # Read in legacy deployment repo if it exists if os.path.exists(legacy_obj_repo_path): repo_cmd = RambleCommand("repo") repo_cmd("add", legacy_obj_repo_path, global_args=["-D", ws.root]) ramble_repos_path = os.path.join(ws.root, "object_repos", "ramble") if os.path.exists(ramble_repos_path): repo_cmd = RambleCommand("repo") for maybe_repo in os.listdir(ramble_repos_path): maybe_repo_dir = os.path.join(ramble_repos_path, maybe_repo) if os.path.isdir(maybe_repo_dir) and os.path.exists( os.path.join(maybe_repo_dir, ramble.repository.unified_config) ): repo_cmd("add", maybe_repo_dir, global_args=["-D", ws.root])
[docs] def deployment_run_pipeline(args, pipeline): include_phase_dependencies = getattr(args, "include_phase_dependencies", None) if include_phase_dependencies: with ramble.config.override("config:include_phase_dependencies", True): pipeline.run() else: pipeline.run()
#: Dictionary mapping subcommand names and aliases to functions subcommand_functions = {}
[docs] def sanitize_arg_name(base_name): """Allow function names to be remapped (eg `-` to `_`)""" formatted_name = base_name.replace("-", "_") return formatted_name
[docs] def setup_parser(subparser): sp = subparser.add_subparsers(metavar="SUBCOMMAND", dest="deployment_command") for name in subcommands: if isinstance(name, (list, tuple)): name, aliases = name[0], name[1:] else: aliases = [] # add commands to subcommands dict function_name = sanitize_arg_name("deployment_%s" % name) function = globals()[function_name] for alias in [name] + aliases: subcommand_functions[alias] = function # make a subparser and run the command's setup function on it setup_parser_cmd_name = sanitize_arg_name("deployment_%s_setup_parser" % name) setup_parser_cmd = globals()[setup_parser_cmd_name] subsubparser = sp.add_parser( name, aliases=aliases, help=setup_parser_cmd.__doc__, description=setup_parser_cmd.__doc__, ) setup_parser_cmd(subsubparser)
[docs] def deployment(parser, args): """Look for a function called deployment_<name> and call it.""" action = subcommand_functions[args.deployment_command] action(args)