Created
April 25, 2024 21:54
-
-
Save tbernacchi/3acffe104afd003384c39bc170f5a91a to your computer and use it in GitHub Desktop.
Mongodump collection 90 days behind and send to a Bucket on GCP. (MongoDB it's a statefulset on k8s)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import gzip | |
import os | |
from datetime import datetime, timedelta | |
from pymongo import MongoClient | |
import bson | |
import json | |
from google.cloud import storage | |
def get_date_90_days_ago_ms(): | |
days = int(os.environ.get('DAYS', 90)) # Use 90 as the default value if 'DAYS' is not set | |
# Calculates the timestamp 90 days ago in milliseconds. | |
ninety_days_ago = datetime.now() - timedelta(days=days) | |
date = int(ninety_days_ago.strftime("%s000")) | |
return date | |
milliseconds = get_date_90_days_ago_ms() | |
print(f"Timestamp 90 days ago: {milliseconds}") | |
# Define the credentials and connection details | |
mongo_user = "root" | |
mongo_pass = "passwd" | |
host = "mongodb-novo-0.mongodb-novo-headless" | |
port = 27017 | |
replica_set = "rs0" | |
authSource = "admin" | |
mongo_db_pipeline = "pipelinecore" | |
connection_uri = f"mongodb://{mongo_user}:{mongo_pass}@{host}:{port}/?replicaSet={replica_set}&authSource={authSource}" | |
print(f"Connection URI: {connection_uri}") | |
# Connect to MongoDB replica set | |
client = MongoClient(connection_uri) | |
db = client[mongo_db_pipeline] | |
print(f"Connection successful! You can access database: { mongo_db_pipeline }") | |
# Connection details | |
collection_name = "auditing" | |
query = { "dateStart": {"$gte": milliseconds }} # Query to filter documents. | |
output_dir = "/tmp" | |
print(f"Query: {query}") | |
# Connect to MongoDB | |
client = MongoClient(connection_uri) | |
db = client[mongo_db_pipeline] | |
print("It was found %d documents from auditing" % db.auditing.count_documents({})) | |
print( | |
"It was found %d documents from auditing using query %s" | |
% (db.auditing.count_documents(query), json.dumps(query)) | |
) | |
# Dump the collection data with query | |
try: | |
with gzip.open(f"{output_dir}/auditing-90-days.bson.gz", "wb") as dump_file: | |
# Use the query with find() method | |
for doc in db[collection_name].find(query): | |
dump_file.write(bson.BSON.encode(doc)) # Serialize each document to BSON | |
print(f"Data dumped to {output_dir}/auditing-90-days.bson.gz successfully...") | |
except Exception as e: | |
print(f"Error during data dump: {e}") | |
finally: | |
# Close the connection | |
client.close() | |
# Google Cloud Storage bucket | |
client = storage.Client() | |
current_datetime = datetime.now() | |
timestamp = current_datetime.strftime("%Y_%m_%d__%H:%M") | |
# Define the source file path and destination bucket and blob names | |
source_file_path = f"{output_dir}/auditing-90-days.bson.gz" | |
destination_bucket_name = "my-bucket" | |
destination_blob_name = f"{timestamp}__auditing-90-days.bson.gz" | |
# Get the destination bucket | |
bucket = client.get_bucket(destination_bucket_name) | |
# Create a blob object in the destination bucket | |
blob = bucket.blob(destination_blob_name) | |
# Upload the file to the destination blob | |
blob.upload_from_filename(source_file_path) | |
print(f'File {source_file_path} uploaded to gs://{destination_bucket_name}/{destination_blob_name} successfully!') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment