Created
February 10, 2025 19:57
-
-
Save stivio00/d4a95e9eb81333c55a1bf6248ee97505 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
""" | |
Usage: db-tool-py test.yaml | |
It will run each query with the correponding db and sql, if not provided it will use the global values | |
To install deps | |
$pip install pydantic pyyaml pandas typer sqlalchemy pyodbc jinja2 | |
Simple case query the same db with two queries: | |
scan: | |
db: sqlite:///test.db # global db connection | |
queries: | |
- sql: select * from t1; | |
- sql: select * from 2; | |
To query to a different databases and echo sql in the console: | |
scan: | |
echo: true # echo the sql in the console | |
queries: | |
- sql: select * from t1; | |
db: sqlite:///test1.db | |
- sql: select * from 2; | |
db: sqlite:///test2.db | |
To write output to different csv files : | |
scan: | |
output: csv # separate files | |
sql: select * from t1; # here is a global sql applied to all queries | |
queries: | |
- db: sqlite:///test1.db | |
- db: sqlite:///test2.db | |
To use jinja2 templates | |
scan: | |
db: sqlite:///test1.db # global db used in all queires | |
queries: | |
- sql: select * from {{table}} # jinja2 template | |
values: { table: table1 } # context | |
- sql: select * from {{table}} | |
values: { table: table2 } | |
Is posible to use global templates templates | |
scan: | |
sql: select * from {{table}} | |
db: sqlite:///{{db_name}}.db | |
queries: | |
- values: { table: table1 , db_name: test1} # use the global d and gloabl sql | |
- sql: select * from table2; # override the global sql | |
values: {db_name: test2} | |
Some DB connection string examples: | |
sqlite3 : sqlite:///scan_results.db | |
postgres : postgresql://user:password@localhost/dbname | |
mysql: mysql+pymysql://user:password@localhost/dbname | |
mssql+pyodbc://user:password@server/dbname?driver=ODBC+Driver+17+for+SQL+Server &trusted_connection=true | |
""" | |
import typer, yaml, sqlite3 | |
from enum import Enum | |
from pydantic import BaseModel, field_validator | |
import pandas as pd | |
from sqlalchemy import create_engine, text | |
from jinja2 import Template | |
app = typer.Typer() | |
class OutputType(Enum): | |
ONECSV = 1 | |
CSV = 2 | |
class TableConfig(BaseModel): | |
name: str | None = None | |
db: str | None = None | |
sql: str | None = None | |
values: dict[str,str] = {} | |
class ScanConfig(BaseModel): | |
output: OutputType = OutputType.ONECSV | |
queries: list[TableConfig] | |
db: str | None = None | |
sql: str | None = None | |
echo: bool = False | |
@field_validator("output", mode="before") | |
def validate_output(cls, v): | |
"""Convert enum name from YAML into an IntEnum value.""" | |
if isinstance(v, str) and v.upper() in OutputType.__members__: | |
return OutputType[v.upper()] # Convert name to enum | |
elif isinstance(v, int) and v in OutputType._value2member_map_: | |
return OutputType(v) # Allow direct int values | |
raise ValueError(f"Invalid output type: {v}. Allowed: {', '.join(OutputType.__members__)}") | |
class Config(BaseModel): | |
scan: ScanConfig | |
templates: dict[str, str] = {} | |
def render_text(template_str, values): | |
template = Template(template_str) | |
return Template(template.render(**values)).render(**values) | |
def load_yaml(file_path: str) -> Config: | |
"""Load and validate YAML into a Pydantic object.""" | |
with open(file_path, "r") as file: | |
data = yaml.safe_load(file) | |
return Config(**data) | |
def execute_query(db_file: str, query: str, echo:bool): | |
"""Execute a SQL query and return results as a DataFrame.""" | |
engine = create_engine(db_file, echo=echo) | |
conn = engine.connect() | |
result = conn.execute(text(query)) | |
rows = result.fetchall() | |
if rows: | |
return pd.DataFrame(rows, columns=result.keys()) # Convert to DataFrame | |
return pd.DataFrame() | |
def save_results_to_csv(df: pd.DataFrame, output_file: str): | |
"""Save DataFrame results to a CSV file.""" | |
df.to_csv(output_file, index=False) | |
print(f"✅ Saved results to {output_file}") | |
def run_queries(config: Config, output_file: str): | |
"""Save queries to CSV based on output type.""" | |
all_results = [] | |
for query in config.scan.queries: | |
print(f"⚡ Quering {query.name or query.sql}") | |
db = render_text(query.db or config.scan.db, query.values | config.templates) | |
sql = render_text(query.sql or config.scan.sql, query.values| config.templates) | |
df = execute_query(db, sql, config.scan.echo) | |
all_results.append(df) | |
if config.scan.output == OutputType.ONECSV: | |
final_df = pd.concat(all_results) | |
save_results_to_csv(final_df, output_file) | |
elif config.scan.output == OutputType.CSV: # separate files | |
count = 0 | |
for i, table in enumerate(all_results): | |
table_file = f"{config.scan.queries[i].name or i }.csv" | |
save_results_to_csv(table, table_file) | |
else: | |
print(f"💣 Output format {config} not soported.") | |
@app.command() | |
def process_yaml(yaml_file: str, output:str = "queries.csv"): | |
"""CLI tool to run queries agains diferen db""" | |
config = load_yaml(yaml_file) | |
run_queries(config, output) | |
if __name__ == "__main__": | |
app() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment