Created
September 14, 2018 01:35
-
-
Save matthewoestreich/1801890d56cb8cd7074771277d2f701f to your computer and use it in GitHub Desktop.
python_mongo_with_classes
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from pymongo import MongoClient | |
# THE NEW CLASS | |
class CtMongoConnectionSettings(object): | |
# This constructor gives the person using this class | |
# the option to connect to a server, db, and coll all at once | |
def __init__(self, mongo_server, mongo_port, mongo_database, mongo_collection): | |
self.mongo_server = mongo_server | |
self.mongo_database = mongo_database | |
self.mongo_port = mongo_port | |
self.mongo_collection = mongo_collection | |
class CtMongoClient(object): | |
def __init__(self, mongo_connection_settings): | |
if not isinstance(mongo_connection_settings, CtMongoConnectionSettings): | |
raise TypeError("mongo_connection_settings must be of type CtMongoConnectionSettings") | |
else: | |
s = mongo_connection_settings.mongo_server | |
p = mongo_connection_settings.mongo_port | |
self.MongoConnectionSettings = mongo_connection_settings | |
self.NicksMongoClient = MongoClient(s, p) | |
def print_all_mongo_docs(self): # doesnt use auth | |
db = self.NicksMongoClient[self.MongoConnectionSettings.mongo_server] | |
collection = db[self.MongoConnectionSettings.mongo_collection] | |
results = collection.find({}) # this is technically a cursor | |
for document in results: | |
print(document) | |
def get_all_mongo_docs(self): # doesnt use auth | |
db = self.NicksMongoClient[self.MongoConnectionSettings.mongo_database] | |
collection = db[self.MongoConnectionSettings.mongo_collection] | |
return collection.find({}) # this is technically a cursor | |
def find_mongo_document(self, mongo_document_id): | |
docs = self.get_all_mongo_docs() | |
found_doc = None | |
for doc in docs: | |
if str(doc["_id"]) == str(mongo_document_id): | |
found_doc = doc | |
if found_doc is not None: | |
return found_doc | |
else: | |
return False | |
def update_mongo_document(self, mongo_document_id, field_name, field_new_value): | |
my_doc = self.find_mongo_document(mongo_document_id) | |
if my_doc is False: | |
raise Exception("Unable to locate doc with ID: %s!" % mongo_document_id) | |
else: | |
db = self.NicksMongoClient[self.MongoConnectionSettings.mongo_database] | |
coll = db[self.MongoConnectionSettings.mongo_collection] | |
my_doc[field_name] = field_new_value | |
# apparently the .save method is depreciated but it works and im tired | |
coll.save(my_doc) | |
# If you wanted to clean it up even further, and get rid of all these messy params | |
# just build a class for them.. | |
ct_mongo_connection_settings = CtMongoConnectionSettings("10.0.0.170", 27017, "Test", "Test") | |
ct_mongo_client = CtMongoClient(ct_mongo_connection_settings) | |
interesting_doc = ct_mongo_client.find_mongo_document("5b7dcdd51913d01a08069e87") | |
print("\r\n~~~Before~~~:\r\n%s\r\n" % interesting_doc) | |
ct_mongo_client.update_mongo_document("5b7dcdd51913d01a08069e87", "Test", "FromOKToNOPEBackToNAY") | |
new_interesting_doc = ct_mongo_client.find_mongo_document("5b7dcdd51913d01a08069e87") | |
print("~~~After~~~:\r\n%s\r\n" % new_interesting_doc) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment