Skip to content

Instantly share code, notes, and snippets.

@kizernis
Last active December 4, 2018 11:41
Show Gist options
  • Save kizernis/2fa5168887e34141992de072dea5993d to your computer and use it in GitHub Desktop.
Save kizernis/2fa5168887e34141992de072dea5993d to your computer and use it in GitHub Desktop.
settings = {}
with open('settings_csv2.txt') as f:
lines = list(s.strip() for s in f.read().splitlines() if s.strip() != '')
assert lines[0] == '[main settings]'
assert lines[8] == '[dates]'
assert lines[11] == '[time ranges]'
for line in lines[1:7+1]:
x, y = line.split('=', 1)
settings[x.strip().replace(' ', '')] = y.strip()
settings['firefox'] = 'true' == settings['firefox'].lower()
settings['headless'] = 'true' == settings['headless'].lower()
settings['additional_pause'] = int(settings['additional_pause']) / 1000
dates = lines[9:10+1]
dates = list(date.strip().replace(':', '.').replace('_', '.').replace('-', '.').replace(' ', '') for date in dates)
dates = list('{:0>2}.{:0>2}.{:0>4}'.format(*(date.split('.'))) for date in dates)
dates.sort(reverse=True)
times = lines[12:]
assert len(times) % 2 == 0
times = list(time.strip().replace('.', ':').replace('_', ':').replace('-', ':').replace(' ', '') for time in times)
times = list(':'.join(f'{s:0>2}' for s in time.split(':')) for time in times)
with open('data_codes.txt') as f:
data_codes = list(s.strip().replace('/', '.').replace(' ', '') for s in f.read().splitlines() if s.strip() != '')
import os
import time
from datetime import datetime, timedelta
from glob import glob
from tqdm import tqdm
from selenium import webdriver
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.support.ui import WebDriverWait
import selenium.webdriver.support.expected_conditions as EC
from selenium.webdriver.common.by import By
from selenium.common.exceptions import NoSuchElementException
if settings['firefox']:
o = webdriver.FirefoxOptions()
else:
o = webdriver.ChromeOptions()
if settings['headless']:
o.add_argument('--headless')
if settings['firefox']:
p = webdriver.FirefoxProfile()
p.set_preference('browser.download.folderList', 2)
p.set_preference('browser.download.manager.showWhenStarting', False)
p.set_preference('browser.helperApps.neverAsk.saveToDisk', 'text/csv')
p.set_preference('browser.download.dir', settings['output_path'])
driver = webdriver.Firefox(options=o, firefox_profile=p, service_log_path=os.devnull)
if not settings['headless']:
driver.maximize_window()
else:
o.add_experimental_option('prefs', {'download.default_directory': settings['output_path'], 'download.prompt_for_download': False, 'download.directory_upgrade': True, 'safebrowsing.disable_download_protection': True})
o.add_argument('--log-level=3')
o.add_argument('--disable-infobars')
if settings['headless']:
o.add_argument('--disable-gpu')
else:
o.add_argument('--start-maximized')
driver = webdriver.Chrome(options=o)
if settings['headless']:
driver.command_executor._commands['send_command'] = ('POST', '/session/$sessionId/chromium/send_command')
driver.execute('send_command', {'cmd': 'Page.setDownloadBehavior', 'params': {'behavior': 'allow', 'downloadPath': settings['output_path']}})
if settings['headless']:
driver.set_window_size(1440, 900)
# Login
driver.get(settings['url'])
driver.find_element_by_xpath('//input[@name="username"]').send_keys(settings['login'])
driver.find_element_by_xpath('//input[@name="password"]').send_keys(settings['password'], Keys.RETURN)
# Select "Time And Sales" tab
e = WebDriverWait(driver, 300).until(EC.visibility_of_element_located((By.XPATH, '//li[@class="DragTabs__tab__3a1AS DragTabs__tabSelected__2D4o4"]')))
if e.text != 'Time And Sales':
driver.find_element_by_xpath('//span[@class="TabbedView__tabTitle__3I2Fo" and text()="Time And Sales"]').click()
def wait_for_new_grid_rows(old_grid_rows):
while True:
time.sleep(0.05)
grid_rows = driver.find_elements_by_xpath('//div[@data-qa="timeAndSalesGridRow"]')
if grid_rows != old_grid_rows:
break
if len(driver.find_elements_by_xpath('//div[@class="EmptyGridView__centered__1IvU4"]/span[text()="No trades data available"]')):
return None
while True:
if len(grid_rows) > 0:
break
if len(driver.find_elements_by_xpath('//div[@class="EmptyGridView__centered__1IvU4"]/span[text()="No trades data available"]')):
return None
time.sleep(0.05)
grid_rows = driver.find_elements_by_xpath('//div[@data-qa="timeAndSalesGridRow"]')
return grid_rows
# Select "Trades Only" tab
grid_rows = wait_for_new_grid_rows(None)
e = driver.find_element_by_xpath('//input[@type="radio" and @value="Trades Only"]')
if e.is_selected():
driver.find_element_by_xpath('//input[@type="radio" and @value="Trades & Quotes"]/..').click()
grid_rows = wait_for_new_grid_rows(grid_rows)
e.find_element_by_xpath('..').click()
grid_rows = wait_for_new_grid_rows(grid_rows)
# Select number of rows
e = WebDriverWait(driver, 300).until(EC.visibility_of_element_located((By.XPATH, '//div[@class="Dropdown-placeholder" and contains(@title, "00 rows")]')))
if e.text == '10000 rows':
e.click()
WebDriverWait(driver, 300).until(EC.visibility_of_element_located((By.XPATH, '//div[@class="Dropdown-option" and @data-label="1000 rows"]'))).click()
e = WebDriverWait(driver, 300).until(EC.visibility_of_element_located((By.XPATH, '//div[@class="Dropdown-placeholder" and @title="1000 rows"]')))
e.click()
WebDriverWait(driver, 300).until(EC.visibility_of_element_located((By.XPATH, '//div[@class="Dropdown-option" and @data-label="10000 rows"]'))).click()
grid_rows = wait_for_new_grid_rows(grid_rows)
def touch(fname, times=None):
with open(fname, 'a'):
os.utime(fname, times)
# We can't have any "trades_*.csv" files in the output folder
for erroneous_file_path in glob(f'{settings["output_path"]}/trades_*.csv'):
os.unlink(erroneous_file_path)
script_launch_date = os.getenv('csv_script_launch_date', f'{datetime.now():%m.%d.%Y}')
date_first = datetime.strptime(dates[0], '%m.%d.%Y')
date_last = datetime.strptime(dates[1], '%m.%d.%Y')
days_total = (date_first - date_last).days + 1
time_ranges_total = len(times) // 2
progress_bar = tqdm(total=len(data_codes) * days_total * time_ranges_total)
for data_code in data_codes:
# Enter data code
input_data_code = driver.find_element_by_xpath('//input[@data-qa="symbolNameInput"]')
label_data_code = driver.find_element_by_xpath('//div[@class="SymbolSummary__symbolName__1ev5O"]')
if label_data_code.text.lower() != data_code.lower():
input_data_code.send_keys(data_code)
# Handle invalid data codes
e = WebDriverWait(driver, 300).until(EC.visibility_of_element_located((By.XPATH, '//div[@class="SymbolSelector__autocomplete__EF_vQ"]')))
if len(e.find_elements_by_xpath('.//span[@data-qa="matchingSymbolsList-noMatches"]')):
input_data_code.send_keys(Keys.ESCAPE)
touch(f'{settings["output_path"]}/{data_code} {script_launch_date} INVALID SYMBOL.csv')
for _ in range(days_total * time_ranges_total):
progress_bar.update()
continue
WebDriverWait(driver, 300).until(EC.visibility_of_element_located((By.XPATH, f'//mark[@class="SymbolSelector__searching__1234y " and translate(text(),"ABCDEFGHIJKLMNOPQRSTUVWXYZ","abcdefghijklmnopqrstuvwxyz")="{data_code.lower()}"]')))
input_data_code.submit()
grid_rows = wait_for_new_grid_rows(grid_rows)
# Sometimes it doesn't say "No matches" but the data code is still invalid
if label_data_code.text.lower() != data_code.lower():
touch(f'{settings["output_path"]}/{data_code} {script_launch_date} INVALID SYMBOL.csv')
for _ in range(days_total * time_ranges_total):
progress_bar.update()
continue
date_current = date_first
while True:
if date_current < date_last:
break
# Enter date
date_pick_button = driver.find_element_by_xpath('//div[@data-qa="datePickerWrapper"]')
picked_date_str = date_pick_button.find_element_by_xpath('input[@data-qa="dateString"]').get_attribute('value')
if picked_date_str != f'{date_current:%Y-%m-%d}':
date_pick_button.click()
date_picker = WebDriverWait(driver, 300).until(EC.visibility_of_element_located((By.XPATH, '//div[@class="react-datepicker"]')))
select_year = date_picker.find_element_by_xpath('//select[@class="react-datepicker__year-select"]')
select_month = date_picker.find_element_by_xpath('//select[@class="react-datepicker__month-select"]')
while True:
select_year.find_element_by_xpath(f'option[@value="{date_current.year}"]').click()
select_month.find_element_by_xpath(f'option[@value="{date_current.month - 1}"]').click()
try:
date_picker.find_element_by_xpath(f'.//div[@class="react-datepicker__day" and text()="{date_current.day}"]').click()
grid_rows = wait_for_new_grid_rows(grid_rows)
break
except NoSuchElementException:
date_current -= timedelta(days=1)
if date_current < date_last:
break
for _ in range(time_ranges_total):
progress_bar.update()
if date_current < date_last:
break
try:
# Remove all filters
driver.find_element_by_xpath('//div[@data-qa="timeAndSalesToolbarResetFilters"]').click()
grid_rows = wait_for_new_grid_rows(grid_rows)
except NoSuchElementException:
pass
if grid_rows is None:
touch(f'{settings["output_path"]}/{data_code} {script_launch_date} {date_current:%m.%d.%Y} (no filters).csv')
for _ in range(time_ranges_total):
progress_bar.update()
date_current -= timedelta(days=1)
continue
for i in range(0, len(times), 2):
progress_bar.update()
output_file_path = f'{settings["output_path"]}/{data_code} {script_launch_date} {date_current:%m.%d.%Y} {times[i].replace(":", ".")}-{times[i + 1].replace(":", ".")}.csv'
if os.path.isfile(output_file_path):
os.unlink(output_file_path)
# Enter next time range
driver.find_element_by_xpath('//div[@data-qa="timeColumnFilter"]').click()
filter_container = WebDriverWait(driver, 300).until(EC.visibility_of_element_located((By.XPATH, '//div[@data-qa="filter-container"]')))
input_field_min = filter_container.find_element_by_xpath('.//input[@data-qa="filterMinValue"]')
input_field_max = filter_container.find_element_by_xpath('.//input[@data-qa="filterMaxValue"]')
input_field_min.clear()
input_field_min.send_keys('0' + times[i].replace(':', ''))
input_field_max.clear()
input_field_max.send_keys('0' + times[i + 1].replace(':', ''))
filter_container.find_element_by_xpath('.//button[@data-qa="Apply"]').click()
grid_rows = wait_for_new_grid_rows(grid_rows)
if grid_rows is None:
touch(output_file_path)
continue
# Export data to CSV
driver.find_element_by_xpath('//span[@class="DropdownButton-placeholder" and text()="Export"]').click()
WebDriverWait(driver, 300).until(EC.visibility_of_element_located((By.XPATH, '//div[@class="DropdownButton-option" and text()="to CSV"]'))).click()
while True:
default_files_list = glob(f'{settings["output_path"]}/trades_*.csv')
if len(default_files_list):
assert len(default_files_list) == 1
os.rename(default_files_list[0], output_file_path)
break
time.sleep(0.05)
date_current -= timedelta(days=1)
time.sleep(settings['additional_pause'])
progress_bar.close()
driver.quit()
# input("\nPress Enter to finish...")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment