Source code for ramble.fetch_strategy

# Copyright 2022-2026 The Ramble Authors
#
# Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
# https://www.apache.org/licenses/LICENSE-2.0> or the MIT license
# <LICENSE-MIT or https://opensource.org/licenses/MIT>, at your
# option. This file may not be copied, modified, or distributed
# except according to those terms.
"""
Fetch strategies are used to download source code into a staging area
in order to build it.  They need to define the following methods:

    * fetch()
        This should attempt to download/check out source from somewhere.
    * check()
        Apply a checksum to the downloaded source code, e.g. for an archive.
        May not do anything if the fetch method was safe to begin with.
    * expand()
        Expand (e.g., an archive) downloaded file to source, with the
        standard stage source path as the destination directory.
    * reset()
        Restore original state of downloaded code.  Used by clean commands.
        This may just remove the expanded source and re-expand an archive,
        or it may run something like git reset --hard.
    * archive()
        Archive a source directory, e.g. for creating a mirror.
"""

import copy
import functools
import itertools
import os
import os.path
import re
import shutil
import sys
import urllib.parse
from typing import List, Optional

from llnl.util import tty
from llnl.util.filesystem import (
    get_single_file,
    mkdirp,
    rename,
    temp_cwd,
    temp_rename,
    working_dir,
)

import ramble.config
import ramble.util.web as web_util
from ramble.util.logger import logger

import spack.util.url as url_util
import spack.version
from spack.util import crypto, pattern
from spack.util.compression import decompressor_for, extension
from spack.util.executable import CommandNotFoundError, which
from spack.version import ver

#: List of all fetch strategies, created by FetchStrategy metaclass.
all_strategies = []

CONTENT_TYPE_MISMATCH_WARNING_TEMPLATE = (
    "The contents of {subject} look like {content_type}.  Either the URL"
    " you are trying to use does not exist or you have an internet gateway"
    " issue. You can remove the bad archive using 'ramble clean',"
    " then try again using the correct URL."
)


[docs] def warn_content_type_mismatch(subject, content_type="HTML"): logger.warn( CONTENT_TYPE_MISMATCH_WARNING_TEMPLATE.format(subject=subject, content_type=content_type) )
def _needs_stage(fun): """Many methods on fetch strategies require a stage to be set using set_stage(). This decorator adds a check for self.stage.""" @functools.wraps(fun) def wrapper(self, *args, **kwargs): if not self.stage: raise NoStageError(fun) return fun(self, *args, **kwargs) return wrapper def _ensure_one_stage_entry(stage_path): """Ensure there is only one stage entry in the stage path.""" stage_entries = os.listdir(stage_path) assert len(stage_entries) == 1 return os.path.join(stage_path, stage_entries[0])
[docs] def fetcher(cls): """Decorator used to register fetch strategies.""" all_strategies.append(cls) return cls
[docs] class FetchStrategy: """Superclass of all fetch strategies.""" #: The URL attribute must be specified either at the package class #: level, or as a keyword argument to ``version()``. It is used to #: distinguish fetchers for different versions in the package DSL. url_attr: Optional[str] = None #: Optional attributes can be used to distinguish fetchers when : #: classes have multiple ``url_attrs`` at the top-level. # optional attributes in version() args. optional_attrs: List[str] = [] url: Optional[str] = None def __init__(self, **kwargs): # The stage is initialized late, so that fetch strategies can be # constructed at package construction time. This is where things # will be fetched. self.stage = None # Enable or disable caching for this strategy based on # 'no_cache' option from version directive. self.cache_enabled = not kwargs.pop("no_cache", False) # Subclasses need to implement these methods
[docs] def fetch(self): """Fetch source code archive or repo. Returns: bool: True on success, False on failure. """
[docs] def check(self): """Checksum the archive fetched by this FetchStrategy."""
[docs] def expand(self): """Expand the downloaded archive into the stage source path."""
[docs] def reset(self): """Revert to freshly downloaded state. For archive files, this may just re-expand the archive. """
[docs] def archive(self, destination): """Create an archive of the downloaded data for a mirror. For downloaded files, this should preserve the checksum of the original file. For repositories, it should just create an expandable tarball out of the downloaded repository. """
@property def cachable(self): """Whether fetcher is capable of caching the resource it retrieves. This generally is determined by whether the resource is identifiably associated with a specific package version. Returns: bool: True if can cache, False otherwise. """
[docs] def source_id(self): """A unique ID for the source. It is intended that a human could easily generate this themselves using the information available to them in the Spack package. The returned value is added to the content which determines the full hash for a package using `str()`. """ raise NotImplementedError
[docs] def mirror_id(self): """This is a unique ID for a source that is intended to help identify reuse of resources across packages. It is unique like source-id, but it does not include the package name and is not necessarily easy for a human to create themselves. """ raise NotImplementedError
def __str__(self): # Should be human readable URL. return "FetchStrategy.__str___"
[docs] @classmethod def matches(cls, args): """Predicate that matches fetch strategies to arguments of the version directive. Args: args: arguments of the version directive """ return cls.url_attr in args
[docs] @fetcher class BundleFetchStrategy(FetchStrategy): """ Fetch strategy associated with bundle, or no-code, packages. Having a basic fetch strategy is a requirement for executing post-install hooks. Consequently, this class provides the API but does little more than log messages. TODO: Remove this class by refactoring resource handling and the link between composite stages and composite fetch strategies (see #11981). """ #: There is no associated URL keyword in ``version()`` for no-code #: packages but this property is required for some strategy-related #: functions (e.g., check_pkg_attributes). url_attr = ""
[docs] def fetch(self): """Simply report success -- there is no code to fetch.""" return True
@property def cachable(self): """Report False as there is no code to cache.""" return False
[docs] def source_id(self): """BundlePackages don't have a source id.""" return ""
[docs] def mirror_id(self): """BundlePackages don't have a mirror id."""
[docs] class FetchStrategyComposite(pattern.Composite): """Composite for a FetchStrategy object.""" matches = FetchStrategy.matches def __init__(self): super().__init__(["fetch", "check", "expand", "reset", "archive", "cachable", "mirror_id"])
[docs] def source_id(self): component_ids = tuple(i.source_id() for i in self) if all(component_ids): return component_ids
[docs] @fetcher class URLFetchStrategy(FetchStrategy): """URLFetchStrategy pulls source code from a URL for an archive, check the archive against a checksum, and decompresses the archive. The destination for the resulting file(s) is the standard stage path. """ url_attr = "url" # these are checksum types. The generic 'checksum' is deprecated for # specific hash names, but we need it for backward compatibility optional_attrs = list(crypto.hashes.keys()) + ["checksum"] def __init__(self, url=None, checksum=None, **kwargs): super().__init__(**kwargs) # Prefer values in kwargs to the positionals. self.url = kwargs.get("url", url) self.mirrors = kwargs.get("mirrors", []) # digest can be set as the first argument, or from an explicit # kwarg by the hash name. self.digest = kwargs.get("checksum", checksum) for h in self.optional_attrs: if h in kwargs: self.digest = kwargs[h] self.expand_archive = kwargs.get("expand", True) self.extra_options = kwargs.get("fetch_options", {}) self._curl = None self.extension = kwargs.get("extension") if not self.url: raise ValueError("URLFetchStrategy requires a url for fetching.") @property def curl(self): if not self._curl: try: self._curl = which("curl", required=True) except CommandNotFoundError as exc: logger.error(str(exc)) return self._curl
[docs] def source_id(self): return self.digest
[docs] def mirror_id(self): if not self.digest: return None # The filename is the digest. A directory is also created based on # truncating the digest to avoid creating a directory with too many # entries return os.path.sep.join(["archive", self.digest[:2], self.digest])
@property def candidate_urls(self): urls = [] for url in itertools.chain([self.url], self.mirrors or []): # This must be skipped on Windows due to URL encoding # of ':' characters on filepaths on Windows if sys.platform != "win32" and url.startswith("file://"): path = urllib.parse.quote(url[len("file://") :]) url = "file://" + path urls.append(url) return urls
[docs] @_needs_stage def fetch(self): if self.archive_file: logger.debug(f"Already downloaded {self.archive_file}") return url = None errors = [] for url in self.candidate_urls: if not self._existing_url(url): continue try: partial_file, save_file = self._fetch_from_url(url) if save_file and (partial_file is not None): rename(partial_file, save_file) break except FailedDownloadError as e: errors.append(str(e)) for msg in errors: logger.debug(msg) if not self.archive_file: raise FailedDownloadError(url)
def _existing_url(self, url): logger.debug(f"Checking existence of {url}") if ramble.config.get("config:url_fetch_method") == "curl": curl = self.curl # Telling curl to fetch the first byte (-r 0-0) is supposed to be # portable. curl_args = ["--stderr", "-", "-s", "-f", "-r", "0-0", url] if not ramble.config.get("config:verify_ssl"): curl_args.append("-k") _ = curl(*curl_args, fail_on_error=False, output=os.devnull) return curl.returncode == 0 else: # Telling urllib to check if url is accessible try: url, _, response = ramble.util.web.read_from_url(url) except ramble.util.web.SpackWebError as werr: msg = f"Urllib fetch failed to verify url {url}\n with error {werr}" raise FailedDownloadError(url, msg) from None return response.getcode() is None or response.getcode() == 200 def _fetch_from_url(self, url): if ramble.config.get("config:url_fetch_method") == "curl": return self._fetch_curl(url) else: return self._fetch_urllib(url) def _check_headers(self, headers): # Check if we somehow got an HTML file rather than the archive we # asked for. We only look at the last content type, to handle # redirects properly. content_types = re.findall(r"Content-Type:[^\r\n]+", headers, flags=re.IGNORECASE) if content_types and "text/html" in content_types[-1]: warn_content_type_mismatch(self.archive_file or "the archive") @_needs_stage def _fetch_urllib(self, url): save_file = None if self.stage.save_filename: save_file = self.stage.save_filename logger.msg(f"Fetching {url}") # Check if we're about to try and open a broken symlink, and if so # remove that file to avoid a bad situation where a file "exists" but # cannot be opened (warning: this is not atomic) if os.path.islink(save_file) and not os.path.exists(save_file): os.unlink(save_file) # Run urllib but grab the mime type from the http headers try: url, headers, response = ramble.util.web.read_from_url(url) except ramble.util.web.SpackWebError as e: # clean up archive on failure. if self.archive_file: os.remove(self.archive_file) if save_file and os.path.exists(save_file): os.remove(save_file) msg = f"urllib failed to fetch with error {e}" raise FailedDownloadError(url, msg) from None with open(save_file, "wb") as _open_file: shutil.copyfileobj(response, _open_file) self._check_headers(str(headers)) return None, save_file @_needs_stage def _fetch_curl(self, url): save_file = None partial_file = None if self.stage.save_filename: save_file = self.stage.save_filename partial_file = self.stage.save_filename + ".part" logger.msg(f"Fetching {url}") if partial_file: save_args = [ "-C", "-", # continue partial downloads "-o", partial_file, ] # use a .part file else: save_args = ["-O"] curl_args = save_args + [ "-f", # fail on >400 errors "-D", "-", # print out HTML headers "-L", # resolve 3xx redirects url, ] if not ramble.config.get("config:verify_ssl"): curl_args.append("-k") if sys.stdout.isatty() and tty.msg_enabled(): curl_args.append("-#") # status bar when using a tty else: curl_args.append("-sS") # show errors if fail connect_timeout = ramble.config.get("config:connect_timeout", 10) if self.extra_options: cookie = self.extra_options.get("cookie") if cookie: curl_args.append("-j") # junk cookies curl_args.append("-b") # specify cookie curl_args.append(cookie) timeout = self.extra_options.get("timeout") if timeout: connect_timeout = max(connect_timeout, int(timeout)) if connect_timeout > 0: # Timeout if can't establish a connection after n sec. curl_args.extend(["--connect-timeout", str(connect_timeout)]) # Run curl but grab the mime type from the http headers curl = self.curl with working_dir(self.stage.path): headers = curl(*curl_args, output=str, fail_on_error=False) if curl.returncode != 0: # clean up archive on failure. if self.archive_file: os.remove(self.archive_file) if partial_file and os.path.exists(partial_file): os.remove(partial_file) if curl.returncode == 22: # This is a 404. Curl will print the error. raise FailedDownloadError(url, f"URL {url} was not found!") elif curl.returncode == 60: # This is a certificate error. Suggest spack -k raise FailedDownloadError( url, "Curl was unable to fetch due to invalid certificate. " "This is either an attack, or your cluster's SSL " "configuration is bad. If you believe your SSL " "configuration is bad, you can try running spack -k, " "which will not check SSL certificates." "Use this at your own risk.", ) else: # This is some other curl error. Curl will print the # error, but print a spack message too raise FailedDownloadError(url, "Curl failed with error %d" % curl.returncode) # Check if we somehow got an HTML file rather than the archive we # asked for. We only look at the last content type, to handle # redirects properly. content_types = re.findall(r"Content-Type:[^\r\n]+", headers, flags=re.IGNORECASE) if content_types and "text/html" in content_types[-1]: warn_content_type_mismatch(self.archive_file or "the archive") return partial_file, save_file @property @_needs_stage def archive_file(self): """Path to the source archive within this stage directory.""" return self.stage.archive_file @property def cachable(self): return self.cache_enabled and bool(self.digest)
[docs] @_needs_stage def expand(self): if not self.expand_archive: logger.debug( f"Staging unexpanded archive {self.archive_file} " f"in {self.stage.source_path}" ) if not self.stage.expanded: mkdirp(self.stage.source_path) dest = os.path.join(self.stage.source_path, os.path.basename(self.archive_file)) shutil.move(self.archive_file, dest) return logger.debug(f"Staging archive: {self.archive_file}") if not self.archive_file: raise NoArchiveFileError( "Couldn't find archive file", f"Failed on expand() for URL {self.url}" ) if not self.extension: self.extension = extension(self.archive_file) if self.stage.expanded: logger.debug(f"Source already staged to {self.stage.source_path}") return decompress = decompressor_for(self.archive_file, self.extension) # Expand all tarballs in their own directory to contain # exploding tarballs. tarball_container = os.path.join(self.stage.path, "expanded-archive") # Below we assume that the command to decompress expand the # archive in the current working directory mkdirp(tarball_container) with working_dir(tarball_container): decompress(self.archive_file) # Check for an exploding tarball, i.e. one that doesn't expand to # a single directory. If the tarball *didn't* explode, move its # contents to the staging source directory & remove the container # directory. If the tarball did explode, just rename the tarball # directory to the staging source directory. # # NOTE: The tar program on Mac OS X will encode HFS metadata in # hidden files, which can end up *alongside* a single top-level # directory. We initially ignore presence of hidden files to # accommodate these "semi-exploding" tarballs but ensure the files # are copied to the source directory. files = os.listdir(tarball_container) non_hidden = [f for f in files if not f.startswith(".")] if len(non_hidden) == 1: src = os.path.join(tarball_container, non_hidden[0]) if os.path.isdir(src): self.stage.srcdir = non_hidden[0] shutil.move(src, self.stage.source_path) if len(files) > 1: files.remove(non_hidden[0]) for f in files: src = os.path.join(tarball_container, f) dest = os.path.join(self.stage.path, f) shutil.move(src, dest) os.rmdir(tarball_container) else: # This is a non-directory entry (e.g., a patch file) so simply # rename the tarball container to be the source path. shutil.move(tarball_container, self.stage.source_path) else: shutil.move(tarball_container, self.stage.source_path)
[docs] def archive(self, destination): """Just moves this archive to the destination.""" if not self.archive_file: raise NoArchiveFileError("Cannot call archive() before fetching.") web_util.push_to_url(self.archive_file, destination, keep_original=True)
[docs] @_needs_stage def check(self): """Check the downloaded archive against a checksum digest. No-op if this stage checks code out of a repository.""" if not self.digest: raise NoDigestError("Attempt to check URLFetchStrategy with no digest.") checker = crypto.Checker(self.digest) if not checker.check(self.archive_file): raise ChecksumError( f"{checker.hash_name} checksum failed for {self.archive_file}", f"Expected {self.digest} but got {checker.sum}", )
[docs] @_needs_stage def reset(self): """ Removes the source path if it exists, then re-expands the archive. """ if not self.archive_file: raise NoArchiveFileError( "Tried to reset URLFetchStrategy before fetching", f"Failed on reset() for URL {self.url}", ) # Remove everything but the archive from the stage for filename in os.listdir(self.stage.path): abspath = os.path.join(self.stage.path, filename) if abspath != self.archive_file: shutil.rmtree(abspath, ignore_errors=True) # Expand the archive again self.expand()
def __repr__(self): url = self.url if self.url else "no url" return f"{self.__class__.__name__}<{url}>" def __str__(self): if self.url: return self.url else: return "[no url]"
[docs] @fetcher class CacheURLFetchStrategy(URLFetchStrategy): """The resource associated with a cache URL may be out of date."""
[docs] @_needs_stage def fetch(self): path = re.sub("^file://", "", self.url) # check whether the cache file exists. if not os.path.isfile(path): raise NoCacheError(f"No cache of {path}") # remove old symlink if one is there. filename = self.stage.save_filename if os.path.exists(filename): os.remove(filename) # Symlink to local cached archive. os.symlink(path, filename) # Remove link if checksum fails, or subsequent fetchers # will assume they don't need to download. if self.digest: try: self.check() except ChecksumError: os.remove(self.archive_file) raise # Notify the user how we fetched. logger.msg(f"Using cached archive: {path}")
[docs] class VCSFetchStrategy(FetchStrategy): """Superclass for version control system fetch strategies. Like all fetchers, VCS fetchers are identified by the attributes passed to the ``version`` directive. The optional_attrs for a VCS fetch strategy represent types of revisions, e.g. tags, branches, commits, etc. The required attributes (git, svn, etc.) are used to specify the URL and to distinguish a VCS fetch strategy from a URL fetch strategy. """ branch: Optional[str] = None tag: Optional[str] = None commit: Optional[str] = None revision: Optional[str] = None def __init__(self, **kwargs): super().__init__(**kwargs) # Set a URL based on the type of fetch strategy. self.url = kwargs.get(self.url_attr) if not self.url: raise ValueError(f"{self.__class__} requires {self.url_attr} argument.") for attr in self.optional_attrs: setattr(self, attr, kwargs.get(attr))
[docs] @_needs_stage def check(self): logger.debug(f"No checksum needed when fetching with {self.url_attr}")
[docs] @_needs_stage def expand(self): logger.debug(f"Source fetched with {self.url_attr} is already expanded.")
[docs] @_needs_stage def archive(self, destination, **kwargs): assert extension(destination) == "tar.gz" assert self.stage.source_path.startswith(self.stage.path) tar = which("tar", required=True) patterns = kwargs.get("exclude") if patterns is not None: if isinstance(patterns, str): patterns = [patterns] for p in patterns: tar.add_default_arg(f"--exclude={p}") with working_dir(self.stage.path): if self.stage.srcdir: # Here we create an archive with the default repository name. # The 'tar' command has options for changing the name of a # directory that is included in the archive, but they differ # based on OS, so we temporarily rename the repo with temp_rename(self.stage.source_path, self.stage.srcdir): tar("-czf", destination, self.stage.srcdir) else: tar("-czf", destination, os.path.basename(self.stage.source_path))
def __str__(self): return f"VCS: {self.url}" def __repr__(self): return f"{self.__class__}<{self.url}>"
[docs] @fetcher class GitFetchStrategy(VCSFetchStrategy): """ Fetch strategy that gets source code from a git repository. Use like this in a package: version('name', git='https://github.com/project/repo.git') Optionally, you can provide a branch, or commit to check out, e.g.: version('1.1', git='https://github.com/project/repo.git', tag='v1.1') You can use these three optional attributes in addition to ``git``: * ``branch``: Particular branch to build from (default is the repository's default branch) * ``tag``: Particular tag to check out * ``commit``: Particular commit hash in the repo Repositories are cloned into the standard stage source path directory. """ url_attr = "git" optional_attrs = [ "tag", "branch", "commit", "submodules", "get_full_repo", "submodules_delete", ] git_version_re = r"git version (\S+)" submodules: bool = False submodules_delete: bool = False get_full_repo: bool = False def __init__(self, **kwargs): # Discards the keywords in kwargs that may conflict with the next call # to __init__ forwarded_args = copy.copy(kwargs) forwarded_args.pop("name", None) super().__init__(**forwarded_args) self._git = None self.submodules = kwargs.get("submodules", False) self.submodules_delete = kwargs.get("submodules_delete", False) self.get_full_repo = kwargs.get("get_full_repo", False) @property def git_version(self): return GitFetchStrategy.version_from_git(self.git)
[docs] @staticmethod def version_from_git(git_exe): """Given a git executable, return the Version (this will fail if the output cannot be parsed into a valid Version). """ version_output = git_exe("--version", output=str) m = re.search(GitFetchStrategy.git_version_re, version_output) return spack.version.Version(m.group(1))
@property def git(self): if not self._git: self._git = which("git", required=True) # Disable advice for a quieter fetch # https://github.com/git/git/blob/master/Documentation/RelNotes/1.7.2.txt if self.git_version >= spack.version.Version("1.7.2"): self._git.add_default_arg("-c") self._git.add_default_arg("advice.detachedHead=false") # If the user asked for insecure fetching, make that work # with git as well. if not ramble.config.get("config:verify_ssl"): self._git.add_default_env("GIT_SSL_NO_VERIFY", "true") return self._git @property def cachable(self): return self.cache_enabled and bool(self.commit or self.tag)
[docs] def source_id(self): return self.commit or self.tag
[docs] def mirror_id(self): repo_ref = self.commit or self.tag or self.branch if repo_ref: repo_path = url_util.parse(self.url).path result = os.path.sep.join(["git", repo_path, repo_ref]) return result
def _repo_info(self): args = "" if self.commit: args = f" at commit {self.commit}" elif self.tag: args = f" at tag {self.tag}" elif self.branch: args = f" on branch {self.branch}" return f"{self.url}{args}"
[docs] @_needs_stage def fetch(self): if self.stage.expanded: logger.debug(f"Already fetched {self.stage.source_path}") return self.clone(commit=self.commit, branch=self.branch, tag=self.tag)
[docs] def clone(self, dest=None, commit=None, branch=None, tag=None, bare=False): """ Clone a repository to a path. This method handles cloning from git, but does not require a stage. Arguments: dest (str | None): The path into which the code is cloned. If None, requires a stage and uses the stage's source path. commit (str | None): A commit to fetch from the remote. Only one of commit, branch, and tag may be non-None. branch (str | None): A branch to fetch from the remote. tag (str | None): A tag to fetch from the remote. bare (bool): Execute a "bare" git clone (--bare option to git) """ # Default to spack source path dest = dest or self.stage.source_path logger.debug(f"Cloning git repository: {self._repo_info()}") git = self.git debug = ramble.config.get("config:debug") if bare: # We don't need to worry about which commit/branch/tag is checked out clone_args = ["clone", "--bare"] if not debug: clone_args.append("--quiet") clone_args.extend([self.url, dest]) git(*clone_args) elif commit: # Need to do a regular clone and check out everything if # they asked for a particular commit. clone_args = ["clone", self.url] if not debug: clone_args.insert(1, "--quiet") with temp_cwd(): git(*clone_args) repo_name = get_single_file(".") if self.stage: self.stage.srcdir = repo_name shutil.move(repo_name, dest) with working_dir(dest): checkout_args = ["checkout", commit] if not debug: checkout_args.insert(1, "--quiet") git(*checkout_args) else: # Can be more efficient if not checking out a specific commit. args = ["clone"] if not ramble.config.get("config:debug"): args.append("--quiet") # If we want a particular branch ask for it. if self.branch: args.extend(["--branch", self.branch]) elif self.tag and self.git_version >= ver("1.8.5.2"): args.extend(["--branch", self.tag]) # Try to be efficient if we're using a new enough git. # This checks out only one branch's history if self.git_version >= ver("1.7.10"): if self.get_full_repo: args.append("--no-single-branch") else: args.append("--single-branch") with temp_cwd(): # Yet more efficiency: only download a 1-commit deep # tree, if the in-use git and protocol permit it. if ( (not self.get_full_repo) and self.git_version >= ver("1.7.1") and self.protocol_supports_shallow_clone() ): args.extend(["--depth", "1"]) args.extend([self.url]) git(*args) repo_name = get_single_file(".") self.stage.srcdir = repo_name shutil.move(repo_name, self.stage.source_path) with working_dir(self.stage.source_path): # For tags, be conservative and check them out AFTER # cloning. Later git versions can do this with clone # --branch, but older ones fail. if self.tag and self.git_version < ver("1.8.5.2"): # pull --tags returns a "special" error code of 1 in # older versions that we have to ignore. # see: https://github.com/git/git/commit/19d122b pull_args = ["pull", "--tags"] co_args = ["checkout", self.tag] if not ramble.config.get("config:debug"): pull_args.insert(1, "--quiet") co_args.insert(1, "--quiet") git(*pull_args, ignore_errors=1) git(*co_args) if self.submodules_delete: with working_dir(self.stage.source_path): for submodule_to_delete in self.submodules_delete: args = ["rm", submodule_to_delete] if not ramble.config.get("config:debug"): args.insert(1, "--quiet") git(*args) # Init submodules if the user asked for them. if self.submodules: with working_dir(self.stage.source_path): args = ["submodule", "update", "--init", "--recursive"] if not ramble.config.get("config:debug"): args.insert(1, "--quiet") git(*args)
[docs] def archive(self, destination): super().archive(destination, exclude=".git")
[docs] @_needs_stage def reset(self): with working_dir(self.stage.source_path): co_args = ["checkout", "."] clean_args = ["clean", "-f"] if ramble.config.get("config:debug"): co_args.insert(1, "--quiet") clean_args.insert(1, "--quiet") self.git(*co_args) self.git(*clean_args)
[docs] def protocol_supports_shallow_clone(self): """Shallow clone operations (--depth #) are not supported by the basic HTTP protocol or by no-protocol file specifications. Use (e.g.) https:// or file:// instead.""" return not (self.url.startswith("http://") or self.url.startswith("/"))
def __str__(self): return f"[git] {self._repo_info()}"
[docs] @fetcher class CvsFetchStrategy(VCSFetchStrategy): """Fetch strategy that gets source code from a CVS repository. Use like this in a package: version('name', cvs=':pserver:anonymous@www.example.com:/cvsroot%module=modulename') Optionally, you can provide a branch and/or a date for the URL: version('name', cvs=':pserver:anonymous@www.example.com:/cvsroot%module=modulename', branch='branchname', date='date') Repositories are checked out into the standard stage source path directory. """ url_attr = "cvs" optional_attrs = ["branch", "date"] def __init__(self, **kwargs): # Discards the keywords in kwargs that may conflict with the next call # to __init__ forwarded_args = copy.copy(kwargs) forwarded_args.pop("name", None) super().__init__(**forwarded_args) self._cvs = None if self.branch is not None: self.branch = str(self.branch) if self.date is not None: self.date = str(self.date) @property def cvs(self): if not self._cvs: self._cvs = which("cvs", required=True) return self._cvs @property def cachable(self): return self.cache_enabled and (bool(self.branch) or bool(self.date))
[docs] def source_id(self): if not (self.branch or self.date): # We need a branch or a date to make a checkout reproducible return None id = "id" if self.branch: id += "-branch=" + self.branch if self.date: id += "-date=" + self.date return id
[docs] def mirror_id(self): if not (self.branch or self.date): # We need a branch or a date to make a checkout reproducible return None # Special-case handling because this is not actually a URL elements = self.url.split(":") final = elements[-1] elements = final.split("/") # Everything before the first slash is a port number elements = elements[1:] result = os.path.sep.join(["cvs"] + elements) if self.branch: result += "%branch=" + self.branch if self.date: result += "%date=" + self.date return result
[docs] @_needs_stage def fetch(self): if self.stage.expanded: logger.debug("Already fetched {self.stage.source_path}") return logger.debug("Checking out CVS repository: {self.url}") with temp_cwd(): url, module = self.url.split("%module=") # Check out files args = ["-z9", "-d", url, "checkout"] if self.branch is not None: args.extend(["-r", self.branch]) if self.date is not None: args.extend(["-D", self.date]) args.append(module) self.cvs(*args) # Rename repo repo_name = get_single_file(".") self.stage.srcdir = repo_name shutil.move(repo_name, self.stage.source_path)
def _remove_untracked_files(self): """Removes untracked files in a CVS repository.""" with working_dir(self.stage.source_path): status = self.cvs("-qn", "update", output=str) for line in status.split("\n"): if re.match(r"^[?]", line): path = line[2:].strip() if os.path.isfile(path): os.unlink(path)
[docs] def archive(self, destination): super().archive(destination, exclude="CVS")
[docs] @_needs_stage def reset(self): self._remove_untracked_files() with working_dir(self.stage.source_path): self.cvs("update", "-C", ".")
def __str__(self): return f"[cvs] {self.url}"
[docs] @fetcher class SvnFetchStrategy(VCSFetchStrategy): """Fetch strategy that gets source code from a subversion repository. Use like this in a package: version('name', svn='http://www.example.com/svn/trunk') Optionally, you can provide a revision for the URL: version('name', svn='http://www.example.com/svn/trunk', revision='1641') Repositories are checked out into the standard stage source path directory. """ url_attr = "svn" optional_attrs = ["revision"] def __init__(self, **kwargs): # Discards the keywords in kwargs that may conflict with the next call # to __init__ forwarded_args = copy.copy(kwargs) forwarded_args.pop("name", None) super().__init__(**forwarded_args) self._svn = None if self.revision is not None: self.revision = str(self.revision) @property def svn(self): if not self._svn: self._svn = which("svn", required=True) return self._svn @property def cachable(self): return self.cache_enabled and bool(self.revision)
[docs] def source_id(self): return self.revision
[docs] def mirror_id(self): if self.revision: repo_path = url_util.parse(self.url).path result = os.path.sep.join(["svn", repo_path, self.revision]) return result
[docs] @_needs_stage def fetch(self): if self.stage.expanded: logger.debug(f"Already fetched {self.stage.source_path}") return logger.debug(f"Checking out subversion repository: {self.url}") args = ["checkout", "--force", "--quiet"] if self.revision: args += ["-r", self.revision] args.extend([self.url]) with temp_cwd(): self.svn(*args) repo_name = get_single_file(".") self.stage.srcdir = repo_name shutil.move(repo_name, self.stage.source_path)
def _remove_untracked_files(self): """Removes untracked files in an svn repository.""" with working_dir(self.stage.source_path): status = self.svn("status", "--no-ignore", output=str) self.svn("status", "--no-ignore") for line in status.split("\n"): if not re.match("^[I?]", line): continue path = line[8:].strip() if os.path.isfile(path): os.unlink(path) elif os.path.isdir(path): shutil.rmtree(path, ignore_errors=True)
[docs] def archive(self, destination): super().archive(destination, exclude=".svn")
[docs] @_needs_stage def reset(self): self._remove_untracked_files() with working_dir(self.stage.source_path): self.svn("revert", ".", "-R")
def __str__(self): return f"[svn] {self.url}"
[docs] @fetcher class HgFetchStrategy(VCSFetchStrategy): """ Fetch strategy that gets source code from a Mercurial repository. Use like this in a package: version('name', hg='https://jay.grs.rwth-aachen.de/hg/lwm2') Optionally, you can provide a branch, or revision to check out, e.g.: version('torus', hg='https://jay.grs.rwth-aachen.de/hg/lwm2', branch='torus') You can use the optional 'revision' attribute to check out a branch, tag, or particular revision in hg. To prevent non-reproducible builds, using a moving target like a branch is discouraged. * ``revision``: Particular revision, branch, or tag. Repositories are cloned into the standard stage source path directory. """ url_attr = "hg" optional_attrs = ["revision"] def __init__(self, **kwargs): # Discards the keywords in kwargs that may conflict with the next call # to __init__ forwarded_args = copy.copy(kwargs) forwarded_args.pop("name", None) super().__init__(**forwarded_args) self._hg = None @property def hg(self): """ Returns: Executable: the hg executable """ if not self._hg: self._hg = which("hg", required=True) # When building PythonPackages, Spack automatically sets # PYTHONPATH. This can interfere with hg, which is a Python # script. Unset PYTHONPATH while running hg. self._hg.add_default_env("PYTHONPATH", "") return self._hg @property def cachable(self): return self.cache_enabled and bool(self.revision)
[docs] def source_id(self): return self.revision
[docs] def mirror_id(self): if self.revision: repo_path = url_util.parse(self.url).path result = os.path.sep.join(["hg", repo_path, self.revision]) return result
[docs] @_needs_stage def fetch(self): if self.stage.expanded: logger.debug(f"Already fetched {self.stage.source_path}") return args = [] if self.revision: args.append(f"at revision {self.revision}") logger.debug(f"Cloning mercurial repository: {self.url} {args}") args = ["clone"] if not ramble.config.get("config:verify_ssl"): args.append("--insecure") if self.revision: args.extend(["-r", self.revision]) args.extend([self.url]) with temp_cwd(): self.hg(*args) repo_name = get_single_file(".") self.stage.srcdir = repo_name shutil.move(repo_name, self.stage.source_path)
[docs] def archive(self, destination): super().archive(destination, exclude=".hg")
[docs] @_needs_stage def reset(self): with working_dir(self.stage.path): source_path = self.stage.source_path scrubbed = "scrubbed-source-tmp" args = ["clone"] if self.revision: args += ["-r", self.revision] args += [source_path, scrubbed] self.hg(*args) shutil.rmtree(source_path, ignore_errors=True) shutil.move(scrubbed, source_path)
def __str__(self): return f"[hg] {self.url}"
[docs] @fetcher class S3FetchStrategy(URLFetchStrategy): """FetchStrategy that pulls from an S3 bucket.""" url_attr = "s3" def __init__(self, *args, **kwargs): try: super().__init__(*args, **kwargs) except ValueError: if not kwargs.get("url"): raise ValueError("S3FetchStrategy requires a url for fetching.") from None
[docs] @_needs_stage def fetch(self): if self.archive_file: logger.debug(f"Already downloaded {self.archive_file}") return parsed_url = url_util.parse(self.url) if parsed_url.scheme != "s3": raise FetchError("S3FetchStrategy can only fetch from s3:// urls.") logger.debug(f"Fetching {self.url}") basename = os.path.basename(parsed_url.path) with working_dir(self.stage.path): _, headers, stream = ramble.util.web.read_from_url(self.url) with open(basename, "wb") as f: shutil.copyfileobj(stream, f) content_type = ramble.util.web.get_header(headers, "Content-type") if content_type == "text/html": warn_content_type_mismatch(self.archive_file or "the archive") if self.stage.save_filename: rename(os.path.join(self.stage.path, basename), self.stage.save_filename) if not self.archive_file: raise FailedDownloadError(self.url)
[docs] @fetcher class GCSFetchStrategy(URLFetchStrategy): """FetchStrategy that pulls from a GCS bucket.""" url_attr = "gs" def __init__(self, *args, **kwargs): try: super().__init__(*args, **kwargs) except ValueError: if not kwargs.get("url"): raise ValueError("GCSFetchStrategy requires a url for fetching.") from None
[docs] @_needs_stage def fetch(self): import ramble.util.web as web_util if self.archive_file: logger.debug(f"Already downloaded {self.archive_file}") return parsed_url = url_util.parse(self.url) if parsed_url.scheme != "gs": raise FetchError("GCSFetchStrategy can only fetch from gs:// urls.") logger.debug(f"Fetching {self.url}") basename = os.path.basename(parsed_url.path) with working_dir(self.stage.path): _, headers, stream = web_util.read_from_url(self.url) with open(basename, "wb") as f: shutil.copyfileobj(stream, f) content_type = web_util.get_header(headers, "Content-type") if content_type == "text/html": warn_content_type_mismatch(self.archive_file or "the archive") if self.stage.save_filename: os.rename(os.path.join(self.stage.path, basename), self.stage.save_filename) if not self.archive_file: raise FailedDownloadError(self.url)
[docs] def stable_target(fetcher): """Returns whether the fetcher target is expected to have a stable checksum. This is only true if the target is a preexisting archive file.""" if isinstance(fetcher, URLFetchStrategy) and fetcher.cachable: return True return False
[docs] def from_kwargs(**kwargs): """Construct an appropriate FetchStrategy from the given keyword arguments. Args: **kwargs: dictionary of keyword arguments, e.g. from a ``version()`` directive in a package. Returns: FetchStrategy: The fetch strategy that matches the args, based on attribute names (e.g., ``git``, ``hg``, etc.) Raises: FetchError: If no ``fetch_strategy`` matches the args. """ for fetcher in all_strategies: if fetcher.matches(kwargs): return fetcher(**kwargs) raise InvalidArgsError(**kwargs)
[docs] def from_url_scheme(url, *args, **kwargs): """Finds a suitable FetchStrategy by matching its url_attr with the scheme in the given url.""" url = kwargs.get("url", url) parsed_url = urllib.parse.urlparse(url, scheme="file") scheme_mapping = kwargs.get("scheme_mapping") or { "file": "url", "http": "url", "https": "url", "ftp": "url", "ftps": "url", } scheme = parsed_url.scheme scheme = scheme_mapping.get(scheme, scheme) for fetcher in all_strategies: url_attr = getattr(fetcher, "url_attr", None) if url_attr and url_attr == scheme: return fetcher(url, *args, **kwargs) raise ValueError(f'No FetchStrategy found for url with scheme: "{parsed_url.scheme}"')
[docs] class FsCache: def __init__(self, root): self.root = os.path.abspath(root)
[docs] def store(self, fetcher, relative_dest): # skip fetchers that aren't cachable if not fetcher.cachable: return # Don't store things that are already cached. if isinstance(fetcher, CacheURLFetchStrategy): return dst = os.path.join(self.root, relative_dest) mkdirp(os.path.dirname(dst)) fetcher.archive(dst)
[docs] def fetcher(self, target_path, digest, **kwargs): path = os.path.join(self.root, target_path) return CacheURLFetchStrategy(path, digest, **kwargs)
[docs] def destroy(self): shutil.rmtree(self.root, ignore_errors=True)
[docs] class FetchError(ramble.error.RambleError): """Superclass for fetcher errors."""
[docs] class NoCacheError(FetchError): """Raised when there is no cached archive for a package."""
[docs] class FailedDownloadError(FetchError): """Raised when a download fails.""" def __init__(self, url, msg=""): super().__init__(f"Failed to fetch file from URL: {url}", msg) self.url = url
[docs] class NoArchiveFileError(FetchError): """ "Raised when an archive file is expected but none exists."""
[docs] class NoDigestError(FetchError): """Raised after attempt to checksum when URL has no digest."""
[docs] class InvalidArgsError(FetchError): """Raised when a version can't be deduced from a set of arguments.""" def __init__(self, **args): msg = "Could not guess a fetch strategy" long_msg = f"with arguments: {args}" super().__init__(msg, long_msg)
[docs] class ChecksumError(FetchError): """Raised when archive fails to checksum."""
[docs] class NoStageError(FetchError): """Raised when fetch operations are called before set_stage().""" def __init__(self, method): super().__init__(f"Must call FetchStrategy.set_stage() before calling {method.__name__}")