Last active
May 22, 2019 00:13
-
-
Save kizernis/d49da5d98467a9662da845babc77b6f5 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Get data from a complex webapp using a headless browser | |
settings = {} | |
with open('settings_scrap.txt') as f: | |
lines = f.read().splitlines() | |
for line in lines[1:9+1]: | |
x, y = line.split('=', 1) | |
settings[x.strip().replace(' ', '')] = y.strip() | |
settings['firefox'] = 'true' == settings['firefox'].lower() | |
settings['headless'] = 'true' == settings['headless'].lower() | |
settings['additional_pause'] = int(settings['additional_pause']) / 1000 | |
settings['ram_threshold'] = float(settings['ram_threshold']) | |
settings['logging'] = 'true' == settings['logging'].lower() | |
dates = lines[12:] | |
dates = list(date.strip().replace(':', '.').replace('_', '.').replace('-', '.').replace(' ', '') for date in dates if date.strip() != '') | |
assert len(dates) == 2 | |
dates = list('{:0>2}.{:0>2}.{:0>4}'.format(*(date.split('.'))) for date in dates) | |
dates.sort(reverse=True) | |
# TODO: let it have a head some day | |
settings['headless'] = True | |
with open('data_codes.txt') as f: | |
data_codes = list(s.strip().replace('/', '.').replace(' ', '') for s in f.read().splitlines() if s.strip() != '') | |
import os | |
import time | |
import csv | |
import gc | |
import psutil | |
from tqdm import tqdm | |
from datetime import datetime, timedelta | |
from bs4 import BeautifulSoup | |
from selenium import webdriver | |
from selenium.webdriver.common.keys import Keys | |
from selenium.webdriver.support.ui import WebDriverWait | |
import selenium.webdriver.support.expected_conditions as EC | |
from selenium.webdriver.common.by import By | |
from selenium.common.exceptions import TimeoutException, NoSuchElementException | |
if settings['firefox']: | |
o = webdriver.FirefoxOptions() | |
if settings['headless']: | |
o.add_argument('--headless') | |
else: | |
o = webdriver.ChromeOptions() | |
o.add_argument('--log-level=3') | |
o.add_argument('--disable-infobars') | |
if settings['headless']: | |
o.add_argument('--headless') | |
o.add_argument('--disable-gpu') | |
else: | |
o.add_argument('--start-maximized') | |
months = ('January', 'February', 'March', 'April', 'May', 'June', 'July', 'August', 'September', 'October', 'November', 'December') | |
months_short_numbers = {'Jan': 1, 'Feb': 2, 'Mar': 3, 'Apr': 4, 'May': 5, 'Jun': 6, 'Jul': 7, 'Aug': 8, 'Sep': 9, 'Oct': 10, 'Nov': 11, 'Dec': 12} | |
csv_column_names = ('Date', 'Time', 'Root', 'Option', 'Qty', 'Price', 'Exchange', 'Condition', 'Market', 'Trade IV', 'Underlying Price') | |
def touch(fname, times=None): | |
with open(fname, 'a'): | |
os.utime(fname, times) | |
def wait_for_new_grid_rows(old_grid_rows): | |
while True: | |
time.sleep(0.05) | |
grid_rows = driver.find_elements_by_xpath('//div[@data-qa="gridRow"]') | |
if grid_rows != old_grid_rows: | |
break | |
try: | |
if len(driver.find_elements_by_xpath('//div[@class="EmptyGridView__centered__1IvU4"]/span[text()="No data available."]')): | |
return None | |
except TimeoutException: | |
continue | |
while True: | |
if len(grid_rows) > 0: | |
break | |
try: | |
if len(driver.find_elements_by_xpath('//div[@class="EmptyGridView__centered__1IvU4"]/span[text()="No data available."]')): | |
return None | |
except TimeoutException: | |
continue | |
time.sleep(0.05) | |
grid_rows = driver.find_elements_by_xpath('//div[@data-qa="gridRow"]') | |
return grid_rows | |
xpath_input_data_code = '//input[@data-qa="symbolNameInput"]' | |
xpath_label_data_code = '//div[@class="DailySnapshot__symbolName__3Nqp2"]' | |
xpath_button_back = '//button[@data-qa="dailySnapshotPrevDay"]' | |
xpath_picked_date = '//span[@class="DateChange__customDayPickerInput__3ac_D"]' | |
def enter_data_code(): | |
global grid_rows, label_data_code, input_data_code | |
label_data_code = driver.find_element_by_xpath(xpath_label_data_code) | |
if label_data_code.text.lower() != data_code.lower(): | |
input_data_code = driver.find_element_by_xpath(xpath_input_data_code) | |
input_data_code.send_keys(data_code) | |
# Handle invalid data codes | |
e = WebDriverWait(driver, 60).until(EC.visibility_of_element_located((By.XPATH, '//div[@class="SymbolSelector__autocomplete__EF_vQ"]'))) | |
if len(e.find_elements_by_xpath('.//span[@data-qa="matchingSymbolsList-noMatches"]')): | |
input_data_code.send_keys(Keys.ESCAPE) | |
touch(f'{settings["output_path"]}/{data_code} is INVALID.csv') | |
return False | |
WebDriverWait(driver, 60).until(EC.visibility_of_element_located((By.XPATH, f'//mark[@class="SymbolSelector__searching__1234y " and translate(text(),"ABCDEFGHIJKLMNOPQRSTUVWXYZ","abcdefghijklmnopqrstuvwxyz")="{data_code.lower()}"]'))) | |
input_data_code.submit() | |
grid_rows = wait_for_new_grid_rows(grid_rows) | |
# Sometimes it doesn't say "No matches" but the data code is still invalid | |
if label_data_code.text.lower() != data_code.lower(): | |
touch(f'{settings["output_path"]}/{data_code} is INVALID.csv') | |
return False | |
return True | |
def new_webdriver_session(): | |
global driver, grid_rows, input_data_code, label_data_code, button_back, picked_date | |
if settings['logging']: | |
global time1, time2 | |
if driver is not None: | |
if settings['logging']: | |
time2 = datetime.now(); log_file.write(f'{time2 - time1} Restarting session: {datetime.now():%H-%M-%S}\n'); time1 = time2 | |
driver.quit() | |
driver = None | |
gc.collect() | |
if settings['firefox']: | |
driver = webdriver.Firefox(options=o, service_log_path=os.devnull) | |
if not settings['headless']: | |
driver.maximize_window() | |
else: | |
driver = webdriver.Chrome(options=o) | |
if settings['headless']: | |
driver.set_window_size(1440, 1080) | |
# Login | |
driver.get(settings['url']) | |
driver.find_element_by_xpath('//input[@name="username"]').send_keys(settings['login']) | |
driver.find_element_by_xpath('//input[@name="password"]').send_keys(settings['password'], Keys.RETURN) | |
# Select "Historical Snapshots" tab | |
e = WebDriverWait(driver, 60).until(EC.visibility_of_element_located((By.XPATH, '//li[@class="DragTabs__tab__3a1AS DragTabs__tabSelected__2D4o4"]'))) | |
if e.text != 'Historical Snapshots': | |
driver.find_element_by_xpath('//span[@class="TabbedView__tabTitle__3I2Fo" and text()="Historical Snapshots"]').click() | |
grid_rows = wait_for_new_grid_rows(None) | |
return enter_data_code() | |
def enter_first_date(date_current_str): | |
global grid_rows, picked_date | |
month_number, day, year = (int(x) for x in date_current_str.split('.')) | |
month = months[month_number - 1] | |
month_short = list(months_short_numbers.keys())[month_number - 1] | |
picked_date = driver.find_element_by_xpath(xpath_picked_date) | |
if picked_date.text != f'{day:02d} {month_short} {year}': | |
picked_date.click() | |
date_picker_month = WebDriverWait(driver, 60).until(EC.visibility_of_element_located((By.XPATH, '//div[@class="react-datepicker__month"]'))) | |
driver.find_element_by_xpath('//select[@class="react-datepicker__year-select"]').find_element_by_xpath(f'.//option[text()="{year}"]').click() | |
driver.find_element_by_xpath('//select[@class="react-datepicker__month-select"]').find_element_by_xpath(f'.//option[text()="{month}"]').click() | |
date_picker_month.find_element_by_xpath(f'.//div[@class="react-datepicker__day" and text()="{day}"]').click() | |
grid_rows = wait_for_new_grid_rows(grid_rows) | |
if settings['logging']: | |
log_file = open(f'log {datetime.now():%Y-%m-%d %H-%M-%S}.txt', 'w', encoding='utf-8') | |
time1 = datetime.now() | |
driver = None | |
date_last = datetime.strptime(dates[1], '%m.%d.%Y') | |
days_total = (datetime.strptime(dates[0], '%m.%d.%Y') - date_last).days + 1 | |
for data_code in data_codes: | |
if driver is None: | |
data_code_is_valid = new_webdriver_session() | |
progress_bar = tqdm(total=len(data_codes) * days_total) | |
elif psutil.virtual_memory().percent > settings['ram_threshold']: | |
data_code_is_valid = new_webdriver_session() | |
else: | |
data_code_is_valid = enter_data_code() | |
if not data_code_is_valid: | |
for _ in range(0, days_total): | |
progress_bar.update() | |
continue | |
if settings['logging']: | |
time2 = datetime.now(); log_file.write(f'{time2 - time1} Data code: {data_code}\n'); time1 = time2 | |
output_file = open(f'{settings["output_path"]}/{data_code}.csv', 'w', newline='', encoding='utf-8') | |
writer = csv.DictWriter(output_file, fieldnames=csv_column_names) | |
writer.writeheader() | |
if settings['logging']: | |
time2 = datetime.now(); log_file.write(f'{time2 - time1} {dates[0]}\n'); time1 = time2 | |
date_current_str = dates[0] | |
enter_first_date(date_current_str) | |
date_current = datetime.strptime(date_current_str, '%m.%d.%Y') | |
date_next = date_current - timedelta(days=1) | |
while True: | |
if date_current_str is None: | |
# Select previous date | |
button_back = driver.find_element_by_xpath(xpath_button_back) | |
button_back.click() | |
grid_rows = wait_for_new_grid_rows(grid_rows) | |
picked_date = driver.find_element_by_xpath(xpath_picked_date) | |
day_str, month_short, year_str = picked_date.text.split() | |
date_current_str = f'{months_short_numbers[month_short]:02d}.{day_str:0>2}.{year_str:0>4}' | |
date_current = datetime.strptime(date_current_str, '%m.%d.%Y') | |
for _ in range(0, (date_next - date_current).days): | |
progress_bar.update() | |
date_next = date_current - timedelta(days=1) | |
if settings['logging']: | |
time2 = datetime.now(); log_file.write(f'{time2 - time1} {date_current_str} {psutil.virtual_memory().percent}%\n'); time1 = time2 | |
progress_bar.update() | |
if grid_rows is not None: | |
# Save data | |
soup = BeautifulSoup(driver.page_source, 'lxml') | |
for grid_row in soup.find_all('div', attrs={'data-qa': 'gridRow'}): | |
row = {'Date': date_current_str} | |
for i, cell in enumerate(grid_row.find_all('div', class_='react-grid-Cell__value'), start=1): | |
row[csv_column_names[i]] = cell.get_text().strip() | |
writer.writerow(row) | |
soup.decompose() | |
soup = None | |
gc.collect() | |
try: | |
button_back = driver.find_element_by_xpath(xpath_button_back) | |
except NoSuchElementException: | |
button_back = WebDriverWait(driver, 60).until(EC.visibility_of_element_located((By.XPATH, xpath_button_back))) | |
if date_current <= date_last or button_back.get_attribute('disabled') is not None: | |
break | |
# Restart WebDriver session if too much RAM consumed | |
if psutil.virtual_memory().percent > settings['ram_threshold']: | |
new_webdriver_session() | |
enter_first_date(date_current_str) | |
date_current_str = None | |
output_file.close() | |
output_file = None | |
writer = None | |
gc.collect() | |
time.sleep(settings['additional_pause']) | |
if settings['logging']: | |
log_file.close() | |
progress_bar.close() | |
driver.quit() | |
# input("\nPress Enter to finish...") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment