Skip to content

Instantly share code, notes, and snippets.

@sabuhish
Last active September 30, 2020 11:51
Show Gist options
  • Select an option

  • Save sabuhish/bef11d68ffb23e632e11e27227c47cfb to your computer and use it in GitHub Desktop.

Select an option

Save sabuhish/bef11d68ffb23e632e11e27227c47cfb to your computer and use it in GitHub Desktop.
from motor.motor_asyncio import AsyncIOMotorClient
from bson.objectid import ObjectId
from bson.json_util import loads as bson_loads, dumps as bson_dumps
import asyncio
import pprint
from pymongo import ReturnDocument
import json
loop = asyncio.get_event_loop()
#NOTE: before you start make sure you have created user and database if not you can follow up my guide in the answer:
#https://stackoverflow.com/questions/45732838/authentication-failed-to-connect-to-mongodb-using-pymongo/61107727#61107727
# installaton: pip install motor, pymongo
class MongoDB:
def __init__(
self,
user: str,
password: str,
collection: str,
database: str,
host: str = None,
port: str = None
):
self.user = user
self.password = password
self.database = database
self.collection = collection
self.port = port if port else 27017
self.host = host if host else "localhost"
self.client = AsyncIOMotorClient(f"mongodb://{self.user}:{self.password}@{self.host}:{self.port}/{self.database}")
self.db = self.client[self.database]
async def insert(
self,
document: dict
) -> dict:
result = await self.db[self.collection].insert_one(document)
return json.loads(bson_dumps(result.inserted_id))
async def find_obj(
self,
obj_id: str
) -> dict:
document = await self.db[self.collection].find_one({'_id': ObjectId(obj_id)})
document = json.loads(bson_dumps(document))
return document
async def search(
self,
params: dict
) -> dict:
# params = {"name": "Sebuhi"}
document = await self.db[self.collection].find_one(params)
return json.loads(bson_dumps(document))
async def find_all(
self,
)-> list:
documents = [json.loads(bson_dumps(i)) async for i in self.db[self.collection].find()]
return documents
async def clear_data(
self,
obj_id: str
) -> int:
result = await self.db[self.collection].delete_one({'_id': ObjectId(obj_id)})
print(result.deleted_count)
return True
async def delete_one_document(
self,
query:dict
)-> int:
'''
query to delete and get count same messages
'''
print(await self.db[self.collection].count_documents(query))
doc = await self.db[self.collection].find_one_and_delete(query)
count =await self.db[self.collection].count_documents(query)
return count
async def replace_one_document(
self,
query: dict,
params: dict
) -> dict:
original_doc = await self.db[self.collection].find_one_and_replace(query,params,return_document=ReturnDocument.AFTER)
return json.loads(bson_dumps(original_doc))
async def update(
self,
query: dict,
new_value: dict
)-> tuple:
'''
it query changes existing value with given new one. if does not match query, will add new value.
'''
result = await self.db[self.collection].update_one(query,new_value)
print('matched %d, modified %d' %(result.matched_count, result.modified_count))
print(result)
return (result.matched_count, result.modified_count)
obj = MongoDB(user="test_user",collection="hello",password="myuser123",database="test_database",host="127.0.0.1")
print(loop.run_until_complete(obj.insert({"hello": "my commit"})))
# loop.run_until_complete(obj.find_obj("some_id"))
# loop.run_until_complete((obj.search({"params":"params"}))
# loop.run_until_complete((obj.find_all())
# loop.run_until_complete(obj.clear_data("some_id"))
# loop.run_until_complete(obj.delete_one_document("some_id"))
# loop.run_until_complete(obj.replace_one_document({"params":"parasms",{"query":"query"}}))
# loop.run_until_complete(obj.update({"params":"parasms",{"query":"query"}}))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment