Created
January 27, 2021 20:52
-
-
Save allenmqcymp/c5af161e53d69b879b0ad38480180d0d to your computer and use it in GitHub Desktop.
Colby Now script I used
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from __future__ import print_function | |
from bs4 import BeautifulSoup | |
from googleapiclient.discovery import build | |
from httplib2 import Http | |
from oauth2client import file, client, tools | |
from apiclient import errors | |
import base64 | |
import unicodedata | |
import re | |
import csv | |
GLOBAL_DATA = [] | |
class Scraper: | |
# scrapes one day's worth of colby now | |
def __init__(self, text, date): | |
self.soup = BeautifulSoup(text, "html.parser") | |
self.date = date | |
self.day_dict = {} | |
self.get_categories() | |
def scrape_category(self, tag): | |
# receives a bs4 soup tag and scrapes the category | |
# adds a dictionary entry of category, list of tuples | |
# each tuple is event title and url | |
cat_name = tag.find("a", class_="nav-top-level-title").find("span").get_text() | |
self.day_dict[cat_name] = None | |
items_list = [] | |
items = tag.find_all("li", class_="subnav-item") | |
for item in list(items): | |
title = item.find("span").get_text() | |
url = item.find("a")["href"] | |
items_list.append((title, url)) | |
self.day_dict.update({cat_name: items_list}) | |
def get_categories(self): | |
lis = self.soup.find("ul", class_="nav-list").find_all("li", attrs={"style": "box-sizing: border-box; margin-bottom: 1.5rem;"}) | |
for l_tag in list(lis): | |
self.scrape_category(l_tag) | |
def get_download(self): | |
return self.day_dict, self.date | |
def normalize_data(self): | |
# format in (date, category, event, url) | |
normalized_data = [] | |
for k, v in self.day_dict.items(): | |
for e in v: | |
normalized_data.extend([(self.date, k, e[0], e[1])]) | |
return normalized_data | |
class Downloader: | |
# uses the GMAIL client api to download all the Colby Now's in my account | |
def __init__(self): | |
store = file.Storage('token.json') | |
creds = store.get() | |
if not creds or creds.invalid: | |
flow = client.flow_from_clientsecrets('credentials.json', SCOPES) | |
creds = tools.run_flow(flow, store) | |
self.service = build('gmail', 'v1', http=creds.authorize(Http())) | |
COLBY_NOW_QUERY = 'from:[email protected]' | |
# list all relevant colby now messages | |
self.msgs = self.listMessagesMatchingQuery('me', query=COLBY_NOW_QUERY) | |
self.msg_list = [] | |
for i in xrange(len(self.msgs)): | |
self.download_one(i) | |
for i, msg in enumerate(self.msg_list): | |
sc = Scraper(msg[0], msg[1]) | |
GLOBAL_DATA.extend(sc.normalize_data()) | |
def download_one(self, count): | |
msg_id = self.msgs[count].get('id') | |
try: | |
dl_msg = self.service.users().messages().get(userId='me', id=msg_id).execute() | |
print("downloaded email with id {} -- count {}".format(msg_id, count)) | |
# convert from unicode to ascii then decode base64 encoding | |
raw_all = dl_msg.get("payload") | |
raw = raw_all.get("body").get("data").replace('-', '+').replace('_', '/') | |
raw = unicodedata.normalize("NFKD", raw).encode('ascii', 'ignore') | |
decoded = base64.decodestring(raw) | |
# get the date | |
headers = raw_all.get("headers") | |
found = False | |
for val in headers: | |
if unicodedata.normalize("NFKD", val["name"]).encode('ascii', 'ignore') == "Date": | |
found = True | |
date = re.search(r'[a-zA-Z]+, \d{1,2} [a-zA-Z]{1,3} \d{4}', unicodedata.normalize("NFKD", val["value"]).encode('ascii', 'ignore')).group() | |
# add date and message as a tuple into a list | |
self.msg_list.append((decoded, date)) | |
if not found: | |
print("cannot find a date for id", msg_id) | |
return | |
except errors.HttpError, error: | |
print('An error occurred: {}'.format(error)) | |
# function is from from google api guide | |
def listMessagesMatchingQuery(self, user_id, query=''): | |
try: | |
response = self.service.users().messages().list(userId=user_id, | |
q=query).execute() | |
messages = [] | |
if 'messages' in response: | |
messages.extend(response['messages']) | |
while 'nextPageToken' in response: | |
page_token = response['nextPageToken'] | |
response = self.service.users().messages().list(userId=user_id, q=query, | |
pageToken=page_token).execute() | |
messages.extend(response['messages']) | |
return messages | |
except errors.HttpError, error: | |
print('An error occurred: {}'.format(error)) | |
def main(): | |
colby_dl = Downloader() | |
with open("colby_now.csv", "wb") as f: | |
csv_out = csv.writer(f) | |
csv_out.writerow(["date", "category", "title", "URL"]) | |
for row in GLOBAL_DATA: | |
csv_out.writerow([unicode(s).encode("utf-8") for s in row]) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment