Source code for pubtools._quay.container_image_pusher

import functools
import logging
from concurrent import futures
from concurrent.futures.thread import ThreadPoolExecutor
from typing import Any, cast, Dict, List, Optional


import requests

from .exceptions import (
    ManifestTypeError,
)
from .utils.misc import (
    get_internal_container_repo_name,
    log_step,
    run_with_retries,
)
from .quay_client import QuayClient
from .tag_images import tag_images
from .types import ManifestList

from pubtools.tracing import get_trace_wrapper

tw = get_trace_wrapper()
LOG = logging.getLogger("pubtools.quay")


[docs]class ContainerImagePusher: """ Push container images to Quay. No validation is performed, push items are expected to be correct. """
[docs] def __init__(self, push_items: List[Any], target_settings: Dict[str, Any]) -> None: """ Initialize. Args: push_items ([ContainerPushItem]): List of push items. target_settings (dict): Target settings. """ self.push_items = push_items self.target_settings = target_settings self.quay_host = self.target_settings.get("quay_host", "quay.io").rstrip("/") self._src_quay_client: Optional[QuayClient] = None self._dest_quay_client: Optional[QuayClient] = None
@property def src_quay_client(self) -> QuayClient: """Create and access QuayClient for source image.""" if self._src_quay_client is None: self._src_quay_client = QuayClient( self.target_settings["source_quay_user"], self.target_settings["source_quay_password"], self.target_settings.get("source_quay_host") or self.quay_host, ) return self._src_quay_client @property def dest_quay_client(self) -> QuayClient: """Create and access QuayClient for dest image.""" if self._dest_quay_client is None: self._dest_quay_client = QuayClient( self.target_settings["dest_quay_user"], self.target_settings["dest_quay_password"], self.quay_host, ) return self._dest_quay_client
[docs] @classmethod def run_tag_images( cls, source_ref: str, dest_refs: List[str], all_arch: bool, target_settings: Dict[str, Any] ) -> None: """ Prepare the "tag images" entrypoint with all the necessary arguments and run it. NOTE: Tagging operation will run with retries to compensate for transient container-related issues. Args: source_ref (str): Source image reference. dest_refs ([str]): List of destination references. all_arch (bool): Whether all architectures should be copied. target_settings (dict): Settings used for setting the values of the function parameters. """ tag_images_partial = functools.partial( tag_images, source_ref, dest_refs, all_arch=all_arch, quay_user=target_settings["dest_quay_user"], quay_password=target_settings["dest_quay_password"], source_quay_host=target_settings.get("source_quay_host"), source_quay_user=target_settings.get("source_quay_user"), source_quay_password=target_settings.get("source_quay_password"), container_exec=target_settings.get("container_exec", True), container_image=target_settings["skopeo_image"], docker_url=target_settings.get("docker_host") or "unix://var/run/docker.sock", docker_timeout=target_settings.get("docker_timeout"), docker_verify_tls=target_settings.get("docker_tls_verify") or False, docker_cert_path=target_settings.get("docker_cert_path") or None, registry_username=target_settings.get("skopeo_executor_username") or None, registry_password=target_settings.get("skopeo_executor_password") or None, ) run_with_retries( tag_images_partial, "Tag images", target_settings.get("tag_images_tries", 4), target_settings.get("tag_images_wait_time_increase", 10), )
def _prepare_dest_refs(self, push_item: Any) -> List[str]: """Prepare destination references for push. Construct destination references based on tags and repo of push item. Args: push_item(ContainerPushItem): Container push item. Returns (list(str)): List of destination references for the push. """ dest_refs = [] image_schema = "{host}/{namespace}/{repo}:{tag}" namespace = self.target_settings["quay_namespace"] for repo, tags in sorted(push_item.metadata["tags"].items()): internal_repo = get_internal_container_repo_name(repo) for tag in tags: dest_ref = image_schema.format( host=self.quay_host, namespace=namespace, repo=internal_repo, tag=tag, ) dest_refs.append(dest_ref) return dest_refs
[docs] def copy_source_push_item(self, push_item: Any) -> None: """ Perform the tagging operation for a push item containing a source image. Args: push_item (ContainerPushItem): Source container push item. """ LOG.info("Copying push item '{0}' as a source image".format(push_item)) source_ref = push_item.metadata["pull_url"] dest_refs = self._prepare_dest_refs(push_item) self.run_tag_images(source_ref, dest_refs, True, self.target_settings)
def copy_v1_push_item(self, push_item: Any) -> None: """ Perform the tagging operation for a push item containing a v1 image. Args: push_item (ContainerPushItem): Container push item. """ LOG.info("Copying push item '{0}' as v1 container only".format(push_item)) source_ref = push_item.metadata["pull_url"] dest_refs = self._prepare_dest_refs(push_item) self.run_tag_images(source_ref, dest_refs, True, self.target_settings)
[docs] def copy_multiarch_push_item(self, push_item: Any, source_ml: ManifestList) -> None: """ Evaluate the correct tagging and manifest list merging strategy of multiarch push item. There are two workflows of multiarch images: Simple copying, or manifest list merging. Destination tags are sorted, and correct workflow is performed on them. Args: push_items (ContainerPushItem): Multiarch container push item. source_ml (dict): Manifest list of the source image. """ LOG.info("Copying push item '{0}' as a multiarch image.".format(push_item)) source_ref = push_item.metadata["pull_url"] simple_dest_refs = [] image_schema = "{host}/{namespace}/{repo}:{tag}" namespace = self.target_settings["quay_namespace"] for repo, tags in sorted(push_item.metadata["tags"].items()): internal_repo = get_internal_container_repo_name(repo) for tag in tags: dest_ref = image_schema.format( host=self.quay_host, namespace=namespace, repo=internal_repo, tag=tag, ) simple_dest_refs.append(dest_ref) if simple_dest_refs: LOG.info( "Copying image {0} to {1} destinations without merging manifest lists".format( source_ref, len(set(simple_dest_refs)) ) ) self.run_tag_images(source_ref, list(set(simple_dest_refs)), True, self.target_settings)
[docs] @log_step("Push images to Quay") def push_container_images(self) -> None: """ Push container images to Quay. Two image types are supported: source images and multiarch images. Non-source, single arch images are not supported. In case of multiarch images, manifest list merging is performed if destination image contains more architectures than source. """ @tw.instrument_func() def push_container_image(item: Any) -> None: """ Push container images to Quay. Args: item (ContainerPushItem): Multiarch container push item. """ try: source_ml = self.src_quay_client.get_manifest( item.metadata["pull_url"], media_type=QuayClient.MANIFEST_LIST_TYPE ) except ManifestTypeError: source_ml = None # some registries can return 404 instead of v2s2 except requests.exceptions.HTTPError as e: if e.response.status_code == 404: source_ml = None else: raise # this metadata field indicates a source image sources_for_nvr = ( item.metadata["build"] .get("extra", {}) .get("image", {}) .get("sources_for_nvr", None) ) v1 = False if not sources_for_nvr and not source_ml: v1 = True # Source image if sources_for_nvr: self.copy_source_push_item(item) # v1 image elif v1: self.copy_v1_push_item(item) # Multiarch images else: self.copy_multiarch_push_item(item, cast(ManifestList, source_ml)) num_thread_container_push = self.target_settings.get("num_thread_container_push", 5) with ThreadPoolExecutor(max_workers=num_thread_container_push) as executor: future_results = [ executor.submit(push_container_image, item) for item in self.push_items ] for future in futures.as_completed(future_results): if future.exception(): raise future.exception() # type: ignore