Last active
August 1, 2019 17:13
-
-
Save Treeki/006a30d2f6e07213aa51 to your computer and use it in GitHub Desktop.
FurAffinity -> Pushbullet Notifications
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"username": "ninji-vahran", | |
"database": "notifier.db", | |
"cookies": { | |
"__cfduid": "REDACTED", | |
"a": "REDACTED", | |
"b": "REDACTED", | |
"folder": "inbox" | |
}, | |
"pushbullet_key": "REDACTED", | |
"headers": { | |
"user-agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/42.0.2311.90 Safari/537.36" | |
}, | |
"log_errors": true | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import requests | |
import time | |
import random | |
import re | |
import sqlite3 | |
import json | |
import traceback | |
import sys | |
import calendar | |
import os | |
from bs4 import BeautifulSoup | |
# FurAffinity -> Pushbullet Notifications | |
# Script by Ninji Vahran | |
# https://twitter.com/_Ninji | |
# https://furaffinity.net/user/Ninji-Vahran | |
# Use at your own risk ;) | |
# Compatible with Python 2 or 3 | |
# Requires requests and BeautifulSoup4 | |
# Currently supported notification types: | |
# Watches, Journals, Notes, Shouts, Favourites, Comments | |
# To use, create a config.json file based on the template, and place it in the | |
# same directory as this script. | |
# Last updated: 7th March 2016 | |
# Should work with both beta and classic FA layouts. Hopefully. | |
FA_BASE = 'https://furaffinity.net' | |
SUB_URL_REGEX = re.compile('^/view/') | |
USER_URL_REGEX = re.compile('^/user/') | |
JOURNAL_URL_REGEX = re.compile('^/journal/') | |
def safe_print(s): | |
try: | |
print(s) | |
except UnicodeEncodeError: | |
print(s.encode('ascii', 'replace').decode('ascii')) | |
############################################################ | |
# Date Parsing | |
MONTHS = ('Jan','Feb','Mar','Apr','May','Jun','Jul','Aug','Sep','Oct','Nov','Dec') | |
SHORT_REGEX = re.compile(r'^([a-z]{3}) (\d+)[a-z][a-z], (\d{4}) (\d\d):(\d\d) ([AP])M$', re.I) | |
LONG_REGEX = re.compile(r'^on ([a-z]{3})[a-z]* (\d+)[a-z][a-z], (\d{4}) (\d\d):(\d\d) ([AP])M$', re.I) | |
def parse_date(text, regex): | |
'''Extract a date from a text timestamp''' | |
month, day, year, hour, minute, meridian = regex.match(text).groups() | |
year = int(year) | |
month = MONTHS.index(month) + 1 | |
day = int(day) | |
hour = int(hour) % 12 | |
if meridian == 'P': hour += 12 | |
minute = int(minute) | |
return calendar.timegm((year, month, day, hour, minute, 0)) | |
def parse_short_date(text): | |
'''Parse one of FurAffinity's date formats''' | |
return parse_date(text, SHORT_REGEX) | |
def parse_long_date(text): | |
'''Parse FurAffinity's other date format''' | |
return parse_date(text, LONG_REGEX) | |
def parse_popup_date(element): | |
'''Parse a date which may be either fuzzy or non-fuzzy by | |
default, depending on the user's settings''' | |
try: | |
return parse_short_date(element.text) | |
except: | |
try: | |
return parse_long_date(element.text) | |
except: | |
try: | |
return parse_short_date(element.attrs['title']) | |
except: | |
return parse_long_date(element.attrs['title']) | |
############################################################ | |
# Notification Utility Functions | |
def pushbullet(key, **data): | |
'''Send a notification to Pushbullet''' | |
if key == 'test': | |
safe_print('[[ TEST NOTIFICATION: %r ]]' % data) | |
else: | |
requests.post( | |
'https://api.pushbullet.com/v2/pushes', | |
auth=(key, ''), | |
data=json.dumps(data), | |
headers={'content-type': 'application/json'} | |
) | |
############################################################ | |
# FurAffinity Page Scraping | |
def scrape_messages_html(html): | |
'''Extract all messages present in a FA /msg/others/ page. | |
Returns a dict containing a list of messages and a note count.''' | |
soup = BeautifulSoup(html) | |
msgs = [] | |
# fetch watches | |
watch_set = soup.find(id='messages-watches') | |
if watch_set: | |
for li in watch_set.find_all('li', class_=None): | |
link = li.find('td', class_='avatar').find('a') | |
if link is None: # removed by the user, probably | |
continue | |
watch = dict( | |
type='watch', | |
name=li.find('div', class_='info').find('span').text, | |
url=link.attrs['href'], | |
timestamp=parse_popup_date(li.find('span', class_='popup_date')), | |
eid=li.find('input', type='checkbox').attrs['value'], | |
) | |
msgs.append(watch) | |
# fetch journals | |
journal_set = soup.find(id='messages-journals') | |
if journal_set: | |
journal_url_re = re.compile('^/journal/') | |
user_url_re = re.compile('^/user/') | |
for li in journal_set.find_all('li', class_=None): | |
link = li.find('a', href=journal_url_re) | |
if link is None: | |
continue | |
journal = dict( | |
type='journal', | |
name=link.text, | |
url=link.attrs['href'], | |
author=li.find('a', href=user_url_re).text, | |
timestamp=parse_popup_date(li.find('span', class_='popup_date')), | |
eid=li.find('input', type='checkbox').attrs['value'], | |
) | |
msgs.append(journal) | |
# fetch shouts | |
shout_set = soup.find(id='messages-shouts') | |
if shout_set: | |
for li in shout_set.find_all('li', class_=None): | |
# can shouts be 'removed'? I don't think so... | |
shout = dict( | |
type='shout', | |
author=li.find('a').text, | |
timestamp=parse_popup_date(li.find('span', class_='popup_date')), | |
eid=li.find('input', type='checkbox').attrs['value'], | |
) | |
msgs.append(shout) | |
# fetch favourites | |
fav_set = soup.find(id='messages-favorites') | |
if fav_set: | |
for li in fav_set.find_all('li', class_=None): | |
link = li.find('a', href=SUB_URL_REGEX) | |
if link is None: # removed by the user, probably | |
continue | |
fav = dict( | |
type='fav', | |
sub_name=link.text, | |
sub_url=link.attrs['href'], | |
username=li.find('a', href=USER_URL_REGEX).text, | |
timestamp=parse_popup_date(li.find('span', class_='popup_date')), | |
eid=li.find('input', type='checkbox').attrs['value'], | |
) | |
msgs.append(fav) | |
# fetch journal and submission comments | |
cmt_sets = ( | |
('s_comment', 'messages-comments-submission', SUB_URL_REGEX), | |
('j_comment', 'messages-comments-journal', JOURNAL_URL_REGEX), | |
) | |
for msg_type, set_id, url_regex in cmt_sets: | |
cmt_set = soup.find(id=set_id) | |
if cmt_set: | |
for li in cmt_set.find_all('li', class_=None): | |
link = li.find('a', href=url_regex) | |
if link is None: # removed by the user, probably | |
continue | |
user_link = li.find('a', href=user_url_re) | |
if user_link is None: | |
uname = 'No User??' | |
else: | |
uname = user_link.text | |
user_link.clear() | |
popup_date = li.find('span', class_='popup_date') | |
ts = parse_popup_date(popup_date) | |
popup_date.clear() | |
cmt = dict( | |
type=msg_type, | |
name=uname, | |
description=li.text.strip(), | |
url=link.attrs['href'], | |
timestamp=ts, | |
eid=li.find('input', type='checkbox').attrs['value'], | |
) | |
msgs.append(cmt) | |
result = {} | |
result['messages'] = msgs | |
# extract note count | |
result['note_count'] = 0 | |
notes = soup.find('a', href='/msg/pms/', string=re.compile('[0-9]+N')) | |
if notes and notes.text: | |
result['note_count'] = int(notes.text.replace('N', '')) | |
return result | |
class Notifier(object): | |
def __init__(self, config): | |
self.username = config['username'] | |
self.pushbullet_key = config['pushbullet_key'] | |
self.request_params = dict(cookies=config['cookies'], headers=config['headers']) | |
self.log_errors = config.get('log_errors', False) | |
self.seen_cache = set() | |
self.db = sqlite3.connect(config['database']) | |
self.setup_db() | |
def setup_db(self): | |
'''Initialise the SQLite database by creating tables that don't exist''' | |
c = self.db.cursor() | |
c.execute('CREATE TABLE IF NOT EXISTS seen_notifs (eid INTEGER, type STRING)') | |
c.close() | |
def db_has_seen_message(self, msg): | |
'''Check whether a particular message has already been seen''' | |
type = msg['type'] | |
eid = msg['eid'] | |
cache_key = (type,eid) | |
if cache_key in self.seen_cache: | |
return True | |
c = self.db.cursor() | |
c.execute('SELECT eid FROM seen_notifs WHERE type = ? AND eid = ?', (type, eid)) | |
result = (c.fetchone() != None) | |
c.close() | |
if result: | |
self.seen_cache.add(cache_key) | |
return result | |
def db_mark_message_as_seen(self, msg): | |
'''Mark a message as one we've already seen''' | |
type = msg['type'] | |
eid = msg['eid'] | |
cache_key = (type,eid) | |
c = self.db.cursor() | |
c.execute('INSERT INTO seen_notifs (type, eid) VALUES (?, ?)', (type, eid)) | |
c.close() | |
self.seen_cache.add(cache_key) | |
def pushbullet_message(self, msg): | |
'''Send a Pushbullet link containing the given FA message''' | |
if msg['type'] == 'watch': | |
pushbullet(self.pushbullet_key, | |
type='link', | |
title='FA [Watch] %s' % msg['name'], | |
body='New watch!', | |
url=FA_BASE+msg['url'], | |
) | |
elif msg['type'] == 'journal': | |
pushbullet(self.pushbullet_key, | |
type='link', | |
title='FA [Journal] %s' % msg['author'], | |
body=msg['name'], | |
url=FA_BASE+msg['url'], | |
) | |
elif msg['type'] == 'shout': | |
pushbullet(self.pushbullet_key, | |
type='link', | |
title='FA [Shout] %s' % msg['author'], | |
body='New shout!', | |
url='%s/user/%s/' % (FA_BASE, self.username), | |
) | |
elif msg['type'] == 'fav': | |
pushbullet(self.pushbullet_key, | |
type='link', | |
title='FA [Fav] %s' % msg['username'], | |
body=msg['sub_name'], | |
url=FA_BASE+msg['sub_url'], | |
) | |
elif msg['type'] == 's_comment' or msg['type'] == 'j_comment': | |
pushbullet(self.pushbullet_key, | |
type='link', | |
title='FA [Comment] %s' % msg['name'], | |
body=msg['description'], | |
url=FA_BASE+msg['url'], | |
) | |
def pushbullet_note(self, note_count): | |
plural = '' if note_count == 1 else 's' | |
pushbullet(self.pushbullet_key, | |
type='link', | |
title='FA: %d new note%s' % (note_count, plural), | |
body='Tap me!', | |
url=FA_BASE+'/msg/pms/', | |
) | |
def get_messages(self): | |
'''Get the current FA messages, write out an error if appropriate''' | |
html = 'None' | |
url = FA_BASE + '/msg/others/' | |
try: | |
html = requests.get(url, **self.request_params).text | |
#with open('notifier_debug/dicks.html', 'wb') as f: | |
# f.write(html.encode('utf-8')) | |
#with open('notifier_debug/dicks.html', 'rb') as f: | |
# html = f.read().decode('utf-8') | |
return scrape_messages_html(html) | |
except Exception as e: | |
# Failed! | |
if self.log_errors: | |
info = sys.exc_info() | |
try: | |
os.mkdir('notifier_debug') | |
except: | |
pass | |
stamp = time.time() | |
with open('notifier_debug/%r.html' % stamp, 'wb') as f: | |
f.write(html.encode('utf-8')) | |
with open('notifier_debug/%r.exc' % stamp, 'w') as f: | |
traceback.print_exception(info[0], info[1], info[2], None, f) | |
return None | |
def execute(self): | |
iteration = 0 | |
last_note_count = None | |
while True: | |
iteration += 1 | |
print('[%d] Polling...' % iteration) | |
result = self.get_messages() | |
if result is None: | |
print('[%d] Failed, trying again soon.' % iteration) | |
time.sleep(60) | |
continue | |
print('[%d] %d message(s) returned, %d unread note(s)' % (iteration, len(result['messages']), result['note_count'])) | |
# check notes | |
if last_note_count is not None and result['note_count'] > last_note_count: | |
print('[%d] New notes!' % iteration) | |
self.pushbullet_note(result['note_count']) | |
last_note_count = result['note_count'] | |
# check messages | |
new_count = 0 | |
too_old_count = 0 | |
old_threshold = time.time() - (86400 * 2) | |
for msg in result['messages']: | |
if self.db_has_seen_message(msg): | |
continue | |
safe_print('%s - %s' % (time.strftime('%c', time.gmtime(msg['timestamp'])), repr(msg))) | |
if msg['timestamp'] > old_threshold: | |
self.pushbullet_message(msg) | |
new_count += 1 | |
else: | |
too_old_count += 1 | |
self.db_mark_message_as_seen(msg) | |
if new_count > 0 or too_old_count > 0: | |
self.db.commit() | |
print('[%d] %d new message(s) pushed, %d held back due to age' % (iteration, new_count, too_old_count)) | |
# delay until the next round! | |
delay = random.randint(240, 300) | |
print('[%d] Waiting for %d seconds' % (iteration, delay)) | |
time.sleep(delay) | |
def main(): | |
# Obtain and read the configuration file | |
if len(sys.argv) <= 1: | |
config_path = 'config.json' | |
print('Configuration file not specified, defaulting to ./config.json') | |
elif len(sys.argv) == 2: | |
config_path = sys.argv[1] | |
print('Reading configuration from %s' % config_path) | |
else: | |
print('Usage: python %s [config.json]' % sys.argv[0]) | |
return | |
with open(config_path, 'r') as f: | |
raw_config = f.read() | |
try: | |
config = json.loads(raw_config) | |
except ValueError: | |
print('JSON parsing error while reading configuration!') | |
raise | |
# Work on it! | |
n = Notifier(config) | |
n.execute() | |
if __name__ == '__main__': | |
main() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
If you import python's
calendar
module, you can write line 48 asWriting it out by hand is fine, but it always just looks cleaner to let something that already did the work for you continue to do said work