Skip to content

Instantly share code, notes, and snippets.

@csghone
Created May 7, 2019 10:31
Show Gist options
  • Select an option

  • Save csghone/835617bc17d45e78267a4cc37291c9c8 to your computer and use it in GitHub Desktop.

Select an option

Save csghone/835617bc17d45e78267a4cc37291c9c8 to your computer and use it in GitHub Desktop.
Selenium Wrapper Class for Python
#!/usr/bin/env python3
# Setup and Usage:
# - sudo -H pip3 install selenium pytz retrying pandas bs4`
# - Download geckodriver and copy to same directory as this file
# - python3 get_ransquawk_data.py
import os
import sys
import traceback
import argparse
import datetime
import time
import json
import subprocess
import logging
import logging.handlers
import pytz
import pandas
import retrying
from selenium import webdriver
from selenium.webdriver.firefox.options import Options
from selenium.webdriver.support.ui import Select
from selenium.webdriver.common.keys import Keys
from bs4 import BeautifulSoup
# Use these two lines in all files
logger = logging.getLogger(__name__)
logger.propagate = True
# Call setup_logging() only in file with def main()
# LOG_FORMATTER and def setup_logging() can be moved to a common file for reuse.
LOG_FORMATTER = logging.Formatter(
"%(asctime)s.%(msecs)03d - %(name)s - %(levelname)s - " +
"%(lineno)s - %(funcName)s - " +
"%(message)s",
"%Y%m%d %H:%M:%S")
def setup_logging(inp_file, level=logging.INFO, enable_console=True):
file_log_handler = logging.handlers.RotatingFileHandler(
"__" + os.path.basename(inp_file) + ".main__" + ".log",
maxBytes=1000000,
backupCount=5)
console_log_handler = logging.StreamHandler()
root_logger = logging.getLogger()
root_logger.addHandler(file_log_handler)
if enable_console:
root_logger.addHandler(console_log_handler)
root_logger.setLevel(level)
for handler in logger.root.handlers:
handler.setFormatter(fmt=LOG_FORMATTER)
class SeleniumWrapper:
def __init__(self, gecko_path=None):
options = Options()
if logging.getLogger().level > logging.DEBUG:
options.add_argument("--headless")
download_folder = os.path.join(os.environ.get("HOME"), "Downloads")
if gecko_path == None:
gecko_path = os.path.join(
os.path.realpath(os.path.dirname(__file__)),
"geckodriver"
)
fp = webdriver.FirefoxProfile()
fp.set_preference("browser.download.folderList", 2)
fp.set_preference("browser.helperApps.alwaysAsk.force", False)
fp.set_preference("browser.download.manager.showWhenStarting",False)
fp.set_preference("browser.download.dir", download_folder)
fp.set_preference("browser.download.downloadDir", download_folder)
fp.set_preference("browser.download.defaultFolder", download_folder)
fp.set_preference("browser.download.forbid_open_with", True)
fp.set_preference(
"browser.helperApps.neverAsk.saveToDisk",
"application/text, application/csv, text/csv, application/vnd.ms-excel")
self.fp = fp
driver = webdriver.Firefox(
executable_path=gecko_path,
firefox_profile=fp, options=options)
self.driver = driver
def get_url(self, url):
self.driver.get(url)
def get_driver_fxn(self, element_type="id"):
driver = self.driver
if element_type == "class":
driver_fxn = driver.find_elements_by_class_name
elif element_type == "id":
driver_fxn = driver.find_elements_by_id
else:
assert False
return driver_fxn
@retrying.retry(wait_fixed=1000,
stop_max_attempt_number=10,
retry_on_result=lambda x: x is None)
def get_element(self, element_id, element_type="id", element_index=0):
driver_fxn = self.get_driver_fxn(element_type=element_type)
try:
element = driver_fxn(element_id)[element_index]
except IndexError:
logger.error("Cannot find element - Type: %s, Name: %s",
element_type, element_id)
return None
return element
def get_displayed_element(self, element_id, element_type="id"):
driver_fxn = self.get_driver_fxn(element_type=element_type)
elements = driver_fxn(element_id)
for element in elements:
if element.is_displayed():
return element
def login(self, **kwargs):
if kwargs.get("url") is not None:
self.get_url(kwargs["url"])
creds = json.load(open(kwargs["credentials_json"]))
login_element = self.get_displayed_element(kwargs["login_element"])
login_element.clear()
login_element.send_keys(creds["username"])
passwd_element = self.get_displayed_element(kwargs["password_element"])
passwd_element.clear()
passwd_element.send_keys(creds["password"] + Keys.ENTER)
@retrying.retry(wait_fixed=1000,
stop_max_attempt_number=10,
retry_on_result=lambda x: x != 0)
def click_element(self, element_id, element_type="id", element_index=0, validate_text=None):
element = self.get_element(
element_id=element_id, element_type=element_type)
if validate_text is not None:
if element.text != validate_text:
return -1
element.click()
return 0
def type_in_element(self, element_id, input_text, element_type="id"):
element = self.get_element(element_id, element_type=element_type)
element.clear()
element.send_keys(input_text)
def get_page_source(self):
return self.driver.page_source
def __del__(self):
if logging.getLogger().level > logging.DEBUG:
self.driver.close()
else:
logger.warning("Driver not closed. Use 'killall geckodriver'")
def get_cur_utc_date():
cur_time_epoch = time.time()
cur_utc_dt_no_tz = datetime.datetime.utcfromtimestamp(cur_time_epoch)
cur_utc_dt = pytz.utc.localize(cur_utc_dt_no_tz)
return cur_utc_dt
def process(**kwargs):
inp_dt = kwargs["inp_date"]
out_file = "{}-WEBDATA.csv".format(inp_dt.strftime("%Y%m%d"))
wrapper = SeleniumWrapper()
wrapper.get_url("https://somewebsite.com/users/sign_in")
wrapper.login(
login_element="user_email",
password_element="user_password",
credentials_json=os.path.join(os.path.dirname(__file__), "creds.json")
)
# Click on elements - loop till they are available and validate the text contents before clicking.
wrapper.click_element("skipbutton", element_type="class", validate_text="CheckThis")
wrapper.click_element("menu_button", element_type="class", validate_text="CheckThis")
# Type in text-boxes
wrapper.type_in_element(
"date",
inp_dt.strftime("%Y-%m-%d") + Keys.ENTER)
# Parse a TABLE and save it as CSV
table_container = wrapper.get_element(
"table_container", element_type="class")
soup = BeautifulSoup(wrapper.get_page_source(), 'lxml')
table = soup.find_all('table')[0]
calendar = pandas.read_html(str(table),header=0)[0]
calendar.to_csv(out_file)
logger.info("CSV generated: %s", out_file)
return 0
def main():
parser = argparse.ArgumentParser(description="Web Downloader")
parser.add_argument(
"-d",
"--inp_date",
dest="inp_date",
help="Input date in %%Y%%m%%d",
type=lambda d: datetime.datetime.strptime(d, "%Y%m%d"),
default=get_cur_utc_date()
)
myargs = parser.parse_args()
return process(**vars(myargs))
if __name__ == "__main__":
setup_logging(__file__, level=logging.INFO)
try:
sys.exit(main()) # Ensure return value is passed to shell
except Exception as error: # pylint: disable=W0702, W0703
exc_mesg = traceback.format_exc()
logger.error("\n%s", exc_mesg)
logger.error("Error: %s", error)
error_message = str(exc_mesg) + "\n" + str(error)
sys.exit(-1)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment