Created
September 11, 2024 22:33
-
-
Save luisdelatorre012/f22b170dcaee8d7174154c1c757d68a6 to your computer and use it in GitHub Desktop.
motored mongo db
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import motor.motor_asyncio | |
import asyncio | |
from typing import List, Dict, Any | |
import logging | |
from urllib.parse import quote_plus | |
from contextlib import asynccontextmanager | |
# Set up logging | |
logging.basicConfig(level=logging.INFO) | |
logger = logging.getLogger(__name__) | |
class MongoDBClient: | |
def __init__(self, uri: str, db_name: str, collection_name: str): | |
self.client = motor.motor_asyncio.AsyncIOMotorClient(uri) | |
self.db = self.client[db_name] | |
self.collection = self.db[collection_name] | |
async def get_messages(self, message_filter: Dict[str, Any], batch_size: int = 1000) -> List[Dict[str, Any]]: | |
messages = [] | |
try: | |
# Use batch_size to optimize network round trips | |
cursor = self.collection.find(message_filter).batch_size(batch_size) | |
async for message in cursor: | |
offset = int(message["a"]) | |
partition = int(message["b"]) | |
# Create a copy to avoid modifying the original document | |
message_copy = message.copy() | |
message_copy.pop("_id", None) | |
message_copy.pop("a", None) | |
message_copy.pop("b", None) | |
messages.append({ | |
"partition": partition, | |
"offset": offset, | |
"message": message_copy | |
}) | |
logger.info(f"Retrieved {len(messages)} messages") | |
return messages | |
except Exception as e: | |
logger.error(f"An error occurred while retrieving messages: {e}") | |
raise | |
async def close(self): | |
self.client.close() | |
@asynccontextmanager | |
async def get_mongo_client(uri: str, db_name: str, collection_name: str): | |
client = MongoDBClient(uri, db_name, collection_name) | |
try: | |
yield client | |
finally: | |
await client.close() | |
async def main(): | |
# Configuration | |
username = quote_plus("your_username") | |
password = quote_plus("your_password") | |
host = "localhost" | |
port = 27017 | |
db_name = "your_database_name" | |
collection_name = "your_collection_name" | |
uri = f"mongodb://{username}:{password}@{host}:{port}/{db_name}" | |
# Use context manager for MongoDB client | |
async with get_mongo_client(uri, db_name, collection_name) as mongo_client: | |
try: | |
# Define your filter | |
message_filter = {} # Add your filter criteria here | |
# Get messages | |
messages = await mongo_client.get_messages(message_filter) | |
# Process messages | |
for message in messages: | |
print(f"Partition: {message['partition']}, Offset: {message['offset']}") | |
# Add any additional processing here | |
except Exception as e: | |
logger.error(f"An error occurred in the main function: {e}") | |
if __name__ == "__main__": | |
asyncio.run(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment