Skip to content

Instantly share code, notes, and snippets.

@Babatunde13
Last active April 26, 2025 18:05
Show Gist options
  • Save Babatunde13/77cd942badea93e9705c0625e898b0fb to your computer and use it in GitHub Desktop.
Save Babatunde13/77cd942badea93e9705c0625e898b0fb to your computer and use it in GitHub Desktop.
An easy script that pulls data from a bucket and cache and compares the destination for both data source
import boto3, json, datetime, redis
def get_unique_destinations(data, key: str):
unique_destinations = set()
all_destinations = []
for item in data:
if key in item:
unique_destinations.add(item[key])
all_destinations.append(item[key])
# Remove duplicates by converting the set back to a list
unique_destinations = list(unique_destinations)
# get the duplicate destinations
duplicates = [item for item in all_destinations if all_destinations.count(item) > 1]
return list(unique_destinations), all_destinations, duplicates
class S3Client:
def __init__(self, bucket_name: str):
self.bucket_name = bucket_name
self.s3 = boto3.client('s3')
def read_from_s3(self, key: str):
obj = self.s3.get_object(Bucket=self.bucket_name, Key=key)
json_string = obj['Body'].read().decode('utf-8')
json_data = json.loads(json_string)
return json_data
def read_all_files_from_s3_folder(self, folder_name):
s3 = boto3.client('s3')
# List all files in the specified S3 bucket and folder
response = self.s3.list_objects_v2(Bucket=self.bucket_name, Prefix=folder_name)
# Extract file names from the response
file_names = [content['Key'] for content in response.get('Contents', [])]
# Filter out the folder name from the file names
file_names = [file_name for file_name in file_names if file_name != folder_name]
print(f"Found {len(file_names)} files in the folder: {folder_name}")
for file_name in file_names:
print(f"File: {file_name}")
response = {
"org_id": "",
"country_id": "",
"data": [],
"unresolved": [],
"comm_channel": "",
"resolved_customer_size": 0,
"unresolved_customer_size": 0,
"page_number": [],
"total_customer_size": 0,
"run_id": "",
}
for file_name in file_names:
now = datetime.datetime.now()
json_data = self.read_from_s3(file_name)
response["org_id"] = json_data["org_id"]
response["country_id"] = json_data["country_id"]
response["comm_channel"] = json_data["comm_channel"]
if json_data.get("data") is not None:
current_data = json_data["data"]
if isinstance(current_data, list):
response["data"] = response["data"] + current_data
else:
print(f"Data is not a list for file: {file_name} with page number: {json_data['page_number']}, customer size: {json_data['total_customer_size']}")
else:
print(f"Data is None for file: {file_name} with page number: {json_data['page_number']}, customer size: {json_data['total_customer_size']}")
if json_data.get("unresolved") is not None:
current_unresolved = json_data.get("unresolved")
if isinstance(current_unresolved, list):
response["unresolved"] = response["unresolved"] + current_unresolved
else:
print(f"Unresolved is not a list for file: {file_name} with page number: {json_data['page_number']}, customer size: {json_data['total_customer_size']}")
else:
print(f"Unresolved is None for file: {file_name} with page number: {json_data['page_number']}, customer size: {json_data['total_customer_size']}")
response["resolved_customer_size"] = response["resolved_customer_size"] + json_data["resolved_customer_size"] if json_data.get("resolved_customer_size") is not None else response["resolved_customer_size"]
response["unresolved_customer_size"] = response["unresolved_customer_size"] + json_data["unresolved_customer_size"] if json_data.get("unresolved_customer_size") is not None else response["unresolved_customer_size"]
response["total_customer_size"] = response["total_customer_size"] + json_data["total_customer_size"] if json_data.get("total_customer_size") is not None else response["total_customer_size"]
response["run_id"] = json_data["run_id"]
end = datetime.datetime.now()
print(f"Processed file: {file_name} with page number: {json_data.get('page_number')}, customer size: {json_data.get('total_customer_size')} in {end - now} seconds")
return response
def process_from_s3(self):
folder_name = input("Enter the folder name: ")
# Read all files from S3
full_json = self.read_all_files_from_s3_folder(folder_name)
# Get unique destinations
unique_destinations, all_destinations, duplicates_destinations = get_unique_destinations(full_json["data"], "destination")
print(f"Unique destination count: {len(unique_destinations)}")
print(f"All destination count: {len(all_destinations)}")
print(f"Duplicate destination count: {len(duplicates_destinations)}")
print(f"Total resolved customer size: {full_json['resolved_customer_size']}")
print(f"Total unresolved customer size: {full_json['unresolved_customer_size']}")
print(f"Total customer size: {full_json['total_customer_size']}")
print(f"Total data size: {len(full_json['data'])}")
print(f"Total unresolved size: {len(full_json['unresolved'])}")
# save to a file called cache_response.json
response = {
"unique_destinations": unique_destinations,
"all_destinations": all_destinations,
"duplicates": duplicates_destinations,
"resolved_customer_size": full_json["resolved_customer_size"],
"unresolved_customer_size": full_json["unresolved_customer_size"],
"total_customer_size": full_json["total_customer_size"],
"total_data_size": len(full_json["data"]),
"total_unresolved_size": len(full_json["unresolved"]),
}
filename = "s3_response.json"
with open(filename, "w") as outfile:
json.dump(response, outfile, indent=4)
return filename
class RedisClient:
def __init__(self, key: str, host: str, port: int, db: int, password: str):
self.key = key
self.redis = redis.Redis(host=host, port=port, db=db, password=password)
def read_list_from_redis(self):
# get the list from redis
sms_data = self.redis.lrange(self.key, 0, -1)
email_key = self.key.replace("sms", "email")
email_data = self.redis.lrange(email_key, 0, -1)
# the item inside data is an array of bytes
# each byte is a json string
# convert the byte to string and then to json
data = [json.loads(item.decode('utf-8')) for item in sms_data] + [json.loads(item.decode('utf-8')) for item in email_data]
return data
def process_from_redis(self):
data = self.read_list_from_redis()
unique_destinations, all_destinations, duplicates = get_unique_destinations(data, 'to')
print(f"Unique numbers count: {len(unique_destinations)}")
print(f"All destinations count: {len(all_destinations)}")
print(f"Duplicates destinations count: {len(duplicates)}")
response = {
"unique_destinations": unique_destinations,
"all_destinations": all_destinations,
"duplicates": duplicates
}
# save to a file called cache_response.json
filename = "cache_response.json"
with open(filename, "w") as outfile:
json.dump(response, outfile, indent=4)
print(f"Response saved to cache_response.json")
return filename
# find all numbers that are in s3 but not in redis
# find all numbers that are in redis but not in s3
def get_missing_destinations(s3_destinations, redis_destinations):
# use set to find the difference
s3_set = set(s3_destinations)
redis_set = set(redis_destinations)
# find the difference
missing_in_redis = s3_set - redis_set
missing_in_s3 = redis_set - s3_set
return list(missing_in_redis), list(missing_in_s3)
def get_missing_destinations_from_s3_and_redis(s3_filename: str, redis_filename: str):
# read the s3 file
with open(s3_filename, "r") as s3_file:
s3_data = json.load(s3_file)
# read the redis file
with open(redis_filename, "r") as redis_file:
redis_data = json.load(redis_file)
# get missing destinations
missing_in_redis, missing_in_s3 = get_missing_destinations(s3_data["unique_destinations"], redis_data["unique_destinations"])
print(f"Missing in redis: {len(missing_in_redis)}")
print(f"Missing in s3: {len(missing_in_s3)}")
def main():
try:
key = input("Enter the key: ")
host = input("Enter the host: ")
port = int(input("Enter the port: "))
db = int(input("Enter the db: "))
password = input("Enter the password: ")
bucket_name = input("Enter the bucket name: ")
except ValueError as e:
print(f"Invalid input: {e}")
except Exception as e:
print(f"An error occurred: {e}")
return
print(f"Reading from S3 bucket: {bucket_name}")
s3_client = S3Client(bucket_name)
s3_processed_file = s3_client.process_from_s3()
print()
print()
print(f"Reading from Redis key: {key}")
redis_client = RedisClient(key, host, port, db, password)
redis_processed_file = redis_client.process_from_redis()
get_missing_destinations_from_s3_and_redis(s3_processed_file, redis_processed_file)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment