Skip to content

Instantly share code, notes, and snippets.

@devraj
Created April 12, 2025 03:00
Show Gist options
  • Save devraj/b828cd3342b79d260b34c85dab90eb18 to your computer and use it in GitHub Desktop.
Save devraj/b828cd3342b79d260b34c85dab90eb18 to your computer and use it in GitHub Desktop.
Pydantic AWS Secrets Manager data source - reference implementation
""" AWS Secrets Manager Source
This is a custom source for pydantic settings that will read from
AWS Secrets Manager. There are some particulars that make the
scenario a little more complex than a simple environment variable.
S3 bucket names have to be unique across a region, to facilitate this
use we a hash appended to the bucket name, this makes it impossible
for us to know the bucket name at build time.
RDS will only provision an admin user, however the application should
not use these credentials and AWS will not make these credentials available
to the EKS cluster. We create a user and password which will end up being
the credentials used by the application, the user is created post
infrastructure deployment by another process.
Finally to reduce the number of secrets written (AWS charged per secret)
so values are written into the same secret, this means that the source
has to undo them and make them available as if they were environment variables.
Secrets Manager also recommends using a caching layer so we aren't hitting
the service on every requests. This is achieved using an official caching
plugin maintained by AWS for Boto3.
"""
from typing import (
Any,
Type,
Tuple,
Dict
)
from datetime import timedelta
import json
import logging
import botocore
import botocore.session
from aws_secretsmanager_caching import (
SecretCache,
SecretCacheConfig
)
from pydantic_settings import (
BaseSettings,
PydanticBaseSettingsSource,
)
from pydantic.fields import (
FieldInfo
)
# Used to parse the AMQP DSN from secrets manager
from pydantic.networks import AmqpDsn
# This is how you get a list of key in the secrets manager
# In case you don't have access to them e.g when RDS provisions
# the master user and stores a key
#
# cache_config = SecretCacheConfig() # See below for defaults
# cache = SecretCache(config=cache_config, client=client)
# paginator = client.get_paginator('list_secrets')
# for page in paginator.paginate():
# for secret in page['SecretList']:
# # print(cache.get_secret_string(secret['Name']))
# print(secret['Name'])
class AWSSecretsManagerSource(PydanticBaseSettingsSource):
""" A pydantic settings source that reads from AWS Secrets Manager
This source will read from AWS Secrets Manager and make the values
available as if they were read from the environment.
There are some assumptions that have been made to keep costs low in
secrets manager and deal with the values as they are made available
by the environment.
These value are unknown to the application as they are references to
resources created by tools like Terraform.
Note: the use of aws_secretsmanager_caching
https://github.com/aws/aws-secretsmanager-caching-python
to minimize the number of calls to the AWS API.
In some instances e.g S3 buckets the keys of the name of the bucket
is unknown, this is because a bucket name has to be unique across
the region. To facilitate this we use a hash appended to the bucket.
Warning: The caching client does not return objects, it can only
cache things as binary of string, you will need to decode the
string into a JSON object.
"""
# TODO: make this configurable
KEY_PREFIX = "ef2test"
def __init__(
self,
settings_cls: type[BaseSettings],
) -> None:
# TODO: make this configurable
region_name = 'ap-southeast-2'
try:
session = botocore.session.get_session()
self.secretsmanager_client = session.create_client(
'secretsmanager',
region_name=region_name,
)
# Cache configuration with default values see following for options
# https://github.com/aws/aws-secretsmanager-caching-python#cache-configuration
cache_config = SecretCacheConfig(
secret_refresh_interval=int(
timedelta(hours=1).total_seconds()
)
)
self.secrets_cache = SecretCache(
config=cache_config,
client=self.secretsmanager_client
)
except Exception as e:
logging.error("Failed to connect to secrets manager")
logging.error(e)
self.secrets_cache = None
self.secretsmanager_client = None
super().__init__(settings_cls)
def prepare_field_value(
self, field_name: str,
field: FieldInfo, value: Any,
value_is_complex: bool
) -> Any:
return value
def get_field_value(
self,
field: FieldInfo,
field_name: str
):
field_value = None
if self.secrets_cache is None:
""" Return if the secrets manager failed to initialise
"""
return (
field_value,
field_name,
False
)
# Get the prefix from the configuration
env_prefix = self.settings_cls.model_config['env_prefix']
if env_prefix == "REDIS_":
field_value = self._get_redis_value(field_name)
elif env_prefix == "POSTGRES_":
field_value = self._get_rds_value(field_name)
elif env_prefix == "AMQP_":
field_value = self._get_amqp_value(field_name)
elif env_prefix == "S3_":
field_value = self._get_s3_value(field_name)
return (
field_value,
field_name,
False
)
def __call__(self) -> Dict[str, Any]:
""" PydanticBaseSettingsSource is an abstract class that requires
the __call__ method to be implemented. The aim of this is to
construct a dictionary of values that this source can satisfy.
"""
d: Dict[str, Any] = {}
for field_name, field in self.settings_cls.model_fields.items():
field_value, field_key, value_is_complex = self.get_field_value(
field, field_name
)
field_value = self.prepare_field_value(
field_name, field, field_value, value_is_complex
)
if field_value is not None:
d[field_key] = field_value
return d
def _get_redis_value(self, field_name: str):
""" Gets the redis cluster related values from secrets manager
redis values are stored as a single secrets which looks like:
kubernetescluster_redis_cluster_endpoint
where kubernetescluster_ is the prefix for the application, accessing
it will yield in a json object that looks like.
{
"cluster_endpoint": "kubernetescluster-redis.....amazonaws.com"
}
"""
try:
redis_secrets_str = self.secrets_cache.get_secret_string(
f'{self.KEY_PREFIX}_redis_cluster_endpoint'
)
redis_secrets = json.loads(redis_secrets_str)
if field_name == "host":
return redis_secrets['cluster_endpoint']
except botocore.exceptions.NoCredentialsError as e:
logging.error("No AWS credentials found, skipping secrets manager")
logging.error(e)
except botocore.exceptions.ClientError as e:
logging.error("Error fetching value from secrets manager")
logging.error(e)
return None
def _get_rds_value(self, field_name: str):
""" Get details of the postgres RDS instance from secrets manager
aurora details are stored at as a single secret which looks like:
kubernetescluster_rds_aurora_access_details
where kubernetescluster_ is the prefix for the application, accessing
it will yield in a json object that looks like.
{
"cluster_endpoint": "kubernetescluster......ap-southeast-2.rds.amazonaws.com",
"username": "kubernetescluster",
"password": "randomstring"
}
"""
try:
rds_secrets_str = self.secrets_cache.get_secret_string(
f'{self.KEY_PREFIX}_rds_aurora_access_details'
)
rds_secrets = json.loads(rds_secrets_str)
if field_name == "host":
return rds_secrets['cluster_endpoint']
elif field_name == "user":
return rds_secrets['username']
elif field_name == "password":
return rds_secrets['password']
except botocore.exceptions.NoCredentialsError as e:
logging.error("No AWS credentials found, skipping secrets manager")
logging.error(e)
except botocore.exceptions.ClientError as e:
logging.error("Error fetching value from secrets manager")
logging.error(e)
return None
def _get_amqp_value(self, field_name: str):
""" Get the AMQP details from secrets manager
AMQP details are stored as a single secret which looks like:
kubernetescluster_mq_broker_access_details
where kubernetescluster_ is the prefix for the application, accessing
it will yield in a json object that looks like.
{
"username": "ExampleUser",
"password": "randomstring",
"endpoint": "amqps://b-40a3474a-...-1.mq.ap-southeast-2.amazonaws.com:5671"
}
Note: that we are using the AMQPS protocol which is the secure version
and use pyndatic to parse the endpoint as the protocol, host and port
"""
try:
mq_broker_str = self.secrets_cache.get_secret_string(
f'{self.KEY_PREFIX}_mq_broker_access_details'
)
mq_broker_secrets = json.loads(mq_broker_str)
if field_name in ["host", "port", "protocol"]:
# Use pyndatic to undo the dsn and pass values
# this should be valid but we should catch pyndatic exceptions
dsn = AmqpDsn(mq_broker_secrets['endpoint'])
if field_name == "host":
return dsn.host
elif field_name == "port":
return dsn.port
elif field_name == "protocol":
return dsn.scheme
elif field_name == "user":
return mq_broker_secrets['username']
elif field_name == "password":
return mq_broker_secrets['password']
except botocore.exceptions.NoCredentialsError as e:
logging.error("No AWS credentials found, skipping secrets manager")
logging.error(e)
except botocore.exceptions.ClientError as e:
logging.error("Error fetching value from secrets manager")
logging.error(e)
return None
def _get_s3_value(self, field_name: str):
"""
kubernetescluster_s3_buckets
{
"s3_bucket_media": "kubernetescluster.media.b4ea4.......",
"s3_bucket_ugc": "kubernetescluster.ugc.b4ea46e........"
}
"""
try:
rds_secrets_str = self.secrets_cache.get_secret_string(
f'{self.KEY_PREFIX}_s3_buckets'
)
rds_secrets = json.loads(rds_secrets_str)
if field_name == "bucket_ugc":
return rds_secrets['s3_bucket_ugc']
elif field_name == "bucket_media":
return rds_secrets['s3_bucket_media']
except botocore.exceptions.NoCredentialsError as e:
logging.error("No AWS credentials found, skipping secrets manager")
logging.error(e)
except botocore.exceptions.ClientError as e:
logging.error("Error fetching value from secrets manager")
logging.error(e)
return None
class BaseSettingsWithAWSSecretManagerSource(BaseSettings):
""" A mixin that enables the Secrets Manager source
Note that be sure to import both the secrets manager class
and the mixin before using it in your configuration sources.
The AWSSecretsManagerSourceMixin should be the last class
in the inheritance chain. This will ensure that you can use
environment variables while in development and secrets manager
when in production.
The secrets manager will only be queried if the environment
variables does not satisfy the query.
When in production ensure that the variable is not set in the
environment for secrets manager queries to work.
"""
@classmethod
def settings_customise_sources(
cls,
settings_cls: Type[BaseSettings],
init_settings: PydanticBaseSettingsSource,
env_settings: PydanticBaseSettingsSource,
dotenv_settings: PydanticBaseSettingsSource,
file_secret_settings: PydanticBaseSettingsSource,
) -> Tuple[PydanticBaseSettingsSource, ...]:
return (
init_settings,
env_settings,
dotenv_settings,
file_secret_settings,
AWSSecretsManagerSource(settings_cls),
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment