Skip to content

Instantly share code, notes, and snippets.

@deepanshumehtaa
Last active January 20, 2025 08:28
Show Gist options
  • Save deepanshumehtaa/d745e2a6929d69f7b8064bac7da5a402 to your computer and use it in GitHub Desktop.
Save deepanshumehtaa/d745e2a6929d69f7b8064bac7da5a402 to your computer and use it in GitHub Desktop.
Efleet_gsheet_script.py
"""
Author: Deepanshu Mehta
Script to Fetch data from EV Gsheets and Populate them into db
it requires `car-master-sheet.json`
python -m venv env_ev_allocation
source env_ev_allocation/bin/activate
pip3 install -r requirements.txt
python3 EV_allocation_data_dump.py
to check for psql:
pip3 install psycopg2
pip3 install psycopg2-binary
import psycopg2
connection = psycopg2.connect(database="quiz", user="dee", password="deed", host="localhost", port="5432")
cursor = connection.cursor()
cursor.execute("SELECT * FROM users")
results = cursor.fetchall()
connection.close()
"""
import logging
import os
import sys
import datetime
import pygsheets
# sys.path.append(os.path.dirname(os.path.realpath('/home/scriptuser/scripts')))
# from scripts import admin_credentials
# import pandas as pd
from typing import List, Dict
import mysql.connector
class SQLHelper(object):
def __init__(self):
self.conn = None
self.cursor = None
def get_db_conn(self):
# Connect to the MySQL database
self.conn = mysql.connector.connect(
host='localhost',
port='3306',
database='test_db',
user='deepanshu',
password='deepanshu'
# host = admin_credentials.localhost,
# port='3306',
# user = admin_credentials.pythonuser,
# password = admin_credentials.pythonpwd123,
# database = admin_credentials.database,
)
# Create a cursor object
self.cursor = self.conn.cursor()
def execute_and_commit(self, sql_query: str):
self.get_db_conn()
self.cursor.execute(sql_query)
# committing changes
self.conn.commit()
# closing changes
self.close_conn()
# print(f"Query got executed {sql_query}")
def get_execute_data(self, sql_query) -> List[Dict]:
self.get_db_conn()
self.cursor.execute(sql_query)
# fetching data
results = self.cursor.fetchall()
self.close_conn()
# print(f"Query got executed {sql_query}")
return results
def close_conn(self):
# closing connection and cursor
self.cursor.close()
self.conn.close()
def bulk_insert(self, sql, sql_parameters, batch_size=1000):
"""
:param sql: "INSERT INTO user (name, email) VALUES (%s, %s)"
:param sql_parameters: values -->[['name1', '[email protected]'], ]
"""
for i in range(0, len(sql_parameters), batch_size):
batch = sql_parameters[i:i + batch_size]
self.get_db_conn()
self.cursor.executemany(sql, batch)
self.conn.commit()
print(f"New {len(sql_parameters)} Entries dumped !!")
class Run(object):
def __init__(self):
self.db_table = "ev_allocation_data"
# self.secret_file_path = r"/home/scriptuser/scripts/client_secret.json"
self.secret_file_path = r"my-sheet-key.json"
self.gc = pygsheets.authorize(service_file=self.secret_file_path) # Replace with your credentials file path
self.spreadsheet_data_list: List[Dict] = [
{
"spreadsheet_key": "abc",
"city": "Ghatkopar",
"city_code": "GHAT",
"gid_num": 258635378,
"column_names": ['Active or not', 'Timestamp', 'Driver + ETM', 'Car Number', "ETM"]
},
{
"spreadsheet_key": "xyz",
"city": "kolkata",
"city_code": "KOLK",
"gid_num": 856648286,
"column_names": ['Active or not', 'Timestamp', 'Driver + ETK', 'Car Number', "ETK"]
},
]
self.sheet_name = "Allocation_Responses"
self.today = datetime.datetime.today().replace(hour=0, minute=0, second=0, microsecond=0)
self.now = datetime.datetime.now()
self.net_count = 0 # total number of entries dump into table
self.column_names = []
self.city = None
def read_and_dump(self, spreadsheet_key):
"""
Reading data from Google Sheet and calling dump function to save data in db
"""
spreadsheet = self.gc.open_by_key(spreadsheet_key)
worksheet = spreadsheet.worksheet_by_title(self.sheet_name)
print(f"Reading WorkSheet :: {spreadsheet._title}")
header_range = set(worksheet.get_row(1))
missing_col = []
for col in self.column_names:
if col not in header_range:
missing_col.append(col)
if missing_col:
raise Exception(
f"`{missing_col}` is not in WorkSheet :: {worksheet.title} :: Options are {header_range}"
)
# fetching all data as df
# now = datetime.datetime.now()
all_sheet_data_df = worksheet.get_as_df(empty_value=None).loc[:, self.column_names]
# Filtering ...
# filtering active fields
all_sheet_data_df = all_sheet_data_df[all_sheet_data_df['Active or not'] == 'Active']
# filtering date based
all_sheet_data_df['Timestamp_'] = all_sheet_data_df['Timestamp'].apply(lambda dt: self.date_to_datetime(dt, True))
all_sheet_data_df = all_sheet_data_df[all_sheet_data_df['Timestamp_'] == self.today.date()]
all_sheet_data_li = all_sheet_data_df.values.tolist()
# worksheet.get_as_df(empty_value=None).loc[:, self.column_names].values.tolist()[:4]
# uncomment this line if you are using new version of mysql.connector and error of Incorrect date
all_sheet_data_df['Timestamp'] = all_sheet_data_df['Timestamp'].apply(self.date_to_datetime)
filtered_data = []
# fetching FK
car_number_id_map = self.get_fk_car_id([data[3] for data in all_sheet_data_li])
for data in all_sheet_data_li:
if not data[3]:
continue
filtered_data.append([
# date
self.date_to_datetime(data[1], True),
# car_number, full_name, etmid
data[3],
None if data[2] is None else data[2].split("-")[0],
data[4],
# car_id
car_number_id_map.get(data[3], None),
# is_active, is_deleted, updated_at, created_at
1, 0, self.now, self.now
])
# dumping data to db
self.dump_data_to_db(filtered_data)
def cleanup_old_today_data(self):
sql_helper_obj = SQLHelper()
# cleaning data
query = f"DELETE FROM {self.db_table} WHERE date = '{self.today.strftime('%Y-%m-%d')}'"
sql_helper_obj.execute_and_commit(query)
def dump_data_to_db(self, data):
sql_helper_obj = SQLHelper()
# dumping data in db
query = """INSERT INTO ev_allocation_data
(date, car_number, full_name, etmid, car_id, is_active, is_deleted, updated_at, created_at)
VALUES
(%s, %s, %s, %s, %s, %s, %s, %s, %s)"""
sql_helper_obj.bulk_insert(query, data)
def execute(self):
"""
This is the Driver Function
"""
in_date_ = input(f"Enter Date as `{self.today.strftime('%Y-%m-%d')}` or press enter for today's date: ")
if in_date_:
try:
in_date_ = datetime.datetime.strptime(in_date_, '%Y-%m-%d')
self.today = in_date_
except Exception as e:
print(f"Error in converting input Date {e}")
return
print("Starting EV Allocation Script !!")
self.cleanup_old_today_data()
for data_dict in self.spreadsheet_data_list:
spreadsheet_key = data_dict["spreadsheet_key"]
gid_num = data_dict["gid_num"] # can be used later, don't remove
self.column_names = data_dict["column_names"]
self.city = data_dict["city"]
self.read_and_dump(spreadsheet_key)
print("All Done :: Process Exit")
def date_to_datetime(self, dt, date_only=False):
if dt:
if date_only is True:
return datetime.datetime.strptime(dt, '%d/%m/%Y %H:%M:%S').date()
return datetime.datetime.strptime(dt, '%d/%m/%Y %H:%M:%S')
else:
return None
def get_fk_car_id(self, car_plate_numbers: List) -> dict:
"""
[{'car_number': 'car_id'}, ... ]
"""
sql_helper_obj = SQLHelper()
n = len(car_plate_numbers)
if n < 1:
return {}
query = "SELECT id, car_number FROM fleet_car WHERE car_number IN {0};".format(tuple(car_plate_numbers))
if n == 1:
query = "SELECT id, car_number FROM fleet_car WHERE car_number = '{0}';".format(car_plate_numbers[0])
query_result = []
try:
query_result: List[Dict] = sql_helper_obj.get_execute_data(query)
except Exception as e:
logging.error(f"ERROR executing sql {query} Error: {e}")
car_number_id_map = {}
for v, k in query_result:
car_number_id_map[k] = v
return car_number_id_map
obj = Run()
obj.execute()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment