Source code for pubtools._quay.signer_wrapper

import abc
from contextlib import contextmanager, redirect_stdout
import logging
import pkg_resources
import tempfile
import json
import io

from typing import Optional, List, Dict, Any, Tuple, Generator, Type

from marshmallow import Schema, fields, EXCLUDE

from .utils.misc import (
    run_entrypoint,
    get_pyxis_ssl_paths,
    # run_in_parallel,
    log_step,
    # FData,
)
from .item_processor import SignEntry

LOG = logging.getLogger("pubtools.quay")


class SigningError(Exception):
    """Error raised when signing fails."""


class NoSchema(Schema):
    """Schema that does not validate anything."""


[docs]class MsgSignerSettingsSchema(Schema): """Validation schema for messaging signer settings.""" pyxis_server = fields.String(required=True) pyxis_ssl_crtfile = fields.String(required=False) pyxis_ssl_keyfile = fields.String(required=False) num_thread_pyxis = fields.Integer(required=False, default=7)
[docs]class SignerWrapper: """Wrapper providing functionality to sign containers with a generic signer.""" label = "unused" pre_push = False SCHEMA: Type[Schema] = NoSchema entry_point_conf = ["signer", "group", "signer"] def __init__( self, config_file: Optional[str] = None, settings: Optional[Dict[str, Any]] = None ) -> None: """Initialize SignerWrapper. Args: config_file (str): Path to config file for the signer. settings (dict): Settings for the signer. """ self.config_file = config_file self.settings = settings or {} self._ep = None self.validate_settings() @property def entry_point(self) -> Any: """Load and return entry point for pubtools-sign project.""" if self._ep is None: self._ep = pkg_resources.load_entry_point(*self.entry_point_conf) return self._ep
[docs] def remove_signatures( self, signatures: List[Tuple[str, str, str]], _exclude: Optional[List[Tuple[str, str, str]]] = None, ) -> None: """Remove signatures from a sigstore.""" LOG.debug("Removing signatures %s", signatures) self._remove_signatures(signatures)
@abc.abstractmethod def _run_remove_signatures(self, signatures_to_remove: List[Any]) -> None: pass # pragma: no cover def _remove_signatures(self, signatures_to_remove: List[Any]) -> None: """Remove signatures from sigstore. This is helper to make testing easier. Args: signatures_to_remove (list): Signatures to remove. """ self._run_remove_signatures(signatures_to_remove) @abc.abstractmethod def _run_store_signed(self, signatures: Dict[str, Any]) -> None: pass # pragma: no cover def _store_signed(self, signatures: Dict[str, Any]) -> None: """Store signatures in sigstore. This is helper to make testing easier. Args: signatures (dict): Signatures to store. """ LOG.debug("Storing signatures %s", signatures) self._run_store_signed(signatures)
[docs] def sign_container_opt_args( self, sign_entry: List[SignEntry], task_id: Optional[str] = None ) -> Dict[str, Any]: """Return optional arguments for signing a container. Args: sign_entries (List[SignEntry]): List of SignEntry. task_id (str): Task ID to identify the signing task if needed. Returns: dict: Optional arguments for signing a container. """ return {}
def _sign_containers( self, sign_entries: List[SignEntry], task_id: Optional[str] = None, ) -> None: """Sign a specific chunk of references and digests with given signing key. Args: sign_entries (List[SignEntry]): Chunk of SignEntry to sign. task_id (str): Task ID to identify the signing task if needed. """ for sign_entry in sign_entries: LOG.debug( "Signing container %s %s %s", sign_entry.reference, sign_entry.digest, sign_entry.signing_key, ) if not sign_entries: return sign_entry = sign_entries[0] opt_args = self.sign_container_opt_args(sign_entries, task_id) signed = self.entry_point( config_file=self.config_file, signing_key=sign_entry.signing_key, reference=[x.reference for x in sign_entries if x], digest=[x.digest for x in sign_entries if x], **opt_args, ) if signed["signer_result"]["status"] != "ok": raise SigningError(signed["signer_result"]["error_message"]) for sign_entry in sign_entries: LOG.info( "Signed %s(%s) with %s in %s", sign_entry.reference, sign_entry.digest, sign_entry.signing_key, self.label, ) self._store_signed(signed) def _filter_to_sign(self, to_sign_entries: List[SignEntry]) -> List[SignEntry]: """Filter entries to sign. Args: to_sign_entries (List[SignEntry]): list of entries to sign. Returns: List[SignEntry]: list of entries to sign. """ return to_sign_entries
[docs] @log_step("Sign container images") def sign_containers( self, to_sign_entries: List[SignEntry], task_id: Optional[str] = None, ) -> None: """Sign signing entries. Entries are sent to signer in chunks of chunk_size size. Args: to_sign_entries (List[SignEntry]): list of entries to sign. task_id (str): optional identifier used in signing process. parallelism (int): determines how many entries should be signed in parallel. """ to_sign_entries = self._filter_to_sign(to_sign_entries) to_sign_entries_filtered = [] for sign_entry in to_sign_entries: if sign_entry not in to_sign_entries_filtered: to_sign_entries_filtered.append(sign_entry) with redirect_stdout(io.StringIO()): self._sign_containers(to_sign_entries_filtered, task_id)
[docs] def validate_settings(self, settings: Optional[Dict[str, Any]] = None) -> None: """Validate provided settings for the SignerWrapper.""" settings = settings or self.settings schema = self.SCHEMA(unknown=EXCLUDE) schema.load(settings)
[docs]class MsgSignerWrapper(SignerWrapper): """Wrapper for messaging signer functionality.""" label = "msg_signer" pre_push = True entry_point_conf = ["pubtools-sign", "modules", "pubtools-sign-msg-container-sign"] MAX_MANIFEST_DIGESTS_PER_SEARCH_REQUEST = 50 SCHEMA = MsgSignerSettingsSchema def _filter_to_sign(self, to_sign_entries: List[SignEntry]) -> List[SignEntry]: to_sign_digests = [x.digest for x in to_sign_entries] existing_signatures = [esig for esig in self._fetch_signatures(to_sign_digests)] existing_signatures_drk = { (x["manifest_digest"], x["reference"], x["sig_key_id"]) for x in existing_signatures } ret = [] for tse in to_sign_entries: if (tse.digest, tse.reference, tse.signing_key) not in existing_signatures_drk: ret.append(tse) return ret
[docs] def sign_container_opt_args( self, sign_entries: List[SignEntry], task_id: Optional[str] = None ) -> Dict[str, Any]: """Return optional arguments for signing a container. Args: sign_entries (List[SignEntry]): List of SignEntry. task_id (str): Task ID to identify the signing task if needed. Returns: dict: Optional arguments for signing a container. """ return {k: v for k, v in [("task_id", task_id)] if v is not None}
@contextmanager def _save_signatures_file(self, signatures: List[Dict[str, Any]]) -> Generator[Any, None, None]: """Save signatures to a temporary file and yield the file.""" with tempfile.NamedTemporaryFile( mode="w", prefix="pubtools_quay_upload_signatures_" ) as signature_file: json.dump(signatures, signature_file) signature_file.flush() yield signature_file def _fetch_signatures( self, manifest_digests: List[str] ) -> Generator[Dict[str, Any], None, None]: """Fetch signatures from sigstore. Args: manifest_digests (list): Manifest digests to fetch signatures for. Returns: List[Dict[str, Any]]: List of fetched signatures. """ cert, key = get_pyxis_ssl_paths(self.settings) chunk_size = self.MAX_MANIFEST_DIGESTS_PER_SEARCH_REQUEST manifest_digests = sorted(list(set(manifest_digests))) args = ["--pyxis-server", self.settings["pyxis_server"]] args += ["--pyxis-ssl-crtfile", cert] args += ["--pyxis-ssl-keyfile", key] args += ["--request-threads", str(self.settings.get("num_thread_pyxis", 7))] for chunk_start in range(0, len(manifest_digests), chunk_size): chunk = manifest_digests[chunk_start : chunk_start + chunk_size] # noqa: E203 args = ["--pyxis-server", self.settings["pyxis_server"]] args += ["--pyxis-ssl-crtfile", cert] args += ["--pyxis-ssl-keyfile", key] with tempfile.NamedTemporaryFile( mode="w", prefix="pubtools_quay_get_signatures_" ) as signature_fetch_file: if manifest_digests: json.dump(chunk, signature_fetch_file) signature_fetch_file.flush() args += ["--manifest-digest", "@{0}".format(signature_fetch_file.name)] env_vars: Dict[Any, Any] = {} chunk_results = run_entrypoint( ("pubtools-pyxis", "console_scripts", "pubtools-pyxis-get-signatures"), "pubtools-pyxis-get-signatures", args, env_vars, ) for result in chunk_results: yield result def _run_store_signed(self, signed_results: Dict[str, Any]) -> None: """ Upload signatures to Pyxis by using a pubtools-pyxis entrypoint. Data required for a Pyxis POST request: - manifest_digest - reference - repository - sig_key_id - signature_data Signatures are uploaded in batches. Args: signed_results: (Dict[str, Any]): Dictionary of {"signer_result":..., "operation_results":..., "signing_key":...}"} holding signed manifest claims data """ LOG.info("Sending new signatures to Pyxis") signatures: List[Dict[str, Any]] = [] for reference, op_res in zip( signed_results["operation"]["references"], signed_results["operation_results"] ): signatures.append( { "manifest_digest": op_res[0]["msg"]["manifest_digest"], "reference": reference, "repository": op_res[0]["msg"]["repo"], "sig_key_id": signed_results["signing_key"], "signature_data": op_res[0]["msg"]["signed_claim"], } ) for sig in signatures: LOG.debug( f"Uploading new signature. Reference: {sig['reference']}, " f"Repository: {sig['repository']}, " f"Digest: {sig['manifest_digest']}, " f"Key: {sig['sig_key_id']}" ) cert, key = get_pyxis_ssl_paths(self.settings) args = ["--pyxis-server", self.settings["pyxis_server"]] args += ["--pyxis-ssl-crtfile", cert] args += ["--pyxis-ssl-keyfile", key] args += ["--request-threads", str(self.settings.get("num_thread_pyxis", 7))] with self._save_signatures_file(signatures) as signature_file: args += ["--signatures", "@{0}".format(signature_file.name)] LOG.info("Uploading {0} new signatures".format(len(signatures))) env_vars: Dict[Any, Any] = {} run_entrypoint( ("pubtools-pyxis", "console_scripts", "pubtools-pyxis-upload-signatures"), "pubtools-pyxis-upload-signature", args, env_vars, False, ) def _run_remove_signatures(self, signatures_to_remove: List[str]) -> None: """Remove signatures from the sigstore. Args: signatures_to_remove (List[str]): List of signatures to remove. """ cert, key = get_pyxis_ssl_paths(self.settings) args = [] args = ["--pyxis-server", self.settings["pyxis_server"]] args += ["--pyxis-ssl-crtfile", cert] args += ["--pyxis-ssl-keyfile", key] args += ["--request-threads", str(self.settings.get("num_thread_pyxis", 7))] with tempfile.NamedTemporaryFile(mode="w") as temp: json.dump(signatures_to_remove, temp) temp.flush() args += ["--ids", "@%s" % temp.name] env_vars: Dict[Any, Any] = {} run_entrypoint( ("pubtools-pyxis", "console_scripts", "pubtools-pyxis-delete-signatures"), "pubtools-pyxis-delete-signatures", args, env_vars, ) def _filter_to_remove( self, signatures: List[Tuple[str, str, str]], _exclude: Optional[List[Tuple[str, str, str]]] = None, ) -> List[str]: """Filter signatures to remove. Args: signatures (List[Tuple[str, str, str]]): List of (digest, tag, repository) tuples of signautres to remove. _exclude (Optional[List[Tuple[str, str, str]]]): List of (digest, tag, repository) tuples of signautres to keep. """ exclude = _exclude or [] signatures_to_remove = list(self._fetch_signatures([x[0] for x in signatures])) sig_ids_to_remove = [] for existing_signature in signatures_to_remove: if ( existing_signature["manifest_digest"], existing_signature["reference"].split(":")[-1], existing_signature["repository"], ) in signatures and ( existing_signature["manifest_digest"], existing_signature["reference"], existing_signature["repository"], ) not in exclude: sig_ids_to_remove.append(existing_signature["_id"]) LOG.debug( f"Removing signature. Reference: {existing_signature['reference']}, " f"Repository: {existing_signature['repository']}, " f"Digest: {existing_signature['manifest_digest']}, " f"Key: {existing_signature['sig_key_id']}" ) return sig_ids_to_remove
[docs] @log_step("Remove outdated signatures") def remove_signatures( self, signatures: List[Tuple[str, str, str]], _exclude: Optional[List[Tuple[str, str, str]]] = None, ) -> None: """Remove signatures from sigstore. Args: signatures (list): List of tuples containing (digest, reference, repository) of signatures to remove. exclude (Optional[List[Tuple[str, str, str]]]): List of (digest, tag, repository) tuples of signautres to keep. """ _signatures = list(signatures) to_remove = self._filter_to_remove(_signatures, _exclude=_exclude) self._remove_signatures(to_remove)
[docs]class CosignSignerSettingsSchema(Schema): """Validation schema for cosign signer settings.""" quay_namespace = fields.String(required=True) quay_host = fields.String(required=True) dest_quay_user = fields.String(required=True) dest_quay_password = fields.String(required=True) dest_quay_api_token = fields.String(required=True) signing_chunk_size = fields.Integer(required=False, default=100) signing_parallelism = fields.Integer(required=False, default=10)
[docs]class CosignSignerWrapper(SignerWrapper): """Wrapper for cosign signer functionality.""" label = "cosign_signer" pre_push = False entry_point_conf = ["pubtools-sign", "modules", "pubtools-sign-cosign-container-sign"] SCHEMA = CosignSignerSettingsSchema # TODO: Uncomment when cosign signature removal is enabled # def _list_signatures(self, repository: str, digest: str) -> List[Tuple[str, str]]: # """List cosign signatures for given repository. # # This methods runs pubtools-sign-cosign-container-list entrypoint which is expected to # return list of full references to signature tags in format sha256-<digest>.sig # # Args: # repository (str): Repository to list signatures for. # tag (str): Tag to list signatures for. # Returns: # List[Tuple[str, str]]: List of (repository, signature tag) tuples # for existing signatures. # """ # quay_client = QuayClient( # self.settings["dest_quay_user"], # self.settings["dest_quay_password"], # self.settings["quay_host"], # ) # try: # ml = quay_client.get_manifest( # f"{self.settings['quay_host'].rstrip('/')}/" # f"{self.settings['quay_namespace']}/{repository.replace('/','----')}@{digest}", # media_type=QuayClient.MANIFEST_LIST_TYPE, # ) # except ManifestTypeError: # ml = None # if ml: # full_references = [ # f"{self.settings['quay_host'].rstrip('/')}" # f"/{self.settings['quay_namespace']}/{repository.replace('/','----')}@{digest}" # ] # for manifest in cast(ManifestList, ml)["manifests"]: # full_references.append( # ( # f"{self.settings['quay_host'].rstrip('/')}/" # + f"{self.settings['quay_namespace']}/{repository.replace('/','----')}" # + f"@{manifest['digest']}" # ) # ) # else: # full_references = [ # ( # f"{self.settings['quay_host'].rstrip('/')}/" # + f"{self.settings['quay_namespace']}/{repository.replace('/','----')}" # + f"@{digest}" # ) # ] # existing_signatures = [] # for full_reference in full_references: # existing_signatures.append( # run_entrypoint( # ("pubtools-sign", "modules", "pubtools-sign-cosign-signature-list"), # None, # [cast(str, self.config_file), full_reference], # {}, # ) # ) # rets = [] # for esig in existing_signatures: # if esig[0] is True: # for sig in esig[1]: # # if not sig: # # continue # rets.extend( # [(repository, sig.split(":")[-1].replace(".sig", "").replace("-", ":"))] # ) # else: # LOG.warning("Fetch existing signatures error:" + esig[1]) # return rets def sign_container_opt_args( self, sign_entries: List[SignEntry], task_id: Optional[str] = None ) -> Dict[str, Any]: """Return optional arguments for signing a container. Args: sign_entries (List[SignEntry]): List of SignEntry. task_id (str): Task ID to identify the signing task if needed. Returns: dict: Optional arguments for signing a container. """ return {"identity": [sign_entry.pub_reference for sign_entry in sign_entries]} def _filter_to_remove( self, signatures: List[Tuple[str, str, str]], _exclude: Optional[List[Tuple[str, str, str]]] = None, ) -> List[str]: """Filter signatures to remove. Args: signatures (List[Tuple[str, str, str]]): List of (digest, tag, repository) tuples of signautres to remove. _exclude (Optional[List[Tuple[str, str, str]]]): List of (digest, tag, repository) tuples of signautres to keep. """ # TODO: Uncomment when cosign signature removal is enabled # signatures_to_remove = [(x[2], x[0]) for x in signatures] # signatures_to_exclude = [(x[2], x[0]) for x in _exclude or []] # existing_signatures = set( # sum( # run_in_parallel( # self._list_signatures, # [FData(args=digest_tag) for digest_tag in signatures_to_remove], # ).values(), # [], # ) # ) # # to_remove = [] # for existing_signature in existing_signatures: # if ( # existing_signature in signatures_to_remove # and existing_signature not in signatures_to_exclude # ): # to_remove.append(existing_signature) # LOG.debug( # f"Removing signature " # f"Repository: {existing_signature[0]}, " # f"Digest: {existing_signature[1]}, " # ) # return to_remove return [] # pragma: no cover def _run_remove_signatures(self, signatures_to_remove: List[Tuple[str, str]]) -> None: """Remove signatures from the sigstore. Args: signatures_to_remove (List[Tuple(str, str)]): List of signatures to remove. """ # TODO: Uncomment when cosign signature removal is enabled # qc = QuayApiClient(self.settings["dest_quay_api_token"], host=self.settings["quay_host"]) # for sig_to_remove in signatures_to_remove: # ref = self.settings["quay_namespace"] + "/" + sig_to_remove[0].replace("/", "----") # sig_tag = sig_to_remove[1].replace(":", "-") + ".sig" # qc.delete_tag(ref, sig_tag) pass # pragma: no cover def remove_signatures( self, signatures: List[Tuple[str, str, str]], _exclude: Optional[List[Tuple[str, str, str]]] = None, ) -> None: """Remove signatures from sigstore. Args: signatures (list): List of tuples containing (digest, reference, repository) of signatures to remove. exclude (Optional[List[Tuple[str, str, str]]]): List of (digest, tag, repository) tuples of signautres to keep. """ # XXX: Removal of signatures in cosign is currently disabled. Once cosign will be able to # remove specific annotation from a signatures, this method will be updated and enabled. return
# to_remove = self._filter_to_remove(signatures, _exclude=_exclude) # self._remove_signatures(to_remove) SIGNER_BY_LABEL = { wrapper.label: wrapper for name, wrapper in locals().items() if type(wrapper) is type and issubclass(wrapper, SignerWrapper) and wrapper != SignerWrapper }