Last active
January 20, 2025 08:28
-
-
Save deepanshumehtaa/d745e2a6929d69f7b8064bac7da5a402 to your computer and use it in GitHub Desktop.
Efleet_gsheet_script.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Author: Deepanshu Mehta | |
Script to Fetch data from EV Gsheets and Populate them into db | |
it requires `car-master-sheet.json` | |
python -m venv env_ev_allocation | |
source env_ev_allocation/bin/activate | |
pip3 install -r requirements.txt | |
python3 EV_allocation_data_dump.py | |
to check for psql: | |
pip3 install psycopg2 | |
pip3 install psycopg2-binary | |
import psycopg2 | |
connection = psycopg2.connect(database="quiz", user="dee", password="deed", host="localhost", port="5432") | |
cursor = connection.cursor() | |
cursor.execute("SELECT * FROM users") | |
results = cursor.fetchall() | |
connection.close() | |
""" | |
import logging | |
import os | |
import sys | |
import datetime | |
import pygsheets | |
# sys.path.append(os.path.dirname(os.path.realpath('/home/scriptuser/scripts'))) | |
# from scripts import admin_credentials | |
# import pandas as pd | |
from typing import List, Dict | |
import mysql.connector | |
class SQLHelper(object): | |
def __init__(self): | |
self.conn = None | |
self.cursor = None | |
def get_db_conn(self): | |
# Connect to the MySQL database | |
self.conn = mysql.connector.connect( | |
host='localhost', | |
port='3306', | |
database='test_db', | |
user='deepanshu', | |
password='deepanshu' | |
# host = admin_credentials.localhost, | |
# port='3306', | |
# user = admin_credentials.pythonuser, | |
# password = admin_credentials.pythonpwd123, | |
# database = admin_credentials.database, | |
) | |
# Create a cursor object | |
self.cursor = self.conn.cursor() | |
def execute_and_commit(self, sql_query: str): | |
self.get_db_conn() | |
self.cursor.execute(sql_query) | |
# committing changes | |
self.conn.commit() | |
# closing changes | |
self.close_conn() | |
# print(f"Query got executed {sql_query}") | |
def get_execute_data(self, sql_query) -> List[Dict]: | |
self.get_db_conn() | |
self.cursor.execute(sql_query) | |
# fetching data | |
results = self.cursor.fetchall() | |
self.close_conn() | |
# print(f"Query got executed {sql_query}") | |
return results | |
def close_conn(self): | |
# closing connection and cursor | |
self.cursor.close() | |
self.conn.close() | |
def bulk_insert(self, sql, sql_parameters, batch_size=1000): | |
""" | |
:param sql: "INSERT INTO user (name, email) VALUES (%s, %s)" | |
:param sql_parameters: values -->[['name1', '[email protected]'], ] | |
""" | |
for i in range(0, len(sql_parameters), batch_size): | |
batch = sql_parameters[i:i + batch_size] | |
self.get_db_conn() | |
self.cursor.executemany(sql, batch) | |
self.conn.commit() | |
print(f"New {len(sql_parameters)} Entries dumped !!") | |
class Run(object): | |
def __init__(self): | |
self.db_table = "ev_allocation_data" | |
# self.secret_file_path = r"/home/scriptuser/scripts/client_secret.json" | |
self.secret_file_path = r"my-sheet-key.json" | |
self.gc = pygsheets.authorize(service_file=self.secret_file_path) # Replace with your credentials file path | |
self.spreadsheet_data_list: List[Dict] = [ | |
{ | |
"spreadsheet_key": "abc", | |
"city": "Ghatkopar", | |
"city_code": "GHAT", | |
"gid_num": 258635378, | |
"column_names": ['Active or not', 'Timestamp', 'Driver + ETM', 'Car Number', "ETM"] | |
}, | |
{ | |
"spreadsheet_key": "xyz", | |
"city": "kolkata", | |
"city_code": "KOLK", | |
"gid_num": 856648286, | |
"column_names": ['Active or not', 'Timestamp', 'Driver + ETK', 'Car Number', "ETK"] | |
}, | |
] | |
self.sheet_name = "Allocation_Responses" | |
self.today = datetime.datetime.today().replace(hour=0, minute=0, second=0, microsecond=0) | |
self.now = datetime.datetime.now() | |
self.net_count = 0 # total number of entries dump into table | |
self.column_names = [] | |
self.city = None | |
def read_and_dump(self, spreadsheet_key): | |
""" | |
Reading data from Google Sheet and calling dump function to save data in db | |
""" | |
spreadsheet = self.gc.open_by_key(spreadsheet_key) | |
worksheet = spreadsheet.worksheet_by_title(self.sheet_name) | |
print(f"Reading WorkSheet :: {spreadsheet._title}") | |
header_range = set(worksheet.get_row(1)) | |
missing_col = [] | |
for col in self.column_names: | |
if col not in header_range: | |
missing_col.append(col) | |
if missing_col: | |
raise Exception( | |
f"`{missing_col}` is not in WorkSheet :: {worksheet.title} :: Options are {header_range}" | |
) | |
# fetching all data as df | |
# now = datetime.datetime.now() | |
all_sheet_data_df = worksheet.get_as_df(empty_value=None).loc[:, self.column_names] | |
# Filtering ... | |
# filtering active fields | |
all_sheet_data_df = all_sheet_data_df[all_sheet_data_df['Active or not'] == 'Active'] | |
# filtering date based | |
all_sheet_data_df['Timestamp_'] = all_sheet_data_df['Timestamp'].apply(lambda dt: self.date_to_datetime(dt, True)) | |
all_sheet_data_df = all_sheet_data_df[all_sheet_data_df['Timestamp_'] == self.today.date()] | |
all_sheet_data_li = all_sheet_data_df.values.tolist() | |
# worksheet.get_as_df(empty_value=None).loc[:, self.column_names].values.tolist()[:4] | |
# uncomment this line if you are using new version of mysql.connector and error of Incorrect date | |
all_sheet_data_df['Timestamp'] = all_sheet_data_df['Timestamp'].apply(self.date_to_datetime) | |
filtered_data = [] | |
# fetching FK | |
car_number_id_map = self.get_fk_car_id([data[3] for data in all_sheet_data_li]) | |
for data in all_sheet_data_li: | |
if not data[3]: | |
continue | |
filtered_data.append([ | |
# date | |
self.date_to_datetime(data[1], True), | |
# car_number, full_name, etmid | |
data[3], | |
None if data[2] is None else data[2].split("-")[0], | |
data[4], | |
# car_id | |
car_number_id_map.get(data[3], None), | |
# is_active, is_deleted, updated_at, created_at | |
1, 0, self.now, self.now | |
]) | |
# dumping data to db | |
self.dump_data_to_db(filtered_data) | |
def cleanup_old_today_data(self): | |
sql_helper_obj = SQLHelper() | |
# cleaning data | |
query = f"DELETE FROM {self.db_table} WHERE date = '{self.today.strftime('%Y-%m-%d')}'" | |
sql_helper_obj.execute_and_commit(query) | |
def dump_data_to_db(self, data): | |
sql_helper_obj = SQLHelper() | |
# dumping data in db | |
query = """INSERT INTO ev_allocation_data | |
(date, car_number, full_name, etmid, car_id, is_active, is_deleted, updated_at, created_at) | |
VALUES | |
(%s, %s, %s, %s, %s, %s, %s, %s, %s)""" | |
sql_helper_obj.bulk_insert(query, data) | |
def execute(self): | |
""" | |
This is the Driver Function | |
""" | |
in_date_ = input(f"Enter Date as `{self.today.strftime('%Y-%m-%d')}` or press enter for today's date: ") | |
if in_date_: | |
try: | |
in_date_ = datetime.datetime.strptime(in_date_, '%Y-%m-%d') | |
self.today = in_date_ | |
except Exception as e: | |
print(f"Error in converting input Date {e}") | |
return | |
print("Starting EV Allocation Script !!") | |
self.cleanup_old_today_data() | |
for data_dict in self.spreadsheet_data_list: | |
spreadsheet_key = data_dict["spreadsheet_key"] | |
gid_num = data_dict["gid_num"] # can be used later, don't remove | |
self.column_names = data_dict["column_names"] | |
self.city = data_dict["city"] | |
self.read_and_dump(spreadsheet_key) | |
print("All Done :: Process Exit") | |
def date_to_datetime(self, dt, date_only=False): | |
if dt: | |
if date_only is True: | |
return datetime.datetime.strptime(dt, '%d/%m/%Y %H:%M:%S').date() | |
return datetime.datetime.strptime(dt, '%d/%m/%Y %H:%M:%S') | |
else: | |
return None | |
def get_fk_car_id(self, car_plate_numbers: List) -> dict: | |
""" | |
[{'car_number': 'car_id'}, ... ] | |
""" | |
sql_helper_obj = SQLHelper() | |
n = len(car_plate_numbers) | |
if n < 1: | |
return {} | |
query = "SELECT id, car_number FROM fleet_car WHERE car_number IN {0};".format(tuple(car_plate_numbers)) | |
if n == 1: | |
query = "SELECT id, car_number FROM fleet_car WHERE car_number = '{0}';".format(car_plate_numbers[0]) | |
query_result = [] | |
try: | |
query_result: List[Dict] = sql_helper_obj.get_execute_data(query) | |
except Exception as e: | |
logging.error(f"ERROR executing sql {query} Error: {e}") | |
car_number_id_map = {} | |
for v, k in query_result: | |
car_number_id_map[k] = v | |
return car_number_id_map | |
obj = Run() | |
obj.execute() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment