Created
January 14, 2018 21:34
-
-
Save yuezhu/fd9cf73e4997a3bda2c1d76d9d42f18a to your computer and use it in GitHub Desktop.
Lenovo Outlet Scanner
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import urllib | |
import re | |
import logging | |
import sys | |
import time | |
import webbrowser | |
import pprint | |
import argparse | |
import subprocess | |
import shlex | |
from HTMLParser import HTMLParser | |
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(message)s') | |
IGNORED_ITEMS_FILE = 'lenovo_outlet_scanner_ignored_items' | |
INFO_FIELD_DEF = ( | |
(r'(ThinkPad .+) - (.+)', ('model', 'condition')), | |
(r'Part number: (.+)', ('pn', )), | |
(r'(Limited quantity available|Out of stock|In stock)', ('inventory', )), | |
(r'List price: (.+)', ('price', )), | |
(r'Processor: (.+)', ('processor', )), | |
(r'Operating system: (.+)', ('os', )), | |
(r'Display: (.+)', ('display', )), | |
(r'Graphics: (.+)', ('graphics', )), | |
(r'Memory: (.+)', ('memory', )), | |
(r'Hard Drive: (.+)', ('hdd', )), | |
(r'Optical Drive: (.+)', ('optical', )) | |
) | |
UA_HDR = 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_11_4) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/53.0.2785.143 Safari/537.36' | |
URL = 'http://outlet.lenovo.com/outlet_us/laptops/' | |
PN_URL = 'http://outlet.lenovo.com/outlet_us/itemdetails/%s/445' | |
DEF_INTERVAL = 180 | |
NUM_ATTEMPTS = 3 | |
# Using urllib2 would cause HTTP 403 errors. | |
def notify(title, subtitle, info_text, delay=0, sound=False, userInfo={}): | |
if sys.platform != 'darwin': | |
return | |
import Foundation | |
import objc | |
import AppKit | |
NSUserNotification = objc.lookUpClass('NSUserNotification') | |
NSUserNotificationCenter = objc.lookUpClass('NSUserNotificationCenter') | |
notification = NSUserNotification.alloc().init() | |
notification.setTitle_(title) | |
notification.setSubtitle_(subtitle) | |
notification.setInformativeText_(info_text) | |
notification.setUserInfo_(userInfo) | |
if sound: | |
notification.setSoundName_("NSUserNotificationDefaultSoundName") | |
notification.setDeliveryDate_(Foundation.NSDate.dateWithTimeInterval_sinceDate_(delay, Foundation.NSDate.date())) | |
NSUserNotificationCenter.defaultUserNotificationCenter().scheduleNotification_(notification) | |
def progressbar(it, prefix='', length=40, dismiss=True, percentage=True): | |
count = float(len(it)) | |
def _display(index): | |
progress = int(length * index / count) | |
if percentage: | |
sys.stderr.write( | |
u'\r{}|{}{}| {:.2f}% '.format( | |
prefix, u'\u2588' * progress, u' ' * (length - progress), index / count * 100 | |
) | |
) | |
else: | |
sys.stderr.write( | |
u'\r{}|{}{}| '.format( | |
prefix, u'\u2588' * progress, u' ' * (length - progress) | |
) | |
) | |
_display(0) | |
for index, item in enumerate(it): | |
yield item | |
_display(index + 1) | |
if dismiss: | |
sys.stderr.write('\x1b[2K\r') | |
else: | |
sys.stderr.write('\n') | |
class ItemPageParser(HTMLParser): | |
def __init__(self): | |
HTMLParser.__init__(self) | |
self.data = '' | |
def handle_data(self, data): | |
self.data += data | |
def parse(self): | |
self.data = re.sub(r'\t', r'', self.data) | |
self.data = re.sub(r'((\r\n)+|\n+)', r'\n', self.data) | |
self.data = re.sub(r':\n+', r': ', self.data) | |
self.data = re.sub(r'List price: (.+?)\n(.+?)\n', r'List price: \2\n', self.data) | |
def match(info, criteria): | |
try: | |
item = criteria[info['model']] | |
except KeyError: | |
return False | |
for key, val in item.items(): | |
if key == 'price' and float(re.sub(r'[$,]', r'', info[key])) > val: | |
return False | |
if key in ('processor', 'os', 'inventory', 'condition') and \ | |
not [v for v in val if v in unicode(info[key], 'utf-8')]: | |
return False | |
return True | |
def parse_item_details(page): | |
parser = ItemPageParser() | |
parser.feed(page) | |
parser.parse() | |
info = {} | |
for line in parser.data.splitlines(): | |
for regex, fields in INFO_FIELD_DEF: | |
found = re.search(regex, line) | |
if found is None: | |
continue | |
for index, field in enumerate(fields): | |
info[field] = found.group(index + 1) | |
break | |
try: | |
logging.info( | |
'%s, %s, %s, %s, P/N URL: %s', | |
info['model'], info['condition'], info['price'], info['inventory'], | |
PN_URL % info['pn'] | |
) | |
except KeyError as ex: | |
logging.error('Page: %s\n, missing: %s', parser.data, ex) | |
return None | |
return info | |
def fetch_outlet_info(): | |
cmd = 'curl -s -k -H \'User-Agent: %s\' -X GET %s' % (UA_HDR, URL) | |
proc = subprocess.Popen(shlex.split(cmd), stdout=subprocess.PIPE, stderr=subprocess.PIPE) | |
page, error = proc.communicate() | |
if proc.returncode != 0: | |
logging.error(error) | |
raise RuntimeError('Failed to GET: %r' % cmd) | |
category_id = re.findall(r'var f_categoryid = "(.+?)";', page, re.DOTALL) | |
if category_id: | |
category_id = category_id[0] | |
logging.debug('Category ID: %s', category_id) | |
else: | |
raise RuntimeError('Could not find category ID in the page') | |
result_url = re.findall(r'var f_resultsUrl = "(.+?)";', page, re.DOTALL) | |
if result_url: | |
result_url = 'http:' + result_url[0] | |
logging.debug('Result URL: %s', result_url) | |
else: | |
raise RuntimeError('Could not find result URL in the page') | |
return category_id, result_url | |
def fetch_items(result_url, facet_data): | |
cmd = 'curl -s -k -H \'User-Agent: %s\' -X POST %s -d %s' % (UA_HDR, result_url, urllib.urlencode(facet_data)) | |
proc = subprocess.Popen(shlex.split(cmd), stdout=subprocess.PIPE, stderr=subprocess.PIPE) | |
page, error = proc.communicate() | |
if proc.returncode != 0: | |
logging.error(error) | |
raise RuntimeError('Failed to POST: %r' % cmd) | |
items = re.findall(r'var fitems = \[\s*(.+?)\s*\];', page, re.DOTALL) | |
if not items or not items[0].strip(): | |
items = [] | |
else: | |
items = [x.strip("'") for x in re.split(r',\s+', items[0])] | |
item_url_prefix = re.findall(r'var infiniteUrl = \'(.+?)\';', page, re.DOTALL) | |
if item_url_prefix: | |
item_url_prefix = 'http:' + item_url_prefix[0] | |
logging.debug('Item URL prefix: %s', item_url_prefix) | |
else: | |
raise RuntimeError('Could not find item URL prefix in the page') | |
return item_url_prefix, items | |
def ignored(item): | |
items = [] | |
try: | |
with open(IGNORED_ITEMS_FILE, 'r') as fptr: | |
for line in fptr: | |
items.extend(re.split(r',\s*', line.strip())) | |
except (OSError, IOError): | |
pass | |
return item in items | |
def check_item(item, item_url, criteria): | |
cmd = 'curl -s -k -H \'User-Agent: %s\' -H \'Referer: %s\' -X GET %s' % (UA_HDR, URL, item_url) | |
proc = subprocess.Popen(shlex.split(cmd), stdout=subprocess.PIPE, stderr=subprocess.PIPE) | |
page, error = proc.communicate() | |
if proc.returncode != 0: | |
logging.error(error) | |
raise RuntimeError('Failed to GET: %r' % cmd) | |
if 'HTTP/1.1 429 Too Many Requests' in page: | |
raise RuntimeError('HTTP/1.1 429 Too Many Requests') | |
if 'Our system has identified this session as possible bot. ' \ | |
'You are being redirected to requested page. Wait a second.' in page: | |
raise RuntimeError('Triggered anti-bot detection') | |
info = parse_item_details(page) | |
if info is None: | |
logging.error('Failed to parse item page\n%s', page) | |
raise RuntimeError('Failed to parse item page') | |
if match(info, criteria): | |
logging.info('Item %s matched: \n%s', item, pprint.pformat(info)) | |
pn_url = PN_URL % info['pn'] | |
if ignored(item): | |
logging.info('Item %s is ignored. P/N URL: %s', item, pn_url) | |
return | |
if sys.platform == 'darwin': | |
time.sleep(1) | |
try: | |
webbrowser.open_new_tab(pn_url) | |
except webbrowser.Error as ex: | |
logging.error(ex) | |
# notify('Lenovo Outlet Scanner', info['model'], 'Price: ' + info['price'], sound=True) | |
def check(criteria): | |
for attempt in xrange(1, NUM_ATTEMPTS + 1): | |
try: | |
category_id, result_url = fetch_outlet_info() | |
except RuntimeError as ex: | |
logging.error('Failed to parse page content, %s, attempt # %d', ex, attempt) | |
else: | |
break | |
else: | |
return | |
data = { | |
'T series': { | |
'category-id': category_id, | |
'sort-criteria': '1', | |
'page-size': '100', | |
'facet-1':'1', # New | |
'facet-2':'1', # ThinkPad | |
'facet-3':'14', # T series | |
'facet-5':'4', # 14 inch screen | |
# 'facet-11':'2' # no touchscreen | |
}, | |
# 'P series': { | |
# 'category-id': category_id, | |
# 'sort-criteria': '2', | |
# 'page-size': '100', | |
# 'facet-1':'1', # New | |
# 'facet-2':'1', # ThinkPad | |
# 'facet-3':'10', # P series | |
# 'facet-4':'1,2,3', # price < 1199 | |
# 'facet-5':'5', # 15 inch screen | |
# 'facet-11':'2' # no touchscreen | |
# } | |
} | |
items = [] | |
for series, data in data.items(): | |
for attempt in xrange(1, NUM_ATTEMPTS + 1): | |
try: | |
item_url_prefix, series_items = fetch_items(result_url, data) | |
logging.info('%d item(s) found for %s', len(series_items), series) | |
items.extend(series_items) | |
except RuntimeError as ex: | |
logging.error('Failed to parse page content, %s, attempt # %d', ex, attempt) | |
else: | |
break | |
else: | |
continue | |
if not items: | |
return | |
for item in items: | |
item_url = item_url_prefix + '&page=1&itemid=' + item | |
logging.debug('Fetch item URL: %s', item_url) | |
for attempt in xrange(1, NUM_ATTEMPTS + 1): | |
try: | |
check_item(item, item_url, criteria) | |
except RuntimeError as ex: | |
logging.error('Failed to parse page content, %s, attempt # %d', ex, attempt) | |
time.sleep(3) | |
else: | |
break | |
else: | |
logging.error('Failed to check item: %s', item) | |
time.sleep(3) | |
time.sleep(0.5) | |
def args(): | |
parser = argparse.ArgumentParser() | |
parser.add_argument('--interval', dest='interval', type=int, default=DEF_INTERVAL) | |
return parser.parse_args() | |
def main(): | |
criteria = { | |
'ThinkPad T460s': { | |
'inventory': ('Limited quantity available', 'In stock'), | |
'condition': ('New', ), | |
'processor': ('6300', '6600'), | |
'os': ('Professional', 'Pro', 'professional', 'pro'), | |
'price': 800 | |
}, | |
'ThinkPad P50': { | |
'inventory': ('Limited quantity available', 'In stock'), | |
'condition': ('New', ), | |
'price': 1000 | |
} | |
} | |
interval = args().interval | |
if interval < 0: | |
logging.info('Adjusted polling interval to %d seconds', | |
DEF_INTERVAL) | |
interval = DEF_INTERVAL | |
while True: | |
check(criteria) | |
for _ in progressbar(xrange(interval)): | |
time.sleep(1) | |
if __name__ == '__main__': | |
try: | |
main() | |
except KeyboardInterrupt: | |
logging.info('Quit') | |
sys.exit(0) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Not working
It needs markupbase, but when I pip install it says there is no matching distribution found for markupbase
Any update?