Source code for ramble.uploader

# Copyright 2022-2026 The Ramble Authors
#
# Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
# https://www.apache.org/licenses/LICENSE-2.0> or the MIT license
# <LICENSE-MIT or https://opensource.org/licenses/MIT>, at your
# option. This file may not be copied, modified, or distributed
# except according to those terms.

import json
import math
import sys
from enum import Enum

import jsonschema

import ramble.config
import ramble.util.version
from ramble.config import ConfigError
from ramble.schema.db import db_schema_version
from ramble.schema.experiment import experiment_schema, experiment_schema_version
from ramble.schema.experiments_metadata import (
    experiments_metadata_schema,
    experiments_metadata_schema_version,
)
from ramble.schema.fom import fom_schema, fom_schema_version
from ramble.schema.metadata import metadata_schema, metadata_schema_version
from ramble.schema.software_db import software_db_schema, software_db_schema_version
from ramble.util.logger import logger

default_node_type_val = "Not Specified"

uploader_types = Enum("uploader_types", ["BigQuery", "PrintOnly"])


[docs] def validate_data(data, schema): """Validate data against a JSON schema.""" try: jsonschema.validate(instance=data, schema=schema) except jsonschema.exceptions.ValidationError as err: logger.error(f"Schema validation error: {err}") raise
[docs] class Uploader: # TODO: should the class store the base uri?
[docs] def perform_upload(self, uri, data): # TODO: move content checking to __init__ ? if not uri: raise ValueError(f"{self.__class__} requires {uri} argument.") if not data: raise ValueError(f"{self.__class__} requires %{data} argument.") pass
[docs] class ExperimentList(list): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.metadata = {}
[docs] def get_user(): config_user = ramble.config.get("config:user") if config_user: return config_user else: import getpass return getpass.getuser()
[docs] class Experiment: """ Class representation of experiment data """ def __init__(self, name, workspace_hash, data, timestamp): self.name = name self.id = None # This is essentially the hash self.foms = [] self.software = [] self.data = data self.application_name = data["application_name"] self.workspace_name = data["RAMBLE_VARIABLES"]["workspace_name"] self.workspace_hash = workspace_hash self.workload_name = data["workload_name"] self.bulk_hash = None # proxy for workspace or "uploaded with" self.n_nodes = int(data["n_nodes"]) self.processes_per_node = int(data["processes_per_node"]) self.n_ranks = int(data["n_ranks"]) self.n_threads = int(data["n_threads"]) self.node_type = default_node_type_val self.status = data["RAMBLE_STATUS"] self.user = get_user() # FIXME: this is no longer strictly needed since it is just a concat of known properties exps_hash = "{workspace_name}::{application}::{workload}::{date}".format( workspace_name=self.workspace_name, application=self.application_name, workload=self.workload_name, date=timestamp, ) self.bulk_hash = exps_hash self.timestamp = str(timestamp) self.id = None self.generate_hash()
[docs] def generate_hash(self): # Avoid regenerating a hash when possible # (The hash of an object must never change during its lifetime..) if self.id is None: # TODO: this might be better as a hash of something we intuitively # expect to be uniqie, like: # "{RAMBLE_STATUS}-{application_name}-{experiment_name}-{time}-etc" # If we don't want this, we can go back to this class just being a dict self.id = hash(self) return self.id
[docs] def get_hash(self): return self.generate_hash()
[docs] def to_json(self): # deep copy so the assignment below doesn't affect the foms array import copy j = copy.deepcopy(self.__dict__) data_copy = copy.deepcopy(self.data) # These two fields will be deprecated in an upcoming release. # For now we avoid setting them to a reduced set of information to # maintain backwards database compatibiilty but also avoiding # large un-needed uploads data_copy["CONTEXTS"] = [] del j["foms"] del j["software"] j["data"] = json.dumps(data_copy, default=vars) return j
[docs] def determine_node_type(experiment, contexts): """ Extract node type from available FOMS. First prio is machine specific data, such as GCP meta data Second prio is more general data like CPU type """ for context in contexts: for fom in context["foms"]: if "machine-type" in fom["name"]: experiment.node_type = fom["value"] continue elif "Model name" in fom["name"]: experiment.node_type = fom["value"] continue # Termination condition if experiment.node_type != default_node_type_val: continue
[docs] def upload_results(results): uploader_type = ramble.config.get("config:upload:type") if uploader_type is None: raise ConfigError("No upload type (config:upload:type) in config.") if not hasattr(uploader_types, uploader_type): raise ConfigError(f"Upload type {uploader_type} is not valid.") uploader_type = getattr(uploader_types, uploader_type) uri = ramble.config.get("config:upload:uri") if not uri: raise ConfigError("No upload URI (config:upload:uri) in config.") try: formatted_data = format_data(results) except (KeyError, TypeError) as e: raise ConfigError("Error parsing file: Does not contain valid data to upload.") from e if len(formatted_data) == 0: logger.warn("No data to upload") return logger.all_msg(f"Uploading results to {uri} with {uploader_type} uploader") if uploader_type == uploader_types.BigQuery: uploader = BigQueryUploader() else: uploader = PrintOnlyUploader() uploader.perform_upload(uri, formatted_data)
[docs] def format_data(data_in): """ Goal: convert results to a more searchable and decomposed format for insertion into data store (etc) Input: .. code-block:: text { experiment_name: { "CONTEXTS": { "context_name": "FOM_name { unit: "value", "value":value" } ...} } } Output: The general idea is the decompose the results into a more "database" like format, where the runs and FOMs are in a different table. """ logger.debug("Format Data in") logger.debug(data_in) results = ExperimentList() if "metadata" in data_in: results.metadata = data_in["metadata"] # TODO: what is the nice way to deal with the distinction between # numberic/float and string FOM values from datetime import datetime current_dateTime = datetime.now() for exp in data_in["experiments"]: upload_failed = ramble.config.get("config:upload:push_failed") if exp["RAMBLE_STATUS"] == "SUCCESS" or upload_failed: e = Experiment(exp["name"], data_in["workspace_hash"], exp, current_dateTime) results.append(e) # experiment_id = exp.hash() # 'experiment_id': experiment_id, for context in exp["CONTEXTS"]: for fom in context["foms"]: # TODO: check on value to make sure it's a number e.foms.append( { "name": fom["name"], "value": fom["value"], "unit": fom["units"], "origin": fom["origin"], "origin_type": fom["origin_type"], "context": context["name"], } ) if "SOFTWARE" in exp: for software_list in exp["SOFTWARE"].values(): for software in software_list: e.software.append( { "name": software["name"], "version": software["version"], "compiler": software["compiler"], "compiler_version": software["compiler_version"], "target": software["target"], "variants": software["variants"], } ) determine_node_type(e, exp["CONTEXTS"]) return results
def _prepare_data(results, uri): # It is expected that the user will create these tables outside of this # tooling exp_table_id = f"{uri}.experiments" fom_table_id = f"{uri}.foms" metadata_table_id = f"{uri}.experiments_metadata" software_table_id = f"{uri}.software" exps_to_insert = [] foms_to_insert = [] metadata_to_insert = [] software_to_insert = [] for experiment in results: json_experiment = experiment.to_json() exps_to_insert.append(json_experiment) for fom in experiment.foms: fom_data = fom fom_data["experiment_id"] = experiment.get_hash() fom_data["experiment_name"] = experiment.name foms_to_insert.append(fom_data) for software in experiment.software: software_data = software.copy() software_data["experiment_id"] = experiment.get_hash() software_data["experiment_name"] = experiment.name software_to_insert.append(software_data) current_metadata = [] if hasattr(results, "metadata"): current_metadata = results.metadata # Handle dictionary and list format of metadata if isinstance(current_metadata, dict): # Flatten dictionary to a list of dicts aux_metadata = [] for key, value in current_metadata.items(): if isinstance(value, dict): for sub_key, sub_value in value.items(): # Check if we should use dot notation or just the sub_key # For now, just using the sub_key as it's cleaner for the 'metadata' case aux_metadata.append({"key": sub_key, "value": sub_value}) else: aux_metadata.append({"key": key, "value": value}) current_metadata = aux_metadata for result in results: for metadatum in current_metadata: md_item = metadatum.copy() # Stringify all values for k, v in md_item.items(): md_item[k] = str(v) md_item["experiment_id"] = result.get_hash() if hasattr(result, "timestamp"): md_item["timestamp"] = result.timestamp metadata_to_insert.append(md_item) return ( exp_table_id, exps_to_insert, fom_table_id, foms_to_insert, metadata_table_id, metadata_to_insert, software_table_id, software_to_insert, )
[docs] class BigQueryUploader(Uploader): """Class to handle upload of FOMs to BigQuery""" """ Attempt to chunk the upload into acceptable size chunks, per BigQuery requirements """ schema = [ { "table": "experiments", "schema": experiment_schema, "version": experiment_schema_version, }, {"table": "foms", "schema": fom_schema, "version": fom_schema_version}, {"table": "metadata", "schema": metadata_schema, "version": metadata_schema_version}, { "table": "experiments_metadata", "schema": experiments_metadata_schema, "version": experiments_metadata_schema_version, }, { "table": "software", "schema": software_db_schema, "version": software_db_schema_version, }, ] def _schema_to_bigquery(self, schema): from google.cloud import bigquery type_map = { "string": "STRING", "number": "FLOAT", "integer": "INTEGER", "boolean": "BOOLEAN", "array": "RECORD", "object": "RECORD", } bq_schema = [] for name, props in schema.get("properties", {}).items(): bq_type = type_map[props["type"]] mode = "NULLABLE" if name in schema.get("required", []): mode = "REQUIRED" fields = [] if "items" in props: fields = self._schema_to_bigquery(props["items"]) bq_schema.append(bigquery.SchemaField(name, bq_type, mode=mode, fields=fields)) return bq_schema
[docs] def create_tables(self, uri): from google.cloud import bigquery from google.cloud.exceptions import NotFound client = bigquery.Client() try: client.get_dataset(uri) except NotFound: logger.info(f"Dataset {uri} is not found, creating it.") client.create_dataset(uri) # Check schema version for table_def in self.schema: try: query = ( f"SELECT value FROM `{uri}.metadata` WHERE key = " f"'{table_def['table']}_schema_version'" ) query_job = client.query(query) results = query_job.result() if results.total_rows > 0: upstream_version = list(results)[0].value if upstream_version != str(table_def["version"]): logger.warn( f"Upstream DB schema version for table {table_def['table']} " f"('{upstream_version}') does not match current version " f"('{table_def['version']}')" ) except NotFound: pass # metadata table doesn't exist, so we don't need to check the version tables_created = False for table_def in self.schema: table_id = f"{uri}.{table_def['table']}" try: client.get_table(table_id) logger.info(f"Table {table_id} already exists.") except NotFound: logger.info(f"Creating table {table_id}") bq_schema = self._schema_to_bigquery(table_def["schema"][table_def["version"]]) table = bigquery.Table(table_id, schema=bq_schema) table = client.create_table(table) logger.info(f"Created table {table.project}.{table.dataset_id}.{table.table_id}") tables_created = True if tables_created: self.upload_metadata(uri)
[docs] def upload_metadata(self, uri): from datetime import datetime logger.info("Uploading metadata at table creation time") metadata_table_id = f"{uri}.metadata" now_timestamp = str(datetime.now()) metadata_to_insert = [ { "key": "db_schema_version", "value": db_schema_version, "timestamp": now_timestamp, }, { "key": "experiment_schema_version", "value": str(experiment_schema_version), "timestamp": now_timestamp, }, { "key": "fom_schema_version", "value": str(fom_schema_version), "timestamp": now_timestamp, }, { "key": "metadata_schema_version", "value": str(metadata_schema_version), "timestamp": now_timestamp, }, { "key": "software_db_schema_version", "value": str(software_db_schema_version), "timestamp": now_timestamp, }, { "key": "ramble_version", "value": ramble.util.version.get_version(), "timestamp": now_timestamp, }, { "key": "user", "value": get_user(), "timestamp": now_timestamp, }, ] self.chunked_upload(metadata_table_id, metadata_to_insert)
[docs] def chunked_upload(self, table_id, data): from google.cloud import bigquery client = bigquery.Client() error = [] approx_max_request = 1000000.0 # 1MB data_len = len(data) approx_request_size = sys.getsizeof(json.dumps(data)) approx_num_batches = math.ceil(approx_request_size / approx_max_request) rows_per_batch = math.floor(data_len / approx_num_batches) if rows_per_batch <= 1: rows_per_batch = 1 logger.debug(f"Size: {sys.getsizeof(json.dumps(data))}B") logger.debug(f"Length in rows: {data_len}") logger.debug(f"Num Batches: {approx_num_batches}") logger.debug(f"Rows per Batch: {rows_per_batch}") for i in range(0, data_len, rows_per_batch): end = i + rows_per_batch if end > data_len: end = data_len logger.debug(f"Uploading rows {i} to {end}") table_name = table_id.split(".")[-1] table_def = next((t for t in self.schema if t["table"] == table_name), None) if table_def and table_def["schema"].get(table_def["version"]): schema_for_validation = table_def["schema"][table_def["version"]] for row in data[i:end]: validate_data(row, schema_for_validation) else: logger.warn( f"Could not find a valid schema for table '{table_name}'. " f"Skipping validation for this chunk." ) error = client.insert_rows_json(table_id, data[i:end]) if error: logger.warn("Issue during uploader insert") logger.warn(error) return error return error
[docs] def insert_data(self, uri: str, results) -> None: ( exp_table_id, exps_to_insert, fom_table_id, foms_to_insert, metadata_table_id, metadata_to_insert, software_table_id, software_to_insert, ) = _prepare_data(results, uri) logger.debug("Experiments to insert:") logger.debug(exps_to_insert) logger.msg("Upload experiments...") errors1 = self.chunked_upload(exp_table_id, exps_to_insert) errors2 = None if errors1 == []: logger.msg("Upload FOMs...") errors2 = self.chunked_upload(fom_table_id, foms_to_insert) if errors2 == []: logger.msg("Upload Experiment Metadata...") errors3 = self.chunked_upload(metadata_table_id, metadata_to_insert) else: errors3 = None if errors3 == []: logger.msg("Upload Software...") errors4 = self.chunked_upload(software_table_id, software_to_insert) else: errors4 = None for errors, name in zip( (errors1, errors2, errors3, errors4), ("exp", "fom", "experiment_metadata", "software"), ): if errors == []: logger.msg(f"New rows have been added in {name}") else: logger.die(f"Encountered errors while inserting rows: {errors}")
[docs] def perform_upload(self, uri, results): super().perform_upload(uri, results) # import spack.util.spack_json as sjson # json_str = sjson.dump(results) self.insert_data(uri, results)
# def get_max_current_id(uri, table): # TODO: Generating an id based on the max in use id is dangerous, and # technically gives a race condition in parallel, and should be done in # a more graceful and scalable way.. like hashing the experiment? or # generating a known unique id for it # query = "SELECT MAX(id) FROM `{uri}.{table}` LIMIT 1".format(uri=uri, table=table) # query_job = client.query(query) # results = query_job.result() # Waits for job to complete. # return results[0]
[docs] class PrintOnlyUploader(Uploader): """An uploader that only prints out formatted data without actually uploading."""
[docs] def perform_upload(self, uri, results): super().perform_upload(uri, results) ( exp_table_id, exps_to_insert, fom_table_id, foms_to_insert, metadata_table_id, metadata_to_insert, software_table_id, software_to_insert, ) = _prepare_data(results, uri) logger.info("NOTE: The PrintOnly uploader only logs, but does not upload any data.") logger.info(f"{len(exps_to_insert)} experiment(s) would be uploaded to {exp_table_id}:") for exp in exps_to_insert: logger.info(f" {exp}") logger.info(f"{len(foms_to_insert)} fom(s) would be uploaded to {fom_table_id}:") for fom in foms_to_insert: logger.info(f" {fom}") logger.info( f"{len(metadata_to_insert)} experiment metadata item(s) " f"would be uploaded to {metadata_table_id}:" ) for metadata_item in metadata_to_insert: logger.info(f" {metadata_item}") logger.info( f"{len(software_to_insert)} " f"software package(s) would be uploaded to {software_table_id}:" ) for software in software_to_insert: logger.info(f" {software}")