Created
January 8, 2024 22:55
-
-
Save sungchun12/615379b63fad70046051d091a1200980 to your computer and use it in GitHub Desktop.
Demo script to run a Datafold Cloud xdb data diff between Databricks and Snowflake with simple API calls
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Demo script to run a xdb data diff between Databricks and Snowflake with simple API calls | |
""" | |
import os | |
import time | |
from pydantic import BaseModel | |
from typing import Any, List, Optional | |
import requests | |
from tabulate import tabulate | |
from termcolor import colored | |
from halo import Halo | |
# TODO: replace with your own Datafold API key | |
host = "app.datafold.com" # replace with 'staging.datafold.io', or on-premise host URL | |
datafold_api_key = os.environ.get( | |
"DATAFOLD_API_KEY" | |
) # replace with your own Datafold API key | |
# TODO: replace with your own data diff configs | |
data_source1_id = 6716 # replace with your own data source id | |
data_source2_id = 4932 # replace with your own data source id | |
table1 = [ | |
"hive_metastore", | |
"dbt_sung", | |
"dim_orgs", | |
] # ["database", "schema", "table/view"] | |
table2 = [ | |
"DEMO", | |
"CORE", | |
"DIM_ORGS", | |
] # ["database", "schema", "table/view"] | |
pk_columns = ["org_id"] # replace with your own primary key columns | |
cols = [ | |
"created_at", | |
"num_users", | |
"sub_created_at", | |
"sub_plan", | |
"sub_price", | |
] # replace with your own column names | |
class DataDiffConfigs(BaseModel): | |
data_source1_id: int | |
data_source2_id: int | |
table1: List[str] | |
table2: List[str] | |
pk_columns: List[str] | |
columns_to_compare: Optional[List[str]] | |
data_diff_configs = DataDiffConfigs( | |
data_source1_id=data_source1_id, | |
data_source2_id=data_source2_id, | |
table1=table1, | |
table2=table2, | |
pk_columns=pk_columns, | |
columns_to_compare=cols, | |
) | |
class DataDiff: | |
def __init__( | |
self, | |
host: str, | |
datafold_api_key: str, | |
): | |
self.session = requests.Session() | |
self.host = host | |
self.session.headers["Authorization"] = f"Key {datafold_api_key}" | |
def create_diff(self, data_diff_configs: DataDiffConfigs) -> int: | |
resp = self.session.post( | |
f"https://{self.host}/api/v1/datadiffs", json=data_diff_configs.dict() | |
) | |
resp.raise_for_status() | |
data = resp.json() | |
url = colored(f"https://{self.host}/datadiffs/{data['id']}", "blue") | |
print(f"Started XDB Diff: {url}") | |
return data["id"] | |
def get_diff_summary(self, id: int) -> dict[str, Any]: | |
resp = self.session.get( | |
f"https://{self.host}/api/v1/datadiffs/{id}/summary_results" | |
) | |
resp.raise_for_status() | |
data = resp.json() | |
return data | |
def wait_for_results(self, id: int) -> dict[str, Any]: | |
spinner = Halo(text="Running", spinner="dots", color="green") | |
start_time = time.time() | |
try: | |
spinner.start() | |
while True: | |
summary = self.get_diff_summary(id) | |
if summary["status"] in ("success", "error"): | |
elapsed_time = round(time.time() - start_time, 2) | |
spinner.succeed( | |
f"Completed with status: {summary['status']}. Total run time: {elapsed_time} seconds" | |
) | |
return summary | |
elapsed_seconds = int(time.time() - start_time) | |
spinner.text = f"Running... {elapsed_seconds} seconds" | |
time.sleep(1) | |
finally: | |
spinner.stop() | |
def print_results( | |
self, results: dict[str, Any], data_diff_configs: DataDiffConfigs | |
): | |
# For "pks" | |
headers_pks = ["Stats", data_diff_configs.table1, data_diff_configs.table2] | |
rows_pks = [] | |
for key, value in results["pks"].items(): | |
rows_pks.append([key] + value) | |
print("Data Diff Primary Keys Summary:") | |
print(tabulate(rows_pks, headers=headers_pks, tablefmt="grid")) | |
def run_xdb_diff(self, data_diff_configs: DataDiffConfigs): | |
diff_id = self.create_diff(data_diff_configs) | |
results = self.wait_for_results(diff_id) | |
self.print_results(results, data_diff_configs) | |
if __name__ == "__main__": | |
datadiff = DataDiff(host, datafold_api_key) | |
datadiff.run_xdb_diff(data_diff_configs) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Run with
python3 xdb_diff.py
to get this kind of result.