Created
April 12, 2025 03:00
-
-
Save devraj/b828cd3342b79d260b34c85dab90eb18 to your computer and use it in GitHub Desktop.
Pydantic AWS Secrets Manager data source - reference implementation
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" AWS Secrets Manager Source | |
This is a custom source for pydantic settings that will read from | |
AWS Secrets Manager. There are some particulars that make the | |
scenario a little more complex than a simple environment variable. | |
S3 bucket names have to be unique across a region, to facilitate this | |
use we a hash appended to the bucket name, this makes it impossible | |
for us to know the bucket name at build time. | |
RDS will only provision an admin user, however the application should | |
not use these credentials and AWS will not make these credentials available | |
to the EKS cluster. We create a user and password which will end up being | |
the credentials used by the application, the user is created post | |
infrastructure deployment by another process. | |
Finally to reduce the number of secrets written (AWS charged per secret) | |
so values are written into the same secret, this means that the source | |
has to undo them and make them available as if they were environment variables. | |
Secrets Manager also recommends using a caching layer so we aren't hitting | |
the service on every requests. This is achieved using an official caching | |
plugin maintained by AWS for Boto3. | |
""" | |
from typing import ( | |
Any, | |
Type, | |
Tuple, | |
Dict | |
) | |
from datetime import timedelta | |
import json | |
import logging | |
import botocore | |
import botocore.session | |
from aws_secretsmanager_caching import ( | |
SecretCache, | |
SecretCacheConfig | |
) | |
from pydantic_settings import ( | |
BaseSettings, | |
PydanticBaseSettingsSource, | |
) | |
from pydantic.fields import ( | |
FieldInfo | |
) | |
# Used to parse the AMQP DSN from secrets manager | |
from pydantic.networks import AmqpDsn | |
# This is how you get a list of key in the secrets manager | |
# In case you don't have access to them e.g when RDS provisions | |
# the master user and stores a key | |
# | |
# cache_config = SecretCacheConfig() # See below for defaults | |
# cache = SecretCache(config=cache_config, client=client) | |
# paginator = client.get_paginator('list_secrets') | |
# for page in paginator.paginate(): | |
# for secret in page['SecretList']: | |
# # print(cache.get_secret_string(secret['Name'])) | |
# print(secret['Name']) | |
class AWSSecretsManagerSource(PydanticBaseSettingsSource): | |
""" A pydantic settings source that reads from AWS Secrets Manager | |
This source will read from AWS Secrets Manager and make the values | |
available as if they were read from the environment. | |
There are some assumptions that have been made to keep costs low in | |
secrets manager and deal with the values as they are made available | |
by the environment. | |
These value are unknown to the application as they are references to | |
resources created by tools like Terraform. | |
Note: the use of aws_secretsmanager_caching | |
https://github.com/aws/aws-secretsmanager-caching-python | |
to minimize the number of calls to the AWS API. | |
In some instances e.g S3 buckets the keys of the name of the bucket | |
is unknown, this is because a bucket name has to be unique across | |
the region. To facilitate this we use a hash appended to the bucket. | |
Warning: The caching client does not return objects, it can only | |
cache things as binary of string, you will need to decode the | |
string into a JSON object. | |
""" | |
# TODO: make this configurable | |
KEY_PREFIX = "ef2test" | |
def __init__( | |
self, | |
settings_cls: type[BaseSettings], | |
) -> None: | |
# TODO: make this configurable | |
region_name = 'ap-southeast-2' | |
try: | |
session = botocore.session.get_session() | |
self.secretsmanager_client = session.create_client( | |
'secretsmanager', | |
region_name=region_name, | |
) | |
# Cache configuration with default values see following for options | |
# https://github.com/aws/aws-secretsmanager-caching-python#cache-configuration | |
cache_config = SecretCacheConfig( | |
secret_refresh_interval=int( | |
timedelta(hours=1).total_seconds() | |
) | |
) | |
self.secrets_cache = SecretCache( | |
config=cache_config, | |
client=self.secretsmanager_client | |
) | |
except Exception as e: | |
logging.error("Failed to connect to secrets manager") | |
logging.error(e) | |
self.secrets_cache = None | |
self.secretsmanager_client = None | |
super().__init__(settings_cls) | |
def prepare_field_value( | |
self, field_name: str, | |
field: FieldInfo, value: Any, | |
value_is_complex: bool | |
) -> Any: | |
return value | |
def get_field_value( | |
self, | |
field: FieldInfo, | |
field_name: str | |
): | |
field_value = None | |
if self.secrets_cache is None: | |
""" Return if the secrets manager failed to initialise | |
""" | |
return ( | |
field_value, | |
field_name, | |
False | |
) | |
# Get the prefix from the configuration | |
env_prefix = self.settings_cls.model_config['env_prefix'] | |
if env_prefix == "REDIS_": | |
field_value = self._get_redis_value(field_name) | |
elif env_prefix == "POSTGRES_": | |
field_value = self._get_rds_value(field_name) | |
elif env_prefix == "AMQP_": | |
field_value = self._get_amqp_value(field_name) | |
elif env_prefix == "S3_": | |
field_value = self._get_s3_value(field_name) | |
return ( | |
field_value, | |
field_name, | |
False | |
) | |
def __call__(self) -> Dict[str, Any]: | |
""" PydanticBaseSettingsSource is an abstract class that requires | |
the __call__ method to be implemented. The aim of this is to | |
construct a dictionary of values that this source can satisfy. | |
""" | |
d: Dict[str, Any] = {} | |
for field_name, field in self.settings_cls.model_fields.items(): | |
field_value, field_key, value_is_complex = self.get_field_value( | |
field, field_name | |
) | |
field_value = self.prepare_field_value( | |
field_name, field, field_value, value_is_complex | |
) | |
if field_value is not None: | |
d[field_key] = field_value | |
return d | |
def _get_redis_value(self, field_name: str): | |
""" Gets the redis cluster related values from secrets manager | |
redis values are stored as a single secrets which looks like: | |
kubernetescluster_redis_cluster_endpoint | |
where kubernetescluster_ is the prefix for the application, accessing | |
it will yield in a json object that looks like. | |
{ | |
"cluster_endpoint": "kubernetescluster-redis.....amazonaws.com" | |
} | |
""" | |
try: | |
redis_secrets_str = self.secrets_cache.get_secret_string( | |
f'{self.KEY_PREFIX}_redis_cluster_endpoint' | |
) | |
redis_secrets = json.loads(redis_secrets_str) | |
if field_name == "host": | |
return redis_secrets['cluster_endpoint'] | |
except botocore.exceptions.NoCredentialsError as e: | |
logging.error("No AWS credentials found, skipping secrets manager") | |
logging.error(e) | |
except botocore.exceptions.ClientError as e: | |
logging.error("Error fetching value from secrets manager") | |
logging.error(e) | |
return None | |
def _get_rds_value(self, field_name: str): | |
""" Get details of the postgres RDS instance from secrets manager | |
aurora details are stored at as a single secret which looks like: | |
kubernetescluster_rds_aurora_access_details | |
where kubernetescluster_ is the prefix for the application, accessing | |
it will yield in a json object that looks like. | |
{ | |
"cluster_endpoint": "kubernetescluster......ap-southeast-2.rds.amazonaws.com", | |
"username": "kubernetescluster", | |
"password": "randomstring" | |
} | |
""" | |
try: | |
rds_secrets_str = self.secrets_cache.get_secret_string( | |
f'{self.KEY_PREFIX}_rds_aurora_access_details' | |
) | |
rds_secrets = json.loads(rds_secrets_str) | |
if field_name == "host": | |
return rds_secrets['cluster_endpoint'] | |
elif field_name == "user": | |
return rds_secrets['username'] | |
elif field_name == "password": | |
return rds_secrets['password'] | |
except botocore.exceptions.NoCredentialsError as e: | |
logging.error("No AWS credentials found, skipping secrets manager") | |
logging.error(e) | |
except botocore.exceptions.ClientError as e: | |
logging.error("Error fetching value from secrets manager") | |
logging.error(e) | |
return None | |
def _get_amqp_value(self, field_name: str): | |
""" Get the AMQP details from secrets manager | |
AMQP details are stored as a single secret which looks like: | |
kubernetescluster_mq_broker_access_details | |
where kubernetescluster_ is the prefix for the application, accessing | |
it will yield in a json object that looks like. | |
{ | |
"username": "ExampleUser", | |
"password": "randomstring", | |
"endpoint": "amqps://b-40a3474a-...-1.mq.ap-southeast-2.amazonaws.com:5671" | |
} | |
Note: that we are using the AMQPS protocol which is the secure version | |
and use pyndatic to parse the endpoint as the protocol, host and port | |
""" | |
try: | |
mq_broker_str = self.secrets_cache.get_secret_string( | |
f'{self.KEY_PREFIX}_mq_broker_access_details' | |
) | |
mq_broker_secrets = json.loads(mq_broker_str) | |
if field_name in ["host", "port", "protocol"]: | |
# Use pyndatic to undo the dsn and pass values | |
# this should be valid but we should catch pyndatic exceptions | |
dsn = AmqpDsn(mq_broker_secrets['endpoint']) | |
if field_name == "host": | |
return dsn.host | |
elif field_name == "port": | |
return dsn.port | |
elif field_name == "protocol": | |
return dsn.scheme | |
elif field_name == "user": | |
return mq_broker_secrets['username'] | |
elif field_name == "password": | |
return mq_broker_secrets['password'] | |
except botocore.exceptions.NoCredentialsError as e: | |
logging.error("No AWS credentials found, skipping secrets manager") | |
logging.error(e) | |
except botocore.exceptions.ClientError as e: | |
logging.error("Error fetching value from secrets manager") | |
logging.error(e) | |
return None | |
def _get_s3_value(self, field_name: str): | |
""" | |
kubernetescluster_s3_buckets | |
{ | |
"s3_bucket_media": "kubernetescluster.media.b4ea4.......", | |
"s3_bucket_ugc": "kubernetescluster.ugc.b4ea46e........" | |
} | |
""" | |
try: | |
rds_secrets_str = self.secrets_cache.get_secret_string( | |
f'{self.KEY_PREFIX}_s3_buckets' | |
) | |
rds_secrets = json.loads(rds_secrets_str) | |
if field_name == "bucket_ugc": | |
return rds_secrets['s3_bucket_ugc'] | |
elif field_name == "bucket_media": | |
return rds_secrets['s3_bucket_media'] | |
except botocore.exceptions.NoCredentialsError as e: | |
logging.error("No AWS credentials found, skipping secrets manager") | |
logging.error(e) | |
except botocore.exceptions.ClientError as e: | |
logging.error("Error fetching value from secrets manager") | |
logging.error(e) | |
return None | |
class BaseSettingsWithAWSSecretManagerSource(BaseSettings): | |
""" A mixin that enables the Secrets Manager source | |
Note that be sure to import both the secrets manager class | |
and the mixin before using it in your configuration sources. | |
The AWSSecretsManagerSourceMixin should be the last class | |
in the inheritance chain. This will ensure that you can use | |
environment variables while in development and secrets manager | |
when in production. | |
The secrets manager will only be queried if the environment | |
variables does not satisfy the query. | |
When in production ensure that the variable is not set in the | |
environment for secrets manager queries to work. | |
""" | |
@classmethod | |
def settings_customise_sources( | |
cls, | |
settings_cls: Type[BaseSettings], | |
init_settings: PydanticBaseSettingsSource, | |
env_settings: PydanticBaseSettingsSource, | |
dotenv_settings: PydanticBaseSettingsSource, | |
file_secret_settings: PydanticBaseSettingsSource, | |
) -> Tuple[PydanticBaseSettingsSource, ...]: | |
return ( | |
init_settings, | |
env_settings, | |
dotenv_settings, | |
file_secret_settings, | |
AWSSecretsManagerSource(settings_cls), | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment