Last active
September 30, 2020 11:51
-
-
Save sabuhish/bef11d68ffb23e632e11e27227c47cfb to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| from motor.motor_asyncio import AsyncIOMotorClient | |
| from bson.objectid import ObjectId | |
| from bson.json_util import loads as bson_loads, dumps as bson_dumps | |
| import asyncio | |
| import pprint | |
| from pymongo import ReturnDocument | |
| import json | |
| loop = asyncio.get_event_loop() | |
| #NOTE: before you start make sure you have created user and database if not you can follow up my guide in the answer: | |
| #https://stackoverflow.com/questions/45732838/authentication-failed-to-connect-to-mongodb-using-pymongo/61107727#61107727 | |
| # installaton: pip install motor, pymongo | |
| class MongoDB: | |
| def __init__( | |
| self, | |
| user: str, | |
| password: str, | |
| collection: str, | |
| database: str, | |
| host: str = None, | |
| port: str = None | |
| ): | |
| self.user = user | |
| self.password = password | |
| self.database = database | |
| self.collection = collection | |
| self.port = port if port else 27017 | |
| self.host = host if host else "localhost" | |
| self.client = AsyncIOMotorClient(f"mongodb://{self.user}:{self.password}@{self.host}:{self.port}/{self.database}") | |
| self.db = self.client[self.database] | |
| async def insert( | |
| self, | |
| document: dict | |
| ) -> dict: | |
| result = await self.db[self.collection].insert_one(document) | |
| return json.loads(bson_dumps(result.inserted_id)) | |
| async def find_obj( | |
| self, | |
| obj_id: str | |
| ) -> dict: | |
| document = await self.db[self.collection].find_one({'_id': ObjectId(obj_id)}) | |
| document = json.loads(bson_dumps(document)) | |
| return document | |
| async def search( | |
| self, | |
| params: dict | |
| ) -> dict: | |
| # params = {"name": "Sebuhi"} | |
| document = await self.db[self.collection].find_one(params) | |
| return json.loads(bson_dumps(document)) | |
| async def find_all( | |
| self, | |
| )-> list: | |
| documents = [json.loads(bson_dumps(i)) async for i in self.db[self.collection].find()] | |
| return documents | |
| async def clear_data( | |
| self, | |
| obj_id: str | |
| ) -> int: | |
| result = await self.db[self.collection].delete_one({'_id': ObjectId(obj_id)}) | |
| print(result.deleted_count) | |
| return True | |
| async def delete_one_document( | |
| self, | |
| query:dict | |
| )-> int: | |
| ''' | |
| query to delete and get count same messages | |
| ''' | |
| print(await self.db[self.collection].count_documents(query)) | |
| doc = await self.db[self.collection].find_one_and_delete(query) | |
| count =await self.db[self.collection].count_documents(query) | |
| return count | |
| async def replace_one_document( | |
| self, | |
| query: dict, | |
| params: dict | |
| ) -> dict: | |
| original_doc = await self.db[self.collection].find_one_and_replace(query,params,return_document=ReturnDocument.AFTER) | |
| return json.loads(bson_dumps(original_doc)) | |
| async def update( | |
| self, | |
| query: dict, | |
| new_value: dict | |
| )-> tuple: | |
| ''' | |
| it query changes existing value with given new one. if does not match query, will add new value. | |
| ''' | |
| result = await self.db[self.collection].update_one(query,new_value) | |
| print('matched %d, modified %d' %(result.matched_count, result.modified_count)) | |
| print(result) | |
| return (result.matched_count, result.modified_count) | |
| obj = MongoDB(user="test_user",collection="hello",password="myuser123",database="test_database",host="127.0.0.1") | |
| print(loop.run_until_complete(obj.insert({"hello": "my commit"}))) | |
| # loop.run_until_complete(obj.find_obj("some_id")) | |
| # loop.run_until_complete((obj.search({"params":"params"})) | |
| # loop.run_until_complete((obj.find_all()) | |
| # loop.run_until_complete(obj.clear_data("some_id")) | |
| # loop.run_until_complete(obj.delete_one_document("some_id")) | |
| # loop.run_until_complete(obj.replace_one_document({"params":"parasms",{"query":"query"}})) | |
| # loop.run_until_complete(obj.update({"params":"parasms",{"query":"query"}})) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment