Last active
April 26, 2025 18:05
-
-
Save Babatunde13/77cd942badea93e9705c0625e898b0fb to your computer and use it in GitHub Desktop.
An easy script that pulls data from a bucket and cache and compares the destination for both data source
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import boto3, json, datetime, redis | |
def get_unique_destinations(data, key: str): | |
unique_destinations = set() | |
all_destinations = [] | |
for item in data: | |
if key in item: | |
unique_destinations.add(item[key]) | |
all_destinations.append(item[key]) | |
# Remove duplicates by converting the set back to a list | |
unique_destinations = list(unique_destinations) | |
# get the duplicate destinations | |
duplicates = [item for item in all_destinations if all_destinations.count(item) > 1] | |
return list(unique_destinations), all_destinations, duplicates | |
class S3Client: | |
def __init__(self, bucket_name: str): | |
self.bucket_name = bucket_name | |
self.s3 = boto3.client('s3') | |
def read_from_s3(self, key: str): | |
obj = self.s3.get_object(Bucket=self.bucket_name, Key=key) | |
json_string = obj['Body'].read().decode('utf-8') | |
json_data = json.loads(json_string) | |
return json_data | |
def read_all_files_from_s3_folder(self, folder_name): | |
s3 = boto3.client('s3') | |
# List all files in the specified S3 bucket and folder | |
response = self.s3.list_objects_v2(Bucket=self.bucket_name, Prefix=folder_name) | |
# Extract file names from the response | |
file_names = [content['Key'] for content in response.get('Contents', [])] | |
# Filter out the folder name from the file names | |
file_names = [file_name for file_name in file_names if file_name != folder_name] | |
print(f"Found {len(file_names)} files in the folder: {folder_name}") | |
for file_name in file_names: | |
print(f"File: {file_name}") | |
response = { | |
"org_id": "", | |
"country_id": "", | |
"data": [], | |
"unresolved": [], | |
"comm_channel": "", | |
"resolved_customer_size": 0, | |
"unresolved_customer_size": 0, | |
"page_number": [], | |
"total_customer_size": 0, | |
"run_id": "", | |
} | |
for file_name in file_names: | |
now = datetime.datetime.now() | |
json_data = self.read_from_s3(file_name) | |
response["org_id"] = json_data["org_id"] | |
response["country_id"] = json_data["country_id"] | |
response["comm_channel"] = json_data["comm_channel"] | |
if json_data.get("data") is not None: | |
current_data = json_data["data"] | |
if isinstance(current_data, list): | |
response["data"] = response["data"] + current_data | |
else: | |
print(f"Data is not a list for file: {file_name} with page number: {json_data['page_number']}, customer size: {json_data['total_customer_size']}") | |
else: | |
print(f"Data is None for file: {file_name} with page number: {json_data['page_number']}, customer size: {json_data['total_customer_size']}") | |
if json_data.get("unresolved") is not None: | |
current_unresolved = json_data.get("unresolved") | |
if isinstance(current_unresolved, list): | |
response["unresolved"] = response["unresolved"] + current_unresolved | |
else: | |
print(f"Unresolved is not a list for file: {file_name} with page number: {json_data['page_number']}, customer size: {json_data['total_customer_size']}") | |
else: | |
print(f"Unresolved is None for file: {file_name} with page number: {json_data['page_number']}, customer size: {json_data['total_customer_size']}") | |
response["resolved_customer_size"] = response["resolved_customer_size"] + json_data["resolved_customer_size"] if json_data.get("resolved_customer_size") is not None else response["resolved_customer_size"] | |
response["unresolved_customer_size"] = response["unresolved_customer_size"] + json_data["unresolved_customer_size"] if json_data.get("unresolved_customer_size") is not None else response["unresolved_customer_size"] | |
response["total_customer_size"] = response["total_customer_size"] + json_data["total_customer_size"] if json_data.get("total_customer_size") is not None else response["total_customer_size"] | |
response["run_id"] = json_data["run_id"] | |
end = datetime.datetime.now() | |
print(f"Processed file: {file_name} with page number: {json_data.get('page_number')}, customer size: {json_data.get('total_customer_size')} in {end - now} seconds") | |
return response | |
def process_from_s3(self): | |
folder_name = input("Enter the folder name: ") | |
# Read all files from S3 | |
full_json = self.read_all_files_from_s3_folder(folder_name) | |
# Get unique destinations | |
unique_destinations, all_destinations, duplicates_destinations = get_unique_destinations(full_json["data"], "destination") | |
print(f"Unique destination count: {len(unique_destinations)}") | |
print(f"All destination count: {len(all_destinations)}") | |
print(f"Duplicate destination count: {len(duplicates_destinations)}") | |
print(f"Total resolved customer size: {full_json['resolved_customer_size']}") | |
print(f"Total unresolved customer size: {full_json['unresolved_customer_size']}") | |
print(f"Total customer size: {full_json['total_customer_size']}") | |
print(f"Total data size: {len(full_json['data'])}") | |
print(f"Total unresolved size: {len(full_json['unresolved'])}") | |
# save to a file called cache_response.json | |
response = { | |
"unique_destinations": unique_destinations, | |
"all_destinations": all_destinations, | |
"duplicates": duplicates_destinations, | |
"resolved_customer_size": full_json["resolved_customer_size"], | |
"unresolved_customer_size": full_json["unresolved_customer_size"], | |
"total_customer_size": full_json["total_customer_size"], | |
"total_data_size": len(full_json["data"]), | |
"total_unresolved_size": len(full_json["unresolved"]), | |
} | |
filename = "s3_response.json" | |
with open(filename, "w") as outfile: | |
json.dump(response, outfile, indent=4) | |
return filename | |
class RedisClient: | |
def __init__(self, key: str, host: str, port: int, db: int, password: str): | |
self.key = key | |
self.redis = redis.Redis(host=host, port=port, db=db, password=password) | |
def read_list_from_redis(self): | |
# get the list from redis | |
sms_data = self.redis.lrange(self.key, 0, -1) | |
email_key = self.key.replace("sms", "email") | |
email_data = self.redis.lrange(email_key, 0, -1) | |
# the item inside data is an array of bytes | |
# each byte is a json string | |
# convert the byte to string and then to json | |
data = [json.loads(item.decode('utf-8')) for item in sms_data] + [json.loads(item.decode('utf-8')) for item in email_data] | |
return data | |
def process_from_redis(self): | |
data = self.read_list_from_redis() | |
unique_destinations, all_destinations, duplicates = get_unique_destinations(data, 'to') | |
print(f"Unique numbers count: {len(unique_destinations)}") | |
print(f"All destinations count: {len(all_destinations)}") | |
print(f"Duplicates destinations count: {len(duplicates)}") | |
response = { | |
"unique_destinations": unique_destinations, | |
"all_destinations": all_destinations, | |
"duplicates": duplicates | |
} | |
# save to a file called cache_response.json | |
filename = "cache_response.json" | |
with open(filename, "w") as outfile: | |
json.dump(response, outfile, indent=4) | |
print(f"Response saved to cache_response.json") | |
return filename | |
# find all numbers that are in s3 but not in redis | |
# find all numbers that are in redis but not in s3 | |
def get_missing_destinations(s3_destinations, redis_destinations): | |
# use set to find the difference | |
s3_set = set(s3_destinations) | |
redis_set = set(redis_destinations) | |
# find the difference | |
missing_in_redis = s3_set - redis_set | |
missing_in_s3 = redis_set - s3_set | |
return list(missing_in_redis), list(missing_in_s3) | |
def get_missing_destinations_from_s3_and_redis(s3_filename: str, redis_filename: str): | |
# read the s3 file | |
with open(s3_filename, "r") as s3_file: | |
s3_data = json.load(s3_file) | |
# read the redis file | |
with open(redis_filename, "r") as redis_file: | |
redis_data = json.load(redis_file) | |
# get missing destinations | |
missing_in_redis, missing_in_s3 = get_missing_destinations(s3_data["unique_destinations"], redis_data["unique_destinations"]) | |
print(f"Missing in redis: {len(missing_in_redis)}") | |
print(f"Missing in s3: {len(missing_in_s3)}") | |
def main(): | |
try: | |
key = input("Enter the key: ") | |
host = input("Enter the host: ") | |
port = int(input("Enter the port: ")) | |
db = int(input("Enter the db: ")) | |
password = input("Enter the password: ") | |
bucket_name = input("Enter the bucket name: ") | |
except ValueError as e: | |
print(f"Invalid input: {e}") | |
except Exception as e: | |
print(f"An error occurred: {e}") | |
return | |
print(f"Reading from S3 bucket: {bucket_name}") | |
s3_client = S3Client(bucket_name) | |
s3_processed_file = s3_client.process_from_s3() | |
print() | |
print() | |
print(f"Reading from Redis key: {key}") | |
redis_client = RedisClient(key, host, port, db, password) | |
redis_processed_file = redis_client.process_from_redis() | |
get_missing_destinations_from_s3_and_redis(s3_processed_file, redis_processed_file) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment