Skip to content

Instantly share code, notes, and snippets.

@luisdelatorre012
Created September 11, 2024 22:33
Show Gist options
  • Save luisdelatorre012/f22b170dcaee8d7174154c1c757d68a6 to your computer and use it in GitHub Desktop.
Save luisdelatorre012/f22b170dcaee8d7174154c1c757d68a6 to your computer and use it in GitHub Desktop.
motored mongo db
import motor.motor_asyncio
import asyncio
from typing import List, Dict, Any
import logging
from urllib.parse import quote_plus
from contextlib import asynccontextmanager
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class MongoDBClient:
def __init__(self, uri: str, db_name: str, collection_name: str):
self.client = motor.motor_asyncio.AsyncIOMotorClient(uri)
self.db = self.client[db_name]
self.collection = self.db[collection_name]
async def get_messages(self, message_filter: Dict[str, Any], batch_size: int = 1000) -> List[Dict[str, Any]]:
messages = []
try:
# Use batch_size to optimize network round trips
cursor = self.collection.find(message_filter).batch_size(batch_size)
async for message in cursor:
offset = int(message["a"])
partition = int(message["b"])
# Create a copy to avoid modifying the original document
message_copy = message.copy()
message_copy.pop("_id", None)
message_copy.pop("a", None)
message_copy.pop("b", None)
messages.append({
"partition": partition,
"offset": offset,
"message": message_copy
})
logger.info(f"Retrieved {len(messages)} messages")
return messages
except Exception as e:
logger.error(f"An error occurred while retrieving messages: {e}")
raise
async def close(self):
self.client.close()
@asynccontextmanager
async def get_mongo_client(uri: str, db_name: str, collection_name: str):
client = MongoDBClient(uri, db_name, collection_name)
try:
yield client
finally:
await client.close()
async def main():
# Configuration
username = quote_plus("your_username")
password = quote_plus("your_password")
host = "localhost"
port = 27017
db_name = "your_database_name"
collection_name = "your_collection_name"
uri = f"mongodb://{username}:{password}@{host}:{port}/{db_name}"
# Use context manager for MongoDB client
async with get_mongo_client(uri, db_name, collection_name) as mongo_client:
try:
# Define your filter
message_filter = {} # Add your filter criteria here
# Get messages
messages = await mongo_client.get_messages(message_filter)
# Process messages
for message in messages:
print(f"Partition: {message['partition']}, Offset: {message['offset']}")
# Add any additional processing here
except Exception as e:
logger.error(f"An error occurred in the main function: {e}")
if __name__ == "__main__":
asyncio.run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment