from collections import namedtuple
from copy import deepcopy
import hashlib
import json
import logging
import urllib3
from typing import Any, cast, Optional, Union, Tuple, List, Dict
import requests
from .command_executor import LocalExecutor
from .exceptions import (
BadPushItem,
InvalidTargetSettings,
)
from .utils.misc import (
get_internal_container_repo_name,
set_aws_kms_environment_variables,
)
from .quay_client import QuayClient
from .container_image_pusher import ContainerImagePusher
from .manifest_list_merger import ManifestListMerger
from .untag_images import untag_images
from .push_docker import PushDocker
from .signer_wrapper import SIGNER_BY_LABEL
from .item_processor import item_processor_for_internal_data, SignEntry
from .command_executor import Executor
from .types import Manifest, ManifestList
# TODO: do we want this, or should I remove it?
from urllib3.exceptions import InsecureRequestWarning
urllib3.disable_warnings(InsecureRequestWarning)
LOG = logging.getLogger("pubtools.quay")
[docs]class TagDocker:
"""Handle full tag-docker workflow."""
ImageDetails = namedtuple("ImageDetails", ["reference", "manifest", "manifest_type", "digest"])
MANIFEST_LIST_TYPE = "application/vnd.docker.distribution.manifest.list.v2+json"
MANIFEST_V2S2_TYPE = "application/vnd.docker.distribution.manifest.v2+json"
MANIFEST_OCI_LIST_TYPE = "application/vnd.oci.image.index.v1+json"
MANIFEST_OCI_V2S2_TYPE = "application/vnd.oci.image.manifest.v1+json"
[docs] def __init__(
self,
push_items: List[Any],
hub: Any,
task_id: str,
target_name: str,
target_settings: Dict[str, Any],
) -> None:
"""
Initialize.
Args:
push_items ([_PushItem]):
List of push items.
hub (HubProxy):
Instance of XMLRPC pub-hub proxy.
task_id (str):
task id
target_name (str):
Name of the target.
target_settings (dict):
Target settings.
"""
self.push_items = push_items
self.hub = hub
self.task_id = task_id
self.target_name = target_name
self.target_settings = target_settings
self._quay_client: Optional[QuayClient] = None
self.quay_host = self.target_settings.get("quay_host", "quay.io").rstrip("/")
self.dest_registries = self.target_settings["docker_settings"]["docker_reference_registry"]
self.dest_registries = (
self.dest_registries
if isinstance(self.dest_registries, list)
else [self.dest_registries]
)
self.verify_target_settings()
self.verify_input_data()
@property
def quay_client(self) -> QuayClient:
"""Create and access QuayClient for source and dest images."""
if self._quay_client is None:
self._quay_client = QuayClient(
self.target_settings["dest_quay_user"],
self.target_settings["dest_quay_password"],
self.quay_host,
)
return self._quay_client
[docs] def verify_target_settings(self) -> None:
"""Verify that target settings contains all the necessary data."""
LOG.info("Verifying the necessary target settings")
required_settings = [
"source_quay_user",
"source_quay_password",
"dest_quay_user",
"dest_quay_password",
"dest_quay_api_token",
"pyxis_server",
"quay_namespace",
"iib_index_image",
"iib_krb_principal",
"quay_operator_repository",
]
for setting in required_settings:
if setting not in self.target_settings:
raise InvalidTargetSettings(
"'{0}' must be present in the target settings.".format(setting)
)
required_docker_settings = ["umb_urls", "docker_reference_registry"]
for setting in required_docker_settings:
if setting not in self.target_settings["docker_settings"]:
raise InvalidTargetSettings(
"'{0}' must be present in the docker settings.".format(setting)
)
[docs] def get_image_details(self, reference: str, executor: Executor) -> Optional[ImageDetails]:
"""
Create an ImageDetails namedtuple for the given image reference.
Args:
reference (str):
Image reference.
executor (Executor):
Instance of Executor subclass used for skopeo inspect.
Returns (Optional[ImageDetails]):
Namedtuple filled with images data, or None if image doesn't exist.
"""
LOG.info("Getting image details of {0}".format(reference))
try:
manifest = cast(Union[Manifest, ManifestList], self.quay_client.get_manifest(reference))
except requests.exceptions.HTTPError as e:
if e.response.status_code == 404 or e.response.status_code == 401:
LOG.info("Image '{0}' doesn't exist".format(reference))
return None
else:
raise
manifest_type = manifest["mediaType"]
if manifest_type not in [
TagDocker.MANIFEST_V2S2_TYPE,
TagDocker.MANIFEST_LIST_TYPE,
TagDocker.MANIFEST_OCI_V2S2_TYPE,
TagDocker.MANIFEST_OCI_LIST_TYPE,
]:
raise BadPushItem(
"Image {0} has manifest type different than V2S2 or manifest list".format(reference)
)
# Check arch if the image is V2S2 manifest
if manifest["mediaType"] in (
TagDocker.MANIFEST_V2S2_TYPE,
TagDocker.MANIFEST_OCI_V2S2_TYPE,
):
arch = executor.skopeo_inspect(reference)["Architecture"]
# Arch check is not a great way to verify that this is a source image, but there are
# no better options without having build details
if arch != "amd64":
raise BadPushItem(
"Image {0} has V2S2 manifest and contains an architecture {1}. Only source "
"images are supported, which have arch 'amd64'.".format(reference, arch)
)
digest = self.quay_client.get_manifest_digest(reference)
return TagDocker.ImageDetails(reference, manifest, manifest["mediaType"], digest)
[docs] def is_arch_relevant(self, push_item: Any, arch: str) -> bool:
"""
Find out if an operation should be performed on a given architecture.
Uses values of 'archs' and 'exclude_archs' in push item's metadata.
Args:
push_item (ContainerPushItem):
Push item to perform the workflow with.
arch (str):
Arch to investigate.
Returns (bool):
True if an operation should be performed on a given arch, False otherwise.
"""
if push_item.metadata["exclude_archs"]:
return arch not in push_item.metadata["archs"]
else:
return arch in push_item.metadata["archs"]
[docs] def tag_remove_calculate_archs(
self, push_item: Any, tag: str, executor: Executor
) -> Tuple[List[str], List[str]]:
"""
Calculate which architectures would be removed, and which would remain from a given tag.
Args:
push_item (ContainerPushItem):
Push item to perform the workflow with.
tag (str):
Tag, for which a 'remove' operation will be performed.
executor (Executor):
Instance of Executor subclass used for skopeo inspect.
Returns ([str], [str]):
Tuple where first element contains archs that will be removed, and second element
contains archs that will remain.
"""
full_repo_schema = "{host}/{namespace}/{repo}"
namespace = self.target_settings["quay_namespace"]
internal_repo = get_internal_container_repo_name(list(push_item.repos.keys())[0])
full_repo = full_repo_schema.format(
host=self.quay_host, namespace=namespace, repo=internal_repo
)
if push_item.metadata["tag_source"]:
source_image = "{0}:{1}".format(full_repo, push_item.metadata["tag_source"])
source_details = self.get_image_details(source_image, executor)
else:
source_details = None
dest_image = "{0}:{1}".format(full_repo, tag)
dest_details = self.get_image_details(dest_image, executor)
if dest_details is None:
LOG.warning("Tag '{0}' already doesn't exist, no removal necessary".format(tag))
return ([], [])
if source_details and source_details.manifest_type != dest_details.manifest_type:
raise BadPushItem(
"Mismatch between manifest types of source {0}:{1} and tag {2}:{3}".format(
push_item.metadata["tag_source"],
source_details.manifest_type,
tag,
dest_details.manifest_type,
)
)
# Scenario 1: source image
if dest_details.manifest_type in (
TagDocker.MANIFEST_V2S2_TYPE,
TagDocker.MANIFEST_OCI_V2S2_TYPE,
):
return self.tag_remove_calculate_archs_source_image(
push_item, source_details, dest_details
)
# Scenario 2: multiarch image
if dest_details.manifest_type in (
TagDocker.MANIFEST_LIST_TYPE,
TagDocker.MANIFEST_OCI_LIST_TYPE,
):
return self.tag_remove_calculate_archs_multiarch_image(
push_item, source_details, dest_details
)
# this should never happen
return ([], []) # pragma: no cover
[docs] def tag_remove_calculate_archs_source_image(
self, push_item: Any, source_details: Optional[ImageDetails], dest_details: ImageDetails
) -> Tuple[List[str], List[str]]:
"""
Calculate which archs would be removed if the specified images were source images.
This method is a sub-step of the 'tag_remove_calculate_archs' method.
Args:
push_item (ContainerPushItem):
Push item to perform the workflow with.
source_details (Optional[ImageDetails]):
ImageDetails of source image, or None if it wasn't specified.
dest_details (ImageDetails):
ImageDetails of destination image.
Returns ([str], [str]):
Tuple where first element contains archs that will be removed, and second element
contains archs that will remain.
"""
# Option A: arch is relevant, source is specified and digests correspond -> remove
if (
self.is_arch_relevant(push_item, "amd64")
and source_details is not None
and source_details.digest == dest_details.digest
):
return (["amd64"], [])
# Option B: arch is relevant, source is specified, but digests don't correspond -> keep
elif (
self.is_arch_relevant(push_item, "amd64")
and source_details is not None
and source_details.digest != dest_details.digest
):
return ([], ["amd64"])
# Option C: arch is relevant, source is not specified (no digest check) -> remove
elif self.is_arch_relevant(push_item, "amd64") and source_details is None:
return (["amd64"], [])
# Option D: arch is not relevant -> keep
else:
return ([], ["amd64"])
[docs] def tag_remove_calculate_archs_multiarch_image(
self, push_item: Any, source_details: Optional[ImageDetails], dest_details: ImageDetails
) -> Tuple[List[str], List[str]]:
"""
Calculate which archs would be removed if the specified images were multiarch images.
This method is a sub-step of the 'tag_remove_calculate_archs' method.
Args:
push_item (ContainerPushItem):
Push item to perform the workflow with.
source_details (Optional[ImageDetails]):
ImageDetails of source image, or None if it wasn't specified.
dest_details (ImageDetails):
ImageDetails of destination image.
Returns ([str], [str]):
Tuple where first element contains archs that will be removed, and second element
contains archs that will remain.
"""
remove_archs = []
keep_archs = []
dest_manifest_data = [
(m["digest"], m["platform"]["architecture"]) for m in dest_details.manifest["manifests"]
]
source_manifest_data = (
[
(m["digest"], m["platform"]["architecture"])
for m in source_details.manifest["manifests"]
]
if source_details
else []
)
for dest_digest, dest_arch in dest_manifest_data:
# Option A: arch is relevant, src exists, digest matches in src and dest -> remove
if (
self.is_arch_relevant(push_item, dest_arch)
and source_details is not None
and (dest_digest, dest_arch) in source_manifest_data
):
remove_archs.append(dest_arch)
# Option B: arch is relevant, src exists, no digest match in src and dest -> keep
elif (
self.is_arch_relevant(push_item, dest_arch)
and source_details is not None
and (dest_digest, dest_arch) not in source_manifest_data
):
keep_archs.append(dest_arch)
# Option C: arch is relevant, src doesn't exist (digest match not possible)-> remove
elif self.is_arch_relevant(push_item, dest_arch) and source_details is None:
remove_archs.append(dest_arch)
# Option D: arch is not relevant -> keep
else:
keep_archs.append(dest_arch)
return (remove_archs, keep_archs)
[docs] def tag_add_calculate_archs(
self, push_item: Any, tag: str, executor: Executor
) -> Optional[List[str]]:
"""
Calculate which architectures are present in a given tag, and which ones would be added.
Args:
push_item (ContainerPushItem):
Push item to perform the workflow with.
tag (str):
Tag, for which an 'add' operation will be performed.
executor (Executor):
Instance of Executor subclass used for skopeo inspect.
Returns (Optional[List[str]]):
In case of multiarch image, arches which would be copied to the destination. In case
of a source image, None if the copy operation is relevant or [] otherwise.
"""
full_repo_schema = "{host}/{namespace}/{repo}"
namespace = self.target_settings["quay_namespace"]
internal_repo = get_internal_container_repo_name(list(push_item.repos.keys())[0])
full_repo = full_repo_schema.format(
host=self.quay_host, namespace=namespace, repo=internal_repo
)
source_image = "{0}:{1}".format(full_repo, push_item.metadata["tag_source"])
dest_image = "{0}:{1}".format(full_repo, tag)
source_details = self.get_image_details(source_image, executor)
dest_details = self.get_image_details(dest_image, executor)
if source_details is None:
raise BadPushItem("Source image must be specified if add operation was requested")
if dest_details and source_details.manifest_type != dest_details.manifest_type:
raise BadPushItem(
"Mismatch between manifest types of source {0}:{1} and tag {2}:{3}".format(
push_item.metadata["tag_source"],
source_details.manifest_type,
tag,
dest_details.manifest_type,
)
)
# Scenario 1: source image
if source_details.manifest_type in (
TagDocker.MANIFEST_V2S2_TYPE,
TagDocker.MANIFEST_OCI_V2S2_TYPE,
):
# source arch is relevant, proceed with copying the source image
if self.is_arch_relevant(push_item, "amd64"):
return None
# arch is irrelevant we want no-op
else:
return []
# Scenario 2: multiarch image
if source_details.manifest_type in (
TagDocker.MANIFEST_LIST_TYPE,
TagDocker.MANIFEST_OCI_LIST_TYPE,
):
add_archs = [
m["platform"]["architecture"]
for m in source_details.manifest["manifests"]
if self.is_arch_relevant(push_item, m["platform"]["architecture"])
]
return add_archs
# this should never happen
return [] # pragma: no cover
[docs] def copy_tag_sign_images(self, push_item: Any, tag: str, executor: Executor) -> None:
"""
Copy image from source to the destination tag and sign new manifest claims.
If destination tag already contains a manifest, it will be overwritten.
This workflow is expected to use on single-arch source images.
Args:
push_item (ContainerPushItem):
Push item to perform the workflow with.
tag (str):
Tag, which acts as a destination to the copy operation.
executor (Executor):
Instance of Executor subclass used for skopeo inspect.
"""
full_repo_schema = "{host}/{namespace}/{repo}"
external_image_schema = "{host}/{repo}:{tag}"
namespace = self.target_settings["quay_namespace"]
repo = list(push_item.repos.keys())[0]
internal_repo = get_internal_container_repo_name(repo)
full_repo = full_repo_schema.format(
host=self.quay_host, namespace=namespace, repo=internal_repo
)
source_image = "{0}:{1}".format(full_repo, push_item.metadata["tag_source"])
dest_image = "{0}:{1}".format(full_repo, tag)
LOG.info(
"Source image tag '{0}' will be copied to destination '{1}'".format(
push_item.metadata["tag_source"], tag
)
)
to_sign_entries = []
to_sign_entries_internal = []
current_signatures: List[Any] = []
details = self.get_image_details(source_image, executor)
if not details:
raise BadPushItem("Source image must be specified if add operation was requested")
registries = self.target_settings["docker_settings"]["docker_reference_registry"]
if details.manifest_type in (
TagDocker.MANIFEST_LIST_TYPE,
TagDocker.MANIFEST_OCI_LIST_TYPE,
):
raise ValueError("Tagging workflow is not supported for multiarch images")
if push_item.claims_signing_key:
for registry in registries:
# add entries in internal format for cosign
to_sign_entries_internal.append(
SignEntry(
repo=repo,
pub_reference=f"{registry}/{repo}:{tag}",
reference="quay.io/"
+ self.target_settings["quay_namespace"]
+ "/"
+ internal_repo
+ ":"
+ tag,
digest=details.digest,
signing_key=push_item.claims_signing_key,
arch="amd64",
)
)
reference = external_image_schema.format(host=registry, repo=repo, tag=tag)
to_sign_entries.append(
SignEntry(
repo=repo,
pub_reference="",
reference=reference,
digest=details.digest,
signing_key=push_item.claims_signing_key,
arch="amd64",
)
)
item_processor = item_processor_for_internal_data(
self.quay_client,
self.target_settings["quay_host"].rstrip("/"),
self.dest_registries,
self.target_settings.get("retry_sleep_time", 5),
self.target_settings["quay_namespace"],
)
outdated_manifests = []
existing_manifests = item_processor.generate_existing_manifests_metadata(push_item)
for repo, _tag, mad in existing_manifests:
if not mad:
continue
if _tag == tag:
outdated_manifests.append((mad.digest, tag, repo))
set_aws_kms_environment_variables(self.target_settings, "cosign_signer")
for _signer in self.target_settings["signing"]:
if _signer["enabled"]:
signercls = SIGNER_BY_LABEL[_signer["label"]]
signer = signercls(
config_file=_signer["config_file"], settings=self.target_settings
)
# exclude should be bool, and outdated manifests should be list?
if outdated_manifests:
signer.remove_signatures(outdated_manifests, _exclude=current_signatures)
if SIGNER_BY_LABEL[_signer["label"]].pre_push:
signer.sign_containers(
to_sign_entries,
task_id=self.task_id,
)
ContainerImagePusher.run_tag_images(source_image, [dest_image], True, self.target_settings)
if push_item.claims_signing_key:
set_aws_kms_environment_variables(self.target_settings, "cosign_signer")
for _signer in self.target_settings["signing"]:
if _signer["enabled"] and not SIGNER_BY_LABEL[_signer["label"]].pre_push:
signercls = SIGNER_BY_LABEL[_signer["label"]]
signer = signercls(
config_file=_signer["config_file"], settings=self.target_settings
)
signer.sign_containers(
to_sign_entries_internal,
task_id=self.task_id,
)
[docs] def merge_manifest_lists_sign_images(
self, push_item: Any, tag: str, add_archs: List[str]
) -> None:
"""
Merge manifest lists between source and destination tag and sign manifest claims.
Args:
push_item (ContainerPushItem):
Push item to perform the workflow with.
tag (str):
Tag, which acts as a destination to the merge operation.
add_archs ([str]):
Architectures which should be copied to the existing manifest list.
"""
LOG.info(
"Architectures {0} of tag '{1}' will be copied to destination tag '{2}'".format(
add_archs, push_item.metadata["tag_source"], tag
)
)
full_repo_schema = "{host}/{namespace}/{repo}"
external_image_schema = "{host}/{repo}:{tag}"
namespace = self.target_settings["quay_namespace"]
repo = list(push_item.repos.keys())[0]
internal_repo = get_internal_container_repo_name(repo)
full_repo = full_repo_schema.format(
host=self.quay_host, namespace=namespace, repo=internal_repo
)
source_image = "{0}:{1}".format(full_repo, push_item.metadata["tag_source"])
dest_image = "{0}:{1}".format(full_repo, tag)
# NOTE: Arch images don't need to be copied, since they already exist in the same repo
merger = ManifestListMerger(source_image, dest_image)
merger.set_quay_clients(self.quay_client, self.quay_client)
new_manifest_list = merger.merge_manifest_lists_selected_architectures(add_archs)
dest_registries = self.target_settings["docker_settings"]["docker_reference_registry"]
current_signatures: list[Any] = []
outdated_manifests = []
if push_item.claims_signing_key:
to_sign_entries = []
to_sign_entries_internal = []
for manifest in new_manifest_list["manifests"]:
for registry in dest_registries:
to_sign_entries_internal.append(
SignEntry(
repo=repo,
pub_reference=f"{registry}/{repo}:{tag}",
reference="quay.io/"
+ self.target_settings["quay_namespace"]
+ "/"
+ internal_repo
+ ":"
+ tag,
digest=manifest["digest"],
arch=manifest["platform"]["architecture"],
signing_key=push_item.claims_signing_key,
)
)
reference = external_image_schema.format(host=registry, repo=repo, tag=tag)
to_sign_entries.append(
SignEntry(
repo=repo,
pub_reference="",
reference=reference,
digest=manifest["digest"],
arch=manifest["platform"]["architecture"],
signing_key=push_item.claims_signing_key,
)
)
current_signatures.append(
(reference, manifest["digest"], push_item.claims_signing_key)
)
item_processor = item_processor_for_internal_data(
self.quay_client,
self.target_settings["quay_host"].rstrip("/"),
self.dest_registries,
self.target_settings.get("retry_sleep_time", 5),
self.target_settings["quay_namespace"],
)
for repo, _tag, mad in item_processor.generate_existing_manifests_metadata(push_item):
if not mad:
continue
if (
mad.digest
not in [manifest["digest"] for manifest in new_manifest_list["manifests"]]
and tag == _tag
):
outdated_manifests.append((mad.digest, tag, repo))
for signer in self.target_settings["signing"]:
if signer["enabled"] and SIGNER_BY_LABEL[signer["label"]].pre_push:
signercls = SIGNER_BY_LABEL[signer["label"]]
signer = signercls(
config_file=signer["config_file"], settings=self.target_settings
)
if outdated_manifests:
signer.remove_signatures(outdated_manifests, _exclude=current_signatures)
signer.sign_containers(
to_sign_entries,
task_id=self.task_id,
)
raw_src_manifest = cast(str, self.quay_client.get_manifest(source_image, raw=True))
# Special case: if the source manifest and the merged manifest are the same, upload the
# raw source manifest. The reason is that otherwise the digests of the copied manifests
# will not correspond due to Python dicitonaries randomizing the element order
if sorted(
new_manifest_list["manifests"], key=lambda manifest: manifest["digest"]
) == sorted(
json.loads(raw_src_manifest)["manifests"], key=lambda manifest: manifest["digest"]
):
ml_to_sign = raw_src_manifest
self.quay_client.upload_manifest(raw_src_manifest, dest_image, raw=True)
else:
ml_to_sign = json.dumps(new_manifest_list, sort_keys=True, indent=4)
self.quay_client.upload_manifest(new_manifest_list, dest_image)
if push_item.claims_signing_key:
# for cosign sign also manifest list
digest = "sha256:" + hashlib.sha256(ml_to_sign.encode("utf-8")).hexdigest()
for pub_registry in dest_registries:
to_sign_entries_internal.append(
SignEntry(
repo=repo,
pub_reference=f"{pub_registry}/{repo}:{tag}",
reference="quay.io/"
+ self.target_settings["quay_namespace"]
+ "/"
+ internal_repo
+ ":"
+ tag,
digest=digest,
arch="",
signing_key=push_item.claims_signing_key,
)
)
set_aws_kms_environment_variables(self.target_settings, "cosign_signer")
for signer in self.target_settings["signing"]:
if signer["enabled"] and not SIGNER_BY_LABEL[signer["label"]].pre_push:
signercls = SIGNER_BY_LABEL[signer["label"]]
signer = signercls(
config_file=signer["config_file"], settings=self.target_settings
)
signer.remove_signatures(outdated_manifests, _exclude=current_signatures)
signer.sign_containers(
to_sign_entries_internal,
task_id=self.task_id,
)
[docs] @classmethod
def run_untag_images(
cls, references: List[str], remove_last: bool, target_settings: Dict[str, Any]
) -> None:
"""
Prepare the "untag images" entrypoint with all the necessary arguments and run it.
Args:
references ([str]):
Image references which should be untagged.
remove_last (bool):
Whether to remove a tag when it's the last reference of an image (in that repo).
target_settings (dict):
Settings used for setting the value of untag parameters.
"""
untag_images(
references=references,
quay_api_token=target_settings["dest_quay_api_token"],
remove_last=remove_last,
quay_user=target_settings["dest_quay_user"],
quay_password=target_settings["dest_quay_password"],
)
[docs] def untag_image(self, push_item: Any, tag: str) -> None:
"""
Untag image specified by tag.
Args:
push_item (ContainerPushItem):
Push item to perform the workflow with.
tag (str):
Tag which should be removed.
"""
LOG.info("Tag '{0}' will be removed".format(tag))
full_repo_schema = "{host}/{namespace}/{repo}"
namespace = self.target_settings["quay_namespace"]
item_processor = item_processor_for_internal_data(
self.quay_client,
self.target_settings["quay_host"].rstrip("/"),
self.dest_registries,
self.target_settings.get("retry_sleep_time", 5),
self.target_settings["quay_namespace"],
)
to_unsign_entries = []
for to_unsign_entry in item_processor.generate_to_unsign(push_item):
to_unsign_entries.append((to_unsign_entry["digest"], tag, to_unsign_entry["repo"]))
internal_repo = get_internal_container_repo_name(list(push_item.repos.keys())[0])
full_repo = full_repo_schema.format(
host=self.quay_host, namespace=namespace, repo=internal_repo
)
dest_image = "{0}:{1}".format(full_repo, tag)
for signer in self.target_settings["signing"]:
if signer["enabled"]:
signercls = SIGNER_BY_LABEL[signer["label"]]
signer = signercls(config_file=signer["config_file"], settings=self.target_settings)
signer.remove_signatures(to_unsign_entries, _exclude=[])
self.run_untag_images([dest_image], True, self.target_settings)
[docs] def manifest_list_remove_archs(self, push_item: Any, tag: str, remove_archs: List[str]) -> None:
"""
Remove specified archs from a manifest list and upload a new manifest list to Quay.
Args:
push_item (ContainerPushItem):
Push item to perform the workflow with.
tag (str):
Tag whose manifest's archs will be removed.
remove_archs ([str]):
Architectures to remove from the manifest list.
"""
LOG.info("Architectures {0} of tag '{1}' will be removed".format(remove_archs, tag))
full_repo_schema = "{host}/{namespace}/{repo}"
namespace = self.target_settings["quay_namespace"]
internal_repo = get_internal_container_repo_name(list(push_item.repos.keys())[0])
full_repo = full_repo_schema.format(
host=self.quay_host, namespace=namespace, repo=internal_repo
)
dest_image = "{0}:{1}".format(full_repo, tag)
manifest_list = cast(ManifestList, self.quay_client.get_manifest(dest_image))
keep_manifests = []
remove_manifest_sigs = []
for manifest in manifest_list["manifests"]:
if manifest["platform"]["architecture"] not in remove_archs:
keep_manifests.append(deepcopy(manifest))
else:
remove_manifest_sigs.append(manifest)
new_manifest_list = deepcopy(manifest_list)
new_manifest_list["manifests"] = keep_manifests
to_remove_sig_entries = []
for to_remove_man in remove_manifest_sigs:
to_remove_sig_entries.append(
(to_remove_man["digest"], tag, list(push_item.repos.keys())[0])
)
for signer in self.target_settings["signing"]:
if signer["enabled"]:
signercls = SIGNER_BY_LABEL[signer["label"]]
signer = signercls(config_file=signer["config_file"], settings=self.target_settings)
signer.remove_signatures(to_remove_sig_entries, _exclude=[])
self.quay_client.upload_manifest(new_manifest_list, dest_image)
[docs] def run(self) -> None:
"""
Perform the full tag-docker workflow.
The workflow adds or removes images to/from given tags. It's possible to specify only
certain architectures to be added or removed. Based on the task arguments, five different
scenarios may occur:
- Copying whole image to tag (if tag is unused, or all its archs will be overwritten)
- Copying only certain archs and adding them to the destination manifest list
- Removing certain archs from a tag
- Removing a tag (if all archs are to be removed from it)
- No operation (to-be-removed tag already doesn't exist, or all archs are restricted)
The workflow may be summarized as:
- Verify that all repos may be worked with (same conditions are in PushDocker)
- Evaluate which archs are to be added/removed from a given tag
- If new images were created, perform signing workflow on them
- Perform the appropriate add/remove/merge operation
"""
# Validate repos, same as in PushDocker
PushDocker.check_repos_validity(self.push_items, self.hub, self.target_settings)
# perform tag-docker-specific checks
self.check_input_validity()
with LocalExecutor() as executor:
executor.skopeo_login(
self.quay_host,
self.target_settings["dest_quay_user"],
self.target_settings["dest_quay_password"],
)
for item in self.push_items:
for tag in item.metadata["add_tags"]:
LOG.info("Processing add tag '{0}'".format(tag))
add_archs = self.tag_add_calculate_archs(item, tag, executor)
# If all archs were somehow excluded from being added, no-op
if add_archs == []:
LOG.warning("No archs can be added to tag '{0}', skipping".format(tag))
continue
# If None, we're dealing with a source image and we want to copy to destination
elif add_archs is None:
self.copy_tag_sign_images(item, tag, executor)
# Otherwise, merge relevant archs of source and dest
else:
self.merge_manifest_lists_sign_images(item, tag, add_archs)
for tag in item.metadata["remove_tags"]:
LOG.info("Processing remove tag '{0}'".format(tag))
remove_archs, keep_archs = self.tag_remove_calculate_archs(item, tag, executor)
# If all archs were somehow excluded from removal, no-op
if not remove_archs:
LOG.warning("No archs can be removed from tag '{0}', skipping".format(tag))
continue
# If no archs will remain after removal, just perform untagging
elif not keep_archs:
self.untag_image(item, tag)
# if some archs will be removed and some will remain, create new manifest list
else:
self.manifest_list_remove_archs(item, tag, remove_archs)
def mod_entry_point(
push_items: List[Any], hub: Any, task_id: str, target_name: str, target_settings: Dict[str, Any]
) -> None:
"""Entry point for use in another python code."""
tag_docker = TagDocker(push_items, hub, task_id, target_name, target_settings)
return tag_docker.run()