Last active
January 10, 2024 03:22
-
-
Save loopyd/60c21f09ca3e2007d00681e5554e3103 to your computer and use it in GitHub Desktop.
[py] Pydantic environment (.env) file parser
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from pydantic import BaseModel, Field | |
from typing import Any, Callable, Type, TypeVar, Union | |
import os, sys | |
class EnvFileModel(BaseModel): | |
class Config: | |
validate_assignment = True | |
extra = "forbid" | |
arbitrary_types_allowed = True | |
def __init__(self, *args, **kwargs): | |
super().__init__(*args, **kwargs) | |
def save_env(self, env_file: str = os.path.abspath(__file__).rsplit("/", 1)[0] + "/.env"): | |
""" | |
Saves the pydantic model to an .env file | |
""" | |
env = self.serialize_env() | |
if os.path.exists(env_file) is True: | |
if os.path.isfile(env_file) is True: | |
os.remove(env_file) | |
else: | |
raise Exception(f"Cannot write to {env_file}, it is not a file.") | |
with open(env_file, "w") as f: | |
f.write(env) | |
def serialize_env(self, obj: Union[Any, None] = None, env: str = ""): | |
""" | |
Serializes a pydantic model to an environment file string | |
""" | |
if obj is None: | |
obj = self | |
elif issubclass(obj, BaseModel) is False: | |
raise Exception(f"Cannot serialize {type(obj)} to env, it is not a pydantic model.") | |
for key, value in obj.model_fields.items(): | |
if hasattr(obj, key) is True: | |
val = getattr(obj, key) | |
if type(val) is str: | |
env += f"{key.upper()}=\"{val}\"\n" | |
elif type(val) is int: | |
env += f"{key.upper()}={val}\n" | |
elif type(val) is bool: | |
env += f"{key.upper()}={'true' if val is True else 'false'}\n" | |
elif type(val) is list: | |
env += f"{key.upper()}=\"{','.join(val)}\"\n" | |
elif type(val) is dict: | |
env += f"{key.upper()}=\"{','.join([f'{k}:{v}' for k, v in val.items()])}\"\n" | |
elif type(val) is set: | |
env += f"{key.upper()}=\"{','.join(val)}\"\n" | |
elif type(val) is tuple: | |
env += f"{key.upper()}=\"{','.join(val)}\"\n" | |
elif type(val) is float: | |
env += f"{key.upper()}={val}\n" | |
elif type(val) is complex: | |
env += f"{key.upper()}={val}\n" | |
elif issubclass(val, BaseModel) is True: | |
env += self.serialize_env(obj=val, env=env) | |
elif type(val) is None or val is None: | |
env += f"{key.upper()}=null\n" | |
return env | |
def load_env(self, env_file: str = os.path.abspath(__file__).rsplit("/", 1)[0] + "/.env"): | |
if os.path.exists(env_file) is False or os.path.isfile(env_file) is False: | |
return | |
with open(os.path.abspath(__file__).rsplit("/", 1)[0] + "/.env", "r") as f: | |
env = f.read() | |
self.deserialize_env(env=env) | |
def deserialize_env(self, env: str, obj: Union[Any, None] = None): | |
""" | |
Deserializes an environment file string to matching pydantic model attributes | |
""" | |
if env is None or env == "": | |
return | |
env_lines = [ | |
x.strip() | |
for x in env.split("\n") | |
] | |
if len(env_lines) == 0: | |
return | |
env_line = env_lines.pop(0) | |
group_dict = re.match(r"(?:(?P<key>[a-zA-Z0-9\_\-]+))?(?:\s*=[\t\ ]*)?(?:(?P<value>'[^']*'|\"[^\"]*\"|[^#\s]*))?(?:(?P<comment>[\ \t]*\#.*))?", env_line).groupdict() | |
if group_dict.get("key") is None: | |
self.deserialize_env(env="\n".join(env_lines), obj=obj) | |
key = group_dict.get("key") | |
value = group_dict.get("value") | |
if obj is None: | |
obj = self | |
elif issubclass(type(obj), BaseModel) is False: | |
raise Exception(f"Cannot deserialize {type(obj)} for {env_line}, it is not a compatible pydantic model.") | |
for kx, vx in obj.model_fields.items(): | |
if hasattr(obj, kx) is True: | |
val = getattr(obj, kx) | |
if issubclass(type(val), BaseModel) is True: | |
env_lines.insert(0, env_line) | |
self.deserialize_env(env="\n".join(env_lines), obj=val) | |
if kx.lower() != key.lower(): | |
continue | |
if type(val) is str: | |
val = value.strip()[1:-1] | |
elif type(val) is int: | |
val = int(value.strip()) | |
elif type(val) is bool: | |
val = True if value.strip().lower() == "true" else False | |
elif type(val) is list: | |
val = value.strip()[1:-1].split(",") | |
elif type(val) is dict: | |
val = {k: v for k, v in [x.split(":") for x in value.strip()[1:-1].split(",")]} | |
elif type(val) is set: | |
val = set(value.strip()[1:-1].split(",")) | |
elif type(val) is tuple: | |
val = tuple(value.strip()[1:-1].split(",")) | |
elif type(val) is float: | |
val = float(value.strip()) | |
elif type(val) is complex: | |
val = complex(value.strip()) | |
elif type(val) is None or value is None: | |
val = None | |
else: | |
raise Exception(f"Cannot deserialize {env_line} for attribute of type {type(val)}") | |
setattr(obj, kx, val) | |
self.deserialize_env(env="\n".join(env_lines), obj=obj) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
DeiEnv - EnvFileModel for pydantic
Advantage over
pydotenv
str
,int
,bool
,dict
,list
,tuple
,float
, andcomplex
, as well as recursion into nested pydanticBaseModel
classes designated as custom fields to look for matching keys.from deienv import EnvFileModel
source
ed