Source code for confii.async_config

"""Async/await support for Confii.

This module provides async versions of Config and loaders for use in
asynchronous Python applications.
"""

import asyncio
import logging
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Sequence

from confii.exceptions import ConfigLoadError

if TYPE_CHECKING:
    from confii.secret_stores.resolver import SecretResolver

logger = logging.getLogger(__name__)


[docs] class AsyncLoader: """Abstract base class for async configuration loaders. All async loaders must inherit from this class and implement the ``load()`` method as a coroutine. Use async loaders when your configuration sources involve I/O that benefits from non-blocking execution (remote APIs, cloud storage, etc.). Example: >>> class AsyncRedisLoader(AsyncLoader): ... async def load(self): ... import aioredis ... ... r = await aioredis.from_url(self.source) ... return await r.hgetall("config") """
[docs] def __init__(self, source: str) -> None: """Initialize the async loader. Args: source: The source identifier (file path, URL, prefix, etc.) """ self.source: str = source self.config: Dict[str, Any] = {}
[docs] async def load(self) -> Optional[Dict[str, Any]]: """Load configuration from the source (async). This method must be implemented by subclasses. Returns: Dictionary containing the loaded configuration, or None if the source doesn't exist or couldn't be loaded. Raises: ConfigLoadError: If loading fails due to an error """ raise NotImplementedError("Async load method must be implemented by subclasses")
[docs] class AsyncYamlLoader(AsyncLoader): """Async YAML configuration loader. Reads a YAML file using a thread-pool executor so the event loop is never blocked by disk I/O. Suitable for ``asyncio``-based applications that load configuration at startup. Example: >>> loader = AsyncYamlLoader("config.yaml") >>> config = await loader.load() >>> print(config["database"]["host"]) """
[docs] async def load(self) -> Optional[Dict[str, Any]]: """Load configuration from YAML file (async). Returns: Loaded configuration dictionary, or None if file doesn't exist Raises: ConfigLoadError: If loading fails """ try: import yaml # Read file in thread pool to avoid blocking loop = asyncio.get_running_loop() content = await loop.run_in_executor( None, self._read_file_sync, self.source ) self.config = yaml.safe_load(content) return self.config except FileNotFoundError: return None except Exception as e: raise ConfigLoadError( f"Failed to load YAML configuration from {self.source}", source=self.source, loader_type=self.__class__.__name__, original_error=e, ) from e
def _read_file_sync(self, source: str) -> str: """Synchronous file read (executed in thread pool).""" with open(source, encoding="utf-8") as f: return f.read()
[docs] class AsyncHTTPLoader(AsyncLoader): """Async HTTP configuration loader. Fetches configuration from a remote HTTP/HTTPS endpoint using ``aiohttp``. The response format (JSON or YAML) is auto-detected from the ``Content-Type`` header or the URL file extension. Requires the ``aiohttp`` package (``pip install aiohttp``). Example: >>> loader = AsyncHTTPLoader("https://cfg.example.com/app.json", timeout=10) >>> config = await loader.load() """
[docs] def __init__(self, url: str, timeout: int = 30) -> None: """Initialize async HTTP loader. Args: url: HTTP/HTTPS URL to load configuration from timeout: Request timeout in seconds """ super().__init__(url) self.timeout = timeout
[docs] async def load(self) -> Dict[str, Any]: """Load configuration from HTTP endpoint (async). Returns: Loaded configuration dictionary Raises: ConfigLoadError: If loading fails """ try: import aiohttp except ImportError: raise ImportError( "aiohttp is required for async HTTP loading. Install with: pip install aiohttp" ) try: async with ( aiohttp.ClientSession( timeout=aiohttp.ClientTimeout(total=self.timeout) ) as session, session.get(self.source) as response, ): response.raise_for_status() content = await response.text() # Detect format from content-type or URL content_type = response.headers.get("content-type", "") if "json" in content_type or self.source.endswith(".json"): import json self.config = json.loads(content) elif "yaml" in content_type or self.source.endswith((".yaml", ".yml")): import yaml self.config = yaml.safe_load(content) else: # Try JSON as default import json self.config = json.loads(content) return self.config except Exception as e: raise ConfigLoadError( f"Failed to load HTTP configuration from {self.source}", source=self.source, loader_type=self.__class__.__name__, original_error=e, ) from e
[docs] class AsyncConfig: """Async version of Config for use in async applications. This class provides async methods for loading and managing configuration in asynchronous Python applications. Example: >>> async def main(): ... loader = AsyncYamlLoader("config.yaml") ... config = await AsyncConfig.create(loaders=[loader]) ... value = await config.get_async("database.host") """
[docs] def __init__( self, env: Optional[str] = None, loaders: Optional[Sequence[AsyncLoader]] = None, use_env_expander: bool = True, use_type_casting: bool = True, debug_mode: bool = False, deep_merge: bool = True, secret_resolver: Optional["SecretResolver"] = None, ) -> None: """Initialize AsyncConfig instance. Args: env: Environment name loaders: List of AsyncLoader instances use_env_expander: Enable environment variable expansion use_type_casting: Enable automatic type casting debug_mode: Enable debug mode deep_merge: Enable deep merging secret_resolver: Optional secret resolver """ self.env = env or "development" self.use_env_expander = use_env_expander self.use_type_casting = use_type_casting self.debug_mode = debug_mode self.deep_merge = deep_merge self.secret_resolver = secret_resolver self._loaders = loaders or [] self._config: Optional[Dict[str, Any]] = None self._merged_config: Optional[Dict[str, Any]] = None
[docs] @classmethod async def create( cls, env: Optional[str] = None, loaders: Optional[Sequence[AsyncLoader]] = None, **kwargs: Any, ) -> "AsyncConfig": """Create and initialize AsyncConfig asynchronously. This is the preferred way to create an AsyncConfig instance as it properly handles async initialization. Args: env: Environment name loaders: List of AsyncLoader instances **kwargs: Additional arguments passed to __init__ Returns: Initialized AsyncConfig instance Example: >>> config = await AsyncConfig.create( ... env="production", loaders=[AsyncYamlLoader("config.yaml")] ... ) """ instance = cls(env=env, loaders=loaders, **kwargs) await instance.load() return instance
[docs] async def load(self) -> None: """Load configuration from all async loaders. This method must be called before accessing configuration values. Raises: ConfigLoadError: If loading fails """ configs: List[Dict[str, Any]] = [] # Load from all async loaders in parallel tasks = [loader.load() for loader in self._loaders] results = await asyncio.gather(*tasks, return_exceptions=True) errors = [] for result in results: if isinstance(result, BaseException): logger.warning(f"Failed to load configuration: {result}") if isinstance(result, Exception): errors.append(result) continue if result is not None: configs.append(result) # If ALL loaders failed, raise instead of silently returning empty config if errors and not configs: raise ConfigLoadError( f"All {len(errors)} configuration loaders failed", source="async_loaders", original_error=errors[0], ) # Merge configurations from confii.config_merger import ConfigMerger config_tuples = [ (config, f"async_source_{i}") for i, config in enumerate(configs) ] self._merged_config = ConfigMerger.merge_configs( config_tuples, deep_merge=self.deep_merge ) self._config = self._merged_config
[docs] async def get_async(self, key_path: str, default: Any = None) -> Any: """Get configuration value asynchronously. Args: key_path: Dot-separated key path default: Default value if key not found Returns: Configuration value or default """ if self._config is None: await self.load() from confii.config_introspection import get_nested_value return get_nested_value(self._config or {}, key_path, default)
[docs] async def reload(self) -> None: """Reload configuration asynchronously. This method reloads all configurations from their sources. """ self._config = None self._merged_config = None await self.load()
[docs] def to_dict(self) -> Dict[str, Any]: """Export configuration as dictionary. Returns: Configuration dictionary """ return self._config or {}
[docs] async def validate_async(self, schema: Optional[Any] = None) -> bool: """Validate configuration asynchronously. Args: schema: Optional schema to validate against Returns: True if valid, False otherwise """ if self._config is None: await self.load() # Validation is typically CPU-bound, so run in executor loop = asyncio.get_running_loop() return await loop.run_in_executor( None, self._validate_sync, self._config or {}, schema )
def _validate_sync(self, config: Dict[str, Any], schema: Any) -> bool: """Synchronous validation (run in executor).""" if not schema: return bool(config) # Use Pydantic validation if schema is a Pydantic model is_pydantic = False try: from pydantic import BaseModel is_pydantic = isinstance(schema, type) and issubclass(schema, BaseModel) except ImportError: pass if is_pydantic: from confii.validators.pydantic_validator import PydanticValidator validator = PydanticValidator(schema) try: validator.validate(config) return True except Exception: return False return bool(config)