Last active
January 16, 2022 07:03
-
-
Save raposatech/d3f10df41c8745b00cb608bd590a986d to your computer and use it in GitHub Desktop.
Starter System with Multiple Instruments
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class DiversifiedStarterSystem(MultiSignalStarterSystem): | |
''' | |
Carver's Starter System without stop losses, multiple entry rules, | |
a forecast for position sizing and rebalancing, and multiple instruments. | |
Adapted from Rob Carver's Leveraged Trading: https://amzn.to/3C1owYn | |
Code for MultiSignalStarterSystem available here: | |
https://gist.github.com/raposatech/2d9f309e2a54fc9545d44eda821e29ad | |
''' | |
def __init__(self, tickers: list, signals: dict, target_risk: float = 0.12, | |
starting_capital: float = 1000, margin_cost: float = 0.04, | |
short_cost: float = 0.001, interest_on_balance: float = 0.0, | |
start: str = '2000-01-01', end: str = '2020-12-31', | |
shorts: bool = True, weights: list = [], | |
max_forecast: float = 2, min_forecast: float = -2, | |
exposure_drift: float = 0.1, | |
*args, **kwargs): | |
self.tickers = tickers | |
self.n_instruments = len(tickers) | |
self.signals = signals | |
self.target_risk = target_risk | |
self.starting_capital = starting_capital | |
self.shorts = shorts | |
self.start = start | |
self.end = end | |
self.margin_cost = margin_cost | |
self.short_cost = short_cost | |
self.interest_on_balance = interest_on_balance | |
self.daily_iob = (1 + self.interest_on_balance) ** (1 / 252) | |
self.daily_margin_cost = (1 + self.margin_cost) ** (1 / 252) | |
self.daily_short_cost = self.short_cost / 360 | |
self.max_forecast = max_forecast | |
self.min_forecast = min_forecast | |
self.max_leverage = 3 | |
self.exposure_drift = exposure_drift | |
self.signal_names = [] | |
self.weights = weights | |
self.idm_dict = { | |
1: 1, | |
2: 1.15, | |
3: 1.22, | |
4: 1.27, | |
5: 1.29, | |
6: 1.31, | |
7: 1.32, | |
8: 1.34, | |
15: 1.36, | |
25: 1.38, | |
30: 1.4 | |
} | |
self._getData() | |
self._calcSignals() | |
self._setWeights() | |
self._calcTotalSignal() | |
self._setIDM() | |
def _getData(self): | |
yfObj = yf.Tickers(self.tickers) | |
df = yfObj.history(start=self.start, end=self.end) | |
df.drop(['High', 'Open', 'Stock Splits', 'Volume', 'Low'], | |
axis=1, inplace=True) | |
# Drop rows where all closing prices are NaN | |
df = df.iloc[df['Close'].apply( | |
lambda x: all(~np.isnan(x)), axis=1).values] | |
df.columns = df.columns.swaplevel() | |
df = df.fillna(0) | |
self.data = df | |
def _setIDM(self): | |
keys = np.array(list(self.idm_dict.keys())) | |
idm_idx = keys[np.where(keys<=self.n_instruments)].max() | |
self.idm = self.idm_dict[idm_idx] | |
def _clipForecast(self, signal): | |
return signal.clip(upper=self.max_forecast, lower=self.min_forecast) | |
def _calcMAC(self, fast, slow, scale): | |
name = f'MAC{self.n_sigs}' | |
close = self.data.loc[:, (slice(None), 'Close')] | |
sma_f = close.rolling(fast).mean() | |
sma_s = close.rolling(slow).mean() | |
risk_units = close * self.data.loc[:, (slice(None), 'STD')].values | |
sig = sma_f - sma_s | |
sig = sig.ffill().fillna(0) / risk_units * scale | |
self.signal_names.append(name) | |
return self._clipForecast(sig).rename(columns={'Close': name}) | |
def _calcMBO(self, periods, scale): | |
name = f'MBO{self.n_sigs}' | |
close = self.data.loc[:, (slice(None), 'Close')] | |
ul = close.rolling(periods).max().values | |
ll = close.rolling(periods).min().values | |
mean = close.rolling(periods).mean() | |
sprice = (close - mean) / (ul - ll) | |
sig = sprice.ffill().fillna(0) * scale | |
self.signal_names.append(name) | |
return self._clipForecast(sig).rename(columns={'Close': name}) | |
def _calcCarry(self, scale): | |
name = f'Carry{self.n_sigs}' | |
ttm_div = self.data.loc[:, (slice(None), 'Dividends')].rolling(252).sum() | |
div_yield = ttm_div / self.data.loc[:, (slice(None), 'Close')].values | |
net_long = div_yield - self.margin_cost | |
net_short = self.interest_on_balance - self.short_cost - div_yield | |
net_return = (net_long - net_short) / 2 | |
sig = net_return / self.data.loc[:, (slice(None), 'STD')].values * scale | |
self.signal_names.append(name) | |
return self._clipForecast(sig).rename(columns={'Dividends': name}) | |
def _calcSignals(self): | |
std = self.data.loc[:, (slice(None), 'Close')].pct_change().rolling(252).std() \ | |
* np.sqrt(252) | |
self.data = pd.concat([self.data, | |
std.rename(columns={'Close': 'STD'})], axis=1) | |
self.n_sigs = 0 | |
for k, v in self.signals.items(): | |
if k == 'MAC': | |
for v1 in v.values(): | |
sig = self._calcMAC(v1['fast'], v1['slow'], v1['scale']) | |
self.data = pd.concat([self.data, sig], axis=1) | |
self.n_sigs += 1 | |
elif k == 'MBO': | |
for v1 in v.values(): | |
sig = self._calcMBO(v1['N'], v1['scale']) | |
self.data = pd.concat([self.data, sig], axis=1) | |
self.n_sigs += 1 | |
elif k == 'CAR': | |
for v1 in v.values(): | |
if v1['status']: | |
sig = self._calcCarry(v1['scale']) | |
self.data = pd.concat([self.data, sig], axis=1) | |
self.n_sigs += 1 | |
def _calcTotalSignal(self): | |
sigs = self.data.groupby(level=0, axis=1).apply( | |
lambda x: x[x.name].apply( | |
lambda x: np.dot(x[self.signal_names].values, | |
self.signal_weights), axis=1)) | |
sigs = sigs.fillna(0) | |
midx = pd.MultiIndex.from_arrays([self.tickers, len(self.tickers)*['signal']]) | |
sigs.columns = midx | |
self.data = pd.concat([self.data, sigs], axis=1) | |
def _sizePositions(self, cash, price, instrument_risk, signal, positions, index): | |
shares = np.zeros(self.n_instruments) | |
if cash <= 0: | |
return shares | |
sig_sub = signal[index] | |
ir_sub = instrument_risk[index] | |
capital = (cash + np.dot(price, positions)) / self.n_instruments | |
exposure = self.target_risk * self.idm * capital * sig_sub / ir_sub | |
shares[index] += np.floor(exposure / price[index]) | |
insuff_cash = np.where(shares * price > | |
(cash * self.max_leverage) / self.n_instruments)[0] | |
if len(insuff_cash) > 0: | |
shares[insuff_cash] = np.floor( | |
(cash * self.max_leverage / self.n_instruments) / price[insuff_cash]) | |
return shares | |
def _getExposureDrift(self, cash, position, price, signal, instrument_risk): | |
if position.sum() == 0: | |
return np.zeros(self.n_instruments), np.zeros(self.n_instruments) | |
capital = (cash + price * position) / self.n_instruments | |
exposure = self.target_risk * self.idm * capital * signal / instrument_risk | |
cur_exposure = price * position | |
avg_exposure = self.target_risk * self.idm * capital / instrument_risk * np.sign(signal) | |
# Cap exposure leverage | |
avg_exposure = np.minimum(avg_exposure, self.max_leverage * capital) | |
return (exposure - cur_exposure) / avg_exposure, avg_exposure | |
def _calcCash(self, cash_balance, positions, dividends): | |
cash = cash_balance * self.daily_iob if cash_balance > 0 else \ | |
cash_balance * self.daily_margin_cost | |
long_idx = np.where(positions>0)[0] | |
short_idx = np.where(positions<0)[0] | |
if len(long_idx) > 0: | |
cash += np.dot(positions[long_idx], dividends[long_idx]) | |
if len(short_idx) > 0: | |
cash += np.dot(positions[short_idx], dividends[short_idx]) | |
return cash | |
def _logData(self, positions, cash, rebalance, exp_delta): | |
# Log data - probably a better way to go about this | |
self.data['cash'] = cash | |
df0 = pd.DataFrame(positions, | |
columns=self.tickers, index=self.data.index) | |
midx0 = pd.MultiIndex.from_arrays( | |
[self.tickers, len(self.tickers)*['position']]) | |
df0.columns = midx0 | |
df1 = pd.DataFrame(rebalance, | |
columns=self.tickers, index=self.data.index) | |
midx1 = pd.MultiIndex.from_arrays( | |
[self.tickers, len(self.tickers)*['rebalance']]) | |
df1.columns = midx1 | |
df2 = pd.DataFrame(exp_delta, | |
columns=self.tickers, index=self.data.index) | |
midx2 = pd.MultiIndex.from_arrays( | |
[self.tickers, len(self.tickers)*['exposure_drift']]) | |
df2.columns = midx2 | |
self.data = pd.concat([self.data, df0, df1, df2], axis=1) | |
portfolio = np.sum( | |
self.data.loc[:, (slice(None), 'Close')].values * df0.values, | |
axis=1) + cash | |
self.data['portfolio'] = portfolio | |
def _processBar(self, prices, sigs, stds, pos, cash): | |
open_long = np.where((pos<=0) & (sigs>0))[0] | |
if len(open_long) > 0: | |
# Short positions turned to long | |
lprices = prices[open_long] | |
cash += np.dot(pos[open_long], lprices) | |
pos[open_long] = 0 | |
pos += self._sizePositions(cash, | |
prices, stds, sigs, | |
pos, open_long) | |
cash -= np.dot(pos[open_long], lprices) | |
open_short = np.where((pos>=0) & (sigs<0))[0] | |
if len(open_short) > 0: | |
# Close long position and open short | |
sprices = prices[open_short] | |
cash += np.dot(pos[open_short], sprices) | |
pos[open_short] = 0 | |
if self.shorts: | |
pos -= self._sizePositions(cash, | |
prices, stds, sigs, | |
pos, open_short) | |
cash -= np.dot(pos[open_short], sprices) | |
neutral = np.where((pos!=0) & (sigs==0))[0] | |
if len(neutral) > 0: | |
cash += np.dot(pos[neutral], | |
prices[neutral]) | |
pos[neutral] = 0 | |
# Rebalance existing positions | |
delta_exposure, avg_exposure = self._getExposureDrift( | |
cash, pos, prices, sigs, stds) | |
drift_idx = np.where(np.abs(delta_exposure) >= self.exposure_drift)[0] | |
reb_shares = np.zeros(self.n_instruments) | |
if len(drift_idx) > 0: | |
reb_shares[drift_idx] = np.round( | |
delta_exposure * avg_exposure / prices)[drift_idx] | |
cash -= np.dot(reb_shares, prices) | |
pos += reb_shares | |
return pos, cash, reb_shares, delta_exposure | |
def run(self): | |
positions = np.zeros((self.data.shape[0], len(self.tickers))) | |
exp_delta = positions.copy() | |
rebalance = positions.copy() | |
cash = np.zeros(self.data.shape[0]) | |
for i, (ts, row) in enumerate(self.data.iterrows()): | |
prices = row.loc[(slice(None), 'Close')].values | |
divs = row.loc[(slice(None), 'Dividends')].values | |
sigs = row.loc[(slice(None), 'signal')].values | |
stds = row.loc[(slice(None), 'STD')].values | |
pos = positions[i-1].copy() | |
cash_t = self._calcCash(cash[i-1], positions[i], divs) \ | |
if i > 0 else self.starting_capital | |
pos, cash_t, shares, delta_exp = self._processBar( | |
prices, sigs, stds, pos, cash_t) | |
positions[i] = pos | |
cash[i] = cash_t | |
rebalance[i] = shares | |
exp_delta[i] = delta_exp | |
self._logData(positions, cash, rebalance, exp_delta) | |
self._calcReturns() | |
def _calcReturns(self): | |
self.data['strat_log_returns'] = np.log( | |
self.data['portfolio'] / self.data['portfolio'].shift(1)) | |
self.data['strat_cum_returns'] = np.exp( | |
self.data['strat_log_returns'].cumsum()) - 1 | |
self.data['strat_peak'] = self.data['strat_cum_returns'].cummax() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment