Source code for pubtools._quay.push_docker

from collections import namedtuple
import logging
import sys
import json
from typing import Tuple, List, Dict, Any, cast, Optional

import requests
import urllib3

from .exceptions import BadPushItem, InvalidTargetSettings, InvalidRepository
from .utils.misc import get_internal_container_repo_name, log_step
from .quay_api_client import QuayApiClient
from .quay_client import QuayClient
from .iib_operations import _sign_index_image
from .container_image_pusher import ContainerImagePusher
from .security_manifest_pusher import SecurityManifestPusher

from .operator_pusher import OperatorPusher
from .item_processor import (
    item_processor_for_external_data,
    item_processor_for_internal_data,
    SignEntry,
    ManifestArchDigest,
)
from .utils.misc import parse_index_image
from .signer_wrapper import SIGNER_BY_LABEL

from .utils.misc import (
    get_external_container_repo_name,
    timestamp,
    pyxis_get_repo_metadata,
    set_aws_kms_environment_variables,
    run_in_parallel,
    FData,
)

# TODO: do we want this, or should I remove it?
from urllib3.exceptions import InsecureRequestWarning

urllib3.disable_warnings(InsecureRequestWarning)

LOG = logging.getLogger("pubtools.quay")


[docs]class PushDocker: """Handle full Docker push workflow.""" ImageData = namedtuple( "ImageData", ["repo", "tag", "v2list_digest", "v2s2_digest", "v2s1_digest"] )
[docs] def __init__( self, push_items: List[Any], hub: Any, task_id: str, target_name: str, target_settings: Dict[str, Any], ) -> None: """ Initialize. Args: push_items ([_PushItem]): List of push items. hub (HubProxy): Instance of XMLRPC pub-hub proxy. task_id (str): task id target_name (str): Name of the target. target_settings (dict): Target settings. """ self.push_items = push_items self.hub = hub self.task_id = task_id self.target_name = target_name self.target_settings = target_settings self.verify_target_settings() self.quay_host = self.target_settings.get("quay_host", "quay.io").rstrip("/") self._src_quay_client: Optional[QuayClient] = None self._dest_quay_client: Optional[QuayClient] = None self._dest_operator_quay_client: Optional[QuayClient] = None self._dest_quay_api_client: Optional[QuayApiClient] = None self._index_image_quay_client = None self.dest_registries = self.target_settings["docker_settings"]["docker_reference_registry"] self.dest_registries = ( self.dest_registries if isinstance(self.dest_registries, list) else [self.dest_registries] )
@property def dest_quay_client(self) -> QuayClient: """Create and access QuayClient for dest image.""" if self._dest_quay_client is None: self._dest_quay_client = QuayClient( self.target_settings["dest_quay_user"], self.target_settings["dest_quay_password"], self.quay_host, ) return self._dest_quay_client @property def src_quay_client(self) -> QuayClient: """Create and access QuayClient for source image.""" if self._src_quay_client is None: self._src_quay_client = QuayClient( self.target_settings["source_quay_user"], self.target_settings["source_quay_password"], self.target_settings.get("source_quay_host") or self.quay_host, ) return self._src_quay_client @property def dest_operator_quay_client(self) -> QuayClient: """Create and access QuayClient for dest image.""" if self._dest_operator_quay_client is None: self._dest_operator_quay_client = QuayClient( self.target_settings.get( "index_image_quay_user", self.target_settings["dest_quay_user"] ), self.target_settings.get( "index_image_quay_password", self.target_settings["dest_quay_password"] ), self.quay_host, ) return self._dest_operator_quay_client @property def dest_quay_api_client(self) -> QuayApiClient: """Create and access QuayApiClient for dest image.""" if self._dest_quay_api_client is None: self._dest_quay_api_client = QuayApiClient( self.target_settings["dest_quay_api_token"], self.quay_host ) return self._dest_quay_api_client
[docs] def verify_target_settings(self) -> None: """Verify that target settings contains all the necessary data.""" LOG.info("Verifying the necessary target settings") required_settings = [ "source_quay_user", "source_quay_password", "dest_quay_user", "dest_quay_password", "dest_quay_api_token", "pyxis_server", "quay_namespace", "iib_krb_principal", "iib_organization", "iib_index_image", "quay_operator_repository", "skopeo_image", "docker_settings", ] for setting in required_settings: if setting not in self.target_settings: raise InvalidTargetSettings( "'{0}' must be present in the target settings.".format(setting) ) required_docker_settings = ["umb_urls", "docker_reference_registry"] for setting in required_docker_settings: if setting not in self.target_settings["docker_settings"]: raise InvalidTargetSettings( "'{0}' must be present in the docker settings.".format(setting) )
[docs] @log_step("Get container push items") def get_docker_push_items(self) -> List[Any]: """ Filter push items to only include docker ones. Also, Check the validity of these items and raise an exception in case of incorrect data. For items having the same pull_url, only one of them is returned to avoid duplication. Returns ([ContainerPushItem]): Docker push items. """ docker_push_items = [] url_items: Dict[str, Any] = {} for item in self.push_items: if item.file_type != "docker": LOG.warning("Push item {0} doesn't have 'docker' type, skipping.".format(item)) continue if item.errors: raise BadPushItem("Push item {0} contains errors: {1}".format(item, item.errors)) if not item.metadata.get("pull_data"): raise BadPushItem("Push item {0} doesn't contain pull data.".format(item)) LOG.info("Docker push item found: {0}".format(item)) pull_url = item.metadata["pull_url"] if pull_url in url_items: url_items[pull_url].append(item) else: url_items[pull_url] = [item] for _, items in url_items.items(): non_amd64 = True for item in items: if item.metadata["arch"] in ["amd64", "x86_64"]: docker_push_items.append(item) non_amd64 = False break if non_amd64: docker_push_items.append(items[0]) return docker_push_items
[docs] @log_step("Get operator push items") def get_operator_push_items(self) -> List[Any]: """ Filter out push items to only include operator ones. Also, Check the validity of these items and raise an exception in case of incorrect data. Returns ([ContainerPushItem]): Operator push items. """ operator_push_items = [] for item in self.push_items: if item.file_type != "operator": LOG.warning("Push item {0} doesn't have 'operator' type, skipping.".format(item)) continue if item.errors: raise BadPushItem("Push item {0} contains errors: {1}".format(item, item.errors)) if not item.metadata.get("op_type"): raise BadPushItem("Push item {0} doesn't contain 'op_type'".format(item)) if item.metadata["op_type"] == "appregistry": LOG.warning( "Push item {0} is unsupported legacy (appregistry), skipping".format(item) ) continue if item.metadata["op_type"] != "bundle": message = "Push item {0} has unknown op_type: '{1}'.".format( item, item.metadata["op_type"] ) raise BadPushItem(message) if not item.metadata.get("com.redhat.openshift.versions"): msg = "Push item {0} doesn't specify 'com.redhat.openshift.versions'".format(item) raise BadPushItem(msg) LOG.info("Operator push item found: {0}".format(item)) operator_push_items.append(item) return operator_push_items
[docs] @classmethod def check_repos_validity( cls, push_items: List[Any], hub: Any, target_settings: Dict[str, Any] ) -> None: """ Check if specified repos are valid and pushing to them is allowed. Specifically, this method checks if the repo exists in Comet and if it's not deprecated If pushing to prod, also check if the repo already exists in stage. Args: push_items ([ContainerPushItem]): Container push items containing the repositories. hub (HubProxy): Instance of XMLRPC pub-hub proxy. target_settings (dict): Target settings. """ repos = [] for item in push_items: if item.external_repos: repos += item.external_repos.keys() else: repos += item.repos.keys() repos = sorted(list(set(repos))) repo_schema = "{namespace}/{repo}" # we'll need to get stage namespace from stage target settings if "propagated_from" in target_settings: stage_target_info = hub.worker.get_target_info(target_settings["propagated_from"]) stage_namespace = stage_target_info["settings"]["quay_namespace"] stage_quay_client = QuayClient( stage_target_info["settings"]["dest_quay_user"], stage_target_info["settings"]["dest_quay_password"], ) for repo in repos: # Only check Pyxis if the option is enabled in target settings if target_settings.get("do_repo_deprecation_check", False): LOG.info("Checking validity of repository metadata '{0}'".format(repo)) # Check if repo exists in Pyxis try: metadata = pyxis_get_repo_metadata(repo, target_settings) # Check if repo is not deprecated if "Deprecated" in metadata.get("release_categories", []): raise InvalidRepository("Repository {0} is deprecated".format(repo)) except requests.exceptions.HTTPError as e: if e.response.status_code == 404: LOG.warning( "Metadata of repository '{0}' don't exist in Pyxis".format(repo) ) else: raise # if we're pushing to prod target, check if repo exists on stage as well if "propagated_from" in target_settings: internal_repo = get_internal_container_repo_name(repo) full_repo = repo_schema.format(namespace=stage_namespace, repo=internal_repo) try: stage_quay_client.get_repository_tags(full_repo) except requests.exceptions.HTTPError as e: # strangely, if repo doesn't exist, 401 is returned if a robot account is used if e.response.status_code == 404 or e.response.status_code == 401: raise InvalidRepository( "Repository {0} doesn't exist on stage".format(repo) ) else: raise
[docs] @log_step("Generate backup mapping") def generate_backup_mapping( self, push_items: List[Any], all_arches: bool = False ) -> Tuple[Dict[ImageData, Tuple[str, str]], List[ImageData]]: """ Create resources which will be used for rollback if something goes wrong during the push. Specifically, create two resources: 'backup_tags' and 'rollback_tags'. - 'backup_tags' is a mapping of ImageData->manifest, and consists of images which will be overwritten. During rollback, tag is made to re-reference the old manifest. - 'rollback_tags' is a list of ImageData which don't yet exist. During rollback, they will be removed to preserve pre-push state. If all_arches is set to true, return all arches for v2s2 and v2s1 manifests ImageData is a namedtuple used to assign and access parts of an image in a formatted way. Args: push_items ([ContainerPushItem]): Container push items. all_arches: bool If set to True include all manifests in results. If False only amd64 are included Returns (({ImageData: Tuple[str,str]}, [ImageData])): Tuple of backup_tags and rollback_tags """ backup_tags = {} rollback_tags = [] internal_item_processor = item_processor_for_internal_data( self.dest_quay_client, self.target_settings["quay_host"].rstrip("/"), self.dest_registries, self.target_settings.get("retry_sleep_time", 5), self.target_settings["quay_namespace"], ) external_item_processor = item_processor_for_external_data( self.dest_quay_client, self.dest_registries, self.target_settings.get("retry_sleep_time", 5), ) existing_manifests_for_items = run_in_parallel( internal_item_processor.generate_existing_manifests_map, [FData(args=[item]) for item in push_items], threads=self.target_settings.get("quay_parallelism", 10), ) for item, existing_manifests in zip(push_items, existing_manifests_for_items.values()): destination_tags = external_item_processor.generate_repo_dest_tag_map(item) for registry, repos in existing_manifests.items(): for e_repo, _ in repos.items(): full_repo = internal_item_processor.reference_processor.replace_repo(e_repo) for d_tag in destination_tags[list(destination_tags.keys())[0]][e_repo]: if ( d_tag in existing_manifests[registry][e_repo] and existing_manifests[registry][e_repo][d_tag] ): man_arch_digs = existing_manifests[registry][e_repo][d_tag] if not all_arches: arch_mads = [ m for m in cast(List[ManifestArchDigest], man_arch_digs) if m.arch == "amd64" ] else: arch_mads = cast(List[ManifestArchDigest], man_arch_digs) v2list_mads: List[Optional[ManifestArchDigest]] = cast( List[Optional[ManifestArchDigest]], [ m for m in cast(List[ManifestArchDigest], man_arch_digs) if m.type_ == QuayClient.MANIFEST_LIST_TYPE ] or [None], ) v2s1_mads: List[Optional[ManifestArchDigest]] = cast( List[Optional[ManifestArchDigest]], [m for m in arch_mads if m.type_ == QuayClient.MANIFEST_V2S1_TYPE] or [None], ) v2s2_mads: List[Optional[ManifestArchDigest]] = cast( List[Optional[ManifestArchDigest]], [m for m in arch_mads if m.type_ == QuayClient.MANIFEST_V2S2_TYPE] or [None], ) for mads in (v2s1_mads, v2s2_mads, v2list_mads): for mad in mads: if not mad: continue image_data = PushDocker.ImageData( full_repo, d_tag, ( mad.digest if mad.type_ == QuayClient.MANIFEST_LIST_TYPE else "" ), ( mad.digest if mad.type_ == QuayClient.MANIFEST_V2S2_TYPE else "" ), ( mad.digest if mad.type_ == QuayClient.MANIFEST_V2S1_TYPE else "" ), ) backup_tags[image_data] = ( json.loads(mad.manifest), mad.arch, ) else: rollback_tags.append( PushDocker.ImageData(full_repo, d_tag, None, None, None) ) rollback_tags = sorted(list(set(rollback_tags))) return (backup_tags, rollback_tags)
[docs] @log_step("Perform rollback") def rollback(self, backup_tags: Dict[ImageData, str], rollback_tags: List[ImageData]) -> None: """ Perform a rollback. Args: backup_tags ({ImageData: str}): Dictionary mapping of ImageData and a manifest before it was overwritten. rollback_tags ([ImageData]): List of newly added ImageData. """ # restore overwritten tags to their original values schema = "{host}/{repo}:{tag}" LOG.info("Restoring tags to their original values") for image_data, manifest in sorted(backup_tags.items()): image_ref = schema.format(host=self.quay_host, repo=image_data.repo, tag=image_data.tag) LOG.info("Restoring tag '{0}'".format(image_ref)) self.dest_quay_client.upload_manifest(manifest, image_ref) # delete tags that didn't previously exist LOG.info("Removing newly introduced tags") for image_data in rollback_tags: image_ref = schema.format(host=self.quay_host, repo=image_data.repo, tag=image_data.tag) LOG.info("Removing tag '{0}'".format(image_ref)) try: self.dest_quay_api_client.delete_tag(image_data.repo, image_data.tag) except requests.exceptions.HTTPError as e: # if error occurred before tag was copied, it may not exist # deleting a non-existent tag shouldn't be an error if e.response.status_code != 404 and e.response.status_code != 401: raise
def get_outdated_manifests( self, backup_tags: Dict[ImageData, str] ) -> List[Tuple[str, str, str]]: """Return list of existing manifests which are being replaced with new ones. Args: backup_tags (dict({ImageData: str}): backup tags generated with generate_backup_mapping. Returns: List of tuples containing digest, tag, repo identifying manifests """ outdated_signatures = [] for image_data, manifest in backup_tags.items(): ext_repo = get_external_container_repo_name(image_data.repo.split("/")[1]) if image_data.v2s2_digest: outdated_signatures.append((image_data.v2s2_digest, image_data.tag, ext_repo)) if image_data.v2s1_digest: outdated_signatures.append((image_data.v2s1_digest, image_data.tag, ext_repo)) return outdated_signatures def fetch_missing_push_items_digests( self, push_items: List[Any] ) -> Dict[str, Dict[str, Dict[str, Dict[str, Tuple[str, Any]]]]]: """Fetch digests for media types which weren't originally pushed. In order to be able to sign v2s1 for images which were pushed as v2s2 or to sign v2s2 for images which were pushed as v2s1 fetch digests of those missing media types from quay and set it to item metadata into 'new_digests' mapping. Args: push_items(list): List of push items. target_settings(dict): Target settings. """ item_processor = item_processor_for_internal_data( self.dest_quay_client, self.target_settings["quay_host"].rstrip("/"), self.dest_registries, self.target_settings.get("retry_sleep_time", 5), self.target_settings["quay_namespace"], ) new_digests: Dict[str, Dict[str, Dict[str, Dict[str, Tuple[str, Any]]]]] = {} for item in push_items: missing_media_types = set( [QuayClient.MANIFEST_V2S2_TYPE, QuayClient.MANIFEST_V2S1_TYPE] ) - set(item.metadata["build"]["extra"]["image"]["media_types"]) # Always add v2s1 due to possible digest change missing_media_types.add(QuayClient.MANIFEST_V2S1_TYPE) existing_manifests = item_processor.generate_existing_manifests_map( item, only_media_types=list(missing_media_types) ) for reference, repos in existing_manifests.items(): new_digests.setdefault(reference, {}) for repo, tags in repos.items(): new_digests.setdefault(reference, {}).setdefault(repo, {}) for tag, man_arch_digs in tags.items(): if not man_arch_digs: continue new_digests[reference][repo].setdefault(tag, {}) for mad in man_arch_digs: new_digests[reference][repo][tag][mad.type_] = ( mad.digest, item.claims_signing_key, ) return new_digests def sign_new_manifests(self, docker_push_items: List[Any]) -> List[Tuple[str, str, Any]]: """Sign newly pushed images with signers enabled in target settings. Args: docker_push_items(List[PushItem]): list of docker push items. Returns: List of tuple (reference, digest, key) representing currently signed images. """ current_signatures = [] to_sign_new_entries = self.fetch_missing_push_items_digests(docker_push_items) to_sign_entries = [] to_sign_entries_internal = [] for internal_reg, repo_tags in to_sign_new_entries.items(): for repo, tag_digests in repo_tags.items(): for tag, digests in tag_digests.items(): for type_, digest_key in digests.items(): digest, key = digest_key internal_reference = ( f"{internal_reg}/" + self.target_settings["quay_namespace"] + "/" + get_internal_container_repo_name(repo) + ":" + tag ) for registry in self.dest_registries: pub_reference = f"{registry}/{repo}:{tag}" # add entries in internal format for cosign to_sign_entries_internal.append( SignEntry( reference=internal_reference, pub_reference=pub_reference, repo=repo, digest=digest, signing_key=key, arch="amd64", ) ) reference = f"{registry}/{repo}:{tag}" to_sign_entries.append( SignEntry( reference=reference, pub_reference="", repo=repo, digest=digest, signing_key=key, arch="amd64", ) ) current_signatures.append((reference, digest, key)) for signer in self.target_settings["signing"]: if signer["enabled"]: signercls = SIGNER_BY_LABEL[signer["label"]] signer = signercls(config_file=signer["config_file"], settings=self.target_settings) if signercls.pre_push is True: signer.sign_containers(to_sign_entries, self.task_id) else: signer.sign_containers(to_sign_entries_internal, self.task_id) return current_signatures
[docs] def run(self) -> None: """ Perform the full push-docker workflow. The workflow can be summarized as: - Filter out push items to only include container image items - Check if the destination repos may be pushed to (using Pyxis) - Generate backup mapping that will be used for rollback if something goes wrong. - Sign container images using RADAS and upload signatures to Pyxis - Push container images to their destinations - Generate and push container security manifests for pushed images - Fetch digests for missing media types of pushed items - Sign manifests for missing media types (has to be done after pushing) - Filter out push items to only include operator image items - Add operator bundles to index images by using IIB - Sign index images using RADAS and upload signatures to Pyxis - Push the index images to Quay - Remove outdated container signatures - (in case of failure) Rollback destination repos to the pre-push state """ # TODO: Do we need to manage push item state? # Filter out non-docker push items docker_push_items = self.get_docker_push_items() # Get operator push items (done early so that possible issues are detected) operator_push_items = self.get_operator_push_items() # Check if we may push to destination repos self.check_repos_validity(docker_push_items, self.hub, self.target_settings) # Generate resources for rollback in case there are errors during the push backup_tags, rollback_tags = self.generate_backup_mapping( docker_push_items, all_arches=True ) # Restore manifest list if it exists, otherwise restore v2s2 or v2s1 manifest backup_tags_restore = {} repo_tag_group: dict[Tuple[str, str], dict[str, PushDocker.ImageData]] = {} for image_data in backup_tags.keys(): repo_tag_group.setdefault((image_data.repo, image_data.tag), {}) for t in ["v2list", "v2s2", "v2s1"]: if getattr(image_data, t + "_digest"): repo_tag_group[(image_data.repo, image_data.tag)][t] = image_data break for images in repo_tag_group.values(): for t in ["v2list", "v2s2", "v2s1"]: if t in images.keys(): backup_tags_restore[images[t]] = backup_tags[images[t]][0] break all_backup_tags = {k: bt[0] for k, bt in backup_tags.items()} existing_index_images = [] iib_results = None successful_iib_results = dict() index_stamp = timestamp() item_processor = item_processor_for_external_data( self.src_quay_client, self.dest_registries, self.target_settings.get("retry_sleep_time", 5), ) to_sign_entries = [] current_signatures = [] to_sign_map = run_in_parallel( item_processor.generate_to_sign, [FData(args=(item,), kwargs={}) for item in docker_push_items], ) for _to_sign_entries in to_sign_map.values(): to_sign_entries.extend(_to_sign_entries) for sign_entry in to_sign_entries: current_signatures.append( (sign_entry.reference, sign_entry.digest, sign_entry.signing_key) ) # Sign containers with signers that doesn't need have pushed containers # in destination registry set_aws_kms_environment_variables(self.target_settings, "cosign_signer") for signer in self.target_settings["signing"]: if signer["enabled"] and SIGNER_BY_LABEL[signer["label"]].pre_push: signercls = SIGNER_BY_LABEL[signer["label"]] signer = signercls(config_file=signer["config_file"], settings=self.target_settings) signer.sign_containers(to_sign_entries, self.task_id) # Push container images container_pusher = ContainerImagePusher(docker_push_items, self.target_settings) container_pusher.push_container_images() # Sign containers with signers which requires pushed containers in destination registry to_sign_entries = [] item_processor = item_processor_for_internal_data( self.src_quay_client, self.target_settings["quay_host"].rstrip("/"), self.dest_registries, self.target_settings.get("retry_sleep_time", 5), self.target_settings["quay_namespace"], ) to_sign_map = run_in_parallel( item_processor.generate_to_sign, [ FData(args=(item,), kwargs={"include_manifest_lists": True}) for item in docker_push_items ], ) for _to_sign_entries in to_sign_map.values(): to_sign_entries.extend(_to_sign_entries) for signer in self.target_settings["signing"]: if signer["enabled"] and not SIGNER_BY_LABEL[signer["label"]].pre_push: signercls = SIGNER_BY_LABEL[signer["label"]] signer = signercls(config_file=signer["config_file"], settings=self.target_settings) signer.sign_containers(to_sign_entries, self.task_id) self.sign_new_manifests(docker_push_items) if self.target_settings.get("push_security_manifests_enabled", False): # Generate and push security manifests (if enabled in target settings) set_aws_kms_environment_variables(self.target_settings, "security_manifest_signer") sec_manifest_pusher = SecurityManifestPusher(docker_push_items, self.target_settings) sec_manifest_pusher.push_security_manifests() failed = False if operator_push_items: # Build index images operator_pusher = OperatorPusher( operator_push_items, self.task_id, self.target_settings ) existing_index_images = operator_pusher.get_existing_index_images( self.dest_operator_quay_client ) if operator_pusher.ensure_bundles_present(): bundles_presence_check_failed = False iib_results = operator_pusher.build_index_images() else: bundles_presence_check_failed = True iib_results = {} # Sign operator images failed_items = [item for item in operator_push_items if item.errors] successful_iib_results = dict( [(key, val) for key, val in iib_results.items() if val["iib_result"]] ) image_schema = "{host}/{namespace}/{repo}:{tag}" for version, iib_details in sorted(successful_iib_results.items()): iib_result = iib_details["iib_result"] _, iib_namespace, iib_intermediate_repo = parse_index_image(iib_result) permanent_index_image = image_schema.format( host=self.target_settings.get("quay_host", "quay.io").rstrip("/"), namespace=iib_namespace, repo=iib_intermediate_repo, tag=iib_result.build_tags[0], ) timestamp_tags = [f"{tag}-{index_stamp}" for tag in iib_details["destination_tags"]] current_signatures.extend( _sign_index_image( permanent_index_image, iib_details["destination_tags"] + timestamp_tags, iib_details["signing_keys"], self.task_id, self.target_settings, pre_push=True, ) ) # If there are any failed items, skip pushing if not failed_items: # Push index images to Quay operator_pusher.push_index_images(successful_iib_results, index_stamp) for version, iib_details in sorted(successful_iib_results.items()): iib_result = iib_details["iib_result"] _, iib_namespace, iib_intermediate_repo = parse_index_image(iib_result) permanent_index_image = image_schema.format( host=self.target_settings.get("quay_host", "quay.io").rstrip("/"), namespace=iib_namespace, repo=iib_intermediate_repo, tag=iib_result.build_tags[0], ) timestamp_tags = [f"{tag}-{index_stamp}" for tag in iib_details["destination_tags"]] current_signatures.extend( _sign_index_image( permanent_index_image, iib_details["destination_tags"] + timestamp_tags, iib_details["signing_keys"], self.task_id, self.target_settings, pre_push=False, ) ) # Rollback only when all index image builds fails or there are failed items # Empty iib_results is not an error and shouldn't fail the push. The only exception is # when bundles presence check failed. if ( (iib_results or bundles_presence_check_failed) and (not any([x["iib_result"] for x in iib_results.values()])) or failed_items ): if failed_items: LOG.error("There are failed push items. Cannot continue, running rollback.") else: LOG.error("Push of all index images failed, running rollback.") self.rollback(backup_tags_restore, rollback_tags) sys.exit(1) if successful_iib_results != iib_results: LOG.error("Push of some index images failed") failed = True # Remove old signatures # run generate backup mapping again to fetch new digests of pushed containers backup_tags2, _ = self.generate_backup_mapping(docker_push_items, all_arches=True) # if new backup tag has differnet digest, it means it was overwritten during the push # and old signature should be removed. If the digest is the same it means, same item # was just repushed outdated_tags = {} backup_tags2_shared = {} # Backup tags can contain new tags which were orignally rollback_tags # limit the comparision for outdated manifests to original backup_tags only for bt2 in backup_tags2.items(): if (bt2[0].tag, bt2[0].repo) in [(x.tag, x.repo) for x in all_backup_tags.keys()]: backup_tags2_shared[bt2[0]] = bt2[1] for bt1, bt2 in zip( sorted( backup_tags.items(), key=lambda x: ( x[0].tag, x[0].repo, x[0].v2list_digest or "", x[0].v2s2_digest or "", x[0].v2s1_digest or "", ), ), sorted( backup_tags2_shared.items(), key=lambda x: ( x[0].tag, x[0].repo, x[0].v2list_digest or "", x[0].v2s2_digest or "", x[0].v2s1_digest or "", ), ), ): list_test = ( bt1[0].v2list_digest and bt2[0].v2list_digest and bt1[0].v2list_digest != bt2[0].v2list_digest ) s2_test = ( bt1[0].v2s2_digest and bt2[0].v2s2_digest and bt1[0].v2s2_digest != bt2[0].v2s2_digest ) s1_test = ( bt1[0].v2s1_digest and bt2[0].v2s1_digest and bt1[0].v2s1_digest != bt2[0].v2s1_digest ) outdated = False if list_test: d1 = bt1[0].v2list_digest d2 = bt2[0].v2list_digest name = "manifest list" outdated = True elif s2_test: d1 = bt1[0].v2s2_digest d2 = bt2[0].v2s2_digest name = "v2s2" outdated = True elif s1_test: d1 = bt1[0].v2s1_digest # noqa: F841 d2 = bt2[0].v2s1_digest # noqa: F841 name = "v2s1" # noqa: F841 outdated = True if outdated: LOG.debug( f"Marking manifest {bt1[0].tag} as outdated, {name} " f"digests don't match {d1} != {d2}" ) outdated_tags[bt1[0]] = bt1[1] outdated_manifests = self.get_outdated_manifests(outdated_tags) outdated_manifests.extend(existing_index_images) for signer in self.target_settings["signing"]: if signer["enabled"] and outdated_manifests: signercls = SIGNER_BY_LABEL[signer["label"]] signer = signercls(config_file=signer["config_file"], settings=self.target_settings) signer.remove_signatures(outdated_manifests, _exclude=current_signatures) if failed: # Why??? sys.exit(1)
def mod_entry_point( push_items: List[Any], hub: Any, task_id: str, target_name: str, target_settings: Dict[str, Any] ) -> None: """Entry point for use in another python code.""" push = PushDocker(push_items, hub, task_id, target_name, target_settings) push.run()