import configparser
import os
import re
from collections.abc import Iterable
from dataclasses import dataclass
from enum import Enum
from typing import Any
from fastapi import HTTPException
from pydantic_settings import BaseSettings, SettingsConfigDict
def split_ini_list(raw: str | None) -> list[str]:
# Given a string value from an .ini file, splits it into multiple
# strings over line boundaries, using the typical form supported
# in .ini files.
# e.g.
#
# [section]
# my-setting=
# foo
# bar
#
# => returns ["foo", "bar"]
if not raw:
return []
return [elem.strip() for elem in raw.split("\n") if elem.strip()]
@dataclass
class CacheFlushRule:
name: str
"""Name of this rule (from the config file)."""
templates: list[str]
"""List of URL/ARL templates.
Each template may be either:
- a base URL, e.g. "https://cdn.example.com/cdn-root"
- an ARL template, e.g. "S/=/123/22334455/{ttl}/cdn1.example.com/{path}"
Templates may contain 'ttl' and 'path' placeholders to be substituted
when calculating cache keys for flush.
When there is no 'path' in a template, the path will instead be
appended.
"""
includes: list[re.Pattern[str]]
"""List of patterns applied to decide whether this rule is
applicable to any given path.
Patterns are non-anchored regular expressions.
A path must match at least one pattern in order for cache flush
to occur for that path.
There is a default pattern of ".*", meaning that all paths will
be included by default.
Note that these includes are evaluated *after* the set of paths
for flush have already been filtered to include only entry points
(e.g. repomd.xml and other mutable paths). It is not possible to
use this mechanism to enable cache flushing of non-entry-point
paths.
"""
excludes: list[re.Pattern[str]]
"""List of patterns applied to decide whether this rule should
be skipped for any given path.
Patterns are non-anchored regular expressions.
If a path matches any pattern, cache flush won't occur.
excludes are applied after includes.
"""
def matches(self, path: str) -> bool:
"""True if this rule matches the given path."""
# We always match against absolute paths with a leading /,
# regardless of how the input was formatted.
path = "/" + path.removeprefix("/")
# Must match at least one 'includes'.
for pattern in self.includes:
if pattern.search(path):
break
else:
return False
# Must not match any 'excludes'.
for pattern in self.excludes:
if pattern.search(path):
return False
return True
@classmethod
def load_all(
cls: type["CacheFlushRule"],
config: configparser.ConfigParser,
env_section: str,
names: Iterable[str],
) -> list["CacheFlushRule"]:
out: list[CacheFlushRule] = []
for rule_name in names:
section_name = f"cache_flush.{rule_name}"
templates = split_ini_list(config.get(section_name, "templates"))
includes = [
re.compile(s)
for s in split_ini_list(
config.get(section_name, "includes", fallback=".*")
)
]
excludes = [
re.compile(s)
for s in split_ini_list(
config.get(section_name, "excludes", fallback=None)
)
]
out.append(
cls(
name=rule_name,
templates=templates,
includes=includes,
excludes=excludes,
)
)
# backwards-compatibility: if no rules were defined, but old-style
# cache flush config was specified, read it into a rule with default
# 'includes' and 'excludes'.
if not names and (
config.has_option(env_section, "cache_flush_urls")
or config.has_option(env_section, "cache_flush_arl_templates")
):
out.append(
cls(
name=f"{env_section}-legacy",
templates=split_ini_list(
config.get(
env_section, "cache_flush_urls", fallback=None
)
)
+ split_ini_list(
config.get(
env_section,
"cache_flush_arl_templates",
fallback=None,
)
),
includes=[re.compile(r".*")],
excludes=[],
)
)
return out
class Environment(object):
def __init__(
self,
name,
aws_profile,
bucket,
table,
config_table,
cdn_url,
cdn_key_id,
cache_flush_rules=None,
):
self.name = name
self.aws_profile = aws_profile
self.bucket = bucket
self.table = table
self.config_table = config_table
self.cdn_url = cdn_url
self.cdn_key_id = cdn_key_id
self.cache_flush_rules: list[CacheFlushRule] = cache_flush_rules or []
@property
def cdn_private_key(self):
return os.getenv("EXODUS_GW_CDN_PRIVATE_KEY_%s" % self.name.upper())
@property
def fastpurge_enabled(self) -> bool:
"""True if this environment has fastpurge-based cache flushing enabled.
When True, it is guaranteed that all needed credentials for fastpurge
are available for this environment.
"""
return (
# There must be at least one cache flush rule in config...
bool(self.cache_flush_rules)
# ... and *all* fastpurge credentials must be set
and self.fastpurge_access_token
and self.fastpurge_client_secret
and self.fastpurge_client_token
and self.fastpurge_host
)
@property
def fastpurge_client_secret(self):
return os.getenv(
"EXODUS_GW_FASTPURGE_CLIENT_SECRET_%s" % self.name.upper()
)
@property
def fastpurge_host(self):
return os.getenv("EXODUS_GW_FASTPURGE_HOST_%s" % self.name.upper())
@property
def fastpurge_access_token(self):
return os.getenv(
"EXODUS_GW_FASTPURGE_ACCESS_TOKEN_%s" % self.name.upper()
)
@property
def fastpurge_client_token(self):
return os.getenv(
"EXODUS_GW_FASTPURGE_CLIENT_TOKEN_%s" % self.name.upper()
)
class MigrationMode(str, Enum):
upgrade = "upgrade"
model = "model"
none = "none"
[docs]
class Settings(BaseSettings):
# Settings for the server.
#
# Most settings defined here can be overridden by an environment variable
# of the same name, prefixed with "EXODUS_GW_". Please add doc strings only
# for those (and not for other computed fields, like 'environments'.)
call_context_header: str = "X-RhApiPlatform-CallContext"
"""Name of the header from which to extract call context (for authentication
and authorization).
"""
upload_meta_fields: dict[str, str] = {}
"""Permitted metadata field names for s3 uploads and their regex
for validation. E.g., "exodus-migration-md5": "^[0-9a-f]{32}$"
"""
publish_paths: dict[str, dict[str, list[str]]] = {}
"""A set of user or service accounts which are only authorized to publish to a
particular set of path(s) in a given CDN environment and the regex(es) describing
the paths to which the user or service account is authorized to publish. The user or
service account will be prevented from publishing to any paths that do not match the
defined regular expression(s).
E.g., '{"pre": {"fake-user":
["^(/content)?/origin/files/sha256/[0-f]{2}/[0-f]{64}/[^/]{1,300}$"]}}'
Any user or service account not included in this configuration is considered to have
unrestricted publish access (i.e., can publish to any path).
"""
log_config: dict[str, Any] = {
"version": 1,
"incremental": True,
"disable_existing_loggers": False,
}
"""Logging configuration in dictConfig schema."""
ini_path: str | None = None
"""Path to an exodus-gw.ini config file with additional settings."""
environments: list[Environment] = []
# List of environment objects derived from exodus-gw.ini.
db_service_user: str = "exodus-gw"
"""db service user name"""
db_service_pass: str = "exodus-gw"
"""db service user password"""
db_service_host: str = "exodus-gw-db"
"""db service host"""
db_service_port: str = "5432"
"""db service port"""
db_url: str | None = None
"""Connection string for database. If set, overrides the ``db_service_*`` settings."""
db_reset: bool = False
"""If set to True, drop all DB tables during startup.
This setting is intended for use during development.
"""
db_migration_mode: MigrationMode = MigrationMode.upgrade
"""Adjusts the DB migration behavior when the exodus-gw service starts.
Valid values are:
upgrade (default)
Migrate the DB to ``db_migration_revision`` (default latest) when
the service starts up.
This is the default setting and should be left enabled for typical
production use.
model
Don't use migrations. Instead, attempt to initialize the database
from the current version of the internal sqlalchemy model.
This is intended for use during development while prototyping
schema changes.
none
Don't perform any DB initialization at all.
"""
db_migration_revision: str = "head"
"""If ``db_migration_mode`` is ``upgrade``, this setting can be used to override
the target revision when migrating the DB.
"""
db_session_max_tries: int = 3
"""The maximum number of attempts to recreate a DB session within a request."""
item_yield_size: int = 5000
"""Number of publish items to load from the service DB at one time."""
write_batch_size: int = 25
"""Maximum number of items to write to the DynamoDB table at one time."""
write_max_tries: int = 20
"""Maximum write attempts to the DynamoDB table."""
write_max_workers: int = 10
"""Maximum number of worker threads used in the DynamoDB batch writes."""
write_queue_size: int = 1000
"""Maximum number of items the queue can hold at one time."""
write_queue_timeout: int = 60 * 10
"""Maximum amount of time (in seconds) to wait for queue items.
Defaults to 10 minutes.
"""
publish_timeout: int = 24
"""Maximum amount of time (in hours) between updates to a pending publish before
it will be considered abandoned. Defaults to one day.
"""
history_timeout: int = 24 * 14
"""Maximum amount of time (in hours) to retain historical data for publishes and
tasks. Publishes and tasks in a terminal state will be erased after this time has
passed. Defaults to two weeks.
"""
path_history_timeout: int = 700
"""Maximum amount of time (in days) to retain data on published paths for
the purpose of cache flushing.
"""
task_deadline: int = 2
"""Maximum amount of time (in hours) a task should remain viable. Defaults to two
hours.
"""
actor_time_limit: int = 30 * 60000
"""Maximum amount of time (in milliseconds) actors may run. Defaults to 30
minutes.
"""
actor_max_backoff: int = 5 * 60000
"""Maximum amount of time (in milliseconds) actors may use while backing
off retries. Defaults to five (5) minutes.
"""
entry_point_files: list[str] = [
"repomd.xml",
"repomd.xml.asc",
"PULP_MANIFEST",
"PULP_MANIFEST.asc",
"treeinfo",
"extra_files.json",
]
"""List of file names that should be saved for last when publishing."""
phase2_patterns: list[re.Pattern[str]] = [
# kickstart repos; note the logic here matches
# the manual workaround RHELDST-27642
re.compile(r"/kickstart/.*(?<!\.rpm)$"),
]
"""List of patterns which, if any have matched, force a path to
be handled during phase 2 of commit.
These patterns are intended for use with repositories not cleanly
separated between mutable entry points and immutable content.
For example, in-place updates to kickstart repositories may not
only modify entry points such as extra_files.json but also
arbitrary files referenced by that entry point, all of which should
be processed during phase 2 of commit in order for updates to
appear atomic.
"""
autoindex_filename: str = ".__exodus_autoindex"
"""Filename for indexes automatically generated during publish.
Can be set to an empty string to disable generation of indexes.
"""
autoindex_partial_excludes: list[str] = ["/kickstart/"]
"""Background processing of autoindexes will be disabled for paths matching
any of these values.
"""
config_cache_ttl: int = 2
"""Time (in minutes) config is expected to live in components that consume it.
Determines the delay for deployment task completion to allow for
existing caches to expire and the newly deployed config to take effect.
"""
worker_health_filepath: str = (
"/tmp/exodus-gw-last-healthy" # nosec - Bandit doesn't like that /tmp is used.
)
"""The path to a file used to verify healthiness of a worker. Intended to be used by OCP"""
worker_keepalive_timeout: int = 60 * 5
"""Background worker keepalive timeout, in seconds. If a worker fails to update its
status within this time period, it is assumed dead.
This setting affects how quickly the system can recover from issues such as a worker
process being killed unexpectedly.
"""
worker_keepalive_interval: int = 60
"""How often, in seconds, should background workers update their status."""
cron_cleanup: str = "0 */12 * * *"
"""cron-style schedule for cleanup task.
exodus-gw will run a cleanup task approximately according to this schedule, removing old
data from the system."""
scheduler_interval: int = 15
"""How often, in minutes, exodus-gw should check if a scheduled task is ready to run.
Note that the cron rules applied to each scheduled task are only as accurate as this
interval allows, i.e. each rule may be triggered up to ``scheduler_interval`` minutes late.
"""
scheduler_delay: int = 5
"""Delay, in minutes, after exodus-gw workers start up before any scheduled tasks
should run."""
cdn_flush_on_commit: bool = True
"""Whether 'commit' tasks should automatically flush CDN cache for
affected URLs.
Only takes effect for environments where cache flush credentials/settings
have been configured.
"""
cdn_listing_flush: bool = True
"""Whether listing paths in the config should be flushed while deploying the
config."""
cdn_cookie_ttl: int = 60 * 720
"""Time (in seconds) cookies generated by ``cdn-redirect`` remain valid."""
cdn_signature_timeout: int = 60 * 30
"""Time (in seconds) signed URLs remain valid."""
cdn_max_expire_days: int = 365
"""Maximum permitted value for ``expire_days`` option on
``cdn-access`` endpoint.
Clients obtaining signed cookies for CDN using ``cdn-access`` will be
forced to renew their cookies at least this frequently.
"""
s3_pool_size: int = 3
"""Number of S3 clients to cache"""
model_config = SettingsConfigDict(env_prefix="exodus_gw_")
def load_settings() -> Settings:
"""Return the currently active settings for the server.
This function will load settings from config files and environment
variables. It is intended to be called once at application startup.
Request handler functions should access settings via ``app.state.settings``.
"""
settings = Settings()
config = configparser.ConfigParser()
# Try to find config here by default...
filenames = [
os.path.join(os.path.dirname(__file__), "../exodus-gw.ini"),
"/opt/app/config/exodus-gw.ini",
]
# ...but also allow pointing at a specific config file if this path
# has been set. Note that putting this at the end gives it the highest
# precedence, as the behavior is to load all the existing files in
# order with each one potentially overriding settings from the prior.
if settings.ini_path:
filenames.append(settings.ini_path)
config.read(filenames)
for logger in config["loglevels"] if "loglevels" in config else []:
settings.log_config.setdefault("loggers", {})
log_config = settings.log_config
dest = log_config if logger == "root" else log_config["loggers"]
dest.update({logger: {"level": config.get("loglevels", logger)}})
for env in [sec for sec in config.sections() if sec.startswith("env.")]:
aws_profile = config.get(env, "aws_profile", fallback=None)
bucket = config.get(env, "bucket", fallback=None)
table = config.get(env, "table", fallback=None)
config_table = config.get(env, "config_table", fallback=None)
cdn_url = config.get(env, "cdn_url", fallback=None)
cdn_key_id = config.get(env, "cdn_key_id", fallback=None)
cache_flush_rule_names = split_ini_list(
config.get(env, "cache_flush_rules", fallback=None)
)
cache_flush_rules = CacheFlushRule.load_all(
config, env, cache_flush_rule_names
)
settings.environments.append(
Environment(
name=env.replace("env.", ""),
aws_profile=aws_profile,
bucket=bucket,
table=table,
config_table=config_table,
cdn_url=cdn_url,
cdn_key_id=cdn_key_id,
cache_flush_rules=cache_flush_rules,
)
)
return settings
def get_environment(env: str, settings: Settings | None = None):
"""Return the corresponding environment object for the given environment
name.
"""
settings = settings or load_settings()
for env_obj in settings.environments:
if env_obj.name == env:
return env_obj
raise HTTPException(
status_code=404, detail="Invalid environment=%s" % repr(env)
)