Source code for pubtools._pyxis.pyxis_authentication

import os
import subprocess
from typing import Optional

from requests_kerberos import HTTPKerberosAuth, OPTIONAL

from .pyxis_session import PyxisSession


[docs] class PyxisAuth: """Base Auth class."""
[docs] def __init__(self) -> None: """Initialize.""" raise NotImplementedError # pragma: no cover"
[docs] def apply_to_session(self, pyxis_session: PyxisSession) -> None: """Set up initialization in the Pyxis session.""" raise NotImplementedError # pragma: no cover"
[docs] class PyxisSSLAuth(PyxisAuth): """SSL Auth provider to PyxisClient.""" # pylint: disable=super-init-not-called
[docs] def __init__(self, crt_path: str, key_path: str) -> None: """ Initialize. Args: crt_path (str) Path to .crt file (signed certificate). key_path (str) Path to .key file (private key). """ self.crt_path = crt_path self.key_path = key_path
[docs] def apply_to_session(self, pyxis_session: PyxisSession) -> None: """ Set up PyxisSession with SSL auth. Args: pyxis_session (PyxisSession) PyxisSession instance. """ pyxis_session.session.cert = (self.crt_path, self.key_path)
[docs] class PyxisKrbAuth(PyxisAuth): """Kerberos authentication support for PyxisClient."""
[docs] def __init__( self, krb_princ: str, service: str, ccache_file: str, ktfile: Optional[str] = None, ) -> None: """ Initialize. Args: krb_princ (str) Kerberos principal for obtaining ticket. service (str) URL of the service to apply the authentication to. ccache_file (str) Path to a file used for ccache. Only necessary if kinit will be used. ktfile (str) Kerberos client keytab file. """ self.krb_princ = krb_princ self.service = service self.ktfile = ktfile self.ccache_file = ccache_file
[docs] def _krb_auth(self) -> HTTPKerberosAuth: retcode = subprocess.Popen( ["klist", "-s"], stdout=subprocess.PIPE, stderr=subprocess.PIPE ).wait() if retcode or self.ktfile: if self.ktfile: retcode = subprocess.Popen( [ "kinit", self.krb_princ, "-k", "-t", self.ktfile, "-c", self.ccache_file, ], stdout=subprocess.PIPE, stderr=subprocess.PIPE, ).wait() os.environ["KRB5CCNAME"] = self.ccache_file else: # If keytab path wasn't provided, default location will be attempted retcode = subprocess.Popen( ["kinit", self.krb_princ, "-k", "-c", self.ccache_file], stdout=subprocess.PIPE, stderr=subprocess.PIPE, ).wait() os.environ["KRB5CCNAME"] = self.ccache_file # preemptive auth is forced to speed up parallel requests return HTTPKerberosAuth( mutual_authentication=OPTIONAL, force_preemptive=True, )
[docs] def apply_to_session(self, pyxis_session: PyxisSession) -> None: """Set up PyxisSession with Kerberos auth. Args: pyxis_session (PyxisSession) PyxisSession instance """ pyxis_session.session.auth = self._krb_auth()