Skip to content

Instantly share code, notes, and snippets.

@stivio00
Created February 10, 2025 19:57
Show Gist options
  • Save stivio00/d4a95e9eb81333c55a1bf6248ee97505 to your computer and use it in GitHub Desktop.
Save stivio00/d4a95e9eb81333c55a1bf6248ee97505 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
"""
Usage: db-tool-py test.yaml
It will run each query with the correponding db and sql, if not provided it will use the global values
To install deps
$pip install pydantic pyyaml pandas typer sqlalchemy pyodbc jinja2
Simple case query the same db with two queries:
scan:
db: sqlite:///test.db # global db connection
queries:
- sql: select * from t1;
- sql: select * from 2;
To query to a different databases and echo sql in the console:
scan:
echo: true # echo the sql in the console
queries:
- sql: select * from t1;
db: sqlite:///test1.db
- sql: select * from 2;
db: sqlite:///test2.db
To write output to different csv files :
scan:
output: csv # separate files
sql: select * from t1; # here is a global sql applied to all queries
queries:
- db: sqlite:///test1.db
- db: sqlite:///test2.db
To use jinja2 templates
scan:
db: sqlite:///test1.db # global db used in all queires
queries:
- sql: select * from {{table}} # jinja2 template
values: { table: table1 } # context
- sql: select * from {{table}}
values: { table: table2 }
Is posible to use global templates templates
scan:
sql: select * from {{table}}
db: sqlite:///{{db_name}}.db
queries:
- values: { table: table1 , db_name: test1} # use the global d and gloabl sql
- sql: select * from table2; # override the global sql
values: {db_name: test2}
Some DB connection string examples:
sqlite3 : sqlite:///scan_results.db
postgres : postgresql://user:password@localhost/dbname
mysql: mysql+pymysql://user:password@localhost/dbname
mssql+pyodbc://user:password@server/dbname?driver=ODBC+Driver+17+for+SQL+Server &trusted_connection=true
"""
import typer, yaml, sqlite3
from enum import Enum
from pydantic import BaseModel, field_validator
import pandas as pd
from sqlalchemy import create_engine, text
from jinja2 import Template
app = typer.Typer()
class OutputType(Enum):
ONECSV = 1
CSV = 2
class TableConfig(BaseModel):
name: str | None = None
db: str | None = None
sql: str | None = None
values: dict[str,str] = {}
class ScanConfig(BaseModel):
output: OutputType = OutputType.ONECSV
queries: list[TableConfig]
db: str | None = None
sql: str | None = None
echo: bool = False
@field_validator("output", mode="before")
def validate_output(cls, v):
"""Convert enum name from YAML into an IntEnum value."""
if isinstance(v, str) and v.upper() in OutputType.__members__:
return OutputType[v.upper()] # Convert name to enum
elif isinstance(v, int) and v in OutputType._value2member_map_:
return OutputType(v) # Allow direct int values
raise ValueError(f"Invalid output type: {v}. Allowed: {', '.join(OutputType.__members__)}")
class Config(BaseModel):
scan: ScanConfig
templates: dict[str, str] = {}
def render_text(template_str, values):
template = Template(template_str)
return Template(template.render(**values)).render(**values)
def load_yaml(file_path: str) -> Config:
"""Load and validate YAML into a Pydantic object."""
with open(file_path, "r") as file:
data = yaml.safe_load(file)
return Config(**data)
def execute_query(db_file: str, query: str, echo:bool):
"""Execute a SQL query and return results as a DataFrame."""
engine = create_engine(db_file, echo=echo)
conn = engine.connect()
result = conn.execute(text(query))
rows = result.fetchall()
if rows:
return pd.DataFrame(rows, columns=result.keys()) # Convert to DataFrame
return pd.DataFrame()
def save_results_to_csv(df: pd.DataFrame, output_file: str):
"""Save DataFrame results to a CSV file."""
df.to_csv(output_file, index=False)
print(f"✅ Saved results to {output_file}")
def run_queries(config: Config, output_file: str):
"""Save queries to CSV based on output type."""
all_results = []
for query in config.scan.queries:
print(f"⚡ Quering {query.name or query.sql}")
db = render_text(query.db or config.scan.db, query.values | config.templates)
sql = render_text(query.sql or config.scan.sql, query.values| config.templates)
df = execute_query(db, sql, config.scan.echo)
all_results.append(df)
if config.scan.output == OutputType.ONECSV:
final_df = pd.concat(all_results)
save_results_to_csv(final_df, output_file)
elif config.scan.output == OutputType.CSV: # separate files
count = 0
for i, table in enumerate(all_results):
table_file = f"{config.scan.queries[i].name or i }.csv"
save_results_to_csv(table, table_file)
else:
print(f"💣 Output format {config} not soported.")
@app.command()
def process_yaml(yaml_file: str, output:str = "queries.csv"):
"""CLI tool to run queries agains diferen db"""
config = load_yaml(yaml_file)
run_queries(config, output)
if __name__ == "__main__":
app()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment