Last active
December 4, 2018 11:41
-
-
Save kizernis/2fa5168887e34141992de072dea5993d to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
settings = {} | |
with open('settings_csv2.txt') as f: | |
lines = list(s.strip() for s in f.read().splitlines() if s.strip() != '') | |
assert lines[0] == '[main settings]' | |
assert lines[8] == '[dates]' | |
assert lines[11] == '[time ranges]' | |
for line in lines[1:7+1]: | |
x, y = line.split('=', 1) | |
settings[x.strip().replace(' ', '')] = y.strip() | |
settings['firefox'] = 'true' == settings['firefox'].lower() | |
settings['headless'] = 'true' == settings['headless'].lower() | |
settings['additional_pause'] = int(settings['additional_pause']) / 1000 | |
dates = lines[9:10+1] | |
dates = list(date.strip().replace(':', '.').replace('_', '.').replace('-', '.').replace(' ', '') for date in dates) | |
dates = list('{:0>2}.{:0>2}.{:0>4}'.format(*(date.split('.'))) for date in dates) | |
dates.sort(reverse=True) | |
times = lines[12:] | |
assert len(times) % 2 == 0 | |
times = list(time.strip().replace('.', ':').replace('_', ':').replace('-', ':').replace(' ', '') for time in times) | |
times = list(':'.join(f'{s:0>2}' for s in time.split(':')) for time in times) | |
with open('data_codes.txt') as f: | |
data_codes = list(s.strip().replace('/', '.').replace(' ', '') for s in f.read().splitlines() if s.strip() != '') | |
import os | |
import time | |
from datetime import datetime, timedelta | |
from glob import glob | |
from tqdm import tqdm | |
from selenium import webdriver | |
from selenium.webdriver.common.keys import Keys | |
from selenium.webdriver.support.ui import WebDriverWait | |
import selenium.webdriver.support.expected_conditions as EC | |
from selenium.webdriver.common.by import By | |
from selenium.common.exceptions import NoSuchElementException | |
if settings['firefox']: | |
o = webdriver.FirefoxOptions() | |
else: | |
o = webdriver.ChromeOptions() | |
if settings['headless']: | |
o.add_argument('--headless') | |
if settings['firefox']: | |
p = webdriver.FirefoxProfile() | |
p.set_preference('browser.download.folderList', 2) | |
p.set_preference('browser.download.manager.showWhenStarting', False) | |
p.set_preference('browser.helperApps.neverAsk.saveToDisk', 'text/csv') | |
p.set_preference('browser.download.dir', settings['output_path']) | |
driver = webdriver.Firefox(options=o, firefox_profile=p, service_log_path=os.devnull) | |
if not settings['headless']: | |
driver.maximize_window() | |
else: | |
o.add_experimental_option('prefs', {'download.default_directory': settings['output_path'], 'download.prompt_for_download': False, 'download.directory_upgrade': True, 'safebrowsing.disable_download_protection': True}) | |
o.add_argument('--log-level=3') | |
o.add_argument('--disable-infobars') | |
if settings['headless']: | |
o.add_argument('--disable-gpu') | |
else: | |
o.add_argument('--start-maximized') | |
driver = webdriver.Chrome(options=o) | |
if settings['headless']: | |
driver.command_executor._commands['send_command'] = ('POST', '/session/$sessionId/chromium/send_command') | |
driver.execute('send_command', {'cmd': 'Page.setDownloadBehavior', 'params': {'behavior': 'allow', 'downloadPath': settings['output_path']}}) | |
if settings['headless']: | |
driver.set_window_size(1440, 900) | |
# Login | |
driver.get(settings['url']) | |
driver.find_element_by_xpath('//input[@name="username"]').send_keys(settings['login']) | |
driver.find_element_by_xpath('//input[@name="password"]').send_keys(settings['password'], Keys.RETURN) | |
# Select "Time And Sales" tab | |
e = WebDriverWait(driver, 300).until(EC.visibility_of_element_located((By.XPATH, '//li[@class="DragTabs__tab__3a1AS DragTabs__tabSelected__2D4o4"]'))) | |
if e.text != 'Time And Sales': | |
driver.find_element_by_xpath('//span[@class="TabbedView__tabTitle__3I2Fo" and text()="Time And Sales"]').click() | |
def wait_for_new_grid_rows(old_grid_rows): | |
while True: | |
time.sleep(0.05) | |
grid_rows = driver.find_elements_by_xpath('//div[@data-qa="timeAndSalesGridRow"]') | |
if grid_rows != old_grid_rows: | |
break | |
if len(driver.find_elements_by_xpath('//div[@class="EmptyGridView__centered__1IvU4"]/span[text()="No trades data available"]')): | |
return None | |
while True: | |
if len(grid_rows) > 0: | |
break | |
if len(driver.find_elements_by_xpath('//div[@class="EmptyGridView__centered__1IvU4"]/span[text()="No trades data available"]')): | |
return None | |
time.sleep(0.05) | |
grid_rows = driver.find_elements_by_xpath('//div[@data-qa="timeAndSalesGridRow"]') | |
return grid_rows | |
# Select "Trades Only" tab | |
grid_rows = wait_for_new_grid_rows(None) | |
e = driver.find_element_by_xpath('//input[@type="radio" and @value="Trades Only"]') | |
if e.is_selected(): | |
driver.find_element_by_xpath('//input[@type="radio" and @value="Trades & Quotes"]/..').click() | |
grid_rows = wait_for_new_grid_rows(grid_rows) | |
e.find_element_by_xpath('..').click() | |
grid_rows = wait_for_new_grid_rows(grid_rows) | |
# Select number of rows | |
e = WebDriverWait(driver, 300).until(EC.visibility_of_element_located((By.XPATH, '//div[@class="Dropdown-placeholder" and contains(@title, "00 rows")]'))) | |
if e.text == '10000 rows': | |
e.click() | |
WebDriverWait(driver, 300).until(EC.visibility_of_element_located((By.XPATH, '//div[@class="Dropdown-option" and @data-label="1000 rows"]'))).click() | |
e = WebDriverWait(driver, 300).until(EC.visibility_of_element_located((By.XPATH, '//div[@class="Dropdown-placeholder" and @title="1000 rows"]'))) | |
e.click() | |
WebDriverWait(driver, 300).until(EC.visibility_of_element_located((By.XPATH, '//div[@class="Dropdown-option" and @data-label="10000 rows"]'))).click() | |
grid_rows = wait_for_new_grid_rows(grid_rows) | |
def touch(fname, times=None): | |
with open(fname, 'a'): | |
os.utime(fname, times) | |
# We can't have any "trades_*.csv" files in the output folder | |
for erroneous_file_path in glob(f'{settings["output_path"]}/trades_*.csv'): | |
os.unlink(erroneous_file_path) | |
script_launch_date = os.getenv('csv_script_launch_date', f'{datetime.now():%m.%d.%Y}') | |
date_first = datetime.strptime(dates[0], '%m.%d.%Y') | |
date_last = datetime.strptime(dates[1], '%m.%d.%Y') | |
days_total = (date_first - date_last).days + 1 | |
time_ranges_total = len(times) // 2 | |
progress_bar = tqdm(total=len(data_codes) * days_total * time_ranges_total) | |
for data_code in data_codes: | |
# Enter data code | |
input_data_code = driver.find_element_by_xpath('//input[@data-qa="symbolNameInput"]') | |
label_data_code = driver.find_element_by_xpath('//div[@class="SymbolSummary__symbolName__1ev5O"]') | |
if label_data_code.text.lower() != data_code.lower(): | |
input_data_code.send_keys(data_code) | |
# Handle invalid data codes | |
e = WebDriverWait(driver, 300).until(EC.visibility_of_element_located((By.XPATH, '//div[@class="SymbolSelector__autocomplete__EF_vQ"]'))) | |
if len(e.find_elements_by_xpath('.//span[@data-qa="matchingSymbolsList-noMatches"]')): | |
input_data_code.send_keys(Keys.ESCAPE) | |
touch(f'{settings["output_path"]}/{data_code} {script_launch_date} INVALID SYMBOL.csv') | |
for _ in range(days_total * time_ranges_total): | |
progress_bar.update() | |
continue | |
WebDriverWait(driver, 300).until(EC.visibility_of_element_located((By.XPATH, f'//mark[@class="SymbolSelector__searching__1234y " and translate(text(),"ABCDEFGHIJKLMNOPQRSTUVWXYZ","abcdefghijklmnopqrstuvwxyz")="{data_code.lower()}"]'))) | |
input_data_code.submit() | |
grid_rows = wait_for_new_grid_rows(grid_rows) | |
# Sometimes it doesn't say "No matches" but the data code is still invalid | |
if label_data_code.text.lower() != data_code.lower(): | |
touch(f'{settings["output_path"]}/{data_code} {script_launch_date} INVALID SYMBOL.csv') | |
for _ in range(days_total * time_ranges_total): | |
progress_bar.update() | |
continue | |
date_current = date_first | |
while True: | |
if date_current < date_last: | |
break | |
# Enter date | |
date_pick_button = driver.find_element_by_xpath('//div[@data-qa="datePickerWrapper"]') | |
picked_date_str = date_pick_button.find_element_by_xpath('input[@data-qa="dateString"]').get_attribute('value') | |
if picked_date_str != f'{date_current:%Y-%m-%d}': | |
date_pick_button.click() | |
date_picker = WebDriverWait(driver, 300).until(EC.visibility_of_element_located((By.XPATH, '//div[@class="react-datepicker"]'))) | |
select_year = date_picker.find_element_by_xpath('//select[@class="react-datepicker__year-select"]') | |
select_month = date_picker.find_element_by_xpath('//select[@class="react-datepicker__month-select"]') | |
while True: | |
select_year.find_element_by_xpath(f'option[@value="{date_current.year}"]').click() | |
select_month.find_element_by_xpath(f'option[@value="{date_current.month - 1}"]').click() | |
try: | |
date_picker.find_element_by_xpath(f'.//div[@class="react-datepicker__day" and text()="{date_current.day}"]').click() | |
grid_rows = wait_for_new_grid_rows(grid_rows) | |
break | |
except NoSuchElementException: | |
date_current -= timedelta(days=1) | |
if date_current < date_last: | |
break | |
for _ in range(time_ranges_total): | |
progress_bar.update() | |
if date_current < date_last: | |
break | |
try: | |
# Remove all filters | |
driver.find_element_by_xpath('//div[@data-qa="timeAndSalesToolbarResetFilters"]').click() | |
grid_rows = wait_for_new_grid_rows(grid_rows) | |
except NoSuchElementException: | |
pass | |
if grid_rows is None: | |
touch(f'{settings["output_path"]}/{data_code} {script_launch_date} {date_current:%m.%d.%Y} (no filters).csv') | |
for _ in range(time_ranges_total): | |
progress_bar.update() | |
date_current -= timedelta(days=1) | |
continue | |
for i in range(0, len(times), 2): | |
progress_bar.update() | |
output_file_path = f'{settings["output_path"]}/{data_code} {script_launch_date} {date_current:%m.%d.%Y} {times[i].replace(":", ".")}-{times[i + 1].replace(":", ".")}.csv' | |
if os.path.isfile(output_file_path): | |
os.unlink(output_file_path) | |
# Enter next time range | |
driver.find_element_by_xpath('//div[@data-qa="timeColumnFilter"]').click() | |
filter_container = WebDriverWait(driver, 300).until(EC.visibility_of_element_located((By.XPATH, '//div[@data-qa="filter-container"]'))) | |
input_field_min = filter_container.find_element_by_xpath('.//input[@data-qa="filterMinValue"]') | |
input_field_max = filter_container.find_element_by_xpath('.//input[@data-qa="filterMaxValue"]') | |
input_field_min.clear() | |
input_field_min.send_keys('0' + times[i].replace(':', '')) | |
input_field_max.clear() | |
input_field_max.send_keys('0' + times[i + 1].replace(':', '')) | |
filter_container.find_element_by_xpath('.//button[@data-qa="Apply"]').click() | |
grid_rows = wait_for_new_grid_rows(grid_rows) | |
if grid_rows is None: | |
touch(output_file_path) | |
continue | |
# Export data to CSV | |
driver.find_element_by_xpath('//span[@class="DropdownButton-placeholder" and text()="Export"]').click() | |
WebDriverWait(driver, 300).until(EC.visibility_of_element_located((By.XPATH, '//div[@class="DropdownButton-option" and text()="to CSV"]'))).click() | |
while True: | |
default_files_list = glob(f'{settings["output_path"]}/trades_*.csv') | |
if len(default_files_list): | |
assert len(default_files_list) == 1 | |
os.rename(default_files_list[0], output_file_path) | |
break | |
time.sleep(0.05) | |
date_current -= timedelta(days=1) | |
time.sleep(settings['additional_pause']) | |
progress_bar.close() | |
driver.quit() | |
# input("\nPress Enter to finish...") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment