Source code for pubtools.pulplib._impl.criteria

import collections
import re
import warnings

from collections.abc import Iterable

from frozenlist2 import frozenlist

from pubtools.pulplib._impl import compat_attr as attr

from .model.unit import type_ids_for_class
from .model.attr import PULP2_FIELD


FieldNamePair = collections.namedtuple(
    "FieldNamePair", ["model_field_name", "pulp_field_name"]
)


[docs]class Criteria(object): """Represents a Pulp search criteria. This is an opaque class which is not intended to be created or used directly. Instances of this class should be obtained and composed by calls to the documented class methods. Example - searching a repository: .. code-block:: python # With Pulp 2.x / mongo, this is roughly equivalent # to search fragment: # # {"notes.my-field": {"$exists": True}, # "notes.other-field": {"$eq": ["a", "b", "c"]}} # crit = Criteria.and_( Criteria.with_field('notes.my-field', Matcher.exists()), Criteria.with_field('notes.other-field', ["a", "b", "c"]) ) # criteria may now be used with client to execute a search repos = client.search_repository(crit) Example - searching across all repos for a specific content type: .. code-block:: python crit = Criteria.and_( Criteria.with_unit_type(RpmUnit), Criteria.with_field("sha256sum", Matcher.in_([ "49ae93732fcf8d63fe1cce759664982dbd5b23161f007dba8561862adc96d063", "6b30e91df993d96df0bef0f9d232d1068fa2f7055f13650208d77b43cd7c99f6"]))) # Will find RpmUnit instances with above sums units = client.search_content(crit) """ exists = object() # exists is undocumented and deprecated, use Matcher.exists() instead
[docs] @classmethod def with_id(cls, ids): """Args: ids (str, list[str]) An id or list of ids Returns: Criteria criteria for finding objects matching the given ID(s) """ if isinstance(ids, str): return cls.with_field("id", ids) return cls.with_field("id", Matcher.in_(ids))
[docs] @classmethod def with_field(cls, field_name, field_value): """Args: field_name (str) The name of a field. Supported field names include both model fields and Pulp fields. See :ref:`model_fields` for information about these two types of fields. When Pulp fields are used, field names may contain a "." to indicate nesting, such as ``notes.created``. field_value :class:`Matcher` A matcher to be applied against the field. object Any value, to be matched against the field via :meth:`Matcher.equals`. Returns: Criteria criteria for finding objects where ``field_name`` is present and matches ``field_value``. """ return FieldMatchCriteria(field_name, field_value)
[docs] @classmethod def with_unit_type(cls, unit_type, **kwargs): """Args: unit_type (class) A subclass of :class:`~pubtools.pulplib.Unit`. unit_fields (Iterable[str]) Names of the desired field(s) to include in response to a search using this criteria. If omitted, all fields are included. Some fields will always be included even when not requested. Returns: Criteria criteria for finding units of type ``unit_type`` populated with (at least) the fields from ``unit_fields``. .. versionadded:: 2.14.0 .. versionadded:: 2.33.0 Introduced ``unit_fields``. """ # This is mainly a thin wrapper for searching on content_type_id which allows # the caller to avoid having to handle the (unit class <=> type id) mapping. type_ids = type_ids_for_class(unit_type) if not type_ids: raise TypeError("Expected a Unit type, got: %s" % repr(unit_type)) unit_fields = kwargs.pop("unit_fields", None) if unit_fields is not None: # We have some non-default set of fields to query. # We must do the following: # - ensure that we also include all the mandatory fields for this model # - generate pairs of (model field, pulp field) names, as we'll need both model_field_names = set(unit_fields) model_fields_dict = attr.fields_dict(unit_type) for field in model_fields_dict.values(): if field.default is attr.NOTHING: # No default => it's mandatory to use this field model_field_names.add(field.name) # Now build up (model, pulp) pairs pairs = set() for model_field_name in model_field_names: # The default/fallback is to assume that the pulp field is named the # same as the model field... pulp_field_name = model_field_name # ... but if the field exists and declares an explicit PULP2_FIELD then we # use that instead field = model_fields_dict.get(model_field_name) if field and field.metadata.get(PULP2_FIELD): # We have a defined pulp field, so use it. # Note we only care about the first component of the field, because that's # all the granularity supported by Pulp search API (e.g. if actual field # is pulp_user_metadata.description, we need to query pulp_user_metadata). pulp_field_name = field.metadata[PULP2_FIELD].split(".")[0] pairs.add(FieldNamePair(model_field_name, pulp_field_name)) unit_fields = tuple(sorted(pairs)) return UnitTypeMatchCriteria( "content_type_id", Matcher.in_(type_ids), unit_fields )
@classmethod def with_field_in(cls, field_name, field_value): warnings.warn( "with_field_in is deprecated, use Matcher.in_() instead", DeprecationWarning ) return cls.with_field(field_name, Matcher.in_(field_value))
[docs] @classmethod def and_(cls, *criteria): """Args: criteria (list[Criteria]) Any number of criteria. Returns: :class:`Criteria` criteria for finding objects which satisfy all of the input ``criteria``. """ return AndCriteria(criteria)
[docs] @classmethod def or_(cls, *criteria): """Args: criteria (list[Criteria]) Any number of criteria. Returns: Criteria criteria for finding objects which satisfy any of the input ``criteria``. """ return OrCriteria(criteria)
[docs] @classmethod def true(cls): """ Returns: Criteria a criteria which always matches any object. """ return TrueCriteria()
[docs]class Matcher(object): """Methods for matching fields within a Pulp search query. Instances of this class are created by the documented class methods, and should be used in conjunction with :class:`Criteria` methods, such as :meth:`Criteria.with_field`. .. versionadded:: 1.1.0 """
[docs] @classmethod def equals(cls, value): """ Matcher for a field which must equal exactly the given value. Arguments: value (object) An object to match against a field. """ return EqMatcher(value)
[docs] @classmethod def regex(cls, pattern): """ Matcher for a field which must be a string and must match the given regular expression. Arguments: pattern (str) A regular expression to match against the field. The expression is not implicitly anchored. .. warning:: It is not defined which specific regular expression syntax is supported. For portable code, callers are recommended to use only the common subset of PCRE-compatible and Python-compatible regular expressions. Raises: :class:`re.error` If the given pattern is not a valid regular expression. Example: .. code-block:: python # Would match any Repository where notes.my-field starts # with "abc" crit = Criteria.with_field('notes.my-field', Matcher.regex("^abc")) """ return RegexMatcher(pattern)
[docs] @classmethod def exists(cls): """ Matcher for a field which must exist, with no specific value. Example: .. code-block:: python # Would match any Repository where notes.my-field exists crit = Criteria.with_field('notes.my-field', Matcher.exists()) """ return ExistsMatcher()
[docs] @classmethod def in_(cls, values): """ Returns a matcher for a field whose value equals one of the specified input values. Arguments: values (iterable) An iterable of values used to match a field. Example: .. code-block:: python # Would match any Repository where notes.my-field is "a", "b" or "c" crit = Criteria.with_field( 'notes.my-field', Matcher.in_(["a", "b", "c"]) ) """ return InMatcher(values)
[docs] @classmethod def less_than(cls, value): """ Returns a matcher for a field whose value is less than the specified input value. Arguments: value (object) An object to match against the field Example: .. code-block:: python # would match where last_publish is before "2019-08-27T00:00:00Z" # date comparison requires a datetime.datetime object crit = Criteria.with_field( 'last_publish', Matcher.less_than(datetime.datetime(2019, 8, 27, 0, 0, 0)) ) .. versionadded:: 2.1.0 """ return LessThanMatcher(value)
def _map(self, _fn): # Internal-only: return self with matched value mapped through # the given function. Intended to be overridden in subclasses # to support field conversions between Pulp and Python. return self
@attr.s(frozen=True) class RegexMatcher(Matcher): _pattern = attr.ib() @_pattern.validator def _check_pattern(self, _, pattern): # It must be a string. # Need an explicit check here because re.compile also succeeds # on already-compiled regex objects. if not isinstance(pattern, str): raise TypeError("Regex matcher expected string, got: %s" % repr(pattern)) # Verify that the given value can really be compiled as a regex. re.compile(pattern) # Note: regex matcher does not implement _map since regex is defined only # in terms of strings, there are no meaningful conversions. def __str__(self): return "=~/%s/" % self._pattern @attr.s(frozen=True) class EqMatcher(Matcher): _value = attr.ib() def _map(self, fn): return attr.evolve(self, value=fn(self._value)) def __str__(self): return "==%s" % repr(self._value) def iterable_nonstr_then_frozenlist(values): if isinstance(values, Iterable) and not isinstance(values, str): return frozenlist(values) raise ValueError("Must be an iterable: %s" % repr(values)) @attr.s(frozen=True) class InMatcher(Matcher): _values = attr.ib(converter=iterable_nonstr_then_frozenlist) def _map(self, fn): return attr.evolve(self, values=[fn(x) for x in self._values]) def __str__(self): return " IN %s" % repr(self._values) @attr.s(frozen=True) class ExistsMatcher(Matcher): def __str__(self): return " EXISTS" @attr.s(frozen=True) class LessThanMatcher(Matcher): _value = attr.ib() def _map(self, fn): return attr.evolve(self, value=fn(self._value)) def __str__(self): return "<%s" % repr(self._value) def coerce_to_matcher(value): if isinstance(value, Matcher): return value if value is Criteria.exists: warnings.warn( "Criteria.exists is deprecated, use Matcher.exists() instead", DeprecationWarning, stacklevel=2, ) return ExistsMatcher() return EqMatcher(value) @attr.s(frozen=True) class FieldMatchCriteria(Criteria): _field = attr.ib() _matcher = attr.ib(converter=coerce_to_matcher) def __str__(self): matcher = str(self._matcher) out = "%s%s" % (self._field, matcher) if " " in matcher: out = "(%s)" % out return out @attr.s(frozen=True) class UnitTypeMatchCriteria(FieldMatchCriteria): # This specialization of FieldMatchCriteria is used to match on unit types # while also keeping info on the fields of interest to the user. _unit_fields = attr.ib() @attr.s(frozen=True) class AndCriteria(Criteria): _operands = attr.ib() def __str__(self): if not self._operands: return "<empty AND>" if len(self._operands) == 1: return str(self._operands[0]) return "(" + " AND ".join([str(o) for o in self._operands]) + ")" @attr.s(frozen=True) class OrCriteria(Criteria): _operands = attr.ib() def __str__(self): if not self._operands: return "<empty OR>" if len(self._operands) == 1: return str(self._operands[0]) return "(" + " OR ".join([str(o) for o in self._operands]) + ")" class TrueCriteria(Criteria): def __str__(self): return "TRUE"