Last active
December 4, 2018 11:33
-
-
Save kizernis/5c6df7b3c8a371937b9277d8356def4c to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
settings = {} | |
with open('settings_csv.txt') as f: | |
lines = f.read().splitlines() | |
for line in lines[1:7+1]: | |
x, y = line.split('=', 1) | |
settings[x.strip().replace(' ', '')] = y.strip() | |
settings['firefox'] = 'true' == settings['firefox'].lower() | |
settings['headless'] = 'true' == settings['headless'].lower() | |
settings['additional_pause'] = int(settings['additional_pause']) / 1000 | |
times = lines[10:] | |
times = list(time.strip().replace('.', ':').replace('_', ':').replace('-', ':').replace(' ', '') for time in times if time.strip() != '') | |
assert len(times) % 2 == 0 | |
times = list(':'.join(f'{s:0>2}' for s in time.split(':')) for time in times) | |
with open('data_codes.txt') as f: | |
data_codes = list(s.strip().replace('/', '.').replace(' ', '') for s in f.read().splitlines() if s.strip() != '') | |
import os | |
import time | |
from datetime import datetime | |
from glob import glob | |
from tqdm import tqdm | |
from selenium import webdriver | |
from selenium.webdriver.common.keys import Keys | |
from selenium.webdriver.support.ui import WebDriverWait | |
import selenium.webdriver.support.expected_conditions as EC | |
from selenium.webdriver.common.by import By | |
from selenium.common.exceptions import NoSuchElementException | |
if settings['firefox']: | |
o = webdriver.FirefoxOptions() | |
else: | |
o = webdriver.ChromeOptions() | |
if settings['headless']: | |
o.add_argument('--headless') | |
if settings['firefox']: | |
p = webdriver.FirefoxProfile() | |
p.set_preference('browser.download.folderList', 2) | |
p.set_preference('browser.download.manager.showWhenStarting', False) | |
p.set_preference('browser.helperApps.neverAsk.saveToDisk', 'text/csv') | |
p.set_preference('browser.download.dir', settings['output_path']) | |
driver = webdriver.Firefox(options=o, firefox_profile=p, service_log_path=os.devnull) | |
if not settings['headless']: | |
driver.maximize_window() | |
else: | |
o.add_experimental_option('prefs', {'download.default_directory': settings['output_path'], 'download.prompt_for_download': False, 'download.directory_upgrade': True, 'safebrowsing.disable_download_protection': True}) | |
o.add_argument('--log-level=3') | |
o.add_argument('--disable-infobars') | |
if settings['headless']: | |
o.add_argument('--disable-gpu') | |
else: | |
o.add_argument('--start-maximized') | |
driver = webdriver.Chrome(options=o) | |
if settings['headless']: | |
driver.command_executor._commands['send_command'] = ('POST', '/session/$sessionId/chromium/send_command') | |
driver.execute('send_command', {'cmd': 'Page.setDownloadBehavior', 'params': {'behavior': 'allow', 'downloadPath': settings['output_path']}}) | |
if settings['headless']: | |
driver.set_window_size(1440, 900) | |
# Login | |
driver.get(settings['url']) | |
driver.find_element_by_xpath('//input[@name="username"]').send_keys(settings['login']) | |
driver.find_element_by_xpath('//input[@name="password"]').send_keys(settings['password'], Keys.RETURN) | |
# Select "Trade Tape" tab | |
e = WebDriverWait(driver, 300).until(EC.visibility_of_element_located((By.XPATH, '//li[@class="DragTabs__tab__3a1AS DragTabs__tabSelected__2D4o4"]'))) | |
if e.text != 'Trade Tape': | |
driver.find_element_by_xpath('//span[@class="TabbedView__tabTitle__3I2Fo" and text()="Trade Tape"]').click() | |
def wait_for_new_grid_rows(old_grid_rows): | |
while True: | |
time.sleep(0.05) | |
grid_rows = driver.find_elements_by_xpath('//div[@data-qa="tradeTapeRow"]') | |
if grid_rows != old_grid_rows: | |
break | |
if len(driver.find_elements_by_xpath('//div[@class="EmptyGridView__centered__1IvU4" and text()="No data available."]')): | |
return None | |
while True: | |
if len(grid_rows) > 0: | |
break | |
if len(driver.find_elements_by_xpath('//div[@class="EmptyGridView__centered__1IvU4" and text()="No data available."]')): | |
return None | |
time.sleep(0.05) | |
grid_rows = driver.find_elements_by_xpath('//div[@data-qa="tradeTapeRow"]') | |
return grid_rows | |
# Select number of rows | |
grid_rows = wait_for_new_grid_rows(None) | |
e = WebDriverWait(driver, 300).until(EC.visibility_of_element_located((By.XPATH, '//div[@class="Dropdown-placeholder" and contains(@title, "00 rows")]'))) | |
if e.text == '10000 rows': | |
e.click() | |
WebDriverWait(driver, 300).until(EC.visibility_of_element_located((By.XPATH, '//div[@class="Dropdown-option" and @data-label="1000 rows"]'))).click() | |
e = WebDriverWait(driver, 300).until(EC.visibility_of_element_located((By.XPATH, '//div[@class="Dropdown-placeholder" and @title="1000 rows"]'))) | |
e.click() | |
WebDriverWait(driver, 300).until(EC.visibility_of_element_located((By.XPATH, '//div[@class="Dropdown-option" and @data-label="10000 rows"]'))).click() | |
grid_rows = wait_for_new_grid_rows(grid_rows) | |
# Select default view | |
e = driver.find_element_by_xpath('//span[@class="QuickViewsDropdown__listItemName__2nHlD"]') | |
if e.text != 'Default View': | |
e.click() | |
WebDriverWait(driver, 300).until(EC.visibility_of_element_located((By.XPATH, '//div[@class="QuickViewsDropdown__listItem__1NmP0" and text()="Default View"]'))).click() | |
grid_rows = wait_for_new_grid_rows(grid_rows) | |
def touch(fname, times=None): | |
with open(fname, 'a'): | |
os.utime(fname, times) | |
# We can't have any "trades_*.csv" files in the output folder | |
for erroneous_file_path in glob(f'{settings["output_path"]}/trades_*.csv'): | |
os.unlink(erroneous_file_path) | |
script_launch_date = os.getenv('csv_script_launch_date', f'{datetime.now():%m.%d.%Y}') | |
progress_bar = tqdm(total=int(len(times) / 2 * len(data_codes))) | |
for data_code in data_codes: | |
try: | |
# Remove all filters | |
driver.find_element_by_xpath('//div[@data-qa="tradeTapeToolbarResetFilters"]').click() | |
grid_rows = wait_for_new_grid_rows(grid_rows) | |
except NoSuchElementException: | |
pass | |
# Enter data code | |
input_data_code = driver.find_element_by_xpath('//input[@data-qa="symbolNameInput"]') | |
label_data_code = driver.find_element_by_xpath('//div[@class="FilterPreview__value__3bDRJ"]') | |
if label_data_code.text.lower() != f'== {data_code.lower()}': | |
input_data_code.send_keys(data_code) | |
# Handle invalid data codes | |
e = WebDriverWait(driver, 300).until(EC.visibility_of_element_located((By.XPATH, '//div[@class="SymbolSelector__autocomplete__EF_vQ"]'))) | |
if len(e.find_elements_by_xpath('.//span[@data-qa="matchingSymbolsList-noMatches"]')): | |
input_data_code.send_keys(Keys.ESCAPE) | |
touch(f'{settings["output_path"]}/{data_code} {script_launch_date} INVALID SYMBOL.csv') | |
for _ in range(0, len(times) // 2): | |
progress_bar.update() | |
continue | |
WebDriverWait(driver, 300).until(EC.visibility_of_element_located((By.XPATH, f'//mark[@class="SymbolSelector__searching__1234y " and translate(text(),"ABCDEFGHIJKLMNOPQRSTUVWXYZ","abcdefghijklmnopqrstuvwxyz")="{data_code.lower()}"]'))) | |
input_data_code.submit() | |
grid_rows = wait_for_new_grid_rows(grid_rows) | |
# Sometimes it doesn't say "No matches" but the data code is still invalid | |
if label_data_code.text.lower() != f'== {data_code.lower()}': | |
touch(f'{settings["output_path"]}/{data_code} {script_launch_date} INVALID SYMBOL.csv') | |
for _ in range(0, len(times) // 2): | |
progress_bar.update() | |
continue | |
if grid_rows is None: | |
touch(f'{settings["output_path"]}/{data_code} {script_launch_date} (no filters).csv') | |
for _ in range(0, len(times) // 2): | |
progress_bar.update() | |
continue | |
# Choose time filter | |
for i in range(0, len(times), 2): | |
progress_bar.update() | |
output_file_path = f'{settings["output_path"]}/{data_code} {script_launch_date} {times[i].replace(":", ".")}-{times[i + 1].replace(":", ".")}.csv' | |
# Delete the output file if it already exists | |
if os.path.isfile(output_file_path): | |
os.unlink(output_file_path) | |
driver.find_element_by_xpath('//div[@data-qa="TradeTimeColumnFilter"]').click() | |
if i == 0: | |
WebDriverWait(driver, 300).until(EC.visibility_of_element_located((By.XPATH, '//div[@title="No filter" and text()="No filter"]'))).click() | |
WebDriverWait(driver, 300).until(EC.visibility_of_element_located((By.XPATH, '//div[@class="Dropdown-option" and text()="in"]'))).click() | |
input_field_min = WebDriverWait(driver, 300).until(EC.visibility_of_element_located((By.XPATH, '//input[@data-qa="filterMinValue"]'))) | |
input_field_max = WebDriverWait(driver, 300).until(EC.visibility_of_element_located((By.XPATH, '//input[@data-qa="filterMaxValue"]'))) | |
input_field_min.clear() | |
input_field_min.send_keys('0' + times[i].replace(':', '')) | |
input_field_max.clear() | |
input_field_max.send_keys('0' + times[i + 1].replace(':', '')) | |
driver.find_element_by_xpath('//button[@data-qa="Apply"]').click() | |
grid_rows = wait_for_new_grid_rows(grid_rows) | |
if grid_rows is None: | |
touch(output_file_path) | |
continue | |
# Export data to CSV | |
driver.find_element_by_xpath('//span[@class="DropdownButton-placeholder" and text()="Export"]').click() | |
WebDriverWait(driver, 300).until(EC.visibility_of_element_located((By.XPATH, '//div[@class="DropdownButton-option" and text()="to CSV"]'))).click() | |
while True: | |
default_files_list = glob(f'{settings["output_path"]}/trades_*.csv') | |
if len(default_files_list): | |
assert len(default_files_list) == 1 | |
os.rename(default_files_list[0], output_file_path) | |
break | |
time.sleep(0.05) | |
time.sleep(settings['additional_pause']) | |
progress_bar.close() | |
driver.quit() | |
# input("\nPress Enter to finish...") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment