Created
September 9, 2025 03:44
-
-
Save adiralashiva8/d26c1ba6671b0d9c274fc418c5c1d974 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import re | |
import pysnow | |
import logging | |
import threading | |
from deepdiff import DeepDiff | |
from datetime import datetime | |
class DataCompare: | |
delimiter = "|" | |
source = "dev" | |
target = "qa" | |
user_name = "abc" | |
user_password = "123456" | |
source_instance = f"{target}.service-now.com" | |
target_instance = f"{source}.service-now.com" | |
fields = ['sys_id', 'element'] | |
rest_max_records = 10000 # pysnow max records limit per hit - don't change | |
max_records_to_validate = 10000 | |
sys_dict_table = "sys_dictionary" | |
keys_to_ignore = ["root['sys_updated_on']", "root['sys_mod_count']", "root['sizeclass']", "root['text_index']", "root['sys_updated_by']", "root['sys_update_name']"] | |
def __init__(self) -> None: | |
self.source_session = pysnow.Client(host=self.source_instance, user=self.user_name, password=self.user_password) | |
self.target_session = pysnow.Client(host=self.target_instance, user=self.user_name, password=self.user_password) | |
self.source_session.parameters.exclude_reference_link = True | |
self.target_session.parameters.exclude_reference_link = True | |
self.source_field_records_sys_id = [] | |
self.target_field_records_sys_id = [] | |
self.source_field_records_elements = [] | |
self.target_field_records_elements = [] | |
def start_logging(self, my_table_name): | |
self.script_start_time = datetime.now() | |
name = f"{my_table_name}.log" | |
# Configure a logger for each table | |
logger = logging.getLogger(my_table_name) | |
logger.setLevel(logging.INFO) | |
formatter = logging.Formatter('%(message)s') | |
file_handler = logging.FileHandler(name, mode='w') | |
file_handler.setFormatter(formatter) | |
logger.addHandler(file_handler) | |
# log data | |
_table_name = my_table_name.replace("_fields",'') | |
logger.info(f"Table Name {self.delimiter} {_table_name}") | |
logger.info(f"Source {self.delimiter} {self.source_instance}") | |
logger.info(f"Target {self.delimiter} {self.target_instance}") | |
logger.info("") | |
def end_logging(self, my_table_name) : | |
logger = logging.getLogger(my_table_name) | |
time_elapsed = datetime.now() - self.script_start_time | |
_table_name = my_table_name.replace("_fields",'') | |
print(f'Script execution time for {_table_name} (hh:mm:ss.ms) {self.delimiter} {time_elapsed}') | |
# Remove the file handler to close the log file | |
for handler in logger.handlers: | |
handler.close() | |
logger.removeHandler(handler) | |
def replace_newlines_with_space(self, input_string): | |
pattern = r'\n+' | |
result_string = re.sub(pattern, ' ', input_string) | |
return result_string | |
def _get_list_of_field_type_records_in_table(self, session, table_name) -> list: | |
# fetch_records_start_time = datetime.now() | |
current_table = session.resource(api_path=f'/table/{self.sys_dict_table}', chunk_size=None) | |
page_size = self.rest_max_records | |
offset = 0 | |
total_field_results_sys_id = [] | |
while True: | |
# qb = ( | |
# pysnow.QueryBuilder().field('name').equals(table_name) | |
# ) | |
cut_off_date = datetime(2025, 4, 12, 23, 59, 59) | |
qb = ( | |
pysnow.QueryBuilder() | |
.field('name').equals(table_name) | |
.AND() | |
.field('sys_created_on').less_than_or_equal(cut_off_date) | |
.AND() | |
.field('sys_created_on').order_descending() | |
) | |
response = current_table.get(query=qb, fields=self.fields, limit=page_size, offset=offset) | |
try: | |
records = response.all() | |
except: | |
records = [] | |
total_field_results_sys_id.extend(records) | |
if len(records) < page_size: | |
break | |
else: | |
offset += page_size | |
# time_elapsed = datetime.now() - fetch_records_start_time | |
# print(f'Elapsed Time to fetch {table_name} records (hh:mm:ss.ms) > {time_elapsed}') | |
return total_field_results_sys_id | |
def _compare_records_in_table(self, source_records_list, target_records_list, table_name, logger): | |
for record in source_records_list: | |
self.source_field_records_sys_id.append(record['sys_id']) | |
self.source_field_records_elements.append(record['element']) | |
for record in target_records_list: | |
self.target_field_records_sys_id.append(record['sys_id']) | |
self.target_field_records_elements.append(record['element']) | |
# for record in source_records_list: | |
# self.source_field_records_elements.append(record['element']) | |
# for record in target_records_list: | |
# self.target_field_records_elements.append(record['element']) | |
# size and differences | |
source_fields_sys_id_size = len(self.source_field_records_sys_id) | |
target_fields_sys_id_size = len(self.target_field_records_sys_id) | |
difference = source_fields_sys_id_size - target_fields_sys_id_size | |
status = True if difference != 0 else False | |
# log | |
logger.info(f"Source Fields Count {self.delimiter} {source_fields_sys_id_size}") | |
logger.info(f"Target Fields Count {self.delimiter} {target_fields_sys_id_size}") | |
logger.info(f"Found Difference? {self.delimiter} {status}") | |
logger.info(f"Difference Count {self.delimiter} {difference}") | |
logger.info("") | |
# compare two lists and find diff | |
extra_sys_id_in_source = set(self.source_field_records_elements) - set(self.target_field_records_elements) | |
extra_sys_id_in_target = set(self.target_field_records_elements) - set(self.source_field_records_elements) | |
source_sys_id_diff = '' if not extra_sys_id_in_source else extra_sys_id_in_source | |
target_sys_id_diff = '' if not extra_sys_id_in_target else extra_sys_id_in_target | |
logger.info(f"Fields Only In Source {self.delimiter} {source_sys_id_diff}") | |
logger.info(f"Fields Only In Target {self.delimiter} {target_sys_id_diff}") | |
logger.info("") | |
# get common sys_id in both instances | |
sys_ids_in_both_instance = set([sys_id for sys_id in self.source_field_records_sys_id if sys_id in self.target_field_records_sys_id]) | |
sys_ids_in_both_instance = set(sys_ids_in_both_instance) | |
logger.info(f"Common Records Count {self.delimiter} {len(sys_ids_in_both_instance)}") | |
logger.info("") | |
logger.info(f"SYS ID {self.delimiter} Key {self.delimiter} Source {self.delimiter} Target") | |
source_table = self.source_session.resource(api_path=f'/table/{self.sys_dict_table}', chunk_size=None) | |
target_table = self.target_session.resource(api_path=f'/table/{self.sys_dict_table}', chunk_size=None) | |
# for index, sys_id in enumerate(sys_ids_in_both_instance): | |
# if index < self.max_records_to_validate: | |
# self._compare_record_value(sys_id, source_table, target_table, logger) | |
# else: | |
# break | |
for sys_id in sys_ids_in_both_instance: | |
self._compare_record_value(sys_id, source_table, target_table, logger) | |
def _compare_record_value(self, sys_id, source_table, target_table, logger): | |
# get_data_start_time = datetime.now() | |
results = {} | |
def get_source_data(): | |
result = self._get_record_data(source_table, sys_id) | |
results['source_result'] = result | |
def get_target_data(): | |
result = self._get_record_data(target_table, sys_id) | |
results['target_result'] = result | |
# Create threads for each sys_id and start them | |
source_thread = threading.Thread(target=get_source_data) | |
target_thread = threading.Thread(target=get_target_data) | |
source_thread.start() | |
target_thread.start() | |
# Join the threads to wait for their completion | |
source_thread.join() | |
target_thread.join() | |
source_response = results.get('source_result') | |
target_response = results.get('target_result') | |
# source_response = source_table.get(query={'sys_id':sys_id},stream=True).first() | |
# target_response = target_table.get(query={'sys_id':sys_id},stream=True).first() | |
# get_data_time_elapsed = datetime.now() - get_data_start_time | |
# print('Get Data Elapsed Time (hh:mm:ss.ms) {}'.format(get_data_time_elapsed)) | |
diff = dict(DeepDiff(source_response, target_response, truncate_datetime='hour')) | |
initial_keys = diff.keys() | |
for key in initial_keys: | |
if 'values_changed' in key: | |
values_changed_dict = diff['values_changed'] | |
changed_keys = values_changed_dict.keys() | |
for change_item in changed_keys: | |
if change_item not in self.keys_to_ignore: | |
local_dict = dict(values_changed_dict[change_item]) | |
source_value = self.replace_newlines_with_space(str(local_dict['old_value'])) | |
target_value = self.replace_newlines_with_space(str(local_dict['new_value'])) | |
logger.info(f"""{sys_id} {self.delimiter} {change_item} {self.delimiter} {source_value} {self.delimiter} {target_value}""") | |
def _get_record_data(self, session, sys_id): | |
return session.get(query={'sys_id':sys_id},stream=True).first() | |
def compare_table_fields(self, obj, _table): | |
table_fields_log_name = str(_table) + "_fields" | |
try: | |
print(f"Started {_table} fields comparison") | |
obj.start_logging(table_fields_log_name) | |
logger = logging.getLogger(table_fields_log_name) | |
# fetch data | |
results = {} | |
def get_source_field_type_data(): | |
result = self._get_list_of_field_type_records_in_table(self.source_session, _table) | |
results['source_result'] = result | |
def get_target_field_type_data(): | |
result = self._get_list_of_field_type_records_in_table(self.target_session, _table) | |
results['target_result'] = result | |
# Create threads for each sys_id and start them | |
source_thread = threading.Thread(target=get_source_field_type_data) | |
target_thread = threading.Thread(target=get_target_field_type_data) | |
source_thread.start() | |
target_thread.start() | |
# Join the threads to wait for their completion | |
source_thread.join() | |
target_thread.join() | |
source_field_records = results.get('source_result') | |
target_field_records = results.get('target_result') | |
# source_field_records = self._get_list_of_field_type_records_in_table(self.source_session, _table) | |
# target_field_records = self._get_list_of_field_type_records_in_table(self.target_session, _table) | |
self._compare_records_in_table(source_field_records, target_field_records, _table, logger) | |
except Exception as error_message: | |
print(f"Failed to compare `{current_table_name}` table fields.\nRefer Exception:\n{error_message}") | |
finally: | |
print(f"Completed {_table} fields comparison") | |
obj.end_logging(table_fields_log_name) | |
if __name__ == "__main__": | |
file_path = "prod_scripts/migration/tables/tables.txt" | |
with open(file_path, 'r') as file: | |
for table in file: | |
current_table_name = table.strip() | |
try: | |
obj = DataCompare() | |
obj.compare_table_fields(obj, current_table_name) | |
except Exception as error_message: | |
print(f"Failed to compare `{current_table_name}` table fields. Exception: {error_message}") | |
finally: | |
obj = '' |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import random | |
import re | |
import time | |
import pysnow | |
import logging | |
import threading | |
from deepdiff import DeepDiff | |
from datetime import datetime, timedelta | |
import traceback | |
class DataCompare: | |
delimiter = "|" | |
source = "dev" | |
target = "qa" | |
user_name = "abc" | |
user_password = "123" | |
# source_instance = f"{source}.service-now.com" | |
# target_instance = f"{target}.service-now.com" | |
source_instance = f"{target}.service-now.com" | |
target_instance = f"{source}.service-now.com" | |
fields = ['sys_id'] | |
custom_fields = ['element'] | |
rest_max_records = 10000 # pysnow max records limit per hit - don't change | |
# days = 100 # last `n` number of days data to fetch | |
records_cut_off_count = 500 # max number of records to store from 'days/query' data | |
max_records_to_validate = 50 # number of random records to compare | |
sys_dict_table = "sys_dictionary" | |
keys_to_ignore = ["root['sys_updated_on']", "root['sys_mod_count']", "root['calendar_duration']"] | |
keys_result_to_truncate = ["root['description']", "root['text']", "root['explanation']", | |
"root['u_detailed_description_html']", "root['u_latest_status']", "root['u_instruction']", | |
"root['short_description']", "root['u_access_guide']", "root['u_extension_notes']", | |
"root['u_comments']", "root['u_notes']", "root['u_order_notes']", "root['u_regulatory_comments']", | |
"root['comments']", "root['summary']", "root['sys_archived']",] | |
max_retries = 3 | |
retry_delay = 10 | |
def __init__(self) -> None: | |
self.source_session = pysnow.Client(host=self.source_instance, user=self.user_name, password=self.user_password) | |
self.target_session = pysnow.Client(host=self.target_instance, user=self.user_name, password=self.user_password) | |
self.source_session.parameters.exclude_reference_link = True | |
self.target_session.parameters.exclude_reference_link = True | |
self.source_records_sys_id = [] | |
self.target_records_sys_id = [] | |
self.source_field_records_sys_id = [] | |
self.target_field_records_sys_id = [] | |
def start_logging(self, my_table_name): | |
self.script_start_time = datetime.now() | |
name = f"{my_table_name}.log" | |
# Configure a logger for each table | |
logger = logging.getLogger(my_table_name) | |
logger.setLevel(logging.INFO) | |
formatter = logging.Formatter('%(message)s') | |
file_handler = logging.FileHandler(name, mode='w') | |
file_handler.setFormatter(formatter) | |
logger.addHandler(file_handler) | |
logger.info(f"Table Name {self.delimiter} {my_table_name}") | |
logger.info(f"Source {self.delimiter} {self.source_instance}") | |
logger.info(f"Target {self.delimiter} {self.target_instance}") | |
logger.info("") | |
def end_logging(self, my_table_name) : | |
logger = logging.getLogger(my_table_name) | |
time_elapsed = datetime.now() - self.script_start_time | |
print(f'Script execution time for {my_table_name} (hh:mm:ss.ms) {self.delimiter} {time_elapsed}') | |
for handler in logger.handlers: | |
handler.close() | |
logger.removeHandler(handler) | |
def replace_newlines_with_space(self, input_string): | |
pattern = r'\n+' | |
result_string = re.sub(pattern, ' ', input_string) | |
return result_string | |
def _get_list_of_field_type_records_in_table(self, session, table_name) -> list: | |
# fetch_records_start_time = datetime.now() | |
current_table = session.resource(api_path=f'/table/{self.sys_dict_table}', chunk_size=None) | |
page_size = self.rest_max_records | |
offset = 0 | |
total_field_results_sys_id = [] | |
while True: | |
# qb = ( | |
# pysnow.QueryBuilder().field('name').equals(table_name) | |
# ) | |
cut_off_date = datetime(2024, 4, 12, 23, 59, 59) | |
qb = ( | |
pysnow.QueryBuilder() | |
.field('name').equals(table_name) | |
.AND() | |
.field('sys_created_on').less_than_or_equal(cut_off_date) | |
.AND() | |
.field('sys_created_on').order_descending() | |
) | |
response = current_table.get(query=qb, fields=self.custom_fields, limit=page_size, offset=offset) | |
try: | |
records = response.all() | |
except: | |
records = [] | |
total_field_results_sys_id.extend(records) | |
if len(records) < page_size: | |
break | |
else: | |
offset += page_size | |
return total_field_results_sys_id | |
def fetch_random_records(self, my_list, limit): | |
try: | |
records = my_list | |
random.shuffle(records) | |
random_records = [] | |
for record in records: | |
if record not in random_records: | |
random_records.append(record) | |
random_records = random_records[:limit] | |
except Exception as error_message: | |
print(traceback.format_exc()) | |
return random_records | |
def _get_table_status(self, session, table_name) -> str: | |
current_table = session.resource(api_path=f'/table/{table_name}', chunk_size=None) | |
page_size = 1 | |
offset = 0 | |
limited_results_sys_id = [] | |
result = None | |
cut_off_date = datetime(2024, 4, 12, 23, 59, 59) | |
qb = ( | |
pysnow.QueryBuilder() | |
.field('sys_created_on').less_than_or_equal(cut_off_date) # before date logic | |
.AND() | |
.field('sys_created_on').order_descending() | |
) | |
response = current_table.get(query=qb, fields=self.fields, limit=page_size, offset=offset) | |
try: | |
records = response.all() | |
result = "Pass" | |
except Exception as error: | |
print(error) | |
result = "Fail" | |
return result | |
def _get_limited_records_in_table(self, session, table_name) -> list: | |
# fetch_records_start_time = datetime.now() | |
current_table = session.resource(api_path=f'/table/{table_name}', chunk_size=None) | |
page_size = min(self.rest_max_records, self.records_cut_off_count) | |
offset = 0 | |
limited_results_sys_id = [] | |
cut_off_date = datetime(2024, 4, 12, 23, 59, 59) | |
while len(limited_results_sys_id) < self.records_cut_off_count: | |
qb = ( | |
pysnow.QueryBuilder() | |
# .field('sys_created_on').greater_than_or_equal(start_date) # after date logic | |
# .field('sys_created_on').between(start,end) # b/w logic | |
.field('sys_created_on').less_than_or_equal(cut_off_date) # before date logic | |
.AND() | |
.field('sys_created_on').order_descending() | |
) | |
# response = current_table.get(query="sys_created_on>javascript:gs.dateGenerate('2023-01-01','end')", fields=self.fields, limit=page_size, offset=offset) | |
response = current_table.get(query=qb, fields=self.fields, limit=page_size, offset=offset) | |
try: | |
records = response.all() | |
except: | |
records = [] | |
limited_results_sys_id.extend(records) | |
if len(records) < page_size: | |
break | |
else: | |
offset += page_size | |
# time_elapsed = datetime.now() - fetch_records_start_time | |
# print(f'Elapsed Time to fetch {table_name} records (hh:mm:ss.ms) > {time_elapsed}') | |
return limited_results_sys_id | |
def _get_list_of_columns_in_table(self, session, table_name) -> dict: | |
# fetch_columns_start_time = datetime.now() | |
current_table = session.resource(api_path=f'/table/{table_name}') | |
try: | |
current_table_result = current_table.get(stream=True).first() | |
except Exception as error_message: | |
print(traceback.format_exc()) | |
current_table_result = dict() | |
current_table_keys = current_table_result.keys() | |
# time_elapsed = datetime.now() - fetch_columns_start_time | |
# print(f'Elapsed Time to fetch {table_name} columns (hh:mm:ss.ms) > {time_elapsed}') | |
return current_table_keys | |
def _compare_limited_records_in_table(self, source_records_list, target_records_list, table_name, logger): | |
for record in source_records_list: | |
self.source_records_sys_id.append(record['sys_id']) | |
for record in target_records_list: | |
self.target_records_sys_id.append(record['sys_id']) | |
logger.info(f"Filtered Source Records Count {self.delimiter} {len(self.source_records_sys_id)}") | |
logger.info(f"Filtered Target Records Count {self.delimiter} {len(self.target_records_sys_id)}") | |
# get common sys_id in both instances | |
sys_ids_in_both_instance = [sys_id for sys_id in self.source_records_sys_id if sys_id in self.target_records_sys_id] | |
# sys_ids_in_both_instance = set(sys_ids_in_both_instance) | |
sys_ids_in_both_instance_count = len(sys_ids_in_both_instance) | |
# fetch random records from list of common sys_id's | |
_records_validate_limit = min(sys_ids_in_both_instance_count, self.max_records_to_validate) | |
random_common_sys_ids = self.fetch_random_records(sys_ids_in_both_instance, _records_validate_limit) | |
logger.info(f"Common Records Count {self.delimiter} {sys_ids_in_both_instance_count}") | |
status = True if {sys_ids_in_both_instance_count} == 0 else False | |
logger.info(f"Common Records Not Found? {self.delimiter} {status}") | |
logger.info(f"Random Record Samples Count {self.delimiter} {len(random_common_sys_ids)}") | |
logger.info("") | |
logger.info(f"SYS ID {self.delimiter} Key {self.delimiter} Source {self.delimiter} Target") | |
source_table = self.source_session.resource(api_path=f'/table/{table_name}', chunk_size=None) | |
target_table = self.target_session.resource(api_path=f'/table/{table_name}', chunk_size=None) | |
for sys_id in random_common_sys_ids: | |
self._compare_record_value(sys_id, source_table, target_table, logger) | |
def _compare_record_value(self, sys_id, source_table, target_table, logger): | |
# get_data_start_time = datetime.now() | |
results = {} | |
def get_source_data(): | |
result = self._get_record_data(source_table, sys_id) | |
results['source_result'] = result | |
def get_target_data(): | |
result = self._get_record_data(target_table, sys_id) | |
results['target_result'] = result | |
# Create threads for each sys_id and start them | |
source_thread = threading.Thread(target=get_source_data) | |
target_thread = threading.Thread(target=get_target_data) | |
source_thread.start() | |
target_thread.start() | |
# Join the threads to wait for their completion | |
source_thread.join() | |
target_thread.join() | |
source_response = results.get('source_result') | |
target_response = results.get('target_result') | |
diff = dict(DeepDiff(source_response, target_response, truncate_datetime='hour')) | |
initial_keys = diff.keys() | |
for key in initial_keys: | |
if 'values_changed' in key: | |
values_changed_dict = diff['values_changed'] | |
changed_keys = values_changed_dict.keys() | |
for change_item in changed_keys: | |
if change_item not in self.keys_to_ignore: | |
local_dict = dict(values_changed_dict[change_item]) | |
source_value = self.replace_newlines_with_space(str(local_dict['old_value'])) | |
target_value = self.replace_newlines_with_space(str(local_dict['new_value'])) | |
if change_item not in self.keys_result_to_truncate: | |
logger.info(f"""{sys_id} {self.delimiter} {change_item} {self.delimiter} {source_value} {self.delimiter} {target_value}""") | |
else: | |
logger.info(f"""{sys_id} {self.delimiter} {change_item} {self.delimiter} Refer changes in Source UI {self.delimiter} Refer changes in Target UI""") | |
def _get_record_data(self, session, sys_id): | |
return session.get(query={'sys_id':sys_id},stream=True).first() | |
def _compare_columns_in_table(self, source_keys_list, target_keys_list, logger): | |
for record in source_keys_list: | |
self.source_field_records_sys_id.append(record['element']) | |
for record in target_keys_list: | |
self.target_field_records_sys_id.append(record['element']) | |
# size and differences | |
source_fields_sys_id_size = len(self.source_field_records_sys_id) | |
target_fields_sys_id_size = len(self.target_field_records_sys_id) | |
difference = source_fields_sys_id_size - target_fields_sys_id_size | |
status = True if difference != 0 else False | |
logger.info(f"Source Columns Count {self.delimiter} {source_fields_sys_id_size}") | |
logger.info(f"Target Columns Count {self.delimiter} {target_fields_sys_id_size}") | |
logger.info(f"Found Columns Difference? {self.delimiter} {status}") | |
logger.info(f"Difference Columns Count {self.delimiter} {difference}") | |
# compare two lists and find diff | |
extra_sys_id_in_source = set(self.source_field_records_sys_id) - set(self.target_field_records_sys_id) | |
extra_sys_id_in_target = set(self.target_field_records_sys_id) - set(self.source_field_records_sys_id) | |
source_sys_id_diff = '' if not extra_sys_id_in_source else extra_sys_id_in_source | |
target_sys_id_diff = '' if not extra_sys_id_in_target else extra_sys_id_in_target | |
logger.info(f"Columns Only In Source {self.delimiter} {source_sys_id_diff}") | |
logger.info(f"Columns Only In Target {self.delimiter} {target_sys_id_diff}") | |
logger.info("") | |
def _compare_records_count_in_table(self, source_session, target_session, table_name, logger): | |
results = {} | |
cut_off_date = datetime(2024, 4, 12, 23, 59, 59) | |
qb = ( | |
pysnow.QueryBuilder() | |
.field('sys_created_on').less_than_or_equal(cut_off_date) | |
.AND() | |
.field('sys_created_on').order_descending() | |
) | |
def get_source_response_data(): | |
result = source_session.query(table_name, query=qb) | |
results['source_result'] = result | |
def get_target_response_data(): | |
result = target_session.query(table_name, query=qb) | |
results['target_result'] = result | |
# Create threads for each sys_id and start them | |
source_thread = threading.Thread(target=get_source_response_data) | |
target_thread = threading.Thread(target=get_target_response_data) | |
source_thread.start() | |
target_thread.start() | |
# Join the threads to wait for their completion | |
source_thread.join() | |
target_thread.join() | |
source_response = results.get('source_result') | |
target_response = results.get('target_result') | |
try: | |
source_records_count = source_response.count | |
except Exception as error: | |
print(type(error).__name__) | |
source_records_count = 0 | |
print(traceback.format_exc()) | |
try: | |
target_records_count = target_response.count | |
except Exception as error: | |
print(type(error).__name__) | |
target_records_count = 0 | |
print(traceback.format_exc()) | |
difference = source_records_count - target_records_count | |
status = True if difference != 0 else False | |
# log | |
logger.info(f"Source Records Count {self.delimiter} {source_records_count}") | |
logger.info(f"Target Records Count {self.delimiter} {target_records_count}") | |
logger.info(f"Found Records Difference? {self.delimiter} {status}") | |
logger.info(f"Difference Records Count {self.delimiter} {difference}") | |
logger.info("") | |
def compare_table(self, _table): | |
print(f"Started {_table} records comparison") | |
# Create a logger for this table | |
logger = logging.getLogger(_table) | |
status_results = {} | |
def get_source_status(): | |
result = self._get_table_status(self.source_session, _table) | |
status_results['source_result'] = result | |
def get_target_status(): | |
result = self._get_table_status(self.target_session, _table) | |
status_results['target_result'] = result | |
# Create threads for each sys_id and start them | |
source_thread = threading.Thread(target=get_source_status) | |
target_thread = threading.Thread(target=get_target_status) | |
source_thread.start() | |
target_thread.start() | |
# Join the threads to wait for their completion | |
source_thread.join() | |
target_thread.join() | |
source_status = status_results.get('source_result') | |
target_status = status_results.get('target_result') | |
logger.info(f"Source Status {self.delimiter} {source_status}") | |
logger.info(f"Target Status {self.delimiter} {target_status}") | |
results = {} | |
def get_source_field_type_data(): | |
result = self._get_list_of_field_type_records_in_table(self.source_session, _table) | |
results['source_result'] = result | |
def get_target_field_type_data(): | |
result = self._get_list_of_field_type_records_in_table(self.target_session, _table) | |
results['target_result'] = result | |
# Create threads for each sys_id and start them | |
source_thread = threading.Thread(target=get_source_field_type_data) | |
target_thread = threading.Thread(target=get_target_field_type_data) | |
source_thread.start() | |
target_thread.start() | |
# Join the threads to wait for their completion | |
source_thread.join() | |
target_thread.join() | |
source_field_records = results.get('source_result') | |
target_field_records = results.get('target_result') | |
results = {} | |
def get_source_limited_data(): | |
result = self._get_limited_records_in_table(self.source_session, _table) | |
results['source_result'] = result | |
def get_target_limited_data(): | |
result = self._get_limited_records_in_table(self.target_session, _table) | |
results['target_result'] = result | |
# Create threads for each sys_id and start them | |
source_thread = threading.Thread(target=get_source_limited_data) | |
target_thread = threading.Thread(target=get_target_limited_data) | |
source_thread.start() | |
target_thread.start() | |
# Join the threads to wait for their completion | |
source_thread.join() | |
target_thread.join() | |
source_limited_records = results.get('source_result') | |
target_limited_records = results.get('target_result') | |
# get data | |
self._compare_columns_in_table(source_field_records, target_field_records, logger) | |
self._compare_records_count_in_table(self.source_session, self.target_session, _table, logger) | |
self._compare_limited_records_in_table(source_limited_records, target_limited_records, _table, logger) | |
print(f"Completed {_table} records comparison") | |
if __name__ == "__main__": | |
file_path = "migration/tables/api_tables.txt" | |
with open(file_path, 'r') as file: | |
for table in file: | |
current_table_name = table.strip() | |
try: | |
obj = DataCompare() | |
obj.start_logging(current_table_name) | |
obj.compare_table(current_table_name) | |
except Exception as error_message: | |
print(f"Failed to compare `{current_table_name}` table. Exception: {error_message}") | |
print(traceback.format_exc()) | |
finally: | |
obj.end_logging(current_table_name) | |
obj = '' |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment