Created
July 19, 2024 06:59
-
-
Save anandadake/ca58ba8927c78487589c3d5676b86ac7 to your computer and use it in GitHub Desktop.
pymongo CRUD of file
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # -*- encoding: utf-8 -*- | |
| """ | |
| Python Application | |
| """ | |
| __author__ = "[email protected]" | |
| import os | |
| import json | |
| import pathlib | |
| import pymongo | |
| import certifi | |
| import numpy as np | |
| from pathlib import Path | |
| from datetime import datetime | |
| def create_nested_dir_for_mongodb(dir_path): | |
| if os.path.exists(dir_path): | |
| return | |
| print('creating directory:', dir_path) | |
| Path(dir_path).mkdir(parents=True, exist_ok=True) | |
| def read_json_file(json_file): | |
| """ | |
| Read JSON file | |
| Args: | |
| json_file: file name | |
| Returns: | |
| dict | |
| """ | |
| try: | |
| # print('Reading json file:' + json_file) | |
| with open(json_file) as file_path: | |
| return json.load(file_path) | |
| except IOError: | |
| print('Couldn\'t find json file:' + json_file) | |
| raise | |
| def pandas_datatype_converter(obj): | |
| """ | |
| Convert pandas data type to python data type. | |
| It is mainly used to remove TypeError: Object of type 'pandas data type e.g. int64' is not JSON serializable | |
| Args: | |
| obj: This is automatically sent during json.dump call | |
| Returns: | |
| Converted python data type from pandas data type | |
| """ | |
| if isinstance(obj, np.integer): | |
| return int(obj) | |
| elif isinstance(obj, np.floating): | |
| return float(obj) | |
| elif isinstance(obj, np.ndarray): | |
| return obj.tolist() | |
| elif isinstance(obj, datetime): | |
| return obj.__str__() | |
| def write_json_file(json_content, json_file): | |
| """ | |
| Write json_content into the file. | |
| Args: | |
| json_content: dict | |
| json_file: file Name | |
| Returns: | |
| None | |
| """ | |
| with open(json_file, 'w+') as file_path: | |
| json.dump(json_content, file_path, indent=4, default=pandas_datatype_converter) | |
| def convert_to_datetime(date_str, pattern='yyyy-mm-dd'): | |
| if pattern == 'yyyy-mm-dd': | |
| date_int = list(map(int, date_str.split('-'))) | |
| elif pattern == 'yyyymmdd': | |
| date_int = [int(date_str[:4]), int(date_str[4:6]), int(date_str[6:])] | |
| else: | |
| print('date pattern mismatch') | |
| return | |
| _date = datetime(date_int[0], date_int[1], date_int[2]) | |
| return _date | |
| class PyMongoDBDatabaseConnector: | |
| def __init__(self, run_info, step_name=None, run_dir_path=None): | |
| db_connection_string = "mongodb+srv://application_db:[email protected]" | |
| db_name = "TestingDB" | |
| db_collection = 'db_collection_name' | |
| self.connection_string = db_connection_string | |
| self.connection = None | |
| self.db_connection = None | |
| self.db_collection = None | |
| self.run_info = run_info | |
| self.step_name = step_name | |
| self.run_dir_path = run_dir_path | |
| self.connection = pymongo.MongoClient(self.connection_string, tlsCAFile=certifi.where()) | |
| self.db_connection = self.connection[db_name] | |
| self.db_collection = self.db_connection[db_collection] | |
| print('MongoDB Connection established') | |
| def __enter__(self): | |
| # print('MongoDBDatabaseConnector: __enter__') | |
| return self | |
| def __exit__(self, exc_type, exc_val, exc_tb): | |
| # print('MongoDBDatabaseConnector: __exit__') | |
| self.close_connection() | |
| def close_connection(self): | |
| if self.connection: | |
| self.connection.close() | |
| def insert(self): | |
| pass | |
| def update(self): | |
| pass | |
| def delete(self): | |
| pass | |
| def add_input_to_mongo_db(self): | |
| input_dir_path = os.path.join(self.run_dir_path, self.step_name, 'input' + str(self.run_info['final_finalRun'])) | |
| for file in os.listdir(input_dir_path): | |
| file_path = os.path.join(input_dir_path, file) | |
| self.add_file_to_mongo_db(file_path) | |
| def add_file_to_mongo_db(self, file_path): | |
| mydict = { | |
| "user": self.run_info['user'], | |
| "furnace": self.run_info["furnace"], | |
| "mode": self.run_info['mode'], | |
| "date": convert_to_datetime(self.run_info['date']), | |
| "run_name": self.run_info['run_name'], | |
| "step": self.step_name, | |
| 'step_sub_dir': os.path.relpath(os.path.dirname(file_path), os.path.join(self.run_dir_path, self.step_name)), | |
| 'file_name': os.path.basename(file_path) | |
| } | |
| if pathlib.Path(file_path).suffix == '.json': | |
| lines = read_json_file(file_path) | |
| else: | |
| with open(file_path) as fr: | |
| lines = fr.read() | |
| if self.db_collection.count_documents(mydict) > 0: | |
| # update database | |
| update_query = {"$set": {'file_content': lines}} | |
| try: | |
| x = self.db_collection.update_one(mydict, update_query) | |
| except Exception as e: | |
| print('Error in file:', file_path, 'Error Message:', e, 'Error Action: Hence adding this file as text file!') | |
| with open(file_path) as fr: | |
| lines = fr.read() | |
| update_query = {"$set": {'file_content': lines}} | |
| x = self.db_collection.update_one(mydict, update_query) | |
| else: | |
| # add to database | |
| mydict['file_content'] = lines | |
| try: | |
| x = self.db_collection.insert_one(mydict) | |
| except Exception as e: | |
| print('Error in file:', file_path, 'Error Message:', e, 'Error Action: Hence adding this file as text file!') | |
| with open(file_path) as fr: | |
| lines = fr.read() | |
| mydict['file_content'] = lines | |
| x = self.db_collection.insert_one(mydict) | |
| print('added {} step data to mongoDb'.format(mydict['step_sub_dir'])) | |
| def delete_file_from_mongo_db(self, file_path): | |
| # if not self.sync_to_MongoDB: | |
| # print('sync_to_MongoDB is False. Hence not deleting from MongoDB database') | |
| # return None | |
| mydict = { | |
| "user": self.run_info['user'], | |
| "furnace": self.run_info["furnace"], | |
| "mode": self.run_info['mode'], | |
| "date": convert_to_datetime(self.run_info['date']), | |
| "run_name": self.run_info['run_name'], | |
| "step": self.step_name, | |
| 'step_sub_dir': os.path.relpath(os.path.dirname(file_path), os.path.join(self.run_dir_path, self.step_name)), | |
| 'file_name': os.path.basename(file_path) | |
| } | |
| try: | |
| x = self.db_collection.delete_one(mydict) | |
| print('Deleted {}/{}/{} file from mongoDb'.format(mydict['step'], mydict['step_sub_dir'], mydict['file_name'])) | |
| except Exception as e: | |
| print(e) | |
| def fetch_from_mongo_db(self, run_info): | |
| # if not self.sync_to_MongoDB: | |
| # print('sync_to_MongoDB is False. Hence not adding to MongoDB database') | |
| # return None | |
| query = { | |
| "user": run_info['user'], | |
| "mode": run_info['mode'], | |
| "date": convert_to_datetime(run_info['date']), | |
| "run_name": run_info['run_name'] | |
| } | |
| case_data = {} | |
| my_doc = self.db_collection.find(query) | |
| for x in my_doc: | |
| sub_dir_path = os.path.join(self.run_dir_path, x['step'], x['step_sub_dir']) | |
| create_nested_dir_for_mongodb(sub_dir_path) | |
| if pathlib.Path(x['file_name']).suffix == '.json': | |
| if not case_data.get(x['step']): | |
| case_data[x['step']] = {} | |
| if not case_data.get(x['step']).get(x['step_sub_dir']): | |
| case_data[x['step']][x['step_sub_dir']] = {} | |
| file_name = os.path.splitext(x['file_name'])[0] | |
| json_file = os.path.join(sub_dir_path, x['file_name']) | |
| write_json_file(x['file_content'], json_file) | |
| case_data[x['step']][x['step_sub_dir']][file_name] = x['file_content'] | |
| else: | |
| with open(os.path.join(sub_dir_path, x['file_name']), mode='w+') as fw: | |
| fw.write(x['file_content']) | |
| return case_data |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment