from __future__ import division
from concurrent.futures import as_completed
from functools import partial
import math
import threading
from typing import Callable, Any, Optional, Union
from more_executors import Executors
from requests.exceptions import HTTPError
from requests import Response
from .constants import DEFAULT_REQUEST_THREADS_LIMIT
from .pyxis_session import PyxisSession
from .pyxis_authentication import PyxisAuth
[docs]
class PyxisClient:
"""Pyxis requests wrapper."""
[docs]
def __init__(
self,
hostname: str,
retries: int = 5,
auth: Optional[PyxisAuth] = None,
backoff_factor: int = 5,
verify: bool = True,
threads: int = DEFAULT_REQUEST_THREADS_LIMIT,
) -> None:
"""
Initialize.
Args:
hostname (str)
Pyxis service hostname.
retries (int)
number of http retries for Pyxis requests.
auth (PyxisAuth)
PyxisAuth subclass instance.
backoff_factor (int)
backoff factor to apply between attempts after the second try.
verify (bool)
enable/disable SSL CA verification.
threads (int)
the number of threads to use for parallel requests.
"""
self.thread_local = threading.local()
self._session_factory = partial(
PyxisSession,
hostname,
retries=retries,
backoff_factor=backoff_factor,
verify=verify,
)
self._auth = auth
self.threads_limit = threads
@property
def pyxis_session(self) -> Union[PyxisSession, Any]:
"""
Return a thread-local session for Pyxis requests.
If a session did not exist for current thread, it is initialized and
cached.
"""
if not hasattr(self.thread_local, "pyxis_session"):
self.thread_local.pyxis_session = self._make_session()
return self.thread_local.pyxis_session
def _make_session(self) -> PyxisSession:
session = self._session_factory()
if self._auth:
self._auth.apply_to_session(session)
return session
[docs]
def get_operator_indices(
self, ocp_versions_range: str, organization: Optional[str] = None
) -> Union[list[str], Any]:
"""Get a list of index images satisfying versioning and organization conditions.
Args:
ocp_versions_range (str)
Supported OCP versions range.
organization (str)
Organization understood by IIB.
Returns:
list: List of index images satisfying the conditions.
"""
params = {"ocp_versions_range": ocp_versions_range}
if organization:
params["organization"] = organization
resp = self.pyxis_session.get("operators/indices", params=params)
resp.raise_for_status()
return resp.json()["data"]
[docs]
def upload_signatures(self, signatures: list[str]) -> list[Any]:
"""
Upload signatures from given JSON string.
Args:
signatures [str]
JSON with signatures to upload. See Pyxis API for details.
Returns:
list: List of uploaded signatures including auto-populated fields.
"""
def _send_post_request(data: dict[Any, Any]) -> Response:
response = self.pyxis_session.post("signatures", json=data)
# SEE CLOUDDST-9698
# Pyxis returns 500 error due to a potential sidecar config issue
# As a workaround to that, it was suggested to clear the session
# establish a new connection and again retry
# After creating a new session and retrying, the request should succeed
if response.status_code == 500:
self._clear_session()
response = self.pyxis_session.post("signatures", json=data)
return response
return self._do_parallel_requests(_send_post_request, signatures)
def _clear_session(self) -> None:
self.thread_local.pyxis_session.close()
delattr(self.thread_local, "pyxis_session")
[docs]
def _do_parallel_requests(
self, make_request: Callable[[Any], Any], data_items: list[Any]
) -> Union[list[Any], Any]:
"""
Call given function with given data items in parallel, collect responses.
Args:
make_request (function): a function that does the actual request.
Must accept a single argument: a data item.
Must return a `requests.models.Response` object.
data_items (list): a list of arbitrary objects to be passed
individually to `make_request()`.
The number of parallel requests is defined by
`DEFAULT_REQUEST_THREADS_LIMIT` (can be overridden by the user) and
of course by the number of actually available threads.
If a response fails consistently (see `PyxisSession` for retry policy),
the execution is terminated and an informative error is raised.
See `PyxisClient._handle_json_response()` for details.
Returns:
list(dict): list of dictionaries extracted from responses.
"""
with Executors.thread_pool(max_workers=self.threads_limit).with_map(
self._handle_json_response
) as executor:
futures = [executor.submit(make_request, data) for data in data_items]
return [f.result() for f in as_completed(futures)]
[docs]
def _handle_json_response(self, response: Response) -> Union[dict[Any, Any], Any]:
"""
Get JSON from given response or raise an informative exception.
Uses `requests.raise_for_status()` but tries to extract more details.
"""
# the response may contain useful JSON even in case of 40x/50x
# but it can't be guaranteed
try:
data = response.json()
except ValueError: # Python 2.x compat
data = {}
tolerate_codes = []
# Uploaded data already exists in Pyxis
if response.request.method == "POST":
tolerate_codes = [409]
# Data already removed from Pyxis
elif response.request.method == "DELETE":
tolerate_codes = [404]
if response.status_code in tolerate_codes:
return data
try:
response.raise_for_status()
except HTTPError as e:
# We've got an error, but by default the exception contains only
# minimal information: status code (400) and reason (Client error).
# If the JSON was successfully parsed earlier, it may contain
# important details (e.g. "Maximum of 100 signatures are allowed").
extra_msg = data["detail"] if "detail" in data else response.text
# re-raise the exception with an extra message
raise HTTPError("{0}\n{1}".format(e, extra_msg), response=response)
return data
[docs]
def get_container_signatures(
self, manifest_digests: Optional[str] = None, references: Optional[str] = None
) -> list[str]:
"""Get a list of signature metadata matching given fields.
Args:
manifest_digests (comma separated str)
manifest_digest used for searching in signatures.
references (comma separated str)
pull reference for image of signature stored.
Returns:
list: List of signature metadata matching given fields.
"""
signatures_endpoint = "signatures"
filter_criteria = []
if manifest_digests:
filter_criteria.append("manifest_digest=in=({0}),".format(manifest_digests))
if references:
filter_criteria.append("reference=in=({0}),".format(references))
signatures_endpoint = "{0}{1}{2}".format(
signatures_endpoint, "?filter=", "".join(filter_criteria)
)
signatures_endpoint = signatures_endpoint[0:-1]
resp = self._get_items_from_all_pages(signatures_endpoint)
return resp
[docs]
def _get_items_from_all_pages(self, endpoint: str, **kwargs: Any) -> list[Any]:
"""
Get response from all pages of pyxis.
Args:
endpoint (str): Endpoint of the request.
**kwargs: Additional arguments to add to the requests method.
Returns:
list: list of all data records returned from pyxis
"""
all_resp = []
first_resp = self.pyxis_session.get(endpoint, **kwargs)
first_resp.raise_for_status()
first_resp_json = first_resp.json()
all_resp.extend(first_resp_json["data"])
# if total data is greater than data returned in first page,
# calculate number of pages and then consequently get response from each page
if len(first_resp_json["data"]) < first_resp_json["total"]:
total_pages = int(
math.ceil(first_resp_json["total"] / first_resp_json["page_size"])
)
for page in range(1, total_pages):
params = {"page": page}
resp = self.pyxis_session.get(endpoint, params=params)
resp.raise_for_status()
all_resp.extend(resp.json()["data"])
return all_resp
[docs]
def delete_container_signatures(self, signature_ids: list[str]) -> list[Any]:
"""Delete signatures matching given fields.
Args:
signature_ids ([str])
Internal Pyxis signature IDs of signatures which should be removed.
"""
def _send_delete_request(signature_id: str) -> Response:
delete_endpoint = "signatures/id/{id}"
resp = self.pyxis_session.delete(delete_endpoint.format(id=signature_id))
return resp
return self._do_parallel_requests(_send_delete_request, signature_ids)