Last active
January 13, 2021 05:14
-
-
Save blakehurlburt/834b846178915d63049d6fedd68fb216 to your computer and use it in GitHub Desktop.
Export current Robinhood stock holdings as CSV
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3.5 | |
import json | |
import os | |
import pickle | |
import robin_stocks | |
debug = print | |
def print_data(data): | |
print(json.dumps(data, sort_keys=True, indent=4)) | |
def cache(func): | |
home_dir = os.path.expanduser("~") | |
data_dir = os.path.join(home_dir, ".tokens") | |
if not os.path.exists(data_dir): | |
os.makedirs(data_dir) | |
function_filename = "{}#{}.pickle".format(sys.argv[0], func.__name__) | |
pickle_path = os.path.join(data_dir, function_filename) | |
data = {} | |
# if file exists already, load cache into memory | |
if os.path.isfile(pickle_path): | |
with open(pickle_path, 'rb') as f: | |
data = pickle.load(f) | |
def helper(*a): | |
if a in data: | |
return data[a] | |
else: | |
result = func(*a) | |
data[a] = result | |
with open(pickle_path, 'wb') as f: | |
pickle.dump(data, f) | |
return result | |
return helper | |
def apply(data, func): | |
if isinstance(data, dict): | |
return {k: apply(v, func) for (k, v) in data.items()} | |
if isinstance(data, list): | |
return [apply(x, func) for x in data] | |
if isinstance(data, set): | |
return {apply(x, func) for x in data} | |
return func(data) | |
def make_numbers(data): | |
def helper(p): | |
try: | |
f = float(p) | |
if f.is_integer(): | |
return int(f) | |
return f | |
except ValueError: | |
return p | |
return apply(data, helper) | |
def format_numbers(data): | |
def helper(p): | |
result = str(p) | |
# pad floats to 2 decimal places | |
if isinstance(p, float) and result[-2] == '.': | |
result += '0' | |
return result | |
return apply(data, helper) | |
@cache | |
def lookup_symbol(instrument): | |
return robin_stocks.get_symbol_by_url(instrument) | |
robin_stocks.login() | |
positions_data = robin_stocks.get_open_stock_positions() | |
symbols = [] | |
for item in positions_data: | |
symbol = lookup_symbol(item['instrument']) | |
item['symbol'] = symbol | |
symbols.append(symbol) | |
quotes = robin_stocks.get_quotes(symbols) | |
# merge many lists of dicts by a common key | |
def join_data(key, data1, data2): | |
def key_data(key, data): | |
return {chunk[key]: chunk for chunk in data} | |
kdata1 = key_data(key, data1) | |
kdata2 = key_data(key, data2) | |
return [{**kdata1[k], **kdata2[k]} for k in kdata1] | |
data = join_data('symbol', positions_data, quotes) | |
data = sorted(data, key=lambda value: value['symbol']) | |
data = make_numbers(data) | |
# augment data with calculated values | |
for row in data: | |
market_value = row['last_trade_price'] | |
yesterday = row['adjusted_previous_close'] | |
day_change = market_value - yesterday | |
day_change_percent = day_change * 100.0 / yesterday | |
cost_basis = float(row['average_buy_price']) | |
total_change = market_value - cost_basis | |
total_change_percent = total_change * 100.0 / cost_basis | |
equity = cost_basis * row['quantity'] | |
row['average_buy_price'] = cost_basis | |
row['day change $'] = round(day_change, 2) | |
row['day change %'] = round(day_change_percent, 2) | |
row['total change $'] = round(total_change, 2) | |
row['total change %'] = round(total_change_percent, 2) | |
row['equity'] = round(equity, 5) | |
data = format_numbers(data) | |
def write_csv(path, data, *columns): | |
import csv | |
with open(path, 'w', newline='') as f: | |
csv_writer = csv.writer(f) | |
# split out data keys from column aliases | |
column_keys = [c[0] if isinstance(c, tuple) else c for c in columns] | |
pretty = lambda s: s.replace('_', ' ').title() | |
column_names = [pretty(c[1]) if isinstance(c, tuple) else pretty(c) for c in columns] | |
csv_writer.writerow(column_names) | |
debug(('%14s ' * len(column_names)) % tuple(column_names), sep='\t\t') | |
for row in data: | |
filtered_row = [row[column] for column in column_keys] | |
csv_writer.writerow(filtered_row) | |
debug(('%14s ' * len(filtered_row)) % tuple(filtered_row), sep='\t\t') | |
output_path = "positions.csv" | |
write_csv(output_path, data, 'symbol', 'quantity', ('average_buy_price', 'cost basis'), 'equity', | |
('last_trade_price', 'market value'), 'day change $', 'day change %', 'total change $', 'total change %') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment