Skip to content

Instantly share code, notes, and snippets.

@kshitijcode
Created November 29, 2021 05:13
Show Gist options
  • Select an option

  • Save kshitijcode/4b8d05a463acb6e2391ddfd10af9dcfa to your computer and use it in GitHub Desktop.

Select an option

Save kshitijcode/4b8d05a463acb6e2391ddfd10af9dcfa to your computer and use it in GitHub Desktop.
mimport pytest
import json
from dataconnectors import adls
from utils import pipelineutils, constants
CLEANUP_TABLES = []
CLEANUP_FOLDERS = []
database = "default"
@pytest.fixture(autouse=True)
def run_before_and_after_tests(adls_connection_client,
base_path: str,
container_name: str,
azure_credential,
synapse_endpoint):
yield
print("STARTING TO CLEAN UP .....")
adls.cleanup_ADLS_files(adls_connection_client,
container_name, CLEANUP_FOLDERS)
print(CLEANUP_TABLES)
params = {"table_name": CLEANUP_TABLES}
pipelineutils.run_and_observe_pipeline(
azure_credential, synapse_endpoint, constants.INTEGRATIONTEST_CLEANUP_PIPELINE, params)
def test_source_to_gold_workflow(azure_credential,
synapse_endpoint: str,
pipeline_name: str,
storage_account_name: str,
container_name: str,
base_path: str,
input_sample_file_name: str,
adls_connection_client,
sql_connection_client):
source_to_raw_tests(azure_credential,
synapse_endpoint,
pipeline_name,
storage_account_name,
container_name,
base_path,
input_sample_file_name,
adls_connection_client)
raw_to_gold_tests(azure_credential,
synapse_endpoint,
pipeline_name,
sql_connection_client)
def source_to_raw_tests(azure_credential,
synapse_endpoint: str,
pipeline_name: str,
storage_account_name: str,
container_name: str,
base_path: str,
input_sample_file_name: str,
adls_connection_client):
print("STARTING SOURCE TO RAW...\n")
# Arrange
target_path = "/integrationtest_employees/raw/"
table_raw = "integrationtest_employees"
target_table = json.dumps(
{"name": table_raw, "overwrite": "false", "path": target_path, "partition_by": ""})
CLEANUP_FOLDERS.append(base_path)
# Act
# Uploading File to Landing Zone
out_file_name, out_file_path = adls.upload_to_ADLS(
adls_connection_client, container_name, base_path, input_sample_file_name)
# Trigger the Master Pipeline for Landing to Raw Zone
masterpipeline_raw_params = {
"basePath": base_path,
"filePath": "employee_*.parquet",
"targetTable": target_table,
"badDataTable": "bad_employees",
"containerName": container_name,
"archivePath": "archive",
"storageAccountName": storage_account_name,
"readFromSparkTables": False,
"database": database,
}
print(f"{pipeline_name} Source to Raw Parameters : {masterpipeline_raw_params}\n")
CLEANUP_FOLDERS.append(target_path.split('/')[1])
CLEANUP_TABLES.append(table_raw)
pipeline_run_result = pipelineutils.run_and_observe_pipeline(
azure_credential, synapse_endpoint, pipeline_name, masterpipeline_raw_params)
assert pipeline_run_result == constants.PIPELINE_SUCCESS_STATUS
# Check for Data in Raw Zone
parquet_dataframe = adls.read_parquet_file_from_ADLS(
adls_connection_client, container_name, target_path)
num_of_rows = len(parquet_dataframe.index)
# Assert
print(f"Number of Rows Fetched : { num_of_rows }\n")
assert num_of_rows == 10
def raw_to_gold_tests(azure_credential,
synapse_endpoint: str,
pipeline_name: str,
sql_connection_client):
print("STARTING PROCESSED TO gold TEST...\n")
table_raw = "integrationtest_employees"
table_gold = "gold_integrationtest_employee"
# Trigger the Master Pipeline for Processed to gold Zone
masterpipeline_gold_params = {
"lookUpTables": [{
"SourceTableSchemaName": "dbo",
"SourceTableName": table_processed,
"SinkTableSchemaName": "dbo",
"SinkTableName": table_gold,
"HasIncrementalData": "false"
}],
"sourceDatabase": database,
}
pipeline_name = constants.COPY_TO_DEDICATE_SQL_PIPELINE
print(f"{pipeline_name} Parameters : {masterpipeline_gold_params}\n")
pipeline_run_result = pipelineutils.run_and_observe_pipeline(
azure_credential, synapse_endpoint, pipeline_name, masterpipeline_gold_params)
assert pipeline_run_result == constants.PIPELINE_SUCCESS_STATUS
# Check for Data in gold Zone
cursor = sql_connection_client.cursor()
cursor.execute(
"SELECT COUNT(*) AS COUNT FROM [dbo].[{0}]".format(table_gold))
row = cursor.fetchone()
assert row is not None
assert int(row.COUNT) == 10
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment