Created
April 10, 2025 23:59
-
-
Save pedrovasconcellos/a5d80d2b3853aa59dd8cc543ba67d2d2 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from pymongo import MongoClient | |
import json | |
from datetime import datetime | |
from urllib.parse import quote_plus | |
USER = "master" | |
PASSWORD = quote_plus("password") # encodes the password | |
PASSWORD_PROD = quote_plus("password_prod") | |
HOST = "localhost" | |
HOST_PROD = "cluster.com" | |
AUTH_DB = "admin" | |
DATABASE = "database_name" | |
# builds the URI with escaped password | |
SOURCE_URI = f"mongodb+srv://{USER}:{PASSWORD_PROD}@{HOST_PROD}/?appName=ClusterDatabaseName" | |
DESTINATION_URI = f"mongodb://{USER}:{PASSWORD}@{HOST}:27021/?authSource={AUTH_DB}" | |
# 🧠 Connecting to databases | |
source_client = MongoClient(SOURCE_URI) | |
destination_client = MongoClient(DESTINATION_URI) | |
db_source = source_client[DATABASE] | |
db_destination = destination_client[DATABASE] | |
# 📁 Text backup | |
backup_filename = f"backup_mongodb_{DATABASE}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt" | |
with open(backup_filename, "w", encoding="utf-8") as backup_file: | |
for collection_name in db_source.list_collection_names(): | |
print(f"📦 Copying collection: {collection_name}") | |
source_collection = db_source[collection_name] | |
destination_collection = db_destination[collection_name] | |
# Copy indexes (except _id_) | |
for index in source_collection.list_indexes(): | |
if index["name"] != "_id_": | |
destination_collection.create_index( | |
index["key"].items(), | |
**{k: v for k, v in index.items() if k not in ("key", "ns", "v", "name")} | |
) | |
# Copy documents | |
documents = list(source_collection.find()) | |
if documents: | |
destination_collection.insert_many(documents) | |
# Write documents to backup .txt | |
backup_file.write(f"--- Collection: {collection_name} ---\n") | |
for doc in documents: | |
doc_str = json.dumps(doc, default=str, ensure_ascii=False) | |
backup_file.write(doc_str + "\n") | |
backup_file.write("\n") | |
print(f"✅ Backup completed. File saved as: {backup_filename}") |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Example of execution via terminal: | |
# | |
# python backup_script.py \ | |
# --password minha_senha_local \ | |
# --password-prod senha_prod_secreta \ | |
# --host-prod meucluster.mongodb.net \ | |
# --database meu_banco | |
import argparse | |
import json | |
from datetime import datetime | |
from urllib.parse import quote_plus | |
from pymongo import MongoClient | |
def run_backup(user, password, password_prod, host, host_prod, auth_db, database): | |
password = quote_plus(password) | |
password_prod = quote_plus(password_prod) | |
source_uri = f"mongodb+srv://{user}:{password_prod}@{host_prod}/?appName=ClusterDatabaseName" | |
destination_uri = f"mongodb://{user}:{password}@{host}:27021/?authSource={auth_db}" | |
source_client = MongoClient(source_uri) | |
destination_client = MongoClient(destination_uri) | |
db_source = source_client[database] | |
db_destination = destination_client[database] | |
backup_filename = f"backup_mongodb_{database}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt" | |
with open(backup_filename, "w", encoding="utf-8") as backup_file: | |
for collection_name in db_source.list_collection_names(): | |
print(f"📦 Copying collection: {collection_name}") | |
source_collection = db_source[collection_name] | |
destination_collection = db_destination[collection_name] | |
# Copy indexes (except _id_) | |
for index in source_collection.list_indexes(): | |
if index["name"] != "_id_": | |
destination_collection.create_index( | |
index["key"].items(), | |
**{k: v for k, v in index.items() if k not in ("key", "ns", "v", "name")} | |
) | |
# Copy documents | |
documents = list(source_collection.find()) | |
if documents: | |
destination_collection.insert_many(documents) | |
# Write documents to backup file | |
backup_file.write(f"--- Collection: {collection_name} ---\n") | |
for doc in documents: | |
doc_str = json.dumps(doc, default=str, ensure_ascii=False) | |
backup_file.write(doc_str + "\n") | |
backup_file.write("\n") | |
print(f"✅ Backup completed. File saved as: {backup_filename}") | |
if __name__ == "__main__": | |
parser = argparse.ArgumentParser(description="Backup MongoDB from Atlas to local") | |
parser.add_argument("--user", default="master", help="MongoDB username") | |
parser.add_argument("--password", required=True, help="MongoDB password for local") | |
parser.add_argument("--password-prod", required=True, help="MongoDB password for production") | |
parser.add_argument("--host", default="localhost", help="Local MongoDB host") | |
parser.add_argument("--host-prod", default="cluster.com", help="Production MongoDB host") | |
parser.add_argument("--auth-db", default="admin", help="Authentication database") | |
parser.add_argument("--database", default="database_name", help="Target database name") | |
args = parser.parse_args() | |
run_backup( | |
user=args.user, | |
password=args.password, | |
password_prod=args.password_prod, | |
host=args.host, | |
host_prod=args.host_prod, | |
auth_db=args.auth_db, | |
database=args.database | |
) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Example of execution via terminal: | |
# | |
#python backup_script.py --password minha_senha --password-prod senha_prod --collections pessoas enderecos | |
import argparse | |
import json | |
import gzip | |
import logging | |
from datetime import datetime | |
from urllib.parse import quote_plus | |
from pymongo import MongoClient | |
# 🧾 Setup logging | |
logging.basicConfig( | |
level=logging.INFO, | |
format="%(asctime)s - %(levelname)s - %(message)s" | |
) | |
def run_backup(user, password, password_prod, host, host_prod, auth_db, database, collections=None): | |
password = quote_plus(password) | |
password_prod = quote_plus(password_prod) | |
source_uri = f"mongodb+srv://{user}:{password_prod}@{host_prod}/?appName=ClusterDatabaseName" | |
destination_uri = f"mongodb://{user}:{password}@{host}:27021/?authSource={auth_db}" | |
source_client = MongoClient(source_uri) | |
destination_client = MongoClient(destination_uri) | |
db_source = source_client[database] | |
db_destination = destination_client[database] | |
backup_filename = f"backup_mongodb_{database}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt.gz" | |
with gzip.open(backup_filename, "wt", encoding="utf-8") as backup_file: | |
all_collections = collections or db_source.list_collection_names() | |
for collection_name in all_collections: | |
logging.info(f"📦 Copying collection: {collection_name}") | |
source_collection = db_source[collection_name] | |
destination_collection = db_destination[collection_name] | |
# Copy indexes (excluding _id_) | |
for index in source_collection.list_indexes(): | |
if index["name"] != "_id_": | |
destination_collection.create_index( | |
index["key"].items(), | |
**{k: v for k, v in index.items() if k not in ("key", "ns", "v", "name")} | |
) | |
# Copy documents | |
documents = list(source_collection.find()) | |
if documents: | |
destination_collection.insert_many(documents) | |
# Write to GZip file | |
backup_file.write(f"--- Collection: {collection_name} ---\n") | |
for doc in documents: | |
doc_str = json.dumps(doc, default=str, ensure_ascii=False) | |
backup_file.write(doc_str + "\n") | |
backup_file.write("\n") | |
logging.info(f"✅ Backup completed. File saved as: {backup_filename}") | |
if __name__ == "__main__": | |
parser = argparse.ArgumentParser(description="Backup MongoDB from Atlas to local") | |
parser.add_argument("--user", default="master", help="MongoDB username") | |
parser.add_argument("--password", required=True, help="MongoDB password for local") | |
parser.add_argument("--password-prod", required=True, help="MongoDB password for production") | |
parser.add_argument("--host", default="localhost", help="Local MongoDB host") | |
parser.add_argument("--host-prod", default="cluster.com", help="Production MongoDB host") | |
parser.add_argument("--auth-db", default="admin", help="Authentication database") | |
parser.add_argument("--database", default="database_name", help="Target database name") | |
parser.add_argument("--collections", nargs="+", help="Specific collections to backup (space separated)") | |
args = parser.parse_args() | |
run_backup( | |
user=args.user, | |
password=args.password, | |
password_prod=args.password_prod, | |
host=args.host, | |
host_prod=args.host_prod, | |
auth_db=args.auth_db, | |
database=args.database, | |
collections=args.collections | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment