Created
November 8, 2017 03:44
-
-
Save Billcountry/74d7131087ca179bb754da416721f0f7 to your computer and use it in GitHub Desktop.
A collection of functions that I use to speed up my productivity.
Feel free to use them as you wish.
Connect's to a database, provides method to insert table, run sql queries, hash using sha256, generate dates with addation or removal of time, log your errors among others
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import smtplib | |
import os | |
import random | |
import hashlib | |
from datetime import datetime | |
from email.mime.multipart import MIMEMultipart | |
from email.mime.text import MIMEText | |
from validate_email import validate_email | |
import sqlalchemy | |
from sqlalchemy import engine | |
from sqlalchemy.sql import schema, text | |
import pendulum | |
class Utilities: | |
error = None | |
status = { | |
"unauthorized": 401, | |
"forbidden": 403, | |
"method_not_allowed": 405, | |
"invalid_data": 433, | |
"not_found": 404, | |
"bad_request": 400, | |
"success": 200, | |
"system_error": 500, | |
"not_implemented": 501, | |
"redirect": 302 | |
} | |
def api_return(self, success, message, status): | |
return { | |
"success": success, | |
"message": message, | |
"status": status | |
} | |
def sha256(self, text: str): | |
hash_object = hashlib.sha256(text.encode()) | |
return hash_object.hexdigest() | |
def db_connect(self): | |
try: | |
url = os.environ.get("DATABASE_URL", "ALTERNATIVE_URL_GOES_HERE_PROBABLY_YOUR_LOCAL_TEST_DB") | |
db_conn: engine = sqlalchemy.create_engine(url) | |
db_meta: schema = sqlalchemy.MetaData(bind=db_conn, reflect=True) | |
except Exception as e: | |
db_conn = None | |
db_meta = None | |
self.log_error("Error connecting to database", str(e)) | |
return db_conn, db_meta | |
def db_insert(self, table: str, data: list): | |
con, meta = self.db_connect() | |
try: | |
if con is not None: | |
con.execute(meta.tables[table].insert(), data) | |
return True | |
else: | |
return False | |
except Exception as e: | |
self.log_error("Error inserting data: ", str(e)) | |
return False | |
def db_execute_query(self, query, params={}): | |
if params is None: | |
params = {} | |
con, meta = self.db_connect() | |
try: | |
if con is not None: | |
stmt = text(query) | |
stmt = stmt.bindparams(**params) | |
return con.execute(stmt) | |
except Exception as e: | |
self.log_error("Error executing query: ", str(e)) | |
return None | |
def send_email(self, recipient: str, recipient_name: str, subject: str, template: str, template_values: dict, | |
text_message="Please use a client that can read HTML email"): | |
success = False | |
status = 500 | |
try: | |
if validate_email(recipient, verify=True): | |
smtp = smtplib.SMTP("smtp.gmail.com", 587) # initialize smtp class | |
smtp.ehlo() | |
smtp.starttls() | |
smtp.ehlo() | |
message = template | |
for key in list(template_values): | |
message = message.replace(str(key), (template_values[key])) | |
smtp.login("[email protected]", "RVxgKpTUktOVYG2L") | |
msg = MIMEMultipart('alternative') | |
msg['Subject'] = subject | |
msg['From'] = 'From: DeMashina <[email protected]>' | |
msg['To'] = recipient_name + ' <' + recipient + '>' | |
plain = MIMEText(text_message, "plain") | |
html = MIMEText(message, "html") | |
msg.attach(plain) | |
msg.attach(html) | |
smtp.sendmail("[email protected]", recipient, msg.as_string()) | |
smtp.close() | |
success = True | |
message = "Your message was sent successfully." | |
else: | |
message = "Invalid or email does not exist" | |
status = 200 | |
except smtplib.SMTPException as e: | |
message = "Error: Unable to send email: \n" + str(e) | |
except (ConnectionRefusedError, ConnectionResetError, ConnectionResetError, ConnectionError) as e: | |
message = "Connection error: Email not sent: \n"+str(e) | |
except Exception as e: | |
message = "Server Error: An error occurred: \n"+str(e) | |
return { | |
"success": success, | |
"message": message, | |
"status": status | |
} | |
def random_string(self, size, | |
characters: str = "abcdefghijklmnopqrstuvwxyz01234567890ABCDEFGHIJKLMNOPQRSTUVWXYZ!@#$%&*()?"): | |
return "".join(random.sample(characters, size)) | |
def validate_phone(self, phone): | |
phone = phone.replace("+", "") | |
phone = phone.replace(" ", "") | |
phone = phone.replace("-", "") | |
if phone.isnumeric() and ( | |
(len(phone) == 9 and phone[0] == "7") or | |
(len(phone) == 10 and phone[0:2] == "07") or | |
(len(phone) == 12 and phone[0:4] == "2547") | |
) and ( | |
(0 <= int(phone[-8] + phone[-7]) < 40) or | |
(60 <= int(phone[-8] + phone[-7]) < 100) | |
): | |
return True, | |
else: | |
return False | |
def human_date(self, timestamp=datetime.now().timestamp(), date_only=False, time_ago=False, hours=0, minutes=0, | |
sec=0, months=0, years=0, days=0): | |
date = pendulum.from_timestamp(timestamp) | |
date = date.add(years=years, months=months, weeks=0, days=days, hours=hours, minutes=minutes, seconds=sec) | |
if date_only: | |
return date.format('DD-MMM-YYYY', formatter='alternative') | |
else: | |
if time_ago: | |
return date.diff_for_humans() | |
return date.format('DD-MMM-YYYY HH:mm:ss', formatter='alternative') | |
def log_error(self, title, data): | |
try: | |
path = os.path.dirname(os.path.realpath(__file__)) | |
file = open(path + "/../logs/errors.md", "a+") | |
file.write(">### "+title+"\n") | |
file.write("*"+str(datetime.now())+"*\n") | |
file.write("```\n"+data+"\n```\n\n") | |
file.close() | |
return True | |
except IOError as e: | |
print(str(e)) | |
return False |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment