-
-
Save jhodges10/86e80656ec6bdc09f108e799d9b183e6 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import psycopg2 | |
import time | |
import selenium | |
from selenium import webdriver | |
import openpyxl | |
from openpyxl import load_workbook | |
import subprocess | |
from email.mime.text import MIMEText | |
from email.mime.application import MIMEApplication | |
from email.mime.multipart import MIMEMultipart | |
from smtplib import SMTP | |
import smtplib | |
import datetime | |
from PyPDF2 import PdfFileWriter, PdfFileReader | |
from time import mktime | |
import sys | |
import os | |
import re | |
import redis | |
import requests | |
class Useful_Functions: | |
def __init__(self): | |
pass | |
def check_cache(cache_key): | |
""" | |
Checks the cache to see if a key exists | |
param cache_key: A string representing a key | |
type cache_key: string | |
""" | |
db = redis.from_url(os.environ.get("REDIS_URL")) | |
if db.get(cache_key): | |
print("Found cache data, continuing and then comparing against this.") | |
return True | |
else: | |
return False | |
def write_cache(data, cache_key): | |
""" | |
Write data with a key of cache_key to the redis cache | |
param data: A python dictionary holding the data you want cached | |
param cache_key: A string representing a key | |
type data: dict | |
type cache_key: string | |
""" | |
db = redis.from_url(os.environ.get("REDIS_URL")) | |
try: | |
db.set(cache_key, json.dumps(data)) | |
return True | |
except Exception as e: | |
print(e) | |
print("Failed to write to cache") | |
def read_cache(cache_key): | |
""" | |
Read data with a key of cache_key from the redis cache | |
param cache_key: A string representing a key | |
type cache_key: string | |
""" | |
db = redis.from_url(os.environ.get("REDIS_URL"), decode_responses=True) | |
data = json.loads(db.get(cache_key)) | |
return data | |
def send_slack_message(**kwargs): | |
""" | |
Sends a messages to Slack via a Webhook | |
param: webhook_url | |
param: msg_username | |
param: msg_text | |
param: msg_channel | |
param: icon_url | |
type webhook_url: string | |
type msg_username: string | |
type msg_text: string | |
type msg_channel: string | |
type icon_url: string | |
""" | |
if not webhook_url: | |
post_url = os.environ.get("SLACK_WEBHOOK_URL") | |
else: | |
post_url = webhook_url | |
payload={"text": msg_text), | |
"username": msg_username, | |
"channel": msg_channel, | |
"icon_url": icon_url} | |
response = requests.post(post_url, json=payload) | |
if response.status_code == 201: | |
print("Message sent to Slack") | |
return True | |
else: | |
return False | |
def postgres_ref_db_connection(): | |
""" | |
Set up the postgres connection | |
""" | |
return psycopg2.connect(host='HOST', | |
user='USERNAME', | |
password='PASSWORD', | |
database='DB_NAME', | |
port=5432) | |
def insert_records(insert_statement, values): | |
""" | |
Inserts a list of values into a database | |
param insert_statement: The SQL query used to insert a row of data | |
type insert_statement: string representing a SQL query | |
param values: the list of tuples to insert into the database | |
type values: list containing a tuple | |
""" | |
if len(values) > 0: | |
db = postgres_ref_db_connection() | |
cur = db.cursor() | |
try: | |
if len(values) >= 1: | |
cur.executemany(insert_statement, values) | |
db.commit() | |
print("INSERTED INTO DB") | |
except Exception as err: | |
print("EXCEPTION") | |
print(err) | |
db.rollback() | |
pass | |
finally: | |
cur.close() | |
db.close() | |
def create_firefox_driver(path_to_profile): | |
""" | |
Creates a firefox driver using a specified profile | |
param path_to_profile: The filepath of the profile | |
type path_to_profile: String representing a filepath | |
""" | |
profile = webdriver.FirefoxProfile(path_to_profile) | |
driver = webdriver.Firefox(profile) | |
return driver | |
def make_pdf_from_xlsx(filename, path_to_edit, path_to_completed, pdf_ready): | |
""" | |
Converts XLSX to PDF | |
param filename: The name of the xlsx file to edit | |
type filename: String representing a file name | |
param path_to_edit: The location of the folder containing a file to edit | |
type path_to_edit: String representing a filepath | |
param path_to_edit: The location of the folder to place a completed file | |
type path_to_completed: String representing a filepath | |
param pdf_ready: The location of the folder containing a file to edit | |
type pdf_ready: String representing a filepath | |
""" | |
try: | |
for i in range(5): | |
pdf_name = filename.replace('xlsx', 'pdf') | |
try: | |
with timeout(45, exception=RuntimeError): | |
pdf_command = 'cd /Applications/LibreOffice.app/Contents/MacOS && ./soffice --headless --convert-to pdf --outdir ' + pdf_ready + ' ' + filename | |
subprocess.check_output(['bash', '-c', pdf_command]) | |
break | |
except RuntimeError: | |
kill_command = 'killall soffice && killall libreoffice && killall libre' | |
process = subprocess.Popen(kill_command.split(), stdout=subprocess.PIPE) | |
time.sleep(5) | |
output, error = process.communicate() | |
continue | |
except Exception as e: | |
print(e) | |
print('Fucked up converting to pdf') | |
return None | |
try: | |
to_edit = path_to_edit + pdf_name | |
edited = path_to_completed + pdf_name | |
edit_pdf(to_edit) | |
crop_pdf(to_edit, edited) | |
return edited | |
except Exception as e: | |
print(e) | |
print('Fucked up editing the PDF') | |
return None | |
def send_gmail(attachment, attachment2, recipient, message_text, subject, email_from, reply_to, username, password): | |
""" | |
Sends an email from gmail with up to 2 attachments | |
param attachment: The first attachment to the email | |
type attachment: String representing a file path | |
param attachment2: The second attachment to the email | |
type attachment2: String representing a file path | |
param recipient: The recipient's email address | |
type recipient: String representing an email address | |
param message_text: The body text of the email | |
type message_text: String representing a message | |
param subject: The subject of the email | |
type subject: String representing a subject line | |
param email_from: The sender email address (your email) | |
type email_from: String representing an email address | |
param reply_to: The email address that will receive a reply | |
type reply_to: String representing an email address | |
param username: Your gmail login | |
type username: String representing an email address | |
param password: Your gmail password | |
type password: String representing a password | |
""" | |
recipients = [recipient] | |
emaillist = [elem.strip().split(',') for elem in recipients] | |
msg = MIMEMultipart() | |
msg['Subject'] = subject | |
msg['From'] = email_from | |
msg['Reply-to'] = reply_to | |
msg.preamble = 'Multipart massage.\n' | |
part = MIMEText(message_text) | |
msg.attach(part) | |
part = MIMEApplication(open(str(pdf_file),"rb").read()) | |
pdf_name = pdf_file | |
part.add_header('Content-Disposition', 'attachment', filename=pdf_name) | |
msg.attach(part) | |
part = MIMEApplication(open(str(xlsx_file),"rb").read()) | |
part.add_header('Content-Disposition', 'attachment', filename=xlsx_file) | |
msg.attach(part) | |
server = smtplib.SMTP("smtp.gmail.com:587") | |
server.ehlo() | |
server.starttls() | |
server.login(username, password) | |
server.sendmail(msg['From'], emaillist , msg.as_string()) | |
def clean_text(text): | |
""" | |
Clean up text and remove the nonsense | |
param text: review text | |
type text: string | |
""" | |
return ' '.join(re.sub("(@[A-Za-z0-9]+)|([^0-9A-Za-z \t])|(\w+:\/\/\S+)", " ", text).split()) | |
def get_sentiment(review): | |
""" | |
Get sentiment analysis from a review | |
param review: An Amazon product review | |
type review: string | |
""" | |
analysis = TextBlob(clean_text(review)) | |
if analysis.sentiment.polarity > 0: | |
return 'positive', analysis.sentiment.polarity | |
elif analysis.sentiment.polarity == 0: | |
return 'neutral', analysis.sentiment.polarity | |
else: | |
return 'negative', analysis.sentiment.polarity |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment