Last active
July 5, 2017 03:29
-
-
Save jashsu/22d0f896b2f07b28793f94ad3eb0fd7f to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import requests, json, smtplib, sys | |
from lxml import etree | |
from StringIO import StringIO | |
from time import sleep, ctime | |
from random import random | |
from email.MIMEMultipart import MIMEMultipart | |
from email.MIMEText import MIMEText | |
from getpass import getpass | |
class NoPledgesError(Exception): | |
pass | |
class InvalidEmailError(Exception): | |
pass | |
class InvalidURLError(Exception): | |
pass | |
class InvalidChoiceError(Exception): | |
pass | |
def validate_email(email): | |
try: | |
email_split = email.split('@') | |
assert len(email_split) == 2 | |
assert len(email_split[1].split('.')) == 2 | |
except AssertionError as e: | |
print "Error: Invalid email." | |
raise InvalidEmailError | |
return | |
def setup(): | |
#prompt user for setup info | |
print "Please enter configuration or press enter to load config file" | |
print '--------------------------------------------------------------------------------' | |
email_recipient = raw_input('Email recipient: ') | |
if email_recipient == '': | |
try: | |
pass | |
#read from file | |
except IOError as e: | |
print "Error: Failed to read configuration file." | |
raise | |
else: | |
validate_email(email_recipient) | |
email_user = raw_input('Email server user (default: same): ') or email_recipient | |
email_pswd = getpass('Email server pswd: ') | |
email_addr = raw_input('Email server addr (default: smtp.gmail.com): ') or 'smtp.gmail.com' | |
assert type(email_addr) == str | |
email_port = raw_input('Email server port (default: 587): ') or 587 | |
assert type(email_port) == int | |
assert 1 <= email_port <= 65536 | |
url = raw_input('Enter project URL: ') | |
return setup_part2(email_recipient, email_user, email_pswd, email_addr, email_port, url, None) | |
def setup_part2(email_recipient, email_user, email_pswd, email_addr, email_port, url, choice): | |
max_reward_desc = 60 | |
print 'Fetching data...' | |
#validate | |
try: | |
split_url = url.split('projects/') | |
assert len(split_url) == 2 | |
assert split_url[0] in ('https://www.kickstarter.com/', 'https://www.kickstarter.com/') | |
assert len(split_url[1].split('/')) > 1 | |
except AssertionError as e: | |
print "Error: Invalid URL." | |
raise InvalidURLError | |
#test cases: | |
#https://www.kickstarter.com/projects/2093152267/losswords-a-game-of-literary-portions/comments | |
#https://www.kickstarter.com/projects/2093152267/losswords-a-game-of-literary-portions?ref=discover_potd | |
#https://www.kickstarter.com/projects/2093152267/losswords-a-game-of-literary-portions/pledge/new?clicked_reward=false&ref=discover_potd | |
#clean the base url | |
url = '/'.join(url.split('/')[:6]) | |
url = url.split('?')[0] | |
pledge_url = url + '/pledge/new' | |
#get reward levels | |
session = requests.Session() | |
r = session.get(pledge_url) | |
session.close() | |
rt = etree.parse(StringIO(r.text), etree.HTMLParser(encoding = 'UTF-8')) | |
project_title = rt.xpath('//*[contains(@class, "project-context__title")]')[0].text.\ | |
replace(u"\u2018", "'").replace(u"\u2019", "'").\ | |
replace(u"\u2122", "tm") | |
reward_data = [json.loads(i.attrib['data-reward']) for i in rt.xpath('//*[contains(concat(" ", @class, " "), " pledge-selectable ")]')] | |
if len(reward_data) == 0: | |
print 'Error: Project has no active pledge levels.' | |
raise NoPledgesException | |
if choice is None: | |
#prompt user to select reward level | |
print '--------------------------------------------------------------------------------' | |
for x in xrange(len(reward_data)): | |
print '[{index: >2}] ${min: <5}: {desc}'.format(index = x+1,\ | |
min = int(reward_data[x].get('minimum')),\ | |
desc = ''.join(reward_data[x].get('reward').\ | |
replace(u"\u2018", "'").replace(u"\u2019", "'"). | |
replace(u"\u2122", "tm").\ | |
splitlines())[0:max_reward_desc]) | |
print '--------------------------------------------------------------------------------' | |
choice = raw_input('Choose reward (default: exit): ') or None | |
if choice == None: | |
sys.exit() | |
#validate | |
try: | |
choice = int(choice) | |
#fixup zero indexing | |
choice -= 1 | |
assert 0 <= choice <= 99 | |
except: | |
print "Error: Invalid choice." | |
raise InvalidChoiceError | |
#clean reward | |
reward = reward_data[choice] | |
reward['reward'] = reward['reward'].\ | |
replace(u"\u2018", "'").replace(u"\u2019", "'").\ | |
replace(u"\u2122", "tm") | |
#done | |
return {'pledge_url': pledge_url,\ | |
'project_title': project_title,\ | |
'reward': reward_data[choice],\ | |
'email_recipient': email_recipient,\ | |
'email_user': email_user,\ | |
'email_pswd': email_pswd,\ | |
'email_addr': email_addr,\ | |
'email_port': email_port} | |
def monitor(config): | |
reward = config['reward'] | |
sleep_time = 8 | |
sleep_rand = 4 | |
session = requests.Session() | |
print 'Monitoring... (sleep {sleep_time}+/-{sleep_rand}sec)'.format(sleep_time = sleep_time, sleep_rand = sleep_rand) | |
while True: | |
url = reward['urls']['api']['reward'] | |
sig = url.split("signature=",1)[1].split('.') | |
print(ctime(float(sig[0])) + " hash:" + sig[1]) | |
r = session.get(url) | |
reward = json.loads(r.text) | |
remaining = reward['remaining'] | |
if remaining > 0: | |
break | |
else: | |
sleep(sleep_time + random() * sleep_rand) | |
def send_email(config): | |
max_reward_desc = 40 | |
print('Composing email...') | |
message = MIMEMultipart() | |
message['From'] = 'noreply' | |
message['To'] = config['email_recipient'] | |
message['Subject'] = 'KSMONITOR ALERT - [{project_title}]'.format(project_title = config['project_title']) | |
html = """<html><head></head><body><p>The requested reward is available!<br><br><a href="{pledge_url}">{desc}</a></p></body></html>""".\ | |
format(pledge_url = config['pledge_url'], desc = ''.join(config['reward'].get('reward').splitlines())) | |
message.attach(MIMEText(html, 'html')) | |
print('Sending email...') | |
smtp = smtplib.SMTP() | |
smtp.connect(config['email_addr'], config['email_port']) | |
smtp.ehlo() | |
smtp.starttls() | |
smtp.login(config['email_user'], config['email_pswd']) | |
smtp.sendmail('noreply', config['email_recipient'], message.as_string()) | |
smtp.close() | |
print('Email sent!') | |
def main(args): | |
print '' | |
#print len(args) | |
#print args | |
if len(args) > 0: | |
config = setup_part2(*args[1:]) | |
else: | |
config = setup() | |
monitor(config) | |
send_email(config) | |
if __name__ == '__main__': | |
sys.exit(main(sys.argv)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment