# Copyright 2022-2026 The Ramble Authors
#
# Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
# https://www.apache.org/licenses/LICENSE-2.0> or the MIT license
# <LICENSE-MIT or https://opensource.org/licenses/MIT>, at your
# option. This file may not be copied, modified, or distributed
# except according to those terms.
import os
import sys
import ramble.cmd.common.arguments
import ramble.config
import ramble.repository
from ramble.util.logger import logger
description = "manage Ramble repositories"
section = "config"
level = "long"
[docs]
def setup_parser(subparser):
"""Setup the repo command parser.
The repo command helps manage Ramble repositories, which are
the locations ramble reads object definition files from.
This command has subcommands for create, add, remove, and list.
"""
sp = subparser.add_subparsers(metavar="SUBCOMMAND", dest="repo_command")
scopes = ramble.config.scopes()
scopes_metavar = ramble.config.scopes_metavar
# Create
create_parser = sp.add_parser(
"create", help=repo_create.__doc__, description=repo_create.__doc__
)
create_parser.add_argument("directory", help="directory to create the repo in")
create_parser.add_argument(
"namespace",
metavar="new_namespace",
help="namespace to identify objects " "in the repository. defaults to the directory name",
nargs="?",
)
create_parser.add_argument(
"-d",
"--subdirectory",
action="store",
help=(
"subdirectory to store objects in the repository. "
"Default is determined by the type of repository. "
"Use an empty string for no subdirectory."
),
)
ramble.cmd.common.arguments.add_common_arguments(create_parser, ["repo_type"])
# List
list_parser = sp.add_parser("list", help=repo_list.__doc__, description=repo_list.__doc__)
list_parser.add_argument(
"--scope",
choices=scopes,
metavar=scopes_metavar,
default=ramble.config.default_list_scope(),
help="configuration scope to read from",
)
ramble.cmd.common.arguments.add_common_arguments(list_parser, ["repo_type"])
# Add
add_parser = sp.add_parser("add", help=repo_add.__doc__, description=repo_add.__doc__)
add_parser.add_argument("path", help="path to a Ramble repository directory")
add_parser.add_argument(
"--scope",
choices=scopes,
metavar=scopes_metavar,
default=ramble.config.default_modify_scope(),
help="configuration scope to modify",
)
ramble.cmd.common.arguments.add_common_arguments(add_parser, ["repo_type"])
# Remove
remove_parser = sp.add_parser(
"remove", help=repo_remove.__doc__, description=repo_remove.__doc__, aliases=["rm"]
)
remove_parser.add_argument(
"namespace_or_path", help="namespace or path of a Ramble repository"
)
remove_parser.add_argument(
"--scope",
choices=scopes,
metavar=scopes_metavar,
default=ramble.config.default_modify_scope(),
help="configuration scope to modify",
)
ramble.cmd.common.arguments.add_common_arguments(remove_parser, ["repo_type"])
[docs]
def repo_create(args):
"""Create a new repository."""
if args.type == "any":
unified_repo = True
obj_type = ramble.repository.default_type
repo_type = "applications and modifiers"
register_type = ""
else:
unified_repo = False
obj_type = ramble.repository.ObjectTypes[args.type]
repo_type = ramble.repository.ObjectTypes[args.type].name
register_type = f" -t {repo_type}"
subdir = args.subdirectory
full_path, namespace = ramble.repository.create_repo(
args.directory, args.namespace, subdir, object_type=obj_type, unified_repo=unified_repo
)
logger.msg(f"Created {repo_type} repo with namespace '{namespace}'.")
logger.msg(
"To register it with ramble, run this command:",
f"ramble repo{register_type} add {full_path}",
)
[docs]
def repo_add(args):
"""Add a repository to Ramble's configuration."""
path = args.path
# real_path is absolute and handles substitution.
canon_path = ramble.util.path.canonicalize_path(path)
# check if the path exists
if not os.path.exists(canon_path):
logger.die(f"No such file or directory: {path}")
# Make sure the path is a directory.
if not os.path.isdir(canon_path):
logger.die(f"Not a Ramble repository: {path}")
if args.type == "any":
# For 'any' type, first determine the namespace from repo.yaml
# without trying to instantiate a full Repo object, which might fail
# for partial repos.
repo_config_name = None
repo_config_file = None
for config_name_candidate in ramble.repository.type_definitions[
ramble.repository.default_type
]["accepted_configs"]:
config_file_candidate = os.path.join(canon_path, config_name_candidate)
if os.path.exists(config_file_candidate):
repo_config_name = config_name_candidate
repo_config_file = config_file_candidate
break
if not repo_config_file:
raise ramble.repository.BadRepoError(f"No valid config file found in '{path}'")
if not os.path.isfile(repo_config_file):
raise ramble.repository.BadRepoError(f"No {repo_config_name} found in '{path}'")
try:
with open(repo_config_file) as f:
yaml_data = ramble.repository.yaml.safe_load(f)
if (
not yaml_data
or "repo" not in yaml_data
or not isinstance(yaml_data["repo"], dict)
):
logger.die(f"Invalid {repo_config_name} in repository {path}")
repo_namespace = yaml_data["repo"].get("namespace")
if not repo_namespace:
logger.die(f"Namespace not defined in {repo_config_name} in repository {path}")
if not ramble.repository.re.match(r"[a-zA-Z][a-zA-Z0-9_.]+", repo_namespace):
logger.die(
f"Invalid namespace '{repo_namespace}' in repo '{path}'. "
"Namespaces must be valid python identifiers separated by '.'"
)
except Exception as e:
raise ramble.repository.BadRepoError(
f"Error reading or parsing {repo_config_name} in '{path}'"
) from e
# Now that we have a valid namespace, check for at least one object type directory
at_least_one_object_type_found = False
for obj_type in ramble.repository.ObjectTypes:
objects_dir_name = yaml_data["repo"].get("subdirectory")
if objects_dir_name is None:
objects_dir_name = ramble.repository.type_definitions[obj_type]["dir_name"]
objects_full_path = os.path.join(canon_path, objects_dir_name)
if os.path.isdir(objects_full_path):
at_least_one_object_type_found = True
break
if not at_least_one_object_type_found:
raise ramble.repository.BadRepoError(
f"The given path {path} is not a valid repo for any object types "
f"as no object type directory (e.g., 'applications/') was found."
)
# Now add the canonical path for all object types
for obj_type in ramble.repository.ObjectTypes:
type_def = ramble.repository.type_definitions[obj_type]
repos = ramble.config.get(type_def["config_section"], scope=args.scope) or []
# Check if canonical path is already present in the list of repos for this type
is_present = False
for r_path in repos:
if ramble.util.path.canonicalize_path(r_path) == canon_path:
is_present = True
break
if is_present:
logger.warn(
f"{obj_type.name} repository is already registered with Ramble: {path}"
)
else:
repos.insert(0, canon_path)
ramble.config.set(type_def["config_section"], repos, args.scope)
logger.msg(f"Added {obj_type.name} repo with namespace '{repo_namespace}'.")
else: # This is the original logic for a specific type
obj_type = ramble.repository.ObjectTypes[args.type]
type_def = ramble.repository.type_definitions[obj_type]
allow_partial = False # For specific type, we don't allow partial
# Make sure it's actually a ramble repository by constructing it.
try:
repo = ramble.repository.Repo(canon_path, obj_type)
except ramble.repository.BadRepoError as e:
# Wrap the error to give a clearer message
if not allow_partial:
raise ramble.repository.BadRepoError(
f"Failed to find valid repo with type {obj_type}"
) from e
repo = None # Should not happen in this branch due to allow_partial=False
# If that succeeds, finally add it to the configuration.
if not repo:
# This case should ideally not be reached if allow_partial is False
# and BadRepoError is re-raised
raise ramble.repository.BadRepoError(
f"The given path {path} is not a valid repo for type {obj_type.name}"
)
repos = ramble.config.get(type_def["config_section"], scope=args.scope) or []
if repo.root in repos or path in repos:
logger.warn(f"{obj_type.name} repository is already registered with Ramble: {path}")
else:
repos.insert(0, canon_path)
ramble.config.set(type_def["config_section"], repos, args.scope)
logger.msg(f"Added {obj_type.name} repo with namespace '{repo.namespace}'.")
[docs]
def repo_remove(args):
"""Remove a repository from Ramble's configuration."""
if args.type == "any":
obj_types = ramble.repository.ObjectTypes
else:
obj_types = [ramble.repository.ObjectTypes[args.type]]
if args.scope:
scopes_to_check = [args.scope]
else:
# Highest precedence first
scopes_to_check = reversed([s.name for s in ramble.config.config.file_scopes])
repo_removed = False
for scope in scopes_to_check:
for obj_type in obj_types:
type_def = ramble.repository.type_definitions[obj_type]
repos = ramble.config.get(type_def["config_section"], scope=scope)
if not repos:
continue
namespace_or_path = args.namespace_or_path
canon_path = ramble.util.path.canonicalize_path(namespace_or_path)
normalized_path_to_remove = os.path.normcase(os.path.normpath(canon_path))
path_found_and_removed = False
for repo_path in repos:
repo_canon_path = ramble.util.path.canonicalize_path(repo_path)
normalized_repo_path = os.path.normcase(os.path.normpath(repo_canon_path))
if normalized_path_to_remove == normalized_repo_path:
repos.remove(repo_path)
ramble.config.set(type_def["config_section"], repos, scope)
logger.msg(
f"Removed {obj_type.name} repository {repo_path} from scope '{scope}'."
)
repo_removed = True
path_found_and_removed = True
break # move to next obj_type
if path_found_and_removed:
continue
for path in list(repos):
try:
repo = ramble.repository.Repo(path, obj_type)
if repo.namespace == namespace_or_path:
repos.remove(path)
ramble.config.set(type_def["config_section"], repos, scope)
logger.msg(
f"Removed {obj_type.name} repository {repo.root} "
f"with namespace '{repo.namespace}' from scope '{scope}'."
)
repo_removed = True
break
except ramble.repository.RepoError:
continue
if repo_removed and not args.scope:
break
if not repo_removed:
all_types = [str(obj_type.name) for obj_type in obj_types]
logger.die(
f"No repository for {all_types} with path or namespace: {args.namespace_or_path}"
)
[docs]
def repo_list(args):
"""Show registered repositories and their namespaces."""
if args.type == "any":
obj_types = ramble.repository.ObjectTypes
else:
obj_types = [ramble.repository.ObjectTypes[args.type]]
for obj_type in obj_types:
type_def = ramble.repository.type_definitions[obj_type]
roots = ramble.config.get(type_def["config_section"], scope=args.scope)
repos = []
for r in roots:
try:
repos.append(ramble.repository.Repo(r, obj_type))
except ramble.repository.RepoError:
continue
if sys.stdout.isatty():
msg = f"{len(repos)} {obj_type.name} repositor"
msg += "y." if len(repos) == 1 else "ies."
logger.msg(msg)
if not repos:
return
max_ns_len = max(len(r.namespace) for r in repos)
for repo in repos:
fmt = "%%-%ds%%s" % (max_ns_len + 4)
print(fmt % (repo.namespace, repo.root))
[docs]
def repo(parser, args):
action = {
"create": repo_create,
"list": repo_list,
"add": repo_add,
"remove": repo_remove,
"rm": repo_remove,
}
if args.type != "any" and args.type not in ramble.repository.OBJECT_NAMES:
logger.die(f"Repository type '{args.type}' is not valid.")
action[args.repo_command](args)