Skip to content

Instantly share code, notes, and snippets.

@cheeyeo
Created March 17, 2026 15:36
Show Gist options
  • Select an option

  • Save cheeyeo/ef4cedee1ad3d555740d6e3957f73786 to your computer and use it in GitHub Desktop.

Select an option

Save cheeyeo/ef4cedee1ad3d555740d6e3957f73786 to your computer and use it in GitHub Desktop.
MongoDB Change stream example using pyMongo
from server.config import get_settings
from pymongo import MongoClient
DATABASE_URL = f"mongodb://{get_settings().mongo_db_root_username}:{get_settings().mongo_db_root_password}@mongo1:27017,mongo2:27017,mongo3:27017/"
client = MongoClient(DATABASE_URL)
db = client.get_database("teststreams")
users = db.create_collection("users", changeStreamPreAndPostImages={"enabled": True})
pipeline = [
{
"$match": {
"operationType": {"$in": ["insert", "update", "replace", "delete"]}
}
}
]
with users.watch(pipeline, full_document="updateLookup", full_document_before_change="required") as stream:
for change in stream:
print(change)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment