# Copyright 2022-2026 The Ramble Authors
#
# Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
# https://www.apache.org/licenses/LICENSE-2.0> or the MIT license
# <LICENSE-MIT or https://opensource.org/licenses/MIT>, at your
# option. This file may not be copied, modified, or distributed
# except according to those terms.
"""
Fetch strategies are used to download source code into a staging area
in order to build it. They need to define the following methods:
* fetch()
This should attempt to download/check out source from somewhere.
* check()
Apply a checksum to the downloaded source code, e.g. for an archive.
May not do anything if the fetch method was safe to begin with.
* expand()
Expand (e.g., an archive) downloaded file to source, with the
standard stage source path as the destination directory.
* reset()
Restore original state of downloaded code. Used by clean commands.
This may just remove the expanded source and re-expand an archive,
or it may run something like git reset --hard.
* archive()
Archive a source directory, e.g. for creating a mirror.
"""
import copy
import functools
import itertools
import os
import os.path
import re
import shutil
import sys
import urllib.parse
from typing import List, Optional
from llnl.util import tty
from llnl.util.filesystem import (
get_single_file,
mkdirp,
rename,
temp_cwd,
temp_rename,
working_dir,
)
import ramble.config
import ramble.util.web as web_util
from ramble.util.logger import logger
import spack.util.url as url_util
import spack.version
from spack.util import crypto, pattern
from spack.util.compression import decompressor_for, extension
from spack.util.executable import CommandNotFoundError, which
from spack.version import ver
#: List of all fetch strategies, created by FetchStrategy metaclass.
all_strategies = []
CONTENT_TYPE_MISMATCH_WARNING_TEMPLATE = (
"The contents of {subject} look like {content_type}. Either the URL"
" you are trying to use does not exist or you have an internet gateway"
" issue. You can remove the bad archive using 'ramble clean',"
" then try again using the correct URL."
)
[docs]
def warn_content_type_mismatch(subject, content_type="HTML"):
logger.warn(
CONTENT_TYPE_MISMATCH_WARNING_TEMPLATE.format(subject=subject, content_type=content_type)
)
def _needs_stage(fun):
"""Many methods on fetch strategies require a stage to be set
using set_stage(). This decorator adds a check for self.stage."""
@functools.wraps(fun)
def wrapper(self, *args, **kwargs):
if not self.stage:
raise NoStageError(fun)
return fun(self, *args, **kwargs)
return wrapper
def _ensure_one_stage_entry(stage_path):
"""Ensure there is only one stage entry in the stage path."""
stage_entries = os.listdir(stage_path)
assert len(stage_entries) == 1
return os.path.join(stage_path, stage_entries[0])
[docs]
def fetcher(cls):
"""Decorator used to register fetch strategies."""
all_strategies.append(cls)
return cls
[docs]
class FetchStrategy:
"""Superclass of all fetch strategies."""
#: The URL attribute must be specified either at the package class
#: level, or as a keyword argument to ``version()``. It is used to
#: distinguish fetchers for different versions in the package DSL.
url_attr: Optional[str] = None
#: Optional attributes can be used to distinguish fetchers when :
#: classes have multiple ``url_attrs`` at the top-level.
# optional attributes in version() args.
optional_attrs: List[str] = []
url: Optional[str] = None
def __init__(self, **kwargs):
# The stage is initialized late, so that fetch strategies can be
# constructed at package construction time. This is where things
# will be fetched.
self.stage = None
# Enable or disable caching for this strategy based on
# 'no_cache' option from version directive.
self.cache_enabled = not kwargs.pop("no_cache", False)
# Subclasses need to implement these methods
[docs]
def fetch(self):
"""Fetch source code archive or repo.
Returns:
bool: True on success, False on failure.
"""
[docs]
def check(self):
"""Checksum the archive fetched by this FetchStrategy."""
[docs]
def expand(self):
"""Expand the downloaded archive into the stage source path."""
[docs]
def reset(self):
"""Revert to freshly downloaded state.
For archive files, this may just re-expand the archive.
"""
[docs]
def archive(self, destination):
"""Create an archive of the downloaded data for a mirror.
For downloaded files, this should preserve the checksum of the
original file. For repositories, it should just create an
expandable tarball out of the downloaded repository.
"""
@property
def cachable(self):
"""Whether fetcher is capable of caching the resource it retrieves.
This generally is determined by whether the resource is
identifiably associated with a specific package version.
Returns:
bool: True if can cache, False otherwise.
"""
[docs]
def source_id(self):
"""A unique ID for the source.
It is intended that a human could easily generate this themselves using
the information available to them in the Spack package.
The returned value is added to the content which determines the full
hash for a package using `str()`.
"""
raise NotImplementedError
[docs]
def mirror_id(self):
"""This is a unique ID for a source that is intended to help identify
reuse of resources across packages.
It is unique like source-id, but it does not include the package name
and is not necessarily easy for a human to create themselves.
"""
raise NotImplementedError
def __str__(self): # Should be human readable URL.
return "FetchStrategy.__str___"
[docs]
@classmethod
def matches(cls, args):
"""Predicate that matches fetch strategies to arguments of
the version directive.
Args:
args: arguments of the version directive
"""
return cls.url_attr in args
[docs]
@fetcher
class BundleFetchStrategy(FetchStrategy):
"""
Fetch strategy associated with bundle, or no-code, packages.
Having a basic fetch strategy is a requirement for executing post-install
hooks. Consequently, this class provides the API but does little more
than log messages.
TODO: Remove this class by refactoring resource handling and the link
between composite stages and composite fetch strategies (see #11981).
"""
#: There is no associated URL keyword in ``version()`` for no-code
#: packages but this property is required for some strategy-related
#: functions (e.g., check_pkg_attributes).
url_attr = ""
[docs]
def fetch(self):
"""Simply report success -- there is no code to fetch."""
return True
@property
def cachable(self):
"""Report False as there is no code to cache."""
return False
[docs]
def source_id(self):
"""BundlePackages don't have a source id."""
return ""
[docs]
def mirror_id(self):
"""BundlePackages don't have a mirror id."""
[docs]
class FetchStrategyComposite(pattern.Composite):
"""Composite for a FetchStrategy object."""
matches = FetchStrategy.matches
def __init__(self):
super().__init__(["fetch", "check", "expand", "reset", "archive", "cachable", "mirror_id"])
[docs]
def source_id(self):
component_ids = tuple(i.source_id() for i in self)
if all(component_ids):
return component_ids
[docs]
@fetcher
class URLFetchStrategy(FetchStrategy):
"""URLFetchStrategy pulls source code from a URL for an archive, check the
archive against a checksum, and decompresses the archive.
The destination for the resulting file(s) is the standard stage path.
"""
url_attr = "url"
# these are checksum types. The generic 'checksum' is deprecated for
# specific hash names, but we need it for backward compatibility
optional_attrs = list(crypto.hashes.keys()) + ["checksum"]
def __init__(self, url=None, checksum=None, **kwargs):
super().__init__(**kwargs)
# Prefer values in kwargs to the positionals.
self.url = kwargs.get("url", url)
self.mirrors = kwargs.get("mirrors", [])
# digest can be set as the first argument, or from an explicit
# kwarg by the hash name.
self.digest = kwargs.get("checksum", checksum)
for h in self.optional_attrs:
if h in kwargs:
self.digest = kwargs[h]
self.expand_archive = kwargs.get("expand", True)
self.extra_options = kwargs.get("fetch_options", {})
self._curl = None
self.extension = kwargs.get("extension")
if not self.url:
raise ValueError("URLFetchStrategy requires a url for fetching.")
@property
def curl(self):
if not self._curl:
try:
self._curl = which("curl", required=True)
except CommandNotFoundError as exc:
logger.error(str(exc))
return self._curl
[docs]
def source_id(self):
return self.digest
[docs]
def mirror_id(self):
if not self.digest:
return None
# The filename is the digest. A directory is also created based on
# truncating the digest to avoid creating a directory with too many
# entries
return os.path.sep.join(["archive", self.digest[:2], self.digest])
@property
def candidate_urls(self):
urls = []
for url in itertools.chain([self.url], self.mirrors or []):
# This must be skipped on Windows due to URL encoding
# of ':' characters on filepaths on Windows
if sys.platform != "win32" and url.startswith("file://"):
path = urllib.parse.quote(url[len("file://") :])
url = "file://" + path
urls.append(url)
return urls
[docs]
@_needs_stage
def fetch(self):
if self.archive_file:
logger.debug(f"Already downloaded {self.archive_file}")
return
url = None
errors = []
for url in self.candidate_urls:
if not self._existing_url(url):
continue
try:
partial_file, save_file = self._fetch_from_url(url)
if save_file and (partial_file is not None):
rename(partial_file, save_file)
break
except FailedDownloadError as e:
errors.append(str(e))
for msg in errors:
logger.debug(msg)
if not self.archive_file:
raise FailedDownloadError(url)
def _existing_url(self, url):
logger.debug(f"Checking existence of {url}")
if ramble.config.get("config:url_fetch_method") == "curl":
curl = self.curl
# Telling curl to fetch the first byte (-r 0-0) is supposed to be
# portable.
curl_args = ["--stderr", "-", "-s", "-f", "-r", "0-0", url]
if not ramble.config.get("config:verify_ssl"):
curl_args.append("-k")
_ = curl(*curl_args, fail_on_error=False, output=os.devnull)
return curl.returncode == 0
else:
# Telling urllib to check if url is accessible
try:
url, _, response = ramble.util.web.read_from_url(url)
except ramble.util.web.SpackWebError as werr:
msg = f"Urllib fetch failed to verify url {url}\n with error {werr}"
raise FailedDownloadError(url, msg) from None
return response.getcode() is None or response.getcode() == 200
def _fetch_from_url(self, url):
if ramble.config.get("config:url_fetch_method") == "curl":
return self._fetch_curl(url)
else:
return self._fetch_urllib(url)
def _check_headers(self, headers):
# Check if we somehow got an HTML file rather than the archive we
# asked for. We only look at the last content type, to handle
# redirects properly.
content_types = re.findall(r"Content-Type:[^\r\n]+", headers, flags=re.IGNORECASE)
if content_types and "text/html" in content_types[-1]:
warn_content_type_mismatch(self.archive_file or "the archive")
@_needs_stage
def _fetch_urllib(self, url):
save_file = None
if self.stage.save_filename:
save_file = self.stage.save_filename
logger.msg(f"Fetching {url}")
# Check if we're about to try and open a broken symlink, and if so
# remove that file to avoid a bad situation where a file "exists" but
# cannot be opened (warning: this is not atomic)
if os.path.islink(save_file) and not os.path.exists(save_file):
os.unlink(save_file)
# Run urllib but grab the mime type from the http headers
try:
url, headers, response = ramble.util.web.read_from_url(url)
except ramble.util.web.SpackWebError as e:
# clean up archive on failure.
if self.archive_file:
os.remove(self.archive_file)
if save_file and os.path.exists(save_file):
os.remove(save_file)
msg = f"urllib failed to fetch with error {e}"
raise FailedDownloadError(url, msg) from None
with open(save_file, "wb") as _open_file:
shutil.copyfileobj(response, _open_file)
self._check_headers(str(headers))
return None, save_file
@_needs_stage
def _fetch_curl(self, url):
save_file = None
partial_file = None
if self.stage.save_filename:
save_file = self.stage.save_filename
partial_file = self.stage.save_filename + ".part"
logger.msg(f"Fetching {url}")
if partial_file:
save_args = [
"-C",
"-", # continue partial downloads
"-o",
partial_file,
] # use a .part file
else:
save_args = ["-O"]
curl_args = save_args + [
"-f", # fail on >400 errors
"-D",
"-", # print out HTML headers
"-L", # resolve 3xx redirects
url,
]
if not ramble.config.get("config:verify_ssl"):
curl_args.append("-k")
if sys.stdout.isatty() and tty.msg_enabled():
curl_args.append("-#") # status bar when using a tty
else:
curl_args.append("-sS") # show errors if fail
connect_timeout = ramble.config.get("config:connect_timeout", 10)
if self.extra_options:
cookie = self.extra_options.get("cookie")
if cookie:
curl_args.append("-j") # junk cookies
curl_args.append("-b") # specify cookie
curl_args.append(cookie)
timeout = self.extra_options.get("timeout")
if timeout:
connect_timeout = max(connect_timeout, int(timeout))
if connect_timeout > 0:
# Timeout if can't establish a connection after n sec.
curl_args.extend(["--connect-timeout", str(connect_timeout)])
# Run curl but grab the mime type from the http headers
curl = self.curl
with working_dir(self.stage.path):
headers = curl(*curl_args, output=str, fail_on_error=False)
if curl.returncode != 0:
# clean up archive on failure.
if self.archive_file:
os.remove(self.archive_file)
if partial_file and os.path.exists(partial_file):
os.remove(partial_file)
if curl.returncode == 22:
# This is a 404. Curl will print the error.
raise FailedDownloadError(url, f"URL {url} was not found!")
elif curl.returncode == 60:
# This is a certificate error. Suggest spack -k
raise FailedDownloadError(
url,
"Curl was unable to fetch due to invalid certificate. "
"This is either an attack, or your cluster's SSL "
"configuration is bad. If you believe your SSL "
"configuration is bad, you can try running spack -k, "
"which will not check SSL certificates."
"Use this at your own risk.",
)
else:
# This is some other curl error. Curl will print the
# error, but print a spack message too
raise FailedDownloadError(url, "Curl failed with error %d" % curl.returncode)
# Check if we somehow got an HTML file rather than the archive we
# asked for. We only look at the last content type, to handle
# redirects properly.
content_types = re.findall(r"Content-Type:[^\r\n]+", headers, flags=re.IGNORECASE)
if content_types and "text/html" in content_types[-1]:
warn_content_type_mismatch(self.archive_file or "the archive")
return partial_file, save_file
@property
@_needs_stage
def archive_file(self):
"""Path to the source archive within this stage directory."""
return self.stage.archive_file
@property
def cachable(self):
return self.cache_enabled and bool(self.digest)
[docs]
@_needs_stage
def expand(self):
if not self.expand_archive:
logger.debug(
f"Staging unexpanded archive {self.archive_file} " f"in {self.stage.source_path}"
)
if not self.stage.expanded:
mkdirp(self.stage.source_path)
dest = os.path.join(self.stage.source_path, os.path.basename(self.archive_file))
shutil.move(self.archive_file, dest)
return
logger.debug(f"Staging archive: {self.archive_file}")
if not self.archive_file:
raise NoArchiveFileError(
"Couldn't find archive file", f"Failed on expand() for URL {self.url}"
)
if not self.extension:
self.extension = extension(self.archive_file)
if self.stage.expanded:
logger.debug(f"Source already staged to {self.stage.source_path}")
return
decompress = decompressor_for(self.archive_file, self.extension)
# Expand all tarballs in their own directory to contain
# exploding tarballs.
tarball_container = os.path.join(self.stage.path, "expanded-archive")
# Below we assume that the command to decompress expand the
# archive in the current working directory
mkdirp(tarball_container)
with working_dir(tarball_container):
decompress(self.archive_file)
# Check for an exploding tarball, i.e. one that doesn't expand to
# a single directory. If the tarball *didn't* explode, move its
# contents to the staging source directory & remove the container
# directory. If the tarball did explode, just rename the tarball
# directory to the staging source directory.
#
# NOTE: The tar program on Mac OS X will encode HFS metadata in
# hidden files, which can end up *alongside* a single top-level
# directory. We initially ignore presence of hidden files to
# accommodate these "semi-exploding" tarballs but ensure the files
# are copied to the source directory.
files = os.listdir(tarball_container)
non_hidden = [f for f in files if not f.startswith(".")]
if len(non_hidden) == 1:
src = os.path.join(tarball_container, non_hidden[0])
if os.path.isdir(src):
self.stage.srcdir = non_hidden[0]
shutil.move(src, self.stage.source_path)
if len(files) > 1:
files.remove(non_hidden[0])
for f in files:
src = os.path.join(tarball_container, f)
dest = os.path.join(self.stage.path, f)
shutil.move(src, dest)
os.rmdir(tarball_container)
else:
# This is a non-directory entry (e.g., a patch file) so simply
# rename the tarball container to be the source path.
shutil.move(tarball_container, self.stage.source_path)
else:
shutil.move(tarball_container, self.stage.source_path)
[docs]
def archive(self, destination):
"""Just moves this archive to the destination."""
if not self.archive_file:
raise NoArchiveFileError("Cannot call archive() before fetching.")
web_util.push_to_url(self.archive_file, destination, keep_original=True)
[docs]
@_needs_stage
def check(self):
"""Check the downloaded archive against a checksum digest.
No-op if this stage checks code out of a repository."""
if not self.digest:
raise NoDigestError("Attempt to check URLFetchStrategy with no digest.")
checker = crypto.Checker(self.digest)
if not checker.check(self.archive_file):
raise ChecksumError(
f"{checker.hash_name} checksum failed for {self.archive_file}",
f"Expected {self.digest} but got {checker.sum}",
)
[docs]
@_needs_stage
def reset(self):
"""
Removes the source path if it exists, then re-expands the archive.
"""
if not self.archive_file:
raise NoArchiveFileError(
"Tried to reset URLFetchStrategy before fetching",
f"Failed on reset() for URL {self.url}",
)
# Remove everything but the archive from the stage
for filename in os.listdir(self.stage.path):
abspath = os.path.join(self.stage.path, filename)
if abspath != self.archive_file:
shutil.rmtree(abspath, ignore_errors=True)
# Expand the archive again
self.expand()
def __repr__(self):
url = self.url if self.url else "no url"
return f"{self.__class__.__name__}<{url}>"
def __str__(self):
if self.url:
return self.url
else:
return "[no url]"
[docs]
@fetcher
class CacheURLFetchStrategy(URLFetchStrategy):
"""The resource associated with a cache URL may be out of date."""
[docs]
@_needs_stage
def fetch(self):
path = re.sub("^file://", "", self.url)
# check whether the cache file exists.
if not os.path.isfile(path):
raise NoCacheError(f"No cache of {path}")
# remove old symlink if one is there.
filename = self.stage.save_filename
if os.path.exists(filename):
os.remove(filename)
# Symlink to local cached archive.
os.symlink(path, filename)
# Remove link if checksum fails, or subsequent fetchers
# will assume they don't need to download.
if self.digest:
try:
self.check()
except ChecksumError:
os.remove(self.archive_file)
raise
# Notify the user how we fetched.
logger.msg(f"Using cached archive: {path}")
[docs]
class VCSFetchStrategy(FetchStrategy):
"""Superclass for version control system fetch strategies.
Like all fetchers, VCS fetchers are identified by the attributes
passed to the ``version`` directive. The optional_attrs for a VCS
fetch strategy represent types of revisions, e.g. tags, branches,
commits, etc.
The required attributes (git, svn, etc.) are used to specify the URL
and to distinguish a VCS fetch strategy from a URL fetch strategy.
"""
branch: Optional[str] = None
tag: Optional[str] = None
commit: Optional[str] = None
revision: Optional[str] = None
def __init__(self, **kwargs):
super().__init__(**kwargs)
# Set a URL based on the type of fetch strategy.
self.url = kwargs.get(self.url_attr)
if not self.url:
raise ValueError(f"{self.__class__} requires {self.url_attr} argument.")
for attr in self.optional_attrs:
setattr(self, attr, kwargs.get(attr))
[docs]
@_needs_stage
def check(self):
logger.debug(f"No checksum needed when fetching with {self.url_attr}")
[docs]
@_needs_stage
def expand(self):
logger.debug(f"Source fetched with {self.url_attr} is already expanded.")
[docs]
@_needs_stage
def archive(self, destination, **kwargs):
assert extension(destination) == "tar.gz"
assert self.stage.source_path.startswith(self.stage.path)
tar = which("tar", required=True)
patterns = kwargs.get("exclude")
if patterns is not None:
if isinstance(patterns, str):
patterns = [patterns]
for p in patterns:
tar.add_default_arg(f"--exclude={p}")
with working_dir(self.stage.path):
if self.stage.srcdir:
# Here we create an archive with the default repository name.
# The 'tar' command has options for changing the name of a
# directory that is included in the archive, but they differ
# based on OS, so we temporarily rename the repo
with temp_rename(self.stage.source_path, self.stage.srcdir):
tar("-czf", destination, self.stage.srcdir)
else:
tar("-czf", destination, os.path.basename(self.stage.source_path))
def __str__(self):
return f"VCS: {self.url}"
def __repr__(self):
return f"{self.__class__}<{self.url}>"
[docs]
@fetcher
class GitFetchStrategy(VCSFetchStrategy):
"""
Fetch strategy that gets source code from a git repository.
Use like this in a package:
version('name', git='https://github.com/project/repo.git')
Optionally, you can provide a branch, or commit to check out, e.g.:
version('1.1', git='https://github.com/project/repo.git', tag='v1.1')
You can use these three optional attributes in addition to ``git``:
* ``branch``: Particular branch to build from (default is the
repository's default branch)
* ``tag``: Particular tag to check out
* ``commit``: Particular commit hash in the repo
Repositories are cloned into the standard stage source path directory.
"""
url_attr = "git"
optional_attrs = [
"tag",
"branch",
"commit",
"submodules",
"get_full_repo",
"submodules_delete",
]
git_version_re = r"git version (\S+)"
submodules: bool = False
submodules_delete: bool = False
get_full_repo: bool = False
def __init__(self, **kwargs):
# Discards the keywords in kwargs that may conflict with the next call
# to __init__
forwarded_args = copy.copy(kwargs)
forwarded_args.pop("name", None)
super().__init__(**forwarded_args)
self._git = None
self.submodules = kwargs.get("submodules", False)
self.submodules_delete = kwargs.get("submodules_delete", False)
self.get_full_repo = kwargs.get("get_full_repo", False)
@property
def git_version(self):
return GitFetchStrategy.version_from_git(self.git)
[docs]
@staticmethod
def version_from_git(git_exe):
"""Given a git executable, return the Version (this will fail if
the output cannot be parsed into a valid Version).
"""
version_output = git_exe("--version", output=str)
m = re.search(GitFetchStrategy.git_version_re, version_output)
return spack.version.Version(m.group(1))
@property
def git(self):
if not self._git:
self._git = which("git", required=True)
# Disable advice for a quieter fetch
# https://github.com/git/git/blob/master/Documentation/RelNotes/1.7.2.txt
if self.git_version >= spack.version.Version("1.7.2"):
self._git.add_default_arg("-c")
self._git.add_default_arg("advice.detachedHead=false")
# If the user asked for insecure fetching, make that work
# with git as well.
if not ramble.config.get("config:verify_ssl"):
self._git.add_default_env("GIT_SSL_NO_VERIFY", "true")
return self._git
@property
def cachable(self):
return self.cache_enabled and bool(self.commit or self.tag)
[docs]
def source_id(self):
return self.commit or self.tag
[docs]
def mirror_id(self):
repo_ref = self.commit or self.tag or self.branch
if repo_ref:
repo_path = url_util.parse(self.url).path
result = os.path.sep.join(["git", repo_path, repo_ref])
return result
def _repo_info(self):
args = ""
if self.commit:
args = f" at commit {self.commit}"
elif self.tag:
args = f" at tag {self.tag}"
elif self.branch:
args = f" on branch {self.branch}"
return f"{self.url}{args}"
[docs]
@_needs_stage
def fetch(self):
if self.stage.expanded:
logger.debug(f"Already fetched {self.stage.source_path}")
return
self.clone(commit=self.commit, branch=self.branch, tag=self.tag)
[docs]
def clone(self, dest=None, commit=None, branch=None, tag=None, bare=False):
"""
Clone a repository to a path.
This method handles cloning from git, but does not require a stage.
Arguments:
dest (str | None): The path into which the code is cloned. If None,
requires a stage and uses the stage's source path.
commit (str | None): A commit to fetch from the remote. Only one of
commit, branch, and tag may be non-None.
branch (str | None): A branch to fetch from the remote.
tag (str | None): A tag to fetch from the remote.
bare (bool): Execute a "bare" git clone (--bare option to git)
"""
# Default to spack source path
dest = dest or self.stage.source_path
logger.debug(f"Cloning git repository: {self._repo_info()}")
git = self.git
debug = ramble.config.get("config:debug")
if bare:
# We don't need to worry about which commit/branch/tag is checked out
clone_args = ["clone", "--bare"]
if not debug:
clone_args.append("--quiet")
clone_args.extend([self.url, dest])
git(*clone_args)
elif commit:
# Need to do a regular clone and check out everything if
# they asked for a particular commit.
clone_args = ["clone", self.url]
if not debug:
clone_args.insert(1, "--quiet")
with temp_cwd():
git(*clone_args)
repo_name = get_single_file(".")
if self.stage:
self.stage.srcdir = repo_name
shutil.move(repo_name, dest)
with working_dir(dest):
checkout_args = ["checkout", commit]
if not debug:
checkout_args.insert(1, "--quiet")
git(*checkout_args)
else:
# Can be more efficient if not checking out a specific commit.
args = ["clone"]
if not ramble.config.get("config:debug"):
args.append("--quiet")
# If we want a particular branch ask for it.
if self.branch:
args.extend(["--branch", self.branch])
elif self.tag and self.git_version >= ver("1.8.5.2"):
args.extend(["--branch", self.tag])
# Try to be efficient if we're using a new enough git.
# This checks out only one branch's history
if self.git_version >= ver("1.7.10"):
if self.get_full_repo:
args.append("--no-single-branch")
else:
args.append("--single-branch")
with temp_cwd():
# Yet more efficiency: only download a 1-commit deep
# tree, if the in-use git and protocol permit it.
if (
(not self.get_full_repo)
and self.git_version >= ver("1.7.1")
and self.protocol_supports_shallow_clone()
):
args.extend(["--depth", "1"])
args.extend([self.url])
git(*args)
repo_name = get_single_file(".")
self.stage.srcdir = repo_name
shutil.move(repo_name, self.stage.source_path)
with working_dir(self.stage.source_path):
# For tags, be conservative and check them out AFTER
# cloning. Later git versions can do this with clone
# --branch, but older ones fail.
if self.tag and self.git_version < ver("1.8.5.2"):
# pull --tags returns a "special" error code of 1 in
# older versions that we have to ignore.
# see: https://github.com/git/git/commit/19d122b
pull_args = ["pull", "--tags"]
co_args = ["checkout", self.tag]
if not ramble.config.get("config:debug"):
pull_args.insert(1, "--quiet")
co_args.insert(1, "--quiet")
git(*pull_args, ignore_errors=1)
git(*co_args)
if self.submodules_delete:
with working_dir(self.stage.source_path):
for submodule_to_delete in self.submodules_delete:
args = ["rm", submodule_to_delete]
if not ramble.config.get("config:debug"):
args.insert(1, "--quiet")
git(*args)
# Init submodules if the user asked for them.
if self.submodules:
with working_dir(self.stage.source_path):
args = ["submodule", "update", "--init", "--recursive"]
if not ramble.config.get("config:debug"):
args.insert(1, "--quiet")
git(*args)
[docs]
def archive(self, destination):
super().archive(destination, exclude=".git")
[docs]
@_needs_stage
def reset(self):
with working_dir(self.stage.source_path):
co_args = ["checkout", "."]
clean_args = ["clean", "-f"]
if ramble.config.get("config:debug"):
co_args.insert(1, "--quiet")
clean_args.insert(1, "--quiet")
self.git(*co_args)
self.git(*clean_args)
[docs]
def protocol_supports_shallow_clone(self):
"""Shallow clone operations (--depth #) are not supported by the basic
HTTP protocol or by no-protocol file specifications.
Use (e.g.) https:// or file:// instead."""
return not (self.url.startswith("http://") or self.url.startswith("/"))
def __str__(self):
return f"[git] {self._repo_info()}"
[docs]
@fetcher
class CvsFetchStrategy(VCSFetchStrategy):
"""Fetch strategy that gets source code from a CVS repository.
Use like this in a package:
version('name',
cvs=':pserver:anonymous@www.example.com:/cvsroot%module=modulename')
Optionally, you can provide a branch and/or a date for the URL:
version('name',
cvs=':pserver:anonymous@www.example.com:/cvsroot%module=modulename',
branch='branchname', date='date')
Repositories are checked out into the standard stage source path directory.
"""
url_attr = "cvs"
optional_attrs = ["branch", "date"]
def __init__(self, **kwargs):
# Discards the keywords in kwargs that may conflict with the next call
# to __init__
forwarded_args = copy.copy(kwargs)
forwarded_args.pop("name", None)
super().__init__(**forwarded_args)
self._cvs = None
if self.branch is not None:
self.branch = str(self.branch)
if self.date is not None:
self.date = str(self.date)
@property
def cvs(self):
if not self._cvs:
self._cvs = which("cvs", required=True)
return self._cvs
@property
def cachable(self):
return self.cache_enabled and (bool(self.branch) or bool(self.date))
[docs]
def source_id(self):
if not (self.branch or self.date):
# We need a branch or a date to make a checkout reproducible
return None
id = "id"
if self.branch:
id += "-branch=" + self.branch
if self.date:
id += "-date=" + self.date
return id
[docs]
def mirror_id(self):
if not (self.branch or self.date):
# We need a branch or a date to make a checkout reproducible
return None
# Special-case handling because this is not actually a URL
elements = self.url.split(":")
final = elements[-1]
elements = final.split("/")
# Everything before the first slash is a port number
elements = elements[1:]
result = os.path.sep.join(["cvs"] + elements)
if self.branch:
result += "%branch=" + self.branch
if self.date:
result += "%date=" + self.date
return result
[docs]
@_needs_stage
def fetch(self):
if self.stage.expanded:
logger.debug("Already fetched {self.stage.source_path}")
return
logger.debug("Checking out CVS repository: {self.url}")
with temp_cwd():
url, module = self.url.split("%module=")
# Check out files
args = ["-z9", "-d", url, "checkout"]
if self.branch is not None:
args.extend(["-r", self.branch])
if self.date is not None:
args.extend(["-D", self.date])
args.append(module)
self.cvs(*args)
# Rename repo
repo_name = get_single_file(".")
self.stage.srcdir = repo_name
shutil.move(repo_name, self.stage.source_path)
def _remove_untracked_files(self):
"""Removes untracked files in a CVS repository."""
with working_dir(self.stage.source_path):
status = self.cvs("-qn", "update", output=str)
for line in status.split("\n"):
if re.match(r"^[?]", line):
path = line[2:].strip()
if os.path.isfile(path):
os.unlink(path)
[docs]
def archive(self, destination):
super().archive(destination, exclude="CVS")
[docs]
@_needs_stage
def reset(self):
self._remove_untracked_files()
with working_dir(self.stage.source_path):
self.cvs("update", "-C", ".")
def __str__(self):
return f"[cvs] {self.url}"
[docs]
@fetcher
class SvnFetchStrategy(VCSFetchStrategy):
"""Fetch strategy that gets source code from a subversion repository.
Use like this in a package:
version('name', svn='http://www.example.com/svn/trunk')
Optionally, you can provide a revision for the URL:
version('name', svn='http://www.example.com/svn/trunk',
revision='1641')
Repositories are checked out into the standard stage source path directory.
"""
url_attr = "svn"
optional_attrs = ["revision"]
def __init__(self, **kwargs):
# Discards the keywords in kwargs that may conflict with the next call
# to __init__
forwarded_args = copy.copy(kwargs)
forwarded_args.pop("name", None)
super().__init__(**forwarded_args)
self._svn = None
if self.revision is not None:
self.revision = str(self.revision)
@property
def svn(self):
if not self._svn:
self._svn = which("svn", required=True)
return self._svn
@property
def cachable(self):
return self.cache_enabled and bool(self.revision)
[docs]
def source_id(self):
return self.revision
[docs]
def mirror_id(self):
if self.revision:
repo_path = url_util.parse(self.url).path
result = os.path.sep.join(["svn", repo_path, self.revision])
return result
[docs]
@_needs_stage
def fetch(self):
if self.stage.expanded:
logger.debug(f"Already fetched {self.stage.source_path}")
return
logger.debug(f"Checking out subversion repository: {self.url}")
args = ["checkout", "--force", "--quiet"]
if self.revision:
args += ["-r", self.revision]
args.extend([self.url])
with temp_cwd():
self.svn(*args)
repo_name = get_single_file(".")
self.stage.srcdir = repo_name
shutil.move(repo_name, self.stage.source_path)
def _remove_untracked_files(self):
"""Removes untracked files in an svn repository."""
with working_dir(self.stage.source_path):
status = self.svn("status", "--no-ignore", output=str)
self.svn("status", "--no-ignore")
for line in status.split("\n"):
if not re.match("^[I?]", line):
continue
path = line[8:].strip()
if os.path.isfile(path):
os.unlink(path)
elif os.path.isdir(path):
shutil.rmtree(path, ignore_errors=True)
[docs]
def archive(self, destination):
super().archive(destination, exclude=".svn")
[docs]
@_needs_stage
def reset(self):
self._remove_untracked_files()
with working_dir(self.stage.source_path):
self.svn("revert", ".", "-R")
def __str__(self):
return f"[svn] {self.url}"
[docs]
@fetcher
class HgFetchStrategy(VCSFetchStrategy):
"""
Fetch strategy that gets source code from a Mercurial repository.
Use like this in a package:
version('name', hg='https://jay.grs.rwth-aachen.de/hg/lwm2')
Optionally, you can provide a branch, or revision to check out, e.g.:
version('torus',
hg='https://jay.grs.rwth-aachen.de/hg/lwm2', branch='torus')
You can use the optional 'revision' attribute to check out a
branch, tag, or particular revision in hg. To prevent
non-reproducible builds, using a moving target like a branch is
discouraged.
* ``revision``: Particular revision, branch, or tag.
Repositories are cloned into the standard stage source path directory.
"""
url_attr = "hg"
optional_attrs = ["revision"]
def __init__(self, **kwargs):
# Discards the keywords in kwargs that may conflict with the next call
# to __init__
forwarded_args = copy.copy(kwargs)
forwarded_args.pop("name", None)
super().__init__(**forwarded_args)
self._hg = None
@property
def hg(self):
"""
Returns:
Executable: the hg executable
"""
if not self._hg:
self._hg = which("hg", required=True)
# When building PythonPackages, Spack automatically sets
# PYTHONPATH. This can interfere with hg, which is a Python
# script. Unset PYTHONPATH while running hg.
self._hg.add_default_env("PYTHONPATH", "")
return self._hg
@property
def cachable(self):
return self.cache_enabled and bool(self.revision)
[docs]
def source_id(self):
return self.revision
[docs]
def mirror_id(self):
if self.revision:
repo_path = url_util.parse(self.url).path
result = os.path.sep.join(["hg", repo_path, self.revision])
return result
[docs]
@_needs_stage
def fetch(self):
if self.stage.expanded:
logger.debug(f"Already fetched {self.stage.source_path}")
return
args = []
if self.revision:
args.append(f"at revision {self.revision}")
logger.debug(f"Cloning mercurial repository: {self.url} {args}")
args = ["clone"]
if not ramble.config.get("config:verify_ssl"):
args.append("--insecure")
if self.revision:
args.extend(["-r", self.revision])
args.extend([self.url])
with temp_cwd():
self.hg(*args)
repo_name = get_single_file(".")
self.stage.srcdir = repo_name
shutil.move(repo_name, self.stage.source_path)
[docs]
def archive(self, destination):
super().archive(destination, exclude=".hg")
[docs]
@_needs_stage
def reset(self):
with working_dir(self.stage.path):
source_path = self.stage.source_path
scrubbed = "scrubbed-source-tmp"
args = ["clone"]
if self.revision:
args += ["-r", self.revision]
args += [source_path, scrubbed]
self.hg(*args)
shutil.rmtree(source_path, ignore_errors=True)
shutil.move(scrubbed, source_path)
def __str__(self):
return f"[hg] {self.url}"
[docs]
@fetcher
class S3FetchStrategy(URLFetchStrategy):
"""FetchStrategy that pulls from an S3 bucket."""
url_attr = "s3"
def __init__(self, *args, **kwargs):
try:
super().__init__(*args, **kwargs)
except ValueError:
if not kwargs.get("url"):
raise ValueError("S3FetchStrategy requires a url for fetching.") from None
[docs]
@_needs_stage
def fetch(self):
if self.archive_file:
logger.debug(f"Already downloaded {self.archive_file}")
return
parsed_url = url_util.parse(self.url)
if parsed_url.scheme != "s3":
raise FetchError("S3FetchStrategy can only fetch from s3:// urls.")
logger.debug(f"Fetching {self.url}")
basename = os.path.basename(parsed_url.path)
with working_dir(self.stage.path):
_, headers, stream = ramble.util.web.read_from_url(self.url)
with open(basename, "wb") as f:
shutil.copyfileobj(stream, f)
content_type = ramble.util.web.get_header(headers, "Content-type")
if content_type == "text/html":
warn_content_type_mismatch(self.archive_file or "the archive")
if self.stage.save_filename:
rename(os.path.join(self.stage.path, basename), self.stage.save_filename)
if not self.archive_file:
raise FailedDownloadError(self.url)
[docs]
@fetcher
class GCSFetchStrategy(URLFetchStrategy):
"""FetchStrategy that pulls from a GCS bucket."""
url_attr = "gs"
def __init__(self, *args, **kwargs):
try:
super().__init__(*args, **kwargs)
except ValueError:
if not kwargs.get("url"):
raise ValueError("GCSFetchStrategy requires a url for fetching.") from None
[docs]
@_needs_stage
def fetch(self):
import ramble.util.web as web_util
if self.archive_file:
logger.debug(f"Already downloaded {self.archive_file}")
return
parsed_url = url_util.parse(self.url)
if parsed_url.scheme != "gs":
raise FetchError("GCSFetchStrategy can only fetch from gs:// urls.")
logger.debug(f"Fetching {self.url}")
basename = os.path.basename(parsed_url.path)
with working_dir(self.stage.path):
_, headers, stream = web_util.read_from_url(self.url)
with open(basename, "wb") as f:
shutil.copyfileobj(stream, f)
content_type = web_util.get_header(headers, "Content-type")
if content_type == "text/html":
warn_content_type_mismatch(self.archive_file or "the archive")
if self.stage.save_filename:
os.rename(os.path.join(self.stage.path, basename), self.stage.save_filename)
if not self.archive_file:
raise FailedDownloadError(self.url)
[docs]
def stable_target(fetcher):
"""Returns whether the fetcher target is expected to have a stable
checksum. This is only true if the target is a preexisting archive
file."""
if isinstance(fetcher, URLFetchStrategy) and fetcher.cachable:
return True
return False
[docs]
def from_kwargs(**kwargs):
"""Construct an appropriate FetchStrategy from the given keyword arguments.
Args:
**kwargs: dictionary of keyword arguments, e.g. from a
``version()`` directive in a package.
Returns:
FetchStrategy: The fetch strategy that matches the args, based
on attribute names (e.g., ``git``, ``hg``, etc.)
Raises:
FetchError: If no ``fetch_strategy`` matches the args.
"""
for fetcher in all_strategies:
if fetcher.matches(kwargs):
return fetcher(**kwargs)
raise InvalidArgsError(**kwargs)
[docs]
def from_url_scheme(url, *args, **kwargs):
"""Finds a suitable FetchStrategy by matching its url_attr with the scheme
in the given url."""
url = kwargs.get("url", url)
parsed_url = urllib.parse.urlparse(url, scheme="file")
scheme_mapping = kwargs.get("scheme_mapping") or {
"file": "url",
"http": "url",
"https": "url",
"ftp": "url",
"ftps": "url",
}
scheme = parsed_url.scheme
scheme = scheme_mapping.get(scheme, scheme)
for fetcher in all_strategies:
url_attr = getattr(fetcher, "url_attr", None)
if url_attr and url_attr == scheme:
return fetcher(url, *args, **kwargs)
raise ValueError(f'No FetchStrategy found for url with scheme: "{parsed_url.scheme}"')
[docs]
class FsCache:
def __init__(self, root):
self.root = os.path.abspath(root)
[docs]
def store(self, fetcher, relative_dest):
# skip fetchers that aren't cachable
if not fetcher.cachable:
return
# Don't store things that are already cached.
if isinstance(fetcher, CacheURLFetchStrategy):
return
dst = os.path.join(self.root, relative_dest)
mkdirp(os.path.dirname(dst))
fetcher.archive(dst)
[docs]
def fetcher(self, target_path, digest, **kwargs):
path = os.path.join(self.root, target_path)
return CacheURLFetchStrategy(path, digest, **kwargs)
[docs]
def destroy(self):
shutil.rmtree(self.root, ignore_errors=True)
[docs]
class FetchError(ramble.error.RambleError):
"""Superclass for fetcher errors."""
[docs]
class NoCacheError(FetchError):
"""Raised when there is no cached archive for a package."""
[docs]
class FailedDownloadError(FetchError):
"""Raised when a download fails."""
def __init__(self, url, msg=""):
super().__init__(f"Failed to fetch file from URL: {url}", msg)
self.url = url
[docs]
class NoArchiveFileError(FetchError):
""" "Raised when an archive file is expected but none exists."""
[docs]
class NoDigestError(FetchError):
"""Raised after attempt to checksum when URL has no digest."""
[docs]
class InvalidArgsError(FetchError):
"""Raised when a version can't be deduced from a set of arguments."""
def __init__(self, **args):
msg = "Could not guess a fetch strategy"
long_msg = f"with arguments: {args}"
super().__init__(msg, long_msg)
[docs]
class ChecksumError(FetchError):
"""Raised when archive fails to checksum."""
[docs]
class NoStageError(FetchError):
"""Raised when fetch operations are called before set_stage()."""
def __init__(self, method):
super().__init__(f"Must call FetchStrategy.set_stage() before calling {method.__name__}")