Source code for pubtools._pyxis.pyxis_ops

import json
import sys
import tempfile
from argparse import ArgumentParser, Namespace
from typing import Any, Optional, Union

from .constants import DEFAULT_REQUEST_THREADS_LIMIT
from .pyxis_authentication import PyxisKrbAuth, PyxisSSLAuth, PyxisAuth
from .pyxis_client import PyxisClient
from .utils import setup_arg_parser

CMD_ARGS = {
    ("--pyxis-server",): {
        "help": "Pyxis service hostname",
        "required": True,
        "type": str,
    },
    ("--pyxis-insecure",): {
        "help": "Allow insecure connection to Pyxis",
        "required": False,
        "type": bool,
    },
    ("--pyxis-krb-principal",): {
        "help": "Pyxis kerberos principal in form: name@REALM",
        "required": False,
        "type": str,
    },
    ("--pyxis-krb-ktfile",): {
        "help": "Pyxis Kerberos client keytab. Optional. Used for login if TGT is not available.",
        "required": False,
        "type": str,
    },
    ("--pyxis-ssl-crtfile",): {
        "help": "Path to .crt file for the SSL authentication",
        "required": False,
        "type": str,
    },
    ("--pyxis-ssl-keyfile",): {
        "help": "Path to .key file for the SSL authentication",
        "required": False,
        "type": str,
    },
}

GET_OPERATORS_INDICES_ARGS = CMD_ARGS.copy()
GET_OPERATORS_INDICES_ARGS[("--ocp-versions-range",)] = {
    "help": "Supported OCP versions range. "
    "See https://docs.engineering.redhat.com/display/CFC/Delivery",
    "required": True,
    "type": str,
}
GET_OPERATORS_INDICES_ARGS[("--organization",)] = {
    "help": "Organization as understood by IIB",
    "required": False,
    "type": str,
}

GET_REPO_METADATA_ARGS = CMD_ARGS.copy()
GET_REPO_METADATA_ARGS[("--repo-name",)] = {
    "help": "Name of the repository",
    "required": True,
    "type": str,
}
GET_REPO_METADATA_ARGS[("--custom-registry",)] = {
    "help": "Custom registry address. Will be used instead of the default addresses.",
    "required": False,
    "type": str,
}
GET_REPO_METADATA_ARGS[("--only-internal-registry",)] = {
    "help": "Check only internal registry",
    "required": False,
    "type": bool,
}
GET_REPO_METADATA_ARGS[("--only-partner-registry",)] = {
    "help": "Check only partner registry",
    "required": False,
    "type": bool,
}

UPLOAD_SIGNATURES_ARGS = CMD_ARGS.copy()
UPLOAD_SIGNATURES_ARGS[("--signatures",)] = {
    "help": "Signatures in JSON format (as a string) or an @-prefixed file path"
    " with JSON, e.g. --signatures=@/tmp/filename.json",
    "required": True,
    "type": str,
}
UPLOAD_SIGNATURES_ARGS[("--request-threads",)] = {
    "help": "Maximum number of threads to use for parallel requests",
    "required": False,
    "default": DEFAULT_REQUEST_THREADS_LIMIT,
    "type": int,
}

GET_SIGNATURES_ARGS = CMD_ARGS.copy()
GET_SIGNATURES_ARGS[("--manifest-digest",)] = {
    "help": "comma separated manifest-digests to search or json file when prefixed with @",
    "required": False,
    "type": str,
}
GET_SIGNATURES_ARGS[("--reference",)] = {
    "help": "comma separated container pull reference to search or json file when prefixed with @",
    "required": False,
    "type": str,
}

DELETE_SIGNATURES_ARGS = CMD_ARGS.copy()
DELETE_SIGNATURES_ARGS[("--ids",)] = {
    "help": "comma separated signature IDs to remove or json file when prefixed with @",
    "required": True,
    "type": str,
}
DELETE_SIGNATURES_ARGS[("--request-threads",)] = {
    "help": "Maximum number of threads to use for parallel requests",
    "required": False,
    "default": DEFAULT_REQUEST_THREADS_LIMIT,
    "type": int,
}


[docs] def setup_pyxis_client(args: Namespace, ccache_file: str) -> PyxisClient: """ Set up a PyxisClient instance according to specified parameters. Args: args (argparse.Namespace) Arguments of the program. cache_dir (str): Path to a file used for storing ccache by Kerberos authentication. Returns: PyxisClient: Configured PyxisClient instance. """ # If both auths are specified, Kerberos is preferred if args.pyxis_krb_principal: auth: PyxisAuth = PyxisKrbAuth( args.pyxis_krb_principal, args.pyxis_server, ccache_file, args.pyxis_krb_ktfile, ) elif args.pyxis_ssl_crtfile and args.pyxis_ssl_keyfile: auth = PyxisSSLAuth(args.pyxis_ssl_crtfile, args.pyxis_ssl_keyfile) else: raise ValueError( "Either Kerberos principal (and optionally keytab) or .crt and .key " "files must be provided for Pyxis authentication." ) if hasattr(args, "request_threads"): return PyxisClient( args.pyxis_server, auth=auth, verify=not args.pyxis_insecure, threads=args.request_threads, ) else: return PyxisClient(args.pyxis_server, auth=auth, verify=not args.pyxis_insecure)
def set_get_operator_indices_args() -> ArgumentParser: """Set up argparser without extra parameters, this method is used for auto doc generation.""" return setup_arg_parser(GET_OPERATORS_INDICES_ARGS) def _get_operator_indices(sysargs: Optional[list[str]] = None) -> Union[list[str], Any]: """ Entrypoint for getting operator indices. Returns: list: Index images satisfying the specified conditions. """ parser = set_get_operator_indices_args() if sysargs: args = parser.parse_args(sysargs[1:]) else: args = parser.parse_args() # pragma: no cover" with tempfile.NamedTemporaryFile() as tmpfile: pyxis_client = setup_pyxis_client(args, tmpfile.name) resp = pyxis_client.get_operator_indices( args.ocp_versions_range, args.organization ) return resp def get_operator_indices_main(sysargs: Optional[list[str]] = None) -> int: """ Entrypoint for getting operator indices. Returns: int: Exit code (0 for success). """ try: resp = _get_operator_indices(sysargs) json.dump(resp, sys.stdout, sort_keys=True, indent=4, separators=(",", ": ")) return 0 except Exception as e: print(f"Error getting operator indices: {e}", file=sys.stderr) return 1 def get_operator_indices_mod( sysargs: Optional[list[str]] = None, ) -> Union[list[str], Any]: """ Entrypoint for getting operator indices in module mode. This function is used when running the script as a module. It does not return an exit code, but rather prints the result directly. """ return _get_operator_indices(sysargs) def set_get_repo_metadata_args() -> ArgumentParser: """Set up argparser without extra parameters, this method is used for auto doc generation.""" return setup_arg_parser(GET_REPO_METADATA_ARGS) def _get_repo_metadata( sysargs: Optional[list[str]] = None, ) -> Union[dict[Any, Any], Any]: """ Entrypoint for getting repository metadata. Returns: dict: Metadata of the repository. """ parser = set_get_repo_metadata_args() if sysargs: args = parser.parse_args(sysargs[1:]) else: args = parser.parse_args() # pragma: no cover" if args.only_internal_registry and args.only_partner_registry: raise ValueError( "Can't check only internal registry as well as only partner registry" ) with tempfile.NamedTemporaryFile() as tmpfile: pyxis_client = setup_pyxis_client(args, tmpfile.name) res = pyxis_client.get_repository_metadata( args.repo_name, args.custom_registry, args.only_internal_registry, args.only_partner_registry, ) return res def get_repo_metadata_main( sysargs: Optional[list[str]] = None, ) -> Union[dict[Any, Any], Any]: """ Entrypoint for getting repository metadata. Returns: int: Exit code (0 for success). """ try: res = _get_repo_metadata(sysargs) json.dump(res, sys.stdout, sort_keys=True, indent=4, separators=(",", ": ")) return 0 except Exception as e: print(f"Error getting repository metadata: {e}", file=sys.stderr) return 1 def get_repo_metadata_mod( sysargs: Optional[list[str]] = None, ) -> Union[dict[Any, Any], Any]: """ Entrypoint for getting repository metadata in module mode. This function is used when running the script as a module. It does not return an exit code, but rather prints the result directly. """ return _get_repo_metadata(sysargs) # No return value, output is printed directly def set_upload_signatures_args() -> ArgumentParser: """Set up argparser without extra parameters, this method is used for auto doc generation.""" return setup_arg_parser(UPLOAD_SIGNATURES_ARGS) def _upload_signatures(sysargs: Optional[list[str]] = None) -> list[Any]: """ Entrypoint for uploading signatures from JSON or a file. Returns: list: List of uploaded signatures including auto-populated fields. """ parser = set_upload_signatures_args() if sysargs: args = parser.parse_args(sysargs[1:]) else: args = parser.parse_args() # pragma: no cover signatures_json = deserialize_list_from_arg(args.signatures) with tempfile.NamedTemporaryFile() as tmpfile: pyxis_client = setup_pyxis_client(args, tmpfile.name) resp = pyxis_client.upload_signatures(signatures_json) return resp def upload_signatures_main(sysargs: Optional[list[str]] = None) -> int: """ Entrypoint for uploading signatures from JSON or a file. Returns: int: Exit code (0 for success). """ try: resp = _upload_signatures(sysargs) json.dump(resp, sys.stdout, sort_keys=True, indent=4, separators=(",", ": ")) return 0 except Exception as e: print(f"Error uploading signatures: {e}", file=sys.stderr) return 1 def upload_signatures_mod(sysargs: Optional[list[str]] = None) -> list[Any]: """ Entrypoint for uploading signatures from JSON or a file in module mode. This function is used when running the script as a module. It does not return an exit code, but rather prints the result directly. """ return _upload_signatures(sysargs) # No return value, output is printed directly
[docs] def deserialize_list_from_arg( value: str, csv_input: bool = False ) -> Union[list[Any], Any]: """ Conditionally load contents of a file if specified in argument value. Detects whether the value is a reference to a file (@-prefixed like in `gcc -o`, `curl -d`, etc.) and returns its contents in list if applicable; otherwise returns the value as a list. Examples: `{"foo"}` -- plain string, returned as a list `@items.json` -- file path; its contents are returned in a list """ if not value.startswith("@"): if csv_input: # convert comma separated string into list return value.split(",") # convert json string into list return json.loads(value) filename = value[1:] with open(filename, "r") as f: # all file content is returned as list return json.load(f)
[docs] def serialize_to_csv_from_list(list_value: list[Any]) -> str: """Convert a list to comma separated string.""" return ",".join(list_value)
def set_get_signatures_args() -> ArgumentParser: """Set up argparser without extra parameters, this method is used for auto doc generation.""" return setup_arg_parser(GET_SIGNATURES_ARGS) def _get_signatures(sysargs: Optional[list[str]] = None) -> list[str]: """ Entrypoint for getting container signature metadata. Returns: list: container signature metadata satisfying the specified conditions. """ parser = set_get_signatures_args() if sysargs: args = parser.parse_args(sysargs[1:]) else: args = parser.parse_args() # pragma: no cover" csv_references = csv_manifest_digests = None if not (args.manifest_digest or args.reference): parser.error("Give atleast 1 filter, --manifest-digest and/or --reference") if args.manifest_digest: csv_manifest_digests = serialize_to_csv_from_list( deserialize_list_from_arg(args.manifest_digest, csv_input=True) ) if args.reference: csv_references = serialize_to_csv_from_list( deserialize_list_from_arg(args.reference, csv_input=True) ) with tempfile.NamedTemporaryFile() as tmpfile: pyxis_client = setup_pyxis_client(args, tmpfile.name) res = pyxis_client.get_container_signatures( csv_manifest_digests, csv_references ) return res def get_signatures_main(sysargs: Optional[list[str]] = None) -> int: """ Entrypoint for getting container signature metadata. Returns: int: Exit code (0 for success). """ try: res = _get_signatures(sysargs) json.dump(res, sys.stdout, sort_keys=True, indent=4, separators=(",", ": ")) return 0 except Exception as e: print(f"Error getting signatures: {e}", file=sys.stderr) return 1 def get_signatures_mod(sysargs: Optional[list[str]] = None) -> list[str]: """ Entrypoint for getting container signature metadata in module mode. This function is used when running the script as a module. It does not return an exit code, but rather prints the result directly. """ return _get_signatures(sysargs) # No return value, output is printed directly def set_delete_signatures_args() -> ArgumentParser: """Set up argparser without extra parameters, this method is used for auto doc generation.""" return setup_arg_parser(DELETE_SIGNATURES_ARGS) def _delete_signatures(sysargs: Optional[list[str]] = None) -> None: """ Entrypoint for removing existing signatures. Returns: list: Signatures which were deleted. """ parser = set_delete_signatures_args() if sysargs: args = parser.parse_args(sysargs[1:]) else: args = parser.parse_args() # pragma: no cover" if args.ids: signature_ids = deserialize_list_from_arg(args.ids, csv_input=True) with tempfile.NamedTemporaryFile() as tmpfile: pyxis_client = setup_pyxis_client(args, tmpfile.name) pyxis_client.delete_container_signatures(signature_ids) def delete_signatures_main(sysargs: Optional[list[str]] = None) -> int: """ Entrypoint for removing existing signatures. Returns: int: Exit code (0 for success). """ try: _delete_signatures(sysargs) return 0 except Exception as e: print(f"Error deleting signatures: {e}", file=sys.stderr) return 1 def delete_signatures_mod(sysargs: Optional[list[str]] = None) -> None: """ Entrypoint for removing existing signatures in module mode. This function is used when running the script as a module. It does not return an exit code, but rather prints the result directly. """ return _delete_signatures(sysargs) # No return value, output is printed directly