Created
May 7, 2019 10:31
-
-
Save csghone/835617bc17d45e78267a4cc37291c9c8 to your computer and use it in GitHub Desktop.
Selenium Wrapper Class for Python
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| # Setup and Usage: | |
| # - sudo -H pip3 install selenium pytz retrying pandas bs4` | |
| # - Download geckodriver and copy to same directory as this file | |
| # - python3 get_ransquawk_data.py | |
| import os | |
| import sys | |
| import traceback | |
| import argparse | |
| import datetime | |
| import time | |
| import json | |
| import subprocess | |
| import logging | |
| import logging.handlers | |
| import pytz | |
| import pandas | |
| import retrying | |
| from selenium import webdriver | |
| from selenium.webdriver.firefox.options import Options | |
| from selenium.webdriver.support.ui import Select | |
| from selenium.webdriver.common.keys import Keys | |
| from bs4 import BeautifulSoup | |
| # Use these two lines in all files | |
| logger = logging.getLogger(__name__) | |
| logger.propagate = True | |
| # Call setup_logging() only in file with def main() | |
| # LOG_FORMATTER and def setup_logging() can be moved to a common file for reuse. | |
| LOG_FORMATTER = logging.Formatter( | |
| "%(asctime)s.%(msecs)03d - %(name)s - %(levelname)s - " + | |
| "%(lineno)s - %(funcName)s - " + | |
| "%(message)s", | |
| "%Y%m%d %H:%M:%S") | |
| def setup_logging(inp_file, level=logging.INFO, enable_console=True): | |
| file_log_handler = logging.handlers.RotatingFileHandler( | |
| "__" + os.path.basename(inp_file) + ".main__" + ".log", | |
| maxBytes=1000000, | |
| backupCount=5) | |
| console_log_handler = logging.StreamHandler() | |
| root_logger = logging.getLogger() | |
| root_logger.addHandler(file_log_handler) | |
| if enable_console: | |
| root_logger.addHandler(console_log_handler) | |
| root_logger.setLevel(level) | |
| for handler in logger.root.handlers: | |
| handler.setFormatter(fmt=LOG_FORMATTER) | |
| class SeleniumWrapper: | |
| def __init__(self, gecko_path=None): | |
| options = Options() | |
| if logging.getLogger().level > logging.DEBUG: | |
| options.add_argument("--headless") | |
| download_folder = os.path.join(os.environ.get("HOME"), "Downloads") | |
| if gecko_path == None: | |
| gecko_path = os.path.join( | |
| os.path.realpath(os.path.dirname(__file__)), | |
| "geckodriver" | |
| ) | |
| fp = webdriver.FirefoxProfile() | |
| fp.set_preference("browser.download.folderList", 2) | |
| fp.set_preference("browser.helperApps.alwaysAsk.force", False) | |
| fp.set_preference("browser.download.manager.showWhenStarting",False) | |
| fp.set_preference("browser.download.dir", download_folder) | |
| fp.set_preference("browser.download.downloadDir", download_folder) | |
| fp.set_preference("browser.download.defaultFolder", download_folder) | |
| fp.set_preference("browser.download.forbid_open_with", True) | |
| fp.set_preference( | |
| "browser.helperApps.neverAsk.saveToDisk", | |
| "application/text, application/csv, text/csv, application/vnd.ms-excel") | |
| self.fp = fp | |
| driver = webdriver.Firefox( | |
| executable_path=gecko_path, | |
| firefox_profile=fp, options=options) | |
| self.driver = driver | |
| def get_url(self, url): | |
| self.driver.get(url) | |
| def get_driver_fxn(self, element_type="id"): | |
| driver = self.driver | |
| if element_type == "class": | |
| driver_fxn = driver.find_elements_by_class_name | |
| elif element_type == "id": | |
| driver_fxn = driver.find_elements_by_id | |
| else: | |
| assert False | |
| return driver_fxn | |
| @retrying.retry(wait_fixed=1000, | |
| stop_max_attempt_number=10, | |
| retry_on_result=lambda x: x is None) | |
| def get_element(self, element_id, element_type="id", element_index=0): | |
| driver_fxn = self.get_driver_fxn(element_type=element_type) | |
| try: | |
| element = driver_fxn(element_id)[element_index] | |
| except IndexError: | |
| logger.error("Cannot find element - Type: %s, Name: %s", | |
| element_type, element_id) | |
| return None | |
| return element | |
| def get_displayed_element(self, element_id, element_type="id"): | |
| driver_fxn = self.get_driver_fxn(element_type=element_type) | |
| elements = driver_fxn(element_id) | |
| for element in elements: | |
| if element.is_displayed(): | |
| return element | |
| def login(self, **kwargs): | |
| if kwargs.get("url") is not None: | |
| self.get_url(kwargs["url"]) | |
| creds = json.load(open(kwargs["credentials_json"])) | |
| login_element = self.get_displayed_element(kwargs["login_element"]) | |
| login_element.clear() | |
| login_element.send_keys(creds["username"]) | |
| passwd_element = self.get_displayed_element(kwargs["password_element"]) | |
| passwd_element.clear() | |
| passwd_element.send_keys(creds["password"] + Keys.ENTER) | |
| @retrying.retry(wait_fixed=1000, | |
| stop_max_attempt_number=10, | |
| retry_on_result=lambda x: x != 0) | |
| def click_element(self, element_id, element_type="id", element_index=0, validate_text=None): | |
| element = self.get_element( | |
| element_id=element_id, element_type=element_type) | |
| if validate_text is not None: | |
| if element.text != validate_text: | |
| return -1 | |
| element.click() | |
| return 0 | |
| def type_in_element(self, element_id, input_text, element_type="id"): | |
| element = self.get_element(element_id, element_type=element_type) | |
| element.clear() | |
| element.send_keys(input_text) | |
| def get_page_source(self): | |
| return self.driver.page_source | |
| def __del__(self): | |
| if logging.getLogger().level > logging.DEBUG: | |
| self.driver.close() | |
| else: | |
| logger.warning("Driver not closed. Use 'killall geckodriver'") | |
| def get_cur_utc_date(): | |
| cur_time_epoch = time.time() | |
| cur_utc_dt_no_tz = datetime.datetime.utcfromtimestamp(cur_time_epoch) | |
| cur_utc_dt = pytz.utc.localize(cur_utc_dt_no_tz) | |
| return cur_utc_dt | |
| def process(**kwargs): | |
| inp_dt = kwargs["inp_date"] | |
| out_file = "{}-WEBDATA.csv".format(inp_dt.strftime("%Y%m%d")) | |
| wrapper = SeleniumWrapper() | |
| wrapper.get_url("https://somewebsite.com/users/sign_in") | |
| wrapper.login( | |
| login_element="user_email", | |
| password_element="user_password", | |
| credentials_json=os.path.join(os.path.dirname(__file__), "creds.json") | |
| ) | |
| # Click on elements - loop till they are available and validate the text contents before clicking. | |
| wrapper.click_element("skipbutton", element_type="class", validate_text="CheckThis") | |
| wrapper.click_element("menu_button", element_type="class", validate_text="CheckThis") | |
| # Type in text-boxes | |
| wrapper.type_in_element( | |
| "date", | |
| inp_dt.strftime("%Y-%m-%d") + Keys.ENTER) | |
| # Parse a TABLE and save it as CSV | |
| table_container = wrapper.get_element( | |
| "table_container", element_type="class") | |
| soup = BeautifulSoup(wrapper.get_page_source(), 'lxml') | |
| table = soup.find_all('table')[0] | |
| calendar = pandas.read_html(str(table),header=0)[0] | |
| calendar.to_csv(out_file) | |
| logger.info("CSV generated: %s", out_file) | |
| return 0 | |
| def main(): | |
| parser = argparse.ArgumentParser(description="Web Downloader") | |
| parser.add_argument( | |
| "-d", | |
| "--inp_date", | |
| dest="inp_date", | |
| help="Input date in %%Y%%m%%d", | |
| type=lambda d: datetime.datetime.strptime(d, "%Y%m%d"), | |
| default=get_cur_utc_date() | |
| ) | |
| myargs = parser.parse_args() | |
| return process(**vars(myargs)) | |
| if __name__ == "__main__": | |
| setup_logging(__file__, level=logging.INFO) | |
| try: | |
| sys.exit(main()) # Ensure return value is passed to shell | |
| except Exception as error: # pylint: disable=W0702, W0703 | |
| exc_mesg = traceback.format_exc() | |
| logger.error("\n%s", exc_mesg) | |
| logger.error("Error: %s", error) | |
| error_message = str(exc_mesg) + "\n" + str(error) | |
| sys.exit(-1) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment