"""HashiCorp Vault secret store provider."""
# pyright: reportPossiblyUnboundVariable=false
from typing import Any, Dict, List, Optional
from confii.secret_stores.base import (
SecretAccessError,
SecretNotFoundError,
SecretStore,
SecretStoreError,
)
try:
import hvac
from hvac.exceptions import Forbidden, InvalidPath, VaultError
HVAC_AVAILABLE = True
except ImportError:
HVAC_AVAILABLE = False
[docs]
class HashiCorpVault(SecretStore):
"""HashiCorp Vault secret store provider.
This provider integrates with HashiCorp Vault to retrieve secrets from:
- KV v1 and KV v2 secrets engines
- Dynamic secrets
- Custom secrets engines
Prerequisites:
pip install hvac
Example:
>>> from confii import Config
>>> from confii.secret_stores import HashiCorpVault, SecretResolver
>>>
>>> # Initialize with token authentication
>>> store = HashiCorpVault(
... url="http://localhost:8200", token="your-vault-token"
... )
>>>
>>> # Or with AppRole authentication
>>> store = HashiCorpVault(
... url="http://localhost:8200",
... role_id="your-role-id",
... secret_id="your-secret-id",
... )
>>>
>>> # Use with Config
>>> config = Config(secret_resolver=SecretResolver(store))
>>>
>>> # In config file:
>>> # database.password = "${secret:secret/data/db/password:password}"
>>> # For KV v2: secret/data/myapp/db
>>> # For KV v1: secret/myapp/db
"""
[docs]
def __init__(
self,
url: str = "http://127.0.0.1:8200",
token: Optional[str] = None,
role_id: Optional[str] = None,
secret_id: Optional[str] = None,
auth_method: Optional[Any] = None,
namespace: Optional[str] = None,
mount_point: str = "secret",
kv_version: int = 2,
verify: bool = True,
) -> None:
"""Initialize HashiCorp Vault client.
Args:
url: Vault server URL (default: 'http://127.0.0.1:8200').
token: Vault authentication token (for token auth). Deprecated: use auth_method.
role_id: Role ID for AppRole authentication. Deprecated: use auth_method.
secret_id: Secret ID for AppRole authentication. Deprecated: use auth_method.
auth_method: VaultAuthMethod instance (recommended). Supports:
- TokenAuth, AppRoleAuth, OIDCAuth, KerberosAuth, LDAPAuth
- JWTAuth, KubernetesAuth, AWSAuth, AzureAuth, GCPAuth
namespace: Vault namespace (Vault Enterprise feature).
mount_point: KV secrets engine mount point (default: 'secret').
kv_version: KV secrets engine version, 1 or 2 (default: 2).
verify: Verify SSL certificates (default: True).
Raises:
ImportError: If hvac is not installed.
SecretAccessError: If authentication fails.
Example:
>>> # Token auth (legacy)
>>> store = HashiCorpVault(
... url="https://vault.example.com",
... token="s.1234567890",
... mount_point="myapp",
... kv_version=2,
... )
>>>
>>> # AppRole auth (legacy)
>>> store = HashiCorpVault(
... url="https://vault.example.com",
... role_id="role-id-here",
... secret_id="secret-id-here",
... )
>>>
>>> # OIDC with Kerberos (recommended)
>>> from confii.secret_stores.vault_auth import OIDCAuth
>>> auth = OIDCAuth(role="myapp-role", use_kerberos=True)
>>> store = HashiCorpVault(
... url="https://vault.example.com", auth_method=auth
... )
>>>
>>> # LDAP with PIN+Token
>>> from confii.secret_stores.vault_auth import LDAPAuth
>>> def get_pin_token():
... pin = getpass.getpass("PIN: ")
... token = getpass.getpass("Token: ")
... return pin + token
>>> auth = LDAPAuth(username="user", password_provider=get_pin_token)
>>> store = HashiCorpVault(
... url="https://vault.example.com", auth_method=auth
... )
"""
if not HVAC_AVAILABLE:
raise ImportError(
"hvac is required for HashiCorpVault. Install it with: pip install hvac"
)
self.url = url
self.mount_point = mount_point
self.kv_version = kv_version
self.namespace = namespace
try:
self.client = hvac.Client(url=url, verify=verify, namespace=namespace)
# Use new auth_method system if provided
if auth_method:
self.client.token = auth_method.authenticate(self.client)
# Fall back to legacy authentication
elif token:
self.client.token = token
elif role_id and secret_id:
auth_response = self.client.auth.approle.login(
role_id=role_id, secret_id=secret_id
)
self.client.token = auth_response["auth"]["client_token"]
else:
raise ValueError(
"Either 'auth_method' or 'token' or both 'role_id' and 'secret_id' must be provided"
)
# Verify authentication
if not self.client.is_authenticated():
raise SecretAccessError("Failed to authenticate with Vault")
except (SecretAccessError, ValueError):
raise
except Exception as e:
raise SecretAccessError(f"Failed to initialize Vault client: {e}") from e
[docs]
def get_secret(self, key: str, version: Optional[str] = None, **kwargs) -> Any:
"""Retrieve a secret from Vault.
Args:
key: The secret path. For KV v2, use format: "path/to/secret:field"
to extract a specific field. For example:
- "myapp/database:password" extracts the 'password' field
- "myapp/database" returns the entire secret dict
version: For KV v2, specify version number. If None, gets latest.
**kwargs: Additional parameters for read operation.
Returns:
The secret value or dict of values.
Raises:
SecretNotFoundError: If the secret doesn't exist.
SecretAccessError: If there's a permission error.
SecretStoreError: For other Vault errors.
Example:
>>> store = HashiCorpVault(url="http://localhost:8200", token="...")
>>>
>>> # Get entire secret
>>> secret = store.get_secret("myapp/database")
>>> # Returns: {"host": "localhost", "password": "secret"}
>>>
>>> # Get specific field
>>> password = store.get_secret("myapp/database:password")
>>> # Returns: "secret"
>>>
>>> # Get specific version (KV v2)
>>> old_secret = store.get_secret("myapp/database", version="2")
"""
# Parse key and field from format "path:field"
if ":" in key:
path, field = key.rsplit(":", 1)
else:
path = key
field = None
try:
if self.kv_version == 2:
# KV v2 API
read_params: Dict[str, Any] = {
"path": path,
"mount_point": self.mount_point,
}
if version:
read_params["version"] = int(version)
read_params.update(kwargs)
response = self.client.secrets.kv.v2.read_secret_version(**read_params)
data = response.get("data", {}).get("data", {})
else:
# KV v1 API
read_params = {"path": path, "mount_point": self.mount_point}
read_params.update(kwargs)
response = self.client.secrets.kv.v1.read_secret(**read_params)
data = response.get("data", {})
if not data:
raise SecretNotFoundError(f"Secret '{path}' exists but has no data")
# Extract specific field if requested
if field:
if field in data:
return data[field]
else:
raise SecretNotFoundError(
f"Field '{field}' not found in secret '{path}'. "
f"Available fields: {list(data.keys())}"
)
return data
except InvalidPath:
raise SecretNotFoundError(
f"Secret '{path}' not found in Vault "
f"(mount: {self.mount_point}, version: {self.kv_version})"
)
except Forbidden as e:
raise SecretAccessError(f"Access denied to secret '{path}': {e}")
except VaultError as e:
raise SecretStoreError(f"Vault error for '{path}': {e}")
except Exception as e:
raise SecretStoreError(f"Unexpected error accessing '{path}': {e}")
[docs]
def set_secret(self, key: str, value: Any, **kwargs) -> None:
"""Store a secret in Vault.
Args:
key: The secret path.
value: The secret value. For KV engines, should be a dict.
If a string/other type is provided, it will be wrapped as {"value": value}.
**kwargs: Additional parameters for create/update operation.
Raises:
SecretAccessError: If there's a permission error.
SecretStoreError: For other Vault errors.
Example:
>>> store = HashiCorpVault(url="http://localhost:8200", token="...")
>>>
>>> # Store dict secret
>>> store.set_secret(
... "myapp/database",
... {"host": "localhost", "port": 5432, "password": "secret"},
... )
>>>
>>> # Store simple value
>>> store.set_secret("myapp/api-key", "abc123")
>>> # Stored as: {"value": "abc123"}
"""
# Ensure value is a dict for KV engines
if not isinstance(value, dict):
secret_data = {"value": value}
else:
secret_data = value
try:
if self.kv_version == 2:
# KV v2 API
self.client.secrets.kv.v2.create_or_update_secret(
path=key, secret=secret_data, mount_point=self.mount_point, **kwargs
)
else:
# KV v1 API
self.client.secrets.kv.v1.create_or_update_secret(
path=key, secret=secret_data, mount_point=self.mount_point, **kwargs
)
except Forbidden as e:
raise SecretAccessError(f"Access denied writing secret '{key}': {e}")
except VaultError as e:
raise SecretStoreError(f"Vault error writing '{key}': {e}")
except Exception as e:
raise SecretStoreError(f"Unexpected error writing '{key}': {e}")
[docs]
def delete_secret(self, key: str, **kwargs) -> None:
"""Delete a secret from Vault.
For KV v2, this performs a soft delete (can be recovered).
Use destroy_secret() for permanent deletion.
Args:
key: The secret path to delete.
**kwargs: Additional parameters:
- versions: For KV v2, list of versions to delete (default: [latest])
Raises:
SecretNotFoundError: If the secret doesn't exist.
SecretAccessError: If there's a permission error.
SecretStoreError: For other Vault errors.
Example:
>>> store = HashiCorpVault(url="http://localhost:8200", token="...")
>>>
>>> # Delete latest version
>>> store.delete_secret("myapp/old-key")
>>>
>>> # Delete specific versions (KV v2)
>>> store.delete_secret("myapp/secret", versions=[1, 2, 3])
"""
try:
if self.kv_version == 2:
# KV v2 soft delete
versions = kwargs.get("versions")
if versions is None:
# Delete latest version - first read to get current version
metadata = self.client.secrets.kv.v2.read_secret_metadata(
path=key, mount_point=self.mount_point
)
current_version = metadata["data"]["current_version"]
versions = [current_version]
self.client.secrets.kv.v2.delete_secret_versions(
path=key, versions=versions, mount_point=self.mount_point
)
else:
# KV v1 delete
self.client.secrets.kv.v1.delete_secret(
path=key, mount_point=self.mount_point
)
except InvalidPath:
raise SecretNotFoundError(f"Secret '{key}' not found")
except Forbidden as e:
raise SecretAccessError(f"Access denied deleting secret '{key}': {e}")
except VaultError as e:
raise SecretStoreError(f"Vault error deleting '{key}': {e}")
[docs]
def list_secrets(self, prefix: Optional[str] = None, **kwargs) -> List[str]:
"""List secrets in Vault.
Args:
prefix: Optional path prefix to list from.
**kwargs: Additional parameters for list operation.
Returns:
List of secret paths.
Raises:
SecretAccessError: If there's a permission error.
SecretStoreError: For other Vault errors.
Example:
>>> store = HashiCorpVault(url="http://localhost:8200", token="...")
>>>
>>> # List all secrets at root
>>> all_secrets = store.list_secrets()
>>>
>>> # List secrets under a path
>>> app_secrets = store.list_secrets(prefix="myapp/")
"""
path = prefix if prefix else ""
try:
if self.kv_version == 2:
response = self.client.secrets.kv.v2.list_secrets(
path=path, mount_point=self.mount_point
)
else:
response = self.client.secrets.kv.v1.list_secrets(
path=path, mount_point=self.mount_point
)
keys = response.get("data", {}).get("keys", [])
return keys
except InvalidPath:
# Path doesn't exist or is empty
return []
except Forbidden as e:
raise SecretAccessError(f"Access denied listing secrets at '{path}': {e}")
except VaultError as e:
raise SecretStoreError(f"Vault error listing secrets at '{path}': {e}")
[docs]
def __repr__(self) -> str:
"""String representation of the store."""
return (
f"HashiCorpVault(url='{self.url}', "
f"mount='{self.mount_point}', "
f"kv_version={self.kv_version})"
)