Skip to content

Instantly share code, notes, and snippets.

@anandadake
Created July 19, 2024 06:59
Show Gist options
  • Select an option

  • Save anandadake/ca58ba8927c78487589c3d5676b86ac7 to your computer and use it in GitHub Desktop.

Select an option

Save anandadake/ca58ba8927c78487589c3d5676b86ac7 to your computer and use it in GitHub Desktop.
pymongo CRUD of file
# -*- encoding: utf-8 -*-
"""
Python Application
"""
__author__ = "[email protected]"
import os
import json
import pathlib
import pymongo
import certifi
import numpy as np
from pathlib import Path
from datetime import datetime
def create_nested_dir_for_mongodb(dir_path):
if os.path.exists(dir_path):
return
print('creating directory:', dir_path)
Path(dir_path).mkdir(parents=True, exist_ok=True)
def read_json_file(json_file):
"""
Read JSON file
Args:
json_file: file name
Returns:
dict
"""
try:
# print('Reading json file:' + json_file)
with open(json_file) as file_path:
return json.load(file_path)
except IOError:
print('Couldn\'t find json file:' + json_file)
raise
def pandas_datatype_converter(obj):
"""
Convert pandas data type to python data type.
It is mainly used to remove TypeError: Object of type 'pandas data type e.g. int64' is not JSON serializable
Args:
obj: This is automatically sent during json.dump call
Returns:
Converted python data type from pandas data type
"""
if isinstance(obj, np.integer):
return int(obj)
elif isinstance(obj, np.floating):
return float(obj)
elif isinstance(obj, np.ndarray):
return obj.tolist()
elif isinstance(obj, datetime):
return obj.__str__()
def write_json_file(json_content, json_file):
"""
Write json_content into the file.
Args:
json_content: dict
json_file: file Name
Returns:
None
"""
with open(json_file, 'w+') as file_path:
json.dump(json_content, file_path, indent=4, default=pandas_datatype_converter)
def convert_to_datetime(date_str, pattern='yyyy-mm-dd'):
if pattern == 'yyyy-mm-dd':
date_int = list(map(int, date_str.split('-')))
elif pattern == 'yyyymmdd':
date_int = [int(date_str[:4]), int(date_str[4:6]), int(date_str[6:])]
else:
print('date pattern mismatch')
return
_date = datetime(date_int[0], date_int[1], date_int[2])
return _date
class PyMongoDBDatabaseConnector:
def __init__(self, run_info, step_name=None, run_dir_path=None):
db_connection_string = "mongodb+srv://application_db:[email protected]"
db_name = "TestingDB"
db_collection = 'db_collection_name'
self.connection_string = db_connection_string
self.connection = None
self.db_connection = None
self.db_collection = None
self.run_info = run_info
self.step_name = step_name
self.run_dir_path = run_dir_path
self.connection = pymongo.MongoClient(self.connection_string, tlsCAFile=certifi.where())
self.db_connection = self.connection[db_name]
self.db_collection = self.db_connection[db_collection]
print('MongoDB Connection established')
def __enter__(self):
# print('MongoDBDatabaseConnector: __enter__')
return self
def __exit__(self, exc_type, exc_val, exc_tb):
# print('MongoDBDatabaseConnector: __exit__')
self.close_connection()
def close_connection(self):
if self.connection:
self.connection.close()
def insert(self):
pass
def update(self):
pass
def delete(self):
pass
def add_input_to_mongo_db(self):
input_dir_path = os.path.join(self.run_dir_path, self.step_name, 'input' + str(self.run_info['final_finalRun']))
for file in os.listdir(input_dir_path):
file_path = os.path.join(input_dir_path, file)
self.add_file_to_mongo_db(file_path)
def add_file_to_mongo_db(self, file_path):
mydict = {
"user": self.run_info['user'],
"furnace": self.run_info["furnace"],
"mode": self.run_info['mode'],
"date": convert_to_datetime(self.run_info['date']),
"run_name": self.run_info['run_name'],
"step": self.step_name,
'step_sub_dir': os.path.relpath(os.path.dirname(file_path), os.path.join(self.run_dir_path, self.step_name)),
'file_name': os.path.basename(file_path)
}
if pathlib.Path(file_path).suffix == '.json':
lines = read_json_file(file_path)
else:
with open(file_path) as fr:
lines = fr.read()
if self.db_collection.count_documents(mydict) > 0:
# update database
update_query = {"$set": {'file_content': lines}}
try:
x = self.db_collection.update_one(mydict, update_query)
except Exception as e:
print('Error in file:', file_path, 'Error Message:', e, 'Error Action: Hence adding this file as text file!')
with open(file_path) as fr:
lines = fr.read()
update_query = {"$set": {'file_content': lines}}
x = self.db_collection.update_one(mydict, update_query)
else:
# add to database
mydict['file_content'] = lines
try:
x = self.db_collection.insert_one(mydict)
except Exception as e:
print('Error in file:', file_path, 'Error Message:', e, 'Error Action: Hence adding this file as text file!')
with open(file_path) as fr:
lines = fr.read()
mydict['file_content'] = lines
x = self.db_collection.insert_one(mydict)
print('added {} step data to mongoDb'.format(mydict['step_sub_dir']))
def delete_file_from_mongo_db(self, file_path):
# if not self.sync_to_MongoDB:
# print('sync_to_MongoDB is False. Hence not deleting from MongoDB database')
# return None
mydict = {
"user": self.run_info['user'],
"furnace": self.run_info["furnace"],
"mode": self.run_info['mode'],
"date": convert_to_datetime(self.run_info['date']),
"run_name": self.run_info['run_name'],
"step": self.step_name,
'step_sub_dir': os.path.relpath(os.path.dirname(file_path), os.path.join(self.run_dir_path, self.step_name)),
'file_name': os.path.basename(file_path)
}
try:
x = self.db_collection.delete_one(mydict)
print('Deleted {}/{}/{} file from mongoDb'.format(mydict['step'], mydict['step_sub_dir'], mydict['file_name']))
except Exception as e:
print(e)
def fetch_from_mongo_db(self, run_info):
# if not self.sync_to_MongoDB:
# print('sync_to_MongoDB is False. Hence not adding to MongoDB database')
# return None
query = {
"user": run_info['user'],
"mode": run_info['mode'],
"date": convert_to_datetime(run_info['date']),
"run_name": run_info['run_name']
}
case_data = {}
my_doc = self.db_collection.find(query)
for x in my_doc:
sub_dir_path = os.path.join(self.run_dir_path, x['step'], x['step_sub_dir'])
create_nested_dir_for_mongodb(sub_dir_path)
if pathlib.Path(x['file_name']).suffix == '.json':
if not case_data.get(x['step']):
case_data[x['step']] = {}
if not case_data.get(x['step']).get(x['step_sub_dir']):
case_data[x['step']][x['step_sub_dir']] = {}
file_name = os.path.splitext(x['file_name'])[0]
json_file = os.path.join(sub_dir_path, x['file_name'])
write_json_file(x['file_content'], json_file)
case_data[x['step']][x['step_sub_dir']][file_name] = x['file_content']
else:
with open(os.path.join(sub_dir_path, x['file_name']), mode='w+') as fw:
fw.write(x['file_content'])
return case_data
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment