Skip to content

Instantly share code, notes, and snippets.

@adiralashiva8
Created September 9, 2025 03:44
Show Gist options
  • Save adiralashiva8/d26c1ba6671b0d9c274fc418c5c1d974 to your computer and use it in GitHub Desktop.
Save adiralashiva8/d26c1ba6671b0d9c274fc418c5c1d974 to your computer and use it in GitHub Desktop.
import re
import pysnow
import logging
import threading
from deepdiff import DeepDiff
from datetime import datetime
class DataCompare:
delimiter = "|"
source = "dev"
target = "qa"
user_name = "abc"
user_password = "123456"
source_instance = f"{target}.service-now.com"
target_instance = f"{source}.service-now.com"
fields = ['sys_id', 'element']
rest_max_records = 10000 # pysnow max records limit per hit - don't change
max_records_to_validate = 10000
sys_dict_table = "sys_dictionary"
keys_to_ignore = ["root['sys_updated_on']", "root['sys_mod_count']", "root['sizeclass']", "root['text_index']", "root['sys_updated_by']", "root['sys_update_name']"]
def __init__(self) -> None:
self.source_session = pysnow.Client(host=self.source_instance, user=self.user_name, password=self.user_password)
self.target_session = pysnow.Client(host=self.target_instance, user=self.user_name, password=self.user_password)
self.source_session.parameters.exclude_reference_link = True
self.target_session.parameters.exclude_reference_link = True
self.source_field_records_sys_id = []
self.target_field_records_sys_id = []
self.source_field_records_elements = []
self.target_field_records_elements = []
def start_logging(self, my_table_name):
self.script_start_time = datetime.now()
name = f"{my_table_name}.log"
# Configure a logger for each table
logger = logging.getLogger(my_table_name)
logger.setLevel(logging.INFO)
formatter = logging.Formatter('%(message)s')
file_handler = logging.FileHandler(name, mode='w')
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)
# log data
_table_name = my_table_name.replace("_fields",'')
logger.info(f"Table Name {self.delimiter} {_table_name}")
logger.info(f"Source {self.delimiter} {self.source_instance}")
logger.info(f"Target {self.delimiter} {self.target_instance}")
logger.info("")
def end_logging(self, my_table_name) :
logger = logging.getLogger(my_table_name)
time_elapsed = datetime.now() - self.script_start_time
_table_name = my_table_name.replace("_fields",'')
print(f'Script execution time for {_table_name} (hh:mm:ss.ms) {self.delimiter} {time_elapsed}')
# Remove the file handler to close the log file
for handler in logger.handlers:
handler.close()
logger.removeHandler(handler)
def replace_newlines_with_space(self, input_string):
pattern = r'\n+'
result_string = re.sub(pattern, ' ', input_string)
return result_string
def _get_list_of_field_type_records_in_table(self, session, table_name) -> list:
# fetch_records_start_time = datetime.now()
current_table = session.resource(api_path=f'/table/{self.sys_dict_table}', chunk_size=None)
page_size = self.rest_max_records
offset = 0
total_field_results_sys_id = []
while True:
# qb = (
# pysnow.QueryBuilder().field('name').equals(table_name)
# )
cut_off_date = datetime(2025, 4, 12, 23, 59, 59)
qb = (
pysnow.QueryBuilder()
.field('name').equals(table_name)
.AND()
.field('sys_created_on').less_than_or_equal(cut_off_date)
.AND()
.field('sys_created_on').order_descending()
)
response = current_table.get(query=qb, fields=self.fields, limit=page_size, offset=offset)
try:
records = response.all()
except:
records = []
total_field_results_sys_id.extend(records)
if len(records) < page_size:
break
else:
offset += page_size
# time_elapsed = datetime.now() - fetch_records_start_time
# print(f'Elapsed Time to fetch {table_name} records (hh:mm:ss.ms) > {time_elapsed}')
return total_field_results_sys_id
def _compare_records_in_table(self, source_records_list, target_records_list, table_name, logger):
for record in source_records_list:
self.source_field_records_sys_id.append(record['sys_id'])
self.source_field_records_elements.append(record['element'])
for record in target_records_list:
self.target_field_records_sys_id.append(record['sys_id'])
self.target_field_records_elements.append(record['element'])
# for record in source_records_list:
# self.source_field_records_elements.append(record['element'])
# for record in target_records_list:
# self.target_field_records_elements.append(record['element'])
# size and differences
source_fields_sys_id_size = len(self.source_field_records_sys_id)
target_fields_sys_id_size = len(self.target_field_records_sys_id)
difference = source_fields_sys_id_size - target_fields_sys_id_size
status = True if difference != 0 else False
# log
logger.info(f"Source Fields Count {self.delimiter} {source_fields_sys_id_size}")
logger.info(f"Target Fields Count {self.delimiter} {target_fields_sys_id_size}")
logger.info(f"Found Difference? {self.delimiter} {status}")
logger.info(f"Difference Count {self.delimiter} {difference}")
logger.info("")
# compare two lists and find diff
extra_sys_id_in_source = set(self.source_field_records_elements) - set(self.target_field_records_elements)
extra_sys_id_in_target = set(self.target_field_records_elements) - set(self.source_field_records_elements)
source_sys_id_diff = '' if not extra_sys_id_in_source else extra_sys_id_in_source
target_sys_id_diff = '' if not extra_sys_id_in_target else extra_sys_id_in_target
logger.info(f"Fields Only In Source {self.delimiter} {source_sys_id_diff}")
logger.info(f"Fields Only In Target {self.delimiter} {target_sys_id_diff}")
logger.info("")
# get common sys_id in both instances
sys_ids_in_both_instance = set([sys_id for sys_id in self.source_field_records_sys_id if sys_id in self.target_field_records_sys_id])
sys_ids_in_both_instance = set(sys_ids_in_both_instance)
logger.info(f"Common Records Count {self.delimiter} {len(sys_ids_in_both_instance)}")
logger.info("")
logger.info(f"SYS ID {self.delimiter} Key {self.delimiter} Source {self.delimiter} Target")
source_table = self.source_session.resource(api_path=f'/table/{self.sys_dict_table}', chunk_size=None)
target_table = self.target_session.resource(api_path=f'/table/{self.sys_dict_table}', chunk_size=None)
# for index, sys_id in enumerate(sys_ids_in_both_instance):
# if index < self.max_records_to_validate:
# self._compare_record_value(sys_id, source_table, target_table, logger)
# else:
# break
for sys_id in sys_ids_in_both_instance:
self._compare_record_value(sys_id, source_table, target_table, logger)
def _compare_record_value(self, sys_id, source_table, target_table, logger):
# get_data_start_time = datetime.now()
results = {}
def get_source_data():
result = self._get_record_data(source_table, sys_id)
results['source_result'] = result
def get_target_data():
result = self._get_record_data(target_table, sys_id)
results['target_result'] = result
# Create threads for each sys_id and start them
source_thread = threading.Thread(target=get_source_data)
target_thread = threading.Thread(target=get_target_data)
source_thread.start()
target_thread.start()
# Join the threads to wait for their completion
source_thread.join()
target_thread.join()
source_response = results.get('source_result')
target_response = results.get('target_result')
# source_response = source_table.get(query={'sys_id':sys_id},stream=True).first()
# target_response = target_table.get(query={'sys_id':sys_id},stream=True).first()
# get_data_time_elapsed = datetime.now() - get_data_start_time
# print('Get Data Elapsed Time (hh:mm:ss.ms) {}'.format(get_data_time_elapsed))
diff = dict(DeepDiff(source_response, target_response, truncate_datetime='hour'))
initial_keys = diff.keys()
for key in initial_keys:
if 'values_changed' in key:
values_changed_dict = diff['values_changed']
changed_keys = values_changed_dict.keys()
for change_item in changed_keys:
if change_item not in self.keys_to_ignore:
local_dict = dict(values_changed_dict[change_item])
source_value = self.replace_newlines_with_space(str(local_dict['old_value']))
target_value = self.replace_newlines_with_space(str(local_dict['new_value']))
logger.info(f"""{sys_id} {self.delimiter} {change_item} {self.delimiter} {source_value} {self.delimiter} {target_value}""")
def _get_record_data(self, session, sys_id):
return session.get(query={'sys_id':sys_id},stream=True).first()
def compare_table_fields(self, obj, _table):
table_fields_log_name = str(_table) + "_fields"
try:
print(f"Started {_table} fields comparison")
obj.start_logging(table_fields_log_name)
logger = logging.getLogger(table_fields_log_name)
# fetch data
results = {}
def get_source_field_type_data():
result = self._get_list_of_field_type_records_in_table(self.source_session, _table)
results['source_result'] = result
def get_target_field_type_data():
result = self._get_list_of_field_type_records_in_table(self.target_session, _table)
results['target_result'] = result
# Create threads for each sys_id and start them
source_thread = threading.Thread(target=get_source_field_type_data)
target_thread = threading.Thread(target=get_target_field_type_data)
source_thread.start()
target_thread.start()
# Join the threads to wait for their completion
source_thread.join()
target_thread.join()
source_field_records = results.get('source_result')
target_field_records = results.get('target_result')
# source_field_records = self._get_list_of_field_type_records_in_table(self.source_session, _table)
# target_field_records = self._get_list_of_field_type_records_in_table(self.target_session, _table)
self._compare_records_in_table(source_field_records, target_field_records, _table, logger)
except Exception as error_message:
print(f"Failed to compare `{current_table_name}` table fields.\nRefer Exception:\n{error_message}")
finally:
print(f"Completed {_table} fields comparison")
obj.end_logging(table_fields_log_name)
if __name__ == "__main__":
file_path = "prod_scripts/migration/tables/tables.txt"
with open(file_path, 'r') as file:
for table in file:
current_table_name = table.strip()
try:
obj = DataCompare()
obj.compare_table_fields(obj, current_table_name)
except Exception as error_message:
print(f"Failed to compare `{current_table_name}` table fields. Exception: {error_message}")
finally:
obj = ''
import random
import re
import time
import pysnow
import logging
import threading
from deepdiff import DeepDiff
from datetime import datetime, timedelta
import traceback
class DataCompare:
delimiter = "|"
source = "dev"
target = "qa"
user_name = "abc"
user_password = "123"
# source_instance = f"{source}.service-now.com"
# target_instance = f"{target}.service-now.com"
source_instance = f"{target}.service-now.com"
target_instance = f"{source}.service-now.com"
fields = ['sys_id']
custom_fields = ['element']
rest_max_records = 10000 # pysnow max records limit per hit - don't change
# days = 100 # last `n` number of days data to fetch
records_cut_off_count = 500 # max number of records to store from 'days/query' data
max_records_to_validate = 50 # number of random records to compare
sys_dict_table = "sys_dictionary"
keys_to_ignore = ["root['sys_updated_on']", "root['sys_mod_count']", "root['calendar_duration']"]
keys_result_to_truncate = ["root['description']", "root['text']", "root['explanation']",
"root['u_detailed_description_html']", "root['u_latest_status']", "root['u_instruction']",
"root['short_description']", "root['u_access_guide']", "root['u_extension_notes']",
"root['u_comments']", "root['u_notes']", "root['u_order_notes']", "root['u_regulatory_comments']",
"root['comments']", "root['summary']", "root['sys_archived']",]
max_retries = 3
retry_delay = 10
def __init__(self) -> None:
self.source_session = pysnow.Client(host=self.source_instance, user=self.user_name, password=self.user_password)
self.target_session = pysnow.Client(host=self.target_instance, user=self.user_name, password=self.user_password)
self.source_session.parameters.exclude_reference_link = True
self.target_session.parameters.exclude_reference_link = True
self.source_records_sys_id = []
self.target_records_sys_id = []
self.source_field_records_sys_id = []
self.target_field_records_sys_id = []
def start_logging(self, my_table_name):
self.script_start_time = datetime.now()
name = f"{my_table_name}.log"
# Configure a logger for each table
logger = logging.getLogger(my_table_name)
logger.setLevel(logging.INFO)
formatter = logging.Formatter('%(message)s')
file_handler = logging.FileHandler(name, mode='w')
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)
logger.info(f"Table Name {self.delimiter} {my_table_name}")
logger.info(f"Source {self.delimiter} {self.source_instance}")
logger.info(f"Target {self.delimiter} {self.target_instance}")
logger.info("")
def end_logging(self, my_table_name) :
logger = logging.getLogger(my_table_name)
time_elapsed = datetime.now() - self.script_start_time
print(f'Script execution time for {my_table_name} (hh:mm:ss.ms) {self.delimiter} {time_elapsed}')
for handler in logger.handlers:
handler.close()
logger.removeHandler(handler)
def replace_newlines_with_space(self, input_string):
pattern = r'\n+'
result_string = re.sub(pattern, ' ', input_string)
return result_string
def _get_list_of_field_type_records_in_table(self, session, table_name) -> list:
# fetch_records_start_time = datetime.now()
current_table = session.resource(api_path=f'/table/{self.sys_dict_table}', chunk_size=None)
page_size = self.rest_max_records
offset = 0
total_field_results_sys_id = []
while True:
# qb = (
# pysnow.QueryBuilder().field('name').equals(table_name)
# )
cut_off_date = datetime(2024, 4, 12, 23, 59, 59)
qb = (
pysnow.QueryBuilder()
.field('name').equals(table_name)
.AND()
.field('sys_created_on').less_than_or_equal(cut_off_date)
.AND()
.field('sys_created_on').order_descending()
)
response = current_table.get(query=qb, fields=self.custom_fields, limit=page_size, offset=offset)
try:
records = response.all()
except:
records = []
total_field_results_sys_id.extend(records)
if len(records) < page_size:
break
else:
offset += page_size
return total_field_results_sys_id
def fetch_random_records(self, my_list, limit):
try:
records = my_list
random.shuffle(records)
random_records = []
for record in records:
if record not in random_records:
random_records.append(record)
random_records = random_records[:limit]
except Exception as error_message:
print(traceback.format_exc())
return random_records
def _get_table_status(self, session, table_name) -> str:
current_table = session.resource(api_path=f'/table/{table_name}', chunk_size=None)
page_size = 1
offset = 0
limited_results_sys_id = []
result = None
cut_off_date = datetime(2024, 4, 12, 23, 59, 59)
qb = (
pysnow.QueryBuilder()
.field('sys_created_on').less_than_or_equal(cut_off_date) # before date logic
.AND()
.field('sys_created_on').order_descending()
)
response = current_table.get(query=qb, fields=self.fields, limit=page_size, offset=offset)
try:
records = response.all()
result = "Pass"
except Exception as error:
print(error)
result = "Fail"
return result
def _get_limited_records_in_table(self, session, table_name) -> list:
# fetch_records_start_time = datetime.now()
current_table = session.resource(api_path=f'/table/{table_name}', chunk_size=None)
page_size = min(self.rest_max_records, self.records_cut_off_count)
offset = 0
limited_results_sys_id = []
cut_off_date = datetime(2024, 4, 12, 23, 59, 59)
while len(limited_results_sys_id) < self.records_cut_off_count:
qb = (
pysnow.QueryBuilder()
# .field('sys_created_on').greater_than_or_equal(start_date) # after date logic
# .field('sys_created_on').between(start,end) # b/w logic
.field('sys_created_on').less_than_or_equal(cut_off_date) # before date logic
.AND()
.field('sys_created_on').order_descending()
)
# response = current_table.get(query="sys_created_on>javascript:gs.dateGenerate('2023-01-01','end')", fields=self.fields, limit=page_size, offset=offset)
response = current_table.get(query=qb, fields=self.fields, limit=page_size, offset=offset)
try:
records = response.all()
except:
records = []
limited_results_sys_id.extend(records)
if len(records) < page_size:
break
else:
offset += page_size
# time_elapsed = datetime.now() - fetch_records_start_time
# print(f'Elapsed Time to fetch {table_name} records (hh:mm:ss.ms) > {time_elapsed}')
return limited_results_sys_id
def _get_list_of_columns_in_table(self, session, table_name) -> dict:
# fetch_columns_start_time = datetime.now()
current_table = session.resource(api_path=f'/table/{table_name}')
try:
current_table_result = current_table.get(stream=True).first()
except Exception as error_message:
print(traceback.format_exc())
current_table_result = dict()
current_table_keys = current_table_result.keys()
# time_elapsed = datetime.now() - fetch_columns_start_time
# print(f'Elapsed Time to fetch {table_name} columns (hh:mm:ss.ms) > {time_elapsed}')
return current_table_keys
def _compare_limited_records_in_table(self, source_records_list, target_records_list, table_name, logger):
for record in source_records_list:
self.source_records_sys_id.append(record['sys_id'])
for record in target_records_list:
self.target_records_sys_id.append(record['sys_id'])
logger.info(f"Filtered Source Records Count {self.delimiter} {len(self.source_records_sys_id)}")
logger.info(f"Filtered Target Records Count {self.delimiter} {len(self.target_records_sys_id)}")
# get common sys_id in both instances
sys_ids_in_both_instance = [sys_id for sys_id in self.source_records_sys_id if sys_id in self.target_records_sys_id]
# sys_ids_in_both_instance = set(sys_ids_in_both_instance)
sys_ids_in_both_instance_count = len(sys_ids_in_both_instance)
# fetch random records from list of common sys_id's
_records_validate_limit = min(sys_ids_in_both_instance_count, self.max_records_to_validate)
random_common_sys_ids = self.fetch_random_records(sys_ids_in_both_instance, _records_validate_limit)
logger.info(f"Common Records Count {self.delimiter} {sys_ids_in_both_instance_count}")
status = True if {sys_ids_in_both_instance_count} == 0 else False
logger.info(f"Common Records Not Found? {self.delimiter} {status}")
logger.info(f"Random Record Samples Count {self.delimiter} {len(random_common_sys_ids)}")
logger.info("")
logger.info(f"SYS ID {self.delimiter} Key {self.delimiter} Source {self.delimiter} Target")
source_table = self.source_session.resource(api_path=f'/table/{table_name}', chunk_size=None)
target_table = self.target_session.resource(api_path=f'/table/{table_name}', chunk_size=None)
for sys_id in random_common_sys_ids:
self._compare_record_value(sys_id, source_table, target_table, logger)
def _compare_record_value(self, sys_id, source_table, target_table, logger):
# get_data_start_time = datetime.now()
results = {}
def get_source_data():
result = self._get_record_data(source_table, sys_id)
results['source_result'] = result
def get_target_data():
result = self._get_record_data(target_table, sys_id)
results['target_result'] = result
# Create threads for each sys_id and start them
source_thread = threading.Thread(target=get_source_data)
target_thread = threading.Thread(target=get_target_data)
source_thread.start()
target_thread.start()
# Join the threads to wait for their completion
source_thread.join()
target_thread.join()
source_response = results.get('source_result')
target_response = results.get('target_result')
diff = dict(DeepDiff(source_response, target_response, truncate_datetime='hour'))
initial_keys = diff.keys()
for key in initial_keys:
if 'values_changed' in key:
values_changed_dict = diff['values_changed']
changed_keys = values_changed_dict.keys()
for change_item in changed_keys:
if change_item not in self.keys_to_ignore:
local_dict = dict(values_changed_dict[change_item])
source_value = self.replace_newlines_with_space(str(local_dict['old_value']))
target_value = self.replace_newlines_with_space(str(local_dict['new_value']))
if change_item not in self.keys_result_to_truncate:
logger.info(f"""{sys_id} {self.delimiter} {change_item} {self.delimiter} {source_value} {self.delimiter} {target_value}""")
else:
logger.info(f"""{sys_id} {self.delimiter} {change_item} {self.delimiter} Refer changes in Source UI {self.delimiter} Refer changes in Target UI""")
def _get_record_data(self, session, sys_id):
return session.get(query={'sys_id':sys_id},stream=True).first()
def _compare_columns_in_table(self, source_keys_list, target_keys_list, logger):
for record in source_keys_list:
self.source_field_records_sys_id.append(record['element'])
for record in target_keys_list:
self.target_field_records_sys_id.append(record['element'])
# size and differences
source_fields_sys_id_size = len(self.source_field_records_sys_id)
target_fields_sys_id_size = len(self.target_field_records_sys_id)
difference = source_fields_sys_id_size - target_fields_sys_id_size
status = True if difference != 0 else False
logger.info(f"Source Columns Count {self.delimiter} {source_fields_sys_id_size}")
logger.info(f"Target Columns Count {self.delimiter} {target_fields_sys_id_size}")
logger.info(f"Found Columns Difference? {self.delimiter} {status}")
logger.info(f"Difference Columns Count {self.delimiter} {difference}")
# compare two lists and find diff
extra_sys_id_in_source = set(self.source_field_records_sys_id) - set(self.target_field_records_sys_id)
extra_sys_id_in_target = set(self.target_field_records_sys_id) - set(self.source_field_records_sys_id)
source_sys_id_diff = '' if not extra_sys_id_in_source else extra_sys_id_in_source
target_sys_id_diff = '' if not extra_sys_id_in_target else extra_sys_id_in_target
logger.info(f"Columns Only In Source {self.delimiter} {source_sys_id_diff}")
logger.info(f"Columns Only In Target {self.delimiter} {target_sys_id_diff}")
logger.info("")
def _compare_records_count_in_table(self, source_session, target_session, table_name, logger):
results = {}
cut_off_date = datetime(2024, 4, 12, 23, 59, 59)
qb = (
pysnow.QueryBuilder()
.field('sys_created_on').less_than_or_equal(cut_off_date)
.AND()
.field('sys_created_on').order_descending()
)
def get_source_response_data():
result = source_session.query(table_name, query=qb)
results['source_result'] = result
def get_target_response_data():
result = target_session.query(table_name, query=qb)
results['target_result'] = result
# Create threads for each sys_id and start them
source_thread = threading.Thread(target=get_source_response_data)
target_thread = threading.Thread(target=get_target_response_data)
source_thread.start()
target_thread.start()
# Join the threads to wait for their completion
source_thread.join()
target_thread.join()
source_response = results.get('source_result')
target_response = results.get('target_result')
try:
source_records_count = source_response.count
except Exception as error:
print(type(error).__name__)
source_records_count = 0
print(traceback.format_exc())
try:
target_records_count = target_response.count
except Exception as error:
print(type(error).__name__)
target_records_count = 0
print(traceback.format_exc())
difference = source_records_count - target_records_count
status = True if difference != 0 else False
# log
logger.info(f"Source Records Count {self.delimiter} {source_records_count}")
logger.info(f"Target Records Count {self.delimiter} {target_records_count}")
logger.info(f"Found Records Difference? {self.delimiter} {status}")
logger.info(f"Difference Records Count {self.delimiter} {difference}")
logger.info("")
def compare_table(self, _table):
print(f"Started {_table} records comparison")
# Create a logger for this table
logger = logging.getLogger(_table)
status_results = {}
def get_source_status():
result = self._get_table_status(self.source_session, _table)
status_results['source_result'] = result
def get_target_status():
result = self._get_table_status(self.target_session, _table)
status_results['target_result'] = result
# Create threads for each sys_id and start them
source_thread = threading.Thread(target=get_source_status)
target_thread = threading.Thread(target=get_target_status)
source_thread.start()
target_thread.start()
# Join the threads to wait for their completion
source_thread.join()
target_thread.join()
source_status = status_results.get('source_result')
target_status = status_results.get('target_result')
logger.info(f"Source Status {self.delimiter} {source_status}")
logger.info(f"Target Status {self.delimiter} {target_status}")
results = {}
def get_source_field_type_data():
result = self._get_list_of_field_type_records_in_table(self.source_session, _table)
results['source_result'] = result
def get_target_field_type_data():
result = self._get_list_of_field_type_records_in_table(self.target_session, _table)
results['target_result'] = result
# Create threads for each sys_id and start them
source_thread = threading.Thread(target=get_source_field_type_data)
target_thread = threading.Thread(target=get_target_field_type_data)
source_thread.start()
target_thread.start()
# Join the threads to wait for their completion
source_thread.join()
target_thread.join()
source_field_records = results.get('source_result')
target_field_records = results.get('target_result')
results = {}
def get_source_limited_data():
result = self._get_limited_records_in_table(self.source_session, _table)
results['source_result'] = result
def get_target_limited_data():
result = self._get_limited_records_in_table(self.target_session, _table)
results['target_result'] = result
# Create threads for each sys_id and start them
source_thread = threading.Thread(target=get_source_limited_data)
target_thread = threading.Thread(target=get_target_limited_data)
source_thread.start()
target_thread.start()
# Join the threads to wait for their completion
source_thread.join()
target_thread.join()
source_limited_records = results.get('source_result')
target_limited_records = results.get('target_result')
# get data
self._compare_columns_in_table(source_field_records, target_field_records, logger)
self._compare_records_count_in_table(self.source_session, self.target_session, _table, logger)
self._compare_limited_records_in_table(source_limited_records, target_limited_records, _table, logger)
print(f"Completed {_table} records comparison")
if __name__ == "__main__":
file_path = "migration/tables/api_tables.txt"
with open(file_path, 'r') as file:
for table in file:
current_table_name = table.strip()
try:
obj = DataCompare()
obj.start_logging(current_table_name)
obj.compare_table(current_table_name)
except Exception as error_message:
print(f"Failed to compare `{current_table_name}` table. Exception: {error_message}")
print(traceback.format_exc())
finally:
obj.end_logging(current_table_name)
obj = ''
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment