Source code for hvac.v1

import os

from hvac import adapters, api, exceptions, utils
from hvac.constants.client import (
    DEFAULT_URL,
    DEPRECATED_PROPERTIES,
    VAULT_CACERT,
    VAULT_CAPATH,
    VAULT_CLIENT_CERT,
    VAULT_CLIENT_KEY,
)

try:
    import hcl

    has_hcl_parser = True
except ImportError:
    has_hcl_parser = False


[docs]class Client: """The hvac Client class for HashiCorp's Vault."""
[docs] def __init__( self, url=None, token=None, cert=None, verify=None, timeout=30, proxies=None, allow_redirects=True, session=None, adapter=adapters.JSONAdapter, namespace=None, **kwargs, ): """Creates a new hvac client instance. :param url: Base URL for the Vault instance being addressed. :type url: str :param token: Authentication token to include in requests sent to Vault. :type token: str :param cert: Certificates for use in requests sent to the Vault instance. This should be a tuple with the certificate and then key. :type cert: tuple :param verify: Either a boolean to indicate whether TLS verification should be performed when sending requests to Vault, or a string pointing at the CA bundle to use for verification. See http://docs.python-requests.org/en/master/user/advanced/#ssl-cert-verification. :type verify: Union[bool,str] :param timeout: The timeout value for requests sent to Vault. :type timeout: int :param proxies: Proxies to use when performing requests. See: http://docs.python-requests.org/en/master/user/advanced/#proxies :type proxies: dict :param allow_redirects: Whether to follow redirects when sending requests to Vault. :type allow_redirects: bool :param session: Optional session object to use when performing request. :type session: request.Session :param adapter: Optional class to be used for performing requests. If none is provided, defaults to hvac.adapters.JSONRequest :type adapter: hvac.adapters.Adapter :param kwargs: Additional parameters to pass to the adapter constructor. :type kwargs: dict :param namespace: Optional Vault Namespace. :type namespace: str """ token = token if token is not None else utils.get_token_from_env() url = url if url else os.getenv("VAULT_ADDR", DEFAULT_URL) if cert is not None and VAULT_CLIENT_CERT: cert = "\n".join( [ VAULT_CLIENT_CERT, VAULT_CLIENT_KEY, ] ) # Consider related CA env vars _only if_ no argument is passed in under the # `verify` parameter. if verify is None: # Reference: https://www.vaultproject.io/docs/commands#vault_cacert # Note: "[VAULT_CACERT] takes precedence over VAULT_CAPATH." and thus we # check for VAULT_CAPATH _first_. if VAULT_CAPATH: verify = VAULT_CAPATH if VAULT_CACERT: verify = VAULT_CACERT if not verify: # default to verifying certificates if the above aren't defined verify = True self._adapter = adapter( base_uri=url, token=token, cert=cert, verify=verify, timeout=timeout, proxies=proxies, allow_redirects=allow_redirects, session=session, namespace=namespace, **kwargs, ) # Instantiate API classes to be exposed as properties on this class starting with auth method classes. self._auth = api.AuthMethods(adapter=self._adapter) self._secrets = api.SecretsEngines(adapter=self._adapter) self._sys = api.SystemBackend(adapter=self._adapter)
def __getattr__(self, name): return utils.getattr_with_deprecated_properties( obj=self, item=name, deprecated_properties=DEPRECATED_PROPERTIES ) @property def adapter(self): return self._adapter @adapter.setter def adapter(self, adapter): self._adapter = adapter @property def url(self): return self._adapter.base_uri @url.setter def url(self, url): self._adapter.base_uri = url @property def token(self): return self._adapter.token @token.setter def token(self, token): self._adapter.token = token @property def session(self): return self._adapter.session @session.setter def session(self, session): self._adapter.session = session @property def allow_redirects(self): return self._adapter.allow_redirects @allow_redirects.setter def allow_redirects(self, allow_redirects): self._adapter.allow_redirects = allow_redirects @property def auth(self): """Accessor for the Client instance's auth methods. Provided via the :py:class:`hvac.api.AuthMethods` class. :return: This Client instance's associated Auth instance. :rtype: hvac.api.AuthMethods """ return self._auth @property def secrets(self): """Accessor for the Client instance's secrets engines. Provided via the :py:class:`hvac.api.SecretsEngines` class. :return: This Client instance's associated SecretsEngines instance. :rtype: hvac.api.SecretsEngines """ return self._secrets @property def sys(self): """Accessor for the Client instance's system backend methods. Provided via the :py:class:`hvac.api.SystemBackend` class. :return: This Client instance's associated SystemBackend instance. :rtype: hvac.api.SystemBackend """ return self._sys @property def generate_root_status(self): return self.sys.read_root_generation_progress() @property def key_status(self): """GET /sys/key-status :return: Information about the current encryption key used by Vault. :rtype: dict """ return self.sys.get_encryption_key_status()["data"] @property def rekey_status(self): return self.sys.read_rekey_progress() @property def ha_status(self): """Read the high availability status and current leader instance of Vault. :return: The JSON response returned by read_leader_status() :rtype: dict """ return self.sys.read_leader_status() @property def seal_status(self): """Read the seal status of the Vault. This is an unauthenticated endpoint. Supported methods: GET: /sys/seal-status. Produces: 200 application/json :return: The JSON response of the request. :rtype: dict """ return self.sys.read_seal_status()
[docs] def read(self, path, wrap_ttl=None): """GET /<path> :param path: :type path: :param wrap_ttl: :type wrap_ttl: :return: :rtype: """ try: return self._adapter.get(f"/v1/{path}", wrap_ttl=wrap_ttl) except exceptions.InvalidPath: return None
[docs] def list(self, path): """GET /<path>?list=true :param path: :type path: :return: :rtype: """ try: payload = {"list": True} return self._adapter.get(f"/v1/{path}", params=payload) except exceptions.InvalidPath: return None
[docs] def write(self, path, wrap_ttl=None, **kwargs): """POST /<path> :param path: :type path: :param wrap_ttl: :type wrap_ttl: :param kwargs: :type kwargs: :return: :rtype: """ return self._adapter.post(f"/v1/{path}", json=kwargs, wrap_ttl=wrap_ttl)
[docs] def delete(self, path): """DELETE /<path> :param path: :type path: :return: :rtype: """ self._adapter.delete(f"/v1/{path}")
[docs] def get_policy(self, name, parse=False): """Retrieve the policy body for the named policy. :param name: The name of the policy to retrieve. :type name: str | unicode :param parse: Specifies whether to parse the policy body using pyhcl or not. :type parse: bool :return: The (optionally parsed) policy body for the specified policy. :rtype: str | dict """ try: policy = self.sys.read_policy(name=name)["data"]["rules"] except exceptions.InvalidPath: return None if parse: if not has_hcl_parser: raise ImportError("pyhcl is required for policy parsing") policy = hcl.loads(policy) return policy
[docs] def lookup_token(self, token=None, accessor=False, wrap_ttl=None): """GET /auth/token/lookup/<token> GET /auth/token/lookup-accessor/<token-accessor> GET /auth/token/lookup-self :param token: :type token: str. :param accessor: :type accessor: str. :param wrap_ttl: :type wrap_ttl: int. :return: :rtype: """ token_param = { "token": token, } accessor_param = { "accessor": token, } if token: if accessor: path = "/v1/auth/token/lookup-accessor" return self._adapter.post(path, json=accessor_param, wrap_ttl=wrap_ttl) else: path = "/v1/auth/token/lookup" return self._adapter.post(path, json=token_param) else: path = "/v1/auth/token/lookup-self" return self._adapter.get(path, wrap_ttl=wrap_ttl)
[docs] def revoke_token(self, token, orphan=False, accessor=False): """POST /auth/token/revoke POST /auth/token/revoke-orphan POST /auth/token/revoke-accessor :param token: :type token: :param orphan: :type orphan: :param accessor: :type accessor: :return: :rtype: """ if accessor and orphan: msg = "revoke_token does not support 'orphan' and 'accessor' flags together" raise exceptions.InvalidRequest(msg) elif accessor: params = {"accessor": token} self._adapter.post("/v1/auth/token/revoke-accessor", json=params) elif orphan: params = {"token": token} self._adapter.post("/v1/auth/token/revoke-orphan", json=params) else: params = {"token": token} self._adapter.post("/v1/auth/token/revoke", json=params)
[docs] def renew_token(self, token, increment=None, wrap_ttl=None): """POST /auth/token/renew POST /auth/token/renew-self :param token: :type token: :param increment: :type increment: :param wrap_ttl: :type wrap_ttl: :return: :rtype: For calls expecting to hit the renew-self endpoint please use the "renew_self_token" method instead """ params = { "increment": increment, } params["token"] = token return self._adapter.post( "/v1/auth/token/renew", json=params, wrap_ttl=wrap_ttl )
[docs] def logout(self, revoke_token=False): """Clears the token used for authentication, optionally revoking it before doing so. :param revoke_token: :type revoke_token: :return: :rtype: """ if revoke_token: self.auth.token.revoke_self() self.token = None
[docs] def is_authenticated(self): """Helper method which returns the authentication status of the client :return: :rtype: """ if not self.token: return False try: self.lookup_token() return True except exceptions.Forbidden: return False except exceptions.InvalidPath: return False except exceptions.InvalidRequest: return False
[docs] def auth_cubbyhole(self, token): """Perform a login request with a wrapped token. Stores the unwrapped token in the resulting Vault response for use by the :py:meth:`hvac.adapters.Adapter` instance under the _adapater Client attribute. :param token: Wrapped token :type token: str | unicode :return: The (JSON decoded) response of the auth request :rtype: dict """ self.token = token return self.login("/v1/sys/wrapping/unwrap")
[docs] def login(self, url, use_token=True, **kwargs): """Perform a login request. Associated request is typically to a path prefixed with "/v1/auth") and optionally stores the client token sent in the resulting Vault response for use by the :py:meth:`hvac.adapters.Adapter` instance under the _adapater Client attribute. :param url: Path to send the authentication request to. :type url: str | unicode :param use_token: if True, uses the token in the response received from the auth request to set the "token" attribute on the the :py:meth:`hvac.adapters.Adapter` instance under the _adapater Client attribute. :type use_token: bool :param kwargs: Additional keyword arguments to include in the params sent with the request. :type kwargs: dict :return: The response of the auth request. :rtype: requests.Response """ return self._adapter.login(url=url, use_token=use_token, **kwargs)