Skip to content

Instantly share code, notes, and snippets.

@normanlmfung
Last active October 10, 2021 01:00
Show Gist options
  • Save normanlmfung/770640edb2b3a1b732850aab8a8ff5c6 to your computer and use it in GitHub Desktop.
Save normanlmfung/770640edb2b3a1b732850aab8a8ff5c6 to your computer and use it in GitHub Desktop.
generic position and pnl calculation
import sys
import time
import traceback
from datetime import datetime
from typing import Any, Dict, List
from gizmo.datetime_gizmo import timer
class PosPnl(object):
def __init__(self, ticker) -> None:
super().__init__()
self.ticker = ticker
self.trade_count = 0
self.running_pos = 0
self.pos_hist : Dict[datetime, Dict] = {}
def __str__(self) -> str:
msg = f"ticker: {self.ticker}, trade_count: {self.trade_count}, running_pos: {self.running_pos}. "
if len(self.pos_hist)>0:
for dt in self.pos_hist:
msg = msg + f" {dt} position: {self.pos_hist[dt]['position']}, avg_filled_price: {self.pos_hist[dt]['avg_filled_price']}, unrealized: {self.pos_hist[dt]['unrealized']}, realized: {self.pos_hist[dt]['realized']}, total_pnl: {self.pos_hist[dt]['total_pnl']}"
return msg
def get_last_pos_date(self):
return max(self.pos_hist.keys()) if len(self.pos_hist)>0 else None
def get_last_pos(self):
last_pos_date = self.get_last_pos_date()
return self.pos_hist[last_pos_date] if last_pos_date else None
@timer
def calc_pos_pnl(
trades : List[Dict[Any, Any]],
prices : Dict[str, List[Dict[datetime, float]]], # key : str = ticker, value : Dict = historical prices
ticker_field_name : str,
side_field_name : str,
filled_price_field_name : str,
amount_field_name : str,
contract_size_field_name : str,
trade_date_field_name : str
):
tickers_pospnl : Dict[str, PosPnl] = {}
errors : Dict[str, str] = {}
tickers = set([ticker[ticker_field_name] for ticker in trades])
for ticker in tickers:
pospnl = PosPnl(ticker)
tickers_pospnl[ticker] = pospnl
try:
ticker_trades = [trade for trade in trades if trade[ticker_field_name]==ticker]
ticker_trades.sort(key=lambda trade : trade[trade_date_field_name], reverse=False)
ticker_prices = prices[ticker]
ticker_prices.sort(key=lambda entry : entry['date'], reverse=True)
for trade in ticker_trades:
trade_date = datetime(trade[trade_date_field_name].year, trade[trade_date_field_name].month, trade[trade_date_field_name].day)
trade_side = trade[side_field_name].upper()
trade_filled_price = trade[filled_price_field_name]
trade_amount = trade[amount_field_name]
trade_contract_size = trade[contract_size_field_name]
trade_notional = (trade_amount * trade_contract_size) if trade_side == 'BUY' else (-1 * trade_amount * trade_contract_size)
pospnl.trade_count = pospnl.trade_count + 1
pospnl.running_pos = pospnl.running_pos + trade_notional
last_pos = pospnl.get_last_pos()
entry = {
'position' : pospnl.running_pos,
'avg_filled_price' : (trade_filled_price * trade_notional + last_pos['avg_filled_price'] * last_pos['position']) / (trade_notional + last_pos['position']) if last_pos and last_pos['position']!=0 else trade_filled_price,
'unrealized' : 0,
'realized' : last_pos['realized'] if last_pos else 0
}
marked_price = [px for px in ticker_prices if px['date']==trade_date]
if not marked_price:
# take latest
marked_price = [px for px in ticker_prices][0]
else:
marked_price = marked_price[0]
if marked_price:
entry['unrealized'] = (marked_price['price'] - entry['avg_filled_price']) * entry['position']
else:
entry['unrealized'] = last_pos['unrealized'] if last_pos else 0
if last_pos:
if last_pos['position']>0 and trade_side=='SELL':
# Selling off long position
entry['realized'] = entry['realized'] + (trade_filled_price - last_pos['avg_filled_price']) * trade_amount * trade_contract_size
elif last_pos['position']<0 and trade_side=='BUY':
# Buying with short position
entry['realized'] = entry['realized'] + (last_pos['avg_filled_price'] - trade_filled_price) * trade_amount * trade_contract_size
entry['total_pnl'] = entry['unrealized'] + entry['realized']
pospnl.pos_hist[trade_date] = entry
except:
err_msg = f"Error calculating pnl for {ticker}: {str(sys.exc_info()[0])} {str(sys.exc_info()[1])} {traceback.format_exc()}"
errors[ticker] = err_msg
return (tickers_pospnl, errors)
if __name__ == "__main__":
prices : Dict[str, List[Dict[datetime, float]]]= {}
adausd_211231_prices = [
{ 'date' : datetime.strptime('2021-10-09 00:00:00', '%Y-%m-%d %H:%M:%S'), 'price' : 2.3 },
{ 'date' : datetime.strptime('2021-10-08 00:00:00', '%Y-%m-%d %H:%M:%S'), 'price' : 2.35 },
{ 'date' : datetime.strptime('2021-10-07 00:00:00', '%Y-%m-%d %H:%M:%S'), 'price' : 2.25 },
# for simplest case only one line for a ticker in prices: latest prices only (for case where you dont have historical prices)
]
prices['ADAUSD_211231'] = adausd_211231_prices
trades = [
{
'symbol' : 'ADAUSD_211231',
'filled_price' : 2.31,
'side' : 'BUY',
'amount' : 3, # For futures, this is number of contracts. For "amount" and "contract_size": https://norman-lm-fung.medium.com/trading-binance-perpetuals-and-coin-m-futures-3f391a5a4d65
'contract_size' : 10, # For spot and linears, this is 1. For inverses, this is different.
'trade_date' : datetime.strptime('2021-10-09 00:18:08', '%Y-%m-%d %H:%M:%S')
},
{
'symbol' : 'ADAUSD_211231',
'filled_price' : 2.41,
'side' : 'SELL',
'amount' : 8, # For futures, this is number of contracts. For "amount" and "contract_size": https://norman-lm-fung.medium.com/trading-binance-perpetuals-and-coin-m-futures-3f391a5a4d65
'contract_size' : 10, # For spot and linears, this is 1. For inverses, this is different.
'trade_date' : datetime.strptime('2021-10-08 00:17:33', '%Y-%m-%d %H:%M:%S')
},
{
'symbol' : 'ADAUSD_211231',
'filled_price' : 2.31,
'side' : 'BUY',
'amount' : 5, # For futures, this is number of contracts. For "amount" and "contract_size": https://norman-lm-fung.medium.com/trading-binance-perpetuals-and-coin-m-futures-3f391a5a4d65
'contract_size' : 10, # For spot and linears, this is 1. For inverses, this is different.
'trade_date' : datetime.strptime('2021-10-07 00:11:28', '%Y-%m-%d %H:%M:%S')
},
{
'symbol' : 'ADAUSD_211231',
'filled_price' : 2.21,
'side' : 'BUY',
'amount' : 15, # For futures, this is number of contracts. For "amount" and "contract_size": https://norman-lm-fung.medium.com/trading-binance-perpetuals-and-coin-m-futures-3f391a5a4d65
'contract_size' : 10, # For spot and linears, this is 1. For inverses, this is different.
'trade_date' : datetime.strptime('2021-10-07 00:09:08', '%Y-%m-%d %H:%M:%S')
}
]
(tickers_pnl, errors) = calc_pos_pnl(
trades=trades,
prices=prices,
ticker_field_name='symbol',
side_field_name='side',
filled_price_field_name='filled_price',
amount_field_name='amount',
contract_size_field_name='contract_size',
trade_date_field_name='trade_date'
)
'''
datetime.datetime(2021, 10, 7, 0, 0):{'position': 200, 'avg_filled_price': 2.235, 'unrealized': 3.000000000000025, 'realized': 0, 'total_pnl': 3.000000000000025}
datetime.datetime(2021, 10, 8, 0, 0):{'position': 120, 'avg_filled_price': 2.2849999999999997, 'unrealized': 7.800000000000047, 'realized': 14.000000000000021, 'total_pnl': 21.800000000000068}
datetime.datetime(2021, 10, 9, 0, 0):{'position': 150, 'avg_filled_price': 2.29, 'unrealized': 1.499999999999968, 'realized': 14.000000000000021, 'total_pnl': 15.49999999999999}
'''
pnlpos_20211007 = tickers_pnl['ADAUSD_211231'].pos_hist[datetime(2021, 10, 7)]
pnlpos_20211008 = tickers_pnl['ADAUSD_211231'].pos_hist[datetime(2021, 10, 8)]
pnlpos_20211009 = tickers_pnl['ADAUSD_211231'].pos_hist[datetime(2021, 10, 9)]
assert pnlpos_20211007['position'] == 200, "(15+5) x contract_size"
assert round(pnlpos_20211007['avg_filled_price'], 8) == round((2.21 * 15 + 2.31 * 5)/20, 8), "(2.21x15 + 2.31x5)/20"
assert pnlpos_20211007['unrealized'] == (2.25 - pnlpos_20211007['avg_filled_price']) * pnlpos_20211007['position'], "Weighted average over two buy's"
assert pnlpos_20211007['realized'] == 0, "Haven't sold anything yet!"
assert pnlpos_20211008['position'] == 120, "(15+5 -8) x contract_size"
assert round(pnlpos_20211008['avg_filled_price'], 8) == round((2.41 * (-8) + 2.235 * 20)/(20-8), 8), "(2.21x15 + 2.31x5)/20"
assert pnlpos_20211008['unrealized'] == (2.35 - pnlpos_20211008['avg_filled_price'] ) * pnlpos_20211008['position'], "Weighted average over two buy's, then one sell"
assert pnlpos_20211008['realized'] == (2.41 - 2.235) * 8 * 10, "(sell price - 20211007 avg filled price) x trade amount x contract size"
assert pnlpos_20211009['position'] == 150, "(15+5 -8 +3) x contract_size"
assert pnlpos_20211009['realized'] == pnlpos_20211008['realized'], "Carry forward from 20211008"
assert pnlpos_20211009['unrealized'] == (2.3 - pnlpos_20211009['avg_filled_price'] ) * pnlpos_20211009['position'], "Weighted average over two buy's, then one sell, then one last buy"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment