Created
September 25, 2019 14:48
-
-
Save arsaboo/cd95f306a4b99ed0892938c9038dc329 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""Aesop interface.""" | |
import datetime | |
import time | |
import json | |
import logging | |
import os.path | |
import pickle | |
import re | |
from bs4 import BeautifulSoup | |
from dateutil.parser import parse | |
import requests | |
from requests.auth import AuthBase | |
import requests_cache | |
from selenium import webdriver | |
from selenium.common.exceptions import TimeoutException, WebDriverException | |
from selenium.webdriver.support import expected_conditions as EC | |
from selenium.webdriver.common.by import By | |
from selenium.webdriver.support.ui import WebDriverWait | |
from selenium.webdriver.firefox.options import Options | |
from slimit import ast | |
from slimit.parser import Parser | |
from slimit.visitors import nodevisitor | |
_LOGGER = logging.getLogger(__name__) | |
_LOGGER.setLevel(logging.DEBUG) | |
logging.debug("test") | |
HTML_PARSER = 'html.parser' | |
LOGIN_URL = 'https://sub.aesoponline.com/Substitute/Home' | |
LOGIN_TIMEOUT = 10 | |
COOKIE_PATH = './aesop_cookies.pickle' | |
CACHE_PATH = './aesop_cache' | |
USER_AGENT = 'Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) ' \ | |
'Chrome/41.0.2228.0 Safari/537.36' | |
CHROME_WEBDRIVER_ARGS = [ | |
'--headless', '--user-agent={}'.format(USER_AGENT), '--disable-extensions', | |
'--disable-gpu', '--no-sandbox' | |
] | |
CHROMEDRIVER_PATH = 'C:/Users/asaboo/Downloads/chromedriver_76/chromedriver' | |
FIREFOXOPTIONS = Options() | |
FIREFOXOPTIONS.add_argument("--headless") | |
class AESOPError(Exception): | |
"""AESOP error.""" | |
pass | |
def _save_cookies(requests_cookiejar, filename): | |
"""Save cookies to a file.""" | |
with open(filename, 'wb') as handle: | |
pickle.dump(requests_cookiejar, handle) | |
def _load_cookies(filename): | |
"""Load cookies from a file.""" | |
with open(filename, 'rb') as handle: | |
return pickle.load(handle) | |
def _get_primary_status(row): | |
"""Get package primary status.""" | |
try: | |
return row.find('div', {'class': 'pack_h3'}).string | |
except AttributeError: | |
return None | |
def _get_driver(driver_type): | |
"""Get webdriver.""" | |
if driver_type == 'phantomjs': | |
return webdriver.PhantomJS(service_log_path=os.path.devnull) | |
if driver_type == 'firefox': | |
return webdriver.Firefox(firefox_options=FIREFOXOPTIONS) | |
elif driver_type == 'chrome': | |
chrome_options = webdriver.ChromeOptions() | |
for arg in CHROME_WEBDRIVER_ARGS: | |
chrome_options.add_argument(arg) | |
return webdriver.Chrome(CHROMEDRIVER_PATH, chrome_options=chrome_options) | |
else: | |
raise AESOPError('{} not supported'.format(driver_type)) | |
def _login(session): | |
"""Login. | |
Use Selenium webdriver to login. AESOP authenticates users | |
in part by a key generated by complex, obfuscated client-side | |
Javascript, which can't (easily) be replicated in Python. | |
Invokes the webdriver once to perform login, then uses the | |
resulting session cookies with a standard Python `requests` | |
session. | |
""" | |
_LOGGER.debug("attempting login") | |
session.cookies.clear() | |
try: | |
session.remove_expired_responses() | |
except AttributeError: | |
pass | |
try: | |
driver = _get_driver(session.auth.driver) | |
except WebDriverException as exception: | |
raise AESOPError(str(exception)) | |
driver.get('https://sub.aesoponline.com/Substitute/Home') | |
time.sleep (5) | |
htm = driver.page_source | |
_LOGGER.debug(htm) | |
username = driver.find_element_by_id('Username') | |
username.send_keys(session.auth.username) | |
password = driver.find_element_by_id('Password') | |
password.send_keys(session.auth.password) | |
driver.find_element_by_id('qa-button-login').click() | |
try: | |
WebDriverWait(driver, LOGIN_TIMEOUT).until( | |
EC.presence_of_element_located((By.ID, "accountBox"))) | |
except TimeoutException: | |
raise AESOPError('login failed') | |
for cookie in driver.get_cookies(): | |
session.cookies.set(name=cookie['name'], value=cookie['value']) | |
_save_cookies(session.cookies, session.auth.cookie_path) | |
def authenticated(function): | |
"""Re-authenticate if session expired.""" | |
def wrapped(*args): | |
"""Wrap function.""" | |
try: | |
return function(*args) | |
except AESOPError: | |
_LOGGER.info("attempted to access page before login") | |
_login(args[0]) | |
return function(*args) | |
return wrapped | |
@authenticated | |
def get_profile(session): | |
"""Get profile data.""" | |
response = session.get(LOGIN_URL, allow_redirects=False) | |
if response.status_code == 302: | |
raise AESOPError('expired session') | |
soup = BeautifulSoup(response.text, HTML_PARSER) | |
_LOGGER.debug(soup) | |
pattern = re.compile('(?<=var pageVars = ).*(?=<\/script>\n<style>?)') | |
search = pattern.findall(soup.text) | |
_LOGGER.debug(search) | |
return search | |
@authenticated | |
def get_profile_js(session): | |
"""Get profile data.""" | |
response = session.get(LOGIN_URL, allow_redirects=False) | |
if response.status_code == 302: | |
raise AESOPError('expired session') | |
# soup = BeautifulSoup(response.text, HTML_PARSER) | |
soup = BeautifulSoup(response.text, 'lxml') | |
_LOGGER.debug(soup) | |
json_string = re.search(r'(?<=var pageVars = ).*(?=<\/script>\n<style>?)', str(soup.find('script')), flags=re.DOTALL) | |
json_data = json.loads('{' + json_string[1] + '}') | |
_LOGGER.debug(json_data) | |
return parser | |
# pylint: disable=too-many-arguments | |
def get_session(username, password, cookie_path=COOKIE_PATH, cache=True, | |
cache_expiry=300, cache_path=CACHE_PATH, driver='chrome'): | |
"""Get session, existing or new.""" | |
class AESOPAuth(AuthBase): # pylint: disable=too-few-public-methods | |
"""AESOP authorization storage.""" | |
def __init__(self, username, password, cookie_path, driver): | |
"""Init.""" | |
self.username = username | |
self.password = password | |
self.cookie_path = cookie_path | |
self.driver = driver | |
def __call__(self, r): | |
"""Call is no-op.""" | |
return r | |
session = requests.Session() | |
if cache: | |
session = requests_cache.core.CachedSession(cache_name=cache_path, | |
expire_after=cache_expiry) | |
session.auth = AESOPAuth(username, password, cookie_path, driver) | |
session.headers.update({'User-Agent': USER_AGENT}) | |
if os.path.exists(cookie_path): | |
_LOGGER.debug("cookie found at: %s", cookie_path) | |
session.cookies = _load_cookies(cookie_path) | |
else: | |
_login(session) | |
return session |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment