Skip to content

Instantly share code, notes, and snippets.

@tbernacchi
Created April 25, 2024 21:54
Show Gist options
  • Save tbernacchi/3acffe104afd003384c39bc170f5a91a to your computer and use it in GitHub Desktop.
Save tbernacchi/3acffe104afd003384c39bc170f5a91a to your computer and use it in GitHub Desktop.
Mongodump collection 90 days behind and send to a Bucket on GCP. (MongoDB it's a statefulset on k8s)
import gzip
import os
from datetime import datetime, timedelta
from pymongo import MongoClient
import bson
import json
from google.cloud import storage
def get_date_90_days_ago_ms():
days = int(os.environ.get('DAYS', 90)) # Use 90 as the default value if 'DAYS' is not set
# Calculates the timestamp 90 days ago in milliseconds.
ninety_days_ago = datetime.now() - timedelta(days=days)
date = int(ninety_days_ago.strftime("%s000"))
return date
milliseconds = get_date_90_days_ago_ms()
print(f"Timestamp 90 days ago: {milliseconds}")
# Define the credentials and connection details
mongo_user = "root"
mongo_pass = "passwd"
host = "mongodb-novo-0.mongodb-novo-headless"
port = 27017
replica_set = "rs0"
authSource = "admin"
mongo_db_pipeline = "pipelinecore"
connection_uri = f"mongodb://{mongo_user}:{mongo_pass}@{host}:{port}/?replicaSet={replica_set}&authSource={authSource}"
print(f"Connection URI: {connection_uri}")
# Connect to MongoDB replica set
client = MongoClient(connection_uri)
db = client[mongo_db_pipeline]
print(f"Connection successful! You can access database: { mongo_db_pipeline }")
# Connection details
collection_name = "auditing"
query = { "dateStart": {"$gte": milliseconds }} # Query to filter documents.
output_dir = "/tmp"
print(f"Query: {query}")
# Connect to MongoDB
client = MongoClient(connection_uri)
db = client[mongo_db_pipeline]
print("It was found %d documents from auditing" % db.auditing.count_documents({}))
print(
"It was found %d documents from auditing using query %s"
% (db.auditing.count_documents(query), json.dumps(query))
)
# Dump the collection data with query
try:
with gzip.open(f"{output_dir}/auditing-90-days.bson.gz", "wb") as dump_file:
# Use the query with find() method
for doc in db[collection_name].find(query):
dump_file.write(bson.BSON.encode(doc)) # Serialize each document to BSON
print(f"Data dumped to {output_dir}/auditing-90-days.bson.gz successfully...")
except Exception as e:
print(f"Error during data dump: {e}")
finally:
# Close the connection
client.close()
# Google Cloud Storage bucket
client = storage.Client()
current_datetime = datetime.now()
timestamp = current_datetime.strftime("%Y_%m_%d__%H:%M")
# Define the source file path and destination bucket and blob names
source_file_path = f"{output_dir}/auditing-90-days.bson.gz"
destination_bucket_name = "my-bucket"
destination_blob_name = f"{timestamp}__auditing-90-days.bson.gz"
# Get the destination bucket
bucket = client.get_bucket(destination_bucket_name)
# Create a blob object in the destination bucket
blob = bucket.blob(destination_blob_name)
# Upload the file to the destination blob
blob.upload_from_filename(source_file_path)
print(f'File {source_file_path} uploaded to gs://{destination_bucket_name}/{destination_blob_name} successfully!')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment