Created
October 25, 2020 23:27
-
-
Save JosXa/b430af408573fa5a64dcc2630d00a574 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import re | |
from typing import AbstractSet, Any, Dict, List, Optional, Union | |
from azure.appconfiguration import AzureAppConfigurationClient | |
from decouple import config | |
from pydantic.fields import ModelField | |
from pydantic.main import BaseModel, Extra | |
from pydantic.typing import display_as_type | |
from pydantic.utils import deep_update, sequence_like | |
app_config_sentinel = str(object()) | |
class ConfigError(ValueError): | |
pass | |
class AppConfig(BaseModel): | |
""" | |
Base class for declarative definition of configuration variables retrieved from Azure App Configuration. | |
""" | |
def __init__( | |
__pydantic_self__, | |
_client: Optional[AzureAppConfigurationClient] = None, | |
_connection_string: Optional[str] = None, | |
**values: Any, | |
) -> None: | |
# Uses something other than `self` the first arg to allow "self" as a settable attribute | |
if _client and _connection_string: | |
raise ValueError( | |
"The `_client` and `_connection_string` parameters are mutually exclusive." | |
) | |
elif _connection_string: | |
_client = AzureAppConfigurationClient.from_connection_string( | |
_connection_string | |
) | |
super().__init__(**__pydantic_self__._build_values(values, _client=_client)) | |
def _build_values( | |
self, | |
init_kwargs: Dict[str, Any], | |
_client: Optional[AzureAppConfigurationClient] = None, | |
) -> Dict[str, Any]: | |
return deep_update(self._build_environ(_client), init_kwargs) | |
def _build_environ( | |
self, | |
_client: Optional[AzureAppConfigurationClient] = None, | |
) -> Dict[str, Optional[str]]: | |
""" | |
Build environment variables suitable for passing to the Model. | |
""" | |
if not _client: | |
if self.__config__.app_config_client: | |
_client = self.__config__.app_config_client | |
else: | |
# Try to find connection string in environment | |
for env_key in [ | |
"APP_CONFIG_CONNECTION", | |
"APP_CONFIG_CONNECTION_STRING", | |
"APP_CONFIGURATION_CONNECTION", | |
"APP_CONFIGURATION_CONNECTION_STRING", | |
]: | |
env_value = config(env_key, default=None) | |
if env_value: | |
_client = AzureAppConfigurationClient.from_connection_string( | |
config("APP_CONFIG_CONNECTION") | |
) | |
break | |
else: | |
raise ValueError( | |
"Could not find connection string or instantiated client for Azure App Configuration." | |
) | |
d: Dict[str, Optional[str]] = {} | |
if (key_prefix := self.__config__.key_prefix) and key_prefix not in ("*", ":*"): | |
key_filter = key_prefix.rstrip(":*") + ":*" | |
else: | |
key_filter = None | |
if (label_prefix := self.__config__.label_prefix) and label_prefix not in ( | |
"*", | |
":*", | |
): | |
label_filter = label_prefix.rstrip(":*") + ":*" | |
else: | |
label_filter = None | |
retrieved_settings = _client.list_configuration_settings( | |
key_filter=key_filter, label_filter=label_filter | |
) | |
def format_key(key: str, prefix: Optional[str]): | |
if prefix: | |
prefix = prefix.rstrip(":*") | |
key = re.sub(fr"{re.escape(prefix)}:", "", key) | |
return key if self.__config__.case_sensitive else key.lower() | |
settings = {format_key(s.key, key_filter): s.value for s in retrieved_settings} | |
for field in self.__fields__.values(): | |
env_val: Optional[str] = None | |
for env_name in field.field_info.extra["keys"]: | |
env_val = settings.get(env_name) | |
if env_val is not None: | |
break | |
if env_val is None: | |
continue | |
if field.is_complex(): | |
try: | |
# noinspection PyUnresolvedReferences | |
env_val = self.__config__.json_loads(env_val) # type: ignore | |
except ValueError as e: | |
raise ConfigError(f'error parsing JSON for "{env_name}"') from e | |
d[field.alias] = env_val | |
return d | |
class Config: | |
key_prefix = "" | |
label_prefix = "" | |
app_config_client = None | |
validate_all = True | |
extra = Extra.forbid | |
arbitrary_types_allowed = True | |
case_sensitive = False | |
@classmethod | |
def prepare_field(cls, field: ModelField) -> None: | |
keys: Union[List[str], AbstractSet[str]] | |
key = field.field_info.extra.get("key") | |
if key is None: | |
keys = {cls.key_prefix + field.name} | |
elif isinstance(key, str): | |
keys = {key} | |
elif isinstance(key, (set, frozenset)): | |
keys = key | |
elif sequence_like(key): | |
keys = list(key) | |
else: | |
raise TypeError( | |
f"invalid field env: {key!r} ({display_as_type(key)}); should be string, list or set" | |
) | |
if not cls.case_sensitive: | |
keys = keys.__class__(n.lower() for n in keys) | |
field.field_info.extra["keys"] = keys | |
__config__: Config # type: ignore |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment