# SPDX-License-Identifier: GPL-3.0-or-later
import json
import logging
from copy import deepcopy
from typing import Dict, List, Optional
from boto3.session import Session
from tenacity import RetryError, Retrying
from tenacity.retry import retry_if_result
from tenacity.stop import stop_after_attempt
from tenacity.wait import wait_fixed
from cloudpub.aws.utils import (
convert_error_list_str,
create_version_tree,
get_restricted_major_versions,
get_restricted_minor_versions,
get_restricted_patch_versions,
get_text_url,
is_str_url,
pprint_debug_logging,
)
from cloudpub.common import BaseService, PublishingMetadata
from cloudpub.error import InvalidStateError, NotFoundError, Timeout
from cloudpub.models.aws import (
ChangeSetResponse,
DeliveryOption,
DescribeChangeSetReponse,
DescribeEntityResponse,
GroupedVersions,
ListChangeSet,
ListChangeSetsResponse,
ListEntitiesResponse,
ProductDetailResponse,
ProductVersionsResponse,
ProductVersionsVirtualizationSource,
VersionMapping,
)
log = logging.getLogger(__name__)
[docs]
class AWSProductService(BaseService[AWSVersionMetadata]):
"""Create a new service provider for AWS using Boto3."""
# Boto3 docs
# https://boto3.amazonaws.com/v1/documentation/api/latest/guide/quickstart.html
[docs]
def __init__(
self,
access_id: str,
secret_key: str,
region: str = "us-east-1",
attempts: int = 288,
interval: int = 600,
) -> None:
"""
AWS cloud provider service.
Args:
access_id (str)
AWS account access ID
secret_key (str)
AWS account secret access key
region (str, optional)
AWS region for compute operations
This defaults to 'us-east-1'
attempts (int, optional)
Max number of times to poll
while waiting for changeset
Defaults to 288
interval (int, optional)
Seconds between polling
while waiting for changeset
Defaults to 600
"""
self.session = Session(
aws_access_key_id=access_id,
aws_secret_access_key=secret_key,
region_name=region,
)
self.marketplace = self.session.client("marketplace-catalog")
self.wait_for_changeset_attempts = attempts
self.wait_for_changeset_interval = interval
super(AWSProductService, self).__init__()
def _check_product_versions(self, details: ProductDetailResponse) -> None:
if not details.versions:
pprint_debug_logging(log, details.to_json(), "The details from the response are: ")
self._raise_error(NotFoundError, "This product has no versions")
[docs]
def get_product_by_id(self, entity_id: str) -> ProductDetailResponse:
"""
Get a product detail by it's id.
Args:
entity_id (str)
Entity id to get details from. If not set will default to
class setting for EntityId.
Returns:
ProductDetailResponse: The details for a product
Raises:
NotFoundError when the product is not found.
"""
rsp = DescribeEntityResponse.from_json(
self.marketplace.describe_entity(Catalog="AWSMarketplace", EntityId=entity_id)
)
if not rsp.details_document:
pprint_debug_logging(log, rsp)
self._raise_error(NotFoundError, f"No such product with EntityId: \"{entity_id}\"")
return rsp.details_document
[docs]
def get_product_by_name(
self, marketplace_entity_type: str, product_name: str
) -> ProductDetailResponse:
"""
Get a product detail by it's name.
Args:
marketplace_entity_type (str)
Product type of the AWS product
Example: AmiProduct
product_name (str)
Name of a product
Returns:
str: A dict of details for the first response of a product
Raises:
NotFoundError when the product is not found.
InvalidStateError when more than one product is found.
"""
filter_list = [{"Name": "Name", "ValueList": [product_name]}]
entity_rsp = ListEntitiesResponse.from_json(
self.marketplace.list_entities(
Catalog="AWSMarketplace",
EntityType=marketplace_entity_type,
FilterList=filter_list,
)
)
if len(entity_rsp.entity_summary_list) == 0:
pprint_debug_logging(log, entity_rsp)
self._raise_error(NotFoundError, f"No such product with name \"{product_name}\"")
elif len(entity_rsp.entity_summary_list) > 1:
pprint_debug_logging(log, entity_rsp)
self._raise_error(InvalidStateError, f"Multiple responses found for \"{product_name}\"")
# We should only get one response based on filtering
elif hasattr(entity_rsp.entity_summary_list[0], "entity_id"):
return self.get_product_by_id(entity_rsp.entity_summary_list[0].entity_id)
self._raise_error(NotFoundError, f"No such product with name \"{product_name}\"")
[docs]
def get_product_version_details(
self, entity_id: str, version_id: str
) -> ProductVersionsResponse:
"""
Get a product detail by it's name.
Args:
entity_id (str)
The Id of the entity to get version details from
version_id (str)
The version id of a product to get the details of
Returns:
ProductVersionsResponse: The details for the first response of a product
Raises:
NotFoundError when the product is not found.
"""
details = self.get_product_by_id(entity_id)
self._check_product_versions(details)
for version in details.versions:
for delivery_option in version.delivery_options:
if delivery_option.id == version_id:
return version
self._raise_error(NotFoundError, f"No such version with id \"{version_id}\"")
[docs]
def get_product_versions(self, entity_id: str) -> Dict[str, GroupedVersions]:
"""
Get the titles, ids, and date created of all the versions of a product.
Args:
entity_id (str)
The Id of the entity to get versions from
Returns:
Dict[str, GroupedVersions]: A dictionary of versions
Raises:
NotFoundError when the product is not found.
"""
details = self.get_product_by_id(entity_id)
self._check_product_versions(details)
version_ids: Dict[str, GroupedVersions] = {}
for v in details.versions:
delivery_options_list = []
ami_id_list = []
for delivery_option in v.delivery_options:
delivery_options_list.append(delivery_option)
for source in v.sources:
if isinstance(source, ProductVersionsVirtualizationSource):
ami_id_list.append(source.image)
delivery_options: GroupedVersions = {
"delivery_options": delivery_options_list,
"created_date": v.creation_date, # type: ignore
"ami_ids": ami_id_list,
}
version_ids[v.version_title] = delivery_options # type: ignore
return version_ids
[docs]
def get_product_version_by_name(self, entity_id: str, version_name: str) -> DeliveryOption:
"""
Get a version detail by it's name.
Args:
entity_id (str)
The Id of the entity to get version by name from
version_name (str)
A version title to get details of
Returns:
DeliveryOption: The delivery options of a version
Raises:
NotFoundError when the product is not found.
"""
details = self.get_product_by_id(entity_id)
self._check_product_versions(details)
for version in details.versions:
if version.version_title == version_name:
# ATM we're not batching Delivery options so
# the first one should be the one we want.
return version.delivery_options[0]
self._raise_error(NotFoundError, f"No such version with name \"{version_name}\"")
[docs]
def get_product_active_changesets(self, entity_id: str) -> List[ListChangeSet]:
"""
Get the active changesets for a product.
Args:
entity_id (str)
The Id of the entity to get active changesets from
Returns:
str: A change set id
"""
filter_list = [
{"Name": "EntityId", "ValueList": [entity_id]},
{"Name": "Status", "ValueList": ["APPLYING", "PREPARING"]},
]
changeset_list = ListChangeSetsResponse.from_json(
self.marketplace.list_change_sets(Catalog="AWSMarketplace", FilterList=filter_list)
)
return changeset_list.change_set_list
[docs]
def wait_active_changesets(self, entity_id: str) -> None:
"""
Get the first active changeset, if there is one, and wait for it to finish.
Args:
entity_id (str)
The Id of the entity to wait for active changesets
"""
def changeset_not_complete(change_set_list: List[ListChangeSet]) -> bool:
if change_set_list:
self.wait_for_changeset(change_set_list[0].id)
return True
else:
return False
r = Retrying(
stop=stop_after_attempt(self.wait_for_changeset_attempts),
retry=retry_if_result(changeset_not_complete),
)
try:
r(self.get_product_active_changesets, entity_id)
except RetryError:
self._raise_error(Timeout, f"Timed out waiting for {entity_id} to be unlocked")
[docs]
def set_restrict_versions(
self, entity_id: str, marketplace_entity_type: str, delivery_option_ids: List[str]
) -> str:
"""
Restrict version(s) of a product by their id.
Args:
entity_id (str)
The Id of the entity to edit
marketplace_entity_type (str)
Product type of the AWS product
Example: AmiProduct
delivery_option_ids (List)
A list of strs of delivery options to restrict. Normally version Ids.
Returns:
str: A change set id
"""
change_details = {"DeliveryOptionIds": delivery_option_ids}
rsp: ChangeSetResponse = self.marketplace.start_change_set(
Catalog="AWSMarketplace",
ChangeSet=[
{
"ChangeType": "RestrictDeliveryOptions",
"Entity": {
"Type": marketplace_entity_type + "@1.0",
"Identifier": entity_id,
},
"Details": json.dumps(change_details),
},
],
)
pprint_debug_logging(log, rsp, "The response from the restrict version was: ")
return rsp["ChangeSetId"]
[docs]
def cancel_change_set(self, change_set_id: str) -> str:
"""
Cancel the publish of a new version in progress.
Args:
change_set_id (str)
A change set id to cancel
Returns:
str: A change set id
"""
rsp: ChangeSetResponse = self.marketplace.cancel_change_set(
Catalog="AWSMarketplace", ChangeSetId=change_set_id
)
pprint_debug_logging(log, rsp, "The response from cancelling a changeset was: ")
return rsp["ChangeSetId"]
[docs]
def check_publish_status(self, change_set_id: str) -> str:
"""
Check the status of a change set.
Args:
change_set_id (str)
A change set id to check the status of
Returns:
str: Status of the publish
Raises:
InvalidStateError if the job failed
"""
rsp = DescribeChangeSetReponse.from_json(
self.marketplace.describe_change_set(
Catalog="AWSMarketplace", ChangeSetId=change_set_id
)
)
status = rsp.status
log.info("Current change status is %s.", status.lower())
if status.lower() == "failed":
failure_code = rsp.failure_code
# ATM we're not batching changesets so
# the first one should be the one we want.
failure_list = rsp.change_set[0].error_details
pprint_debug_logging(log, rsp, "The response from the status was: ")
# Check if message is a URL and download the message.
added_messages = []
for failure in failure_list:
if is_str_url(failure.message):
added_messages.extend(get_text_url(failure.message))
failure_list.extend(added_messages)
error_list_str = convert_error_list_str(failure_list)
error_message = (
f"Changeset {change_set_id} failed with code {failure_code}: \n {error_list_str}"
)
self._raise_error(InvalidStateError, error_message)
return rsp.status
[docs]
def wait_for_changeset(self, change_set_id: str) -> None:
"""
Wait until ChangeSet is complete.
Args:
change_set_id (str)
Id for the change set to wait on
Raises:
Timeout when the status doesn't change to either
'Succeeded' or 'Failed' within the set retry time.
"""
def changeset_not_complete(status: str) -> bool:
if status.lower() == "succeeded":
return False
else:
return True
r = Retrying(
wait=wait_fixed(self.wait_for_changeset_interval),
stop=stop_after_attempt(self.wait_for_changeset_attempts),
retry=retry_if_result(changeset_not_complete),
)
try:
r(self.check_publish_status, change_set_id)
except RetryError:
self._raise_error(Timeout, f"Timed out waiting for {change_set_id} to finish")
[docs]
def restrict_versions(
self,
entity_id: str,
marketplace_entity_type: str,
restrict_major: Optional[int] = None,
restrict_minor: Optional[int] = 1,
) -> List[str]:
"""
Restrict the old versions of a release.
Args:
entity_id (str)
The entity id to modifiy.
marketplace_entity_type (str)
Product type of the AWS product
Example: AmiProduct
restrict_major (optional int)
How many major versions are allowed
Example: 3
restrict_minor (optional int)
how many minor versions are allowed
Example: 3
Returns:
List[str]: List of AMI ids of restricted versions
"""
versions = self.get_product_versions(entity_id)
version_tree = create_version_tree(versions)
restrict_delivery_ids = []
restrict_ami_ids = []
if restrict_major and len(version_tree) > restrict_major:
major_delivery_ids, major_ami_ids, version_tree = get_restricted_major_versions(
version_tree, restrict_major
)
restrict_delivery_ids.extend(major_delivery_ids)
restrict_ami_ids.extend(major_ami_ids)
if restrict_minor:
minor_delivery_ids, minor_ami_ids, version_tree = get_restricted_minor_versions(
version_tree, restrict_minor
)
restrict_delivery_ids.extend(minor_delivery_ids)
restrict_ami_ids.extend(minor_ami_ids)
patch_delivery_ids, patch_ami_ids = get_restricted_patch_versions(version_tree)
restrict_delivery_ids.extend(patch_delivery_ids)
restrict_ami_ids.extend(patch_ami_ids)
if restrict_delivery_ids:
log.debug(f"Restricting these minor version(s) with id(s): {restrict_delivery_ids}")
change_id = self.set_restrict_versions(
entity_id, marketplace_entity_type, restrict_delivery_ids
)
self.wait_for_changeset(change_id)
return restrict_ami_ids
[docs]
def publish(self, metadata: AWSVersionMetadata) -> None:
"""
Add new version to an existing product.
Args:
new_version_details (VersionMapping): A model of the version mapping
"""
change_set = {
"ChangeType": "AddDeliveryOptions",
"Entity": {
"Type": f"{metadata.marketplace_entity_type}@1.0",
"Identifier": metadata.destination,
},
# AWS accepts 'Details' as a JSON string.
# So we convert it here.
"DetailsDocument": metadata.version_mapping.to_json(),
}
if metadata.overwrite:
# Make a copy of the original Version Mapping to avoid overwriting settings
json_mapping = deepcopy(metadata.version_mapping)
org_version_details = self.get_product_version_by_name(
metadata.destination, metadata.version_mapping.version.version_title
)
# ATM we're not batching Delivery options so
# the first one should be the one we want.
json_mapping.delivery_options[0].id = org_version_details.id
change_set["ChangeType"] = "UpdateDeliveryOptions"
change_set["DetailsDocument"] = json_mapping.to_json()
if metadata.keepdraft:
log.info("Sending draft version to %s.", metadata.marketplace_entity_type)
rsp: ChangeSetResponse = self.marketplace.start_change_set(
Catalog="AWSMarketplace", ChangeSet=[change_set], Intent="VALIDATE"
)
else:
log.info("Publishing new version in %s.", metadata.marketplace_entity_type)
rsp = self.marketplace.start_change_set(
Catalog="AWSMarketplace", ChangeSet=[change_set], Intent="APPLY"
)
pprint_debug_logging(log, rsp, "The response from publishing was: ")
self.wait_for_changeset(rsp["ChangeSetId"])