Last active
September 16, 2022 15:26
-
-
Save robcarver17/c2e9c594af31894b9942396f719da449 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Implement the handcrafting method | |
This is 'self contained code' which requires wrapping before using in pysystemtrade | |
""" | |
## CAVEATS: | |
## Uses weekly returns (resample needed first) | |
## Doesn't deal with missing assets | |
import numpy as np | |
import pandas as pd | |
import scipy.cluster.hierarchy as sch | |
from scipy.interpolate import interp1d | |
WEEKS_IN_YEAR = 365.25/7.0 | |
MAX_CLUSTER_SIZE = 3 # Do not change | |
WARN_ON_SUBPORTFOLIO_SIZE = 0.2 # change if you like, sensible values are between 0 and 0.5 | |
# Convenience objects | |
NO_SUB_PORTFOLIOS = object() | |
NO_RISK_TARGET = object() | |
NO_TOP_LEVEL_WEIGHTS = object() | |
class diagobject(object): | |
def __init__(self): | |
pass | |
def __repr__(self): | |
return "%s \n %s " % ( self.calcs, self.description) | |
""" | |
In this section we create the candidate matrices and weights | |
""" | |
unsorted_candidate_matrices = [ | |
np.array([[1.,0.,0.],[0.,1.,0.],[0.,0.,1.]]), # equal weights | |
np.array([[1.,0.5,0.5],[0.5,1.,0.5],[0.5,0.5,1.]]), # equal weights | |
np.array([[1., 0.9, 0.9], [0.9, 1., 0.9], [0.9, 0.9, 1.]]), # equal weights | |
np.array([[1., 0.0, 0.5], [0.0, 1., 0.], [0.5,0.,1.]]), # first interesting row of 'ABC' from 'Systematic Trading' table 8 | |
np.array([[1., 0.0, 0.9], [0.0, 1., 0.], [0.9, 0., 1.]]), # 2nd row of 'ABC' from 'Systematic Trading' table 8 | |
np.array([[1., 0.5, 0.], [0.5, 1., 0.5], [0., 0.5, 1.]]), # 3rd row of 'ABC' from 'Systematic Trading' table 8 | |
np.array([[1., 0.0, 0.5], [0.0, 1., 0.9], [0.5, 0.9, 1.]]), # 4th row of 'ABC' from 'Systematic Trading' table 8 | |
np.array([[1., 0.9, 0.0], [0.9, 1., 0.9], [0.0, 0.9, 1.]]), # 5th row of 'ABC' from 'Systematic Trading' table 8 | |
np.array([[1., 0.5, 0.9], [0.5, 1., 0.5], [0.9, 0.5, 1.]]), # 6th row of 'ABC' from 'Systematic Trading' table 8 | |
np.array([[1., 0.9, 0.5], [0.9, 1., 0.9], [0.5, 0.9, 1.]]) # 7th row of 'ABC' from 'Systematic Trading' table 8 | |
] | |
unsorted_candidate_weights = [[0.33333,0.33333,0.33333], # equal weights | |
[0.33333,0.33333,0.33333], # equal weights | |
[0.33333,0.33333,0.33333], # equal weights | |
[0.3, 0.4, 0.3], # first interesting row of 'ABC' from 'Systematic Trading' table 8 | |
[0.27, 0.46, 0.27], # 2nd row of 'ABC' from 'Systematic Trading' table 8 | |
[0.37, 0.26, 0.37], # 3rd row of 'ABC' from 'Systematic Trading' table 8 | |
[0.45, 0.45, 0.1], # 4th row of 'ABC' from 'Systematic Trading' table 8 | |
[0.39, 0.22, 0.39], # 5th row of 'ABC' from 'Systematic Trading' table 8 | |
[0.29, 0.42, 0.29], # 6th row of 'ABC' from 'Systematic Trading' table 8 | |
[0.42, 0.16, 0.42] # 7th row of 'ABC' from 'Systematic Trading' table 8 | |
] | |
def norm_weights(list_of_weights): | |
norm_weights = list(np.array(list_of_weights)/np.sum(list_of_weights)) | |
return norm_weights | |
# To make comparision easier we compare sorted correlations to sorted correlations; otherwise we'd need many more than 10 | |
# candidate matrices to cope with different ordering of the same matrix | |
def get_sorted_order_from_corr_matrix(cmatrix): | |
""" | |
Returns the sort order for a correlation matrix | |
:param cmatrix: NxN np.array | |
:return: np.array you can use to re-order your assets | |
""" | |
corr_sums = cmatrix.sum(axis=0) | |
corr_order = corr_sums.argsort() | |
return corr_order | |
def sort_corr_matrix_and_weights(cmatrix, cweights): | |
""" | |
Sort a correlation matrix and weights | |
:param cmatrix: a NxN np.array | |
:param cweights: an N length list of weights | |
:return: tuple cmatrix, cweights [both reordered] | |
""" | |
corr_order = get_sorted_order_from_corr_matrix(cmatrix) | |
new_cmatrix = cmatrix[np.ix_(corr_order, corr_order)] | |
new_cweights= list(np.array(cweights)[corr_order]) | |
return new_cmatrix, new_cweights | |
## Now build the sorted lists | |
candidate_matrices = [] | |
candidate_weights = [] | |
for cmatrix, cweights in zip(unsorted_candidate_matrices, unsorted_candidate_weights): | |
new_cmatrix, new_cweights = sort_corr_matrix_and_weights(cmatrix, cweights) | |
candidate_matrices.append(new_cmatrix) | |
candidate_weights.append(new_cweights) | |
def distance_between_matrices(matrix1, matrix2): | |
""" | |
Return the euclidian distance between two matrices | |
:param matrix1: NxN np.array | |
:param matrix2: NxN np.array | |
:return: float | |
""" | |
diff_matrix = matrix1 - matrix2 | |
distance_squared = sum(sum(diff_matrix**2)) | |
return distance_squared**.5 | |
def get_weights_using_candidate_method(cmatrix): | |
""" | |
Using interpolation, find the optimal weights from a correlation matrix using the candidate method | |
:return: a list of N weights | |
""" | |
if len(cmatrix)==1: | |
return [1.0] | |
if len(cmatrix)==2: | |
return [0.5, 0.5] | |
if len(cmatrix)>MAX_CLUSTER_SIZE: | |
raise Exception("Cluster too big") | |
# we have to sort first, and then map back to the original weights | |
corr_order = get_sorted_order_from_corr_matrix(cmatrix) | |
sorted_cmatrix = cmatrix[np.ix_(corr_order, corr_order)] | |
# not quite inverse of weighting, in case of divide by zero | |
corr_weightings = [1.0/(0.0001+distance_between_matrices(sorted_cmatrix, candidate_matrix)) for candidate_matrix in candidate_matrices] | |
weighted_weights = np.array([corr_weight_this_candidate*np.array(weightings_for_candidate) | |
for corr_weight_this_candidate, weightings_for_candidate in | |
zip(corr_weightings, candidate_weights)]) | |
weighted_weights = weighted_weights.sum(axis=0) | |
normalised_weights = norm_weights(weighted_weights) | |
# return to original order | |
natural_order_weights = [normalised_weights[idx] for idx in list(corr_order)] | |
return natural_order_weights | |
""" | |
SR adjustment | |
""" | |
relative_SR_adjustment_list= [ | |
[-0.5, 0.65], | |
[-0.4, 0.75], | |
[-0.3, 0.83], | |
[-0.25, 0.85], | |
[-0.2, 0.88], | |
[-0.15, 0.92], | |
[-0.1, 0.95], | |
[-0.05, 0.98], | |
[0, 1], | |
[0.05, 1.03], | |
[0.1, 1.06], | |
[0.15, 1.09], | |
[0.2, 1.13], | |
[0.25, 1.15], | |
[0.3, 1.17], | |
[0.4, 1.25], | |
[0.5, 1.35], | |
] | |
x_values = [i[0] for i in relative_SR_adjustment_list] | |
y_values = [i[1] for i in relative_SR_adjustment_list] | |
multiplier_from_relative_SR=interp1d(x_values, y_values, bounds_error=False, fill_value=(y_values[0], y_values[-1])) | |
def adjust_weights_for_SR(weights, SR_list): | |
""" | |
Adjust weights according to method in table 12 of 'Systematic Trading' | |
:param weights: List of starting weights | |
:param SR_list: np.array of Sharpe Ratios | |
:return: list of adjusted weights | |
""" | |
assert len(weights)==len(SR_list) | |
avg_SR = np.nanmean(SR_list) | |
relative_SR_list = SR_list -avg_SR | |
multipliers = [float(multiplier_from_relative_SR(relative_SR)) for relative_SR in relative_SR_list] | |
new_weights = list(np.array(weights)*np.array(multipliers)) | |
norm_new_weights = norm_weights(new_weights) | |
return norm_new_weights | |
class Portfolio(): | |
""" | |
Portfolios; what do they contain: a list of instruments, return characteristics, [vol weights], [cash weights] | |
can contain sub portfolios | |
they are initially created with some returns | |
""" | |
def __init__(self, instrument_returns, allow_leverage=False, risk_target=NO_RISK_TARGET, use_SR_estimates=True, | |
top_level_weights = NO_TOP_LEVEL_WEIGHTS, log=print): | |
""" | |
:param instrument_returns: A pandas data frame labelled with instrument names, containing weekly instrument_returns | |
:param allow_leverage: bool. Ignored if NO_RISK_TARGET | |
:param risk_target: (optionally) float, annual standard deviation estimate | |
:param use_SR_estimates: bool | |
:param top_level_weights: (optionally) pass a list, same length as top level. Used for partioning to hit risk target. | |
""" | |
self.instrument_returns = instrument_returns | |
self.instruments = list(instrument_returns.columns) | |
self.corr_matrix = instrument_returns.corr() | |
self.vol_vector = np.array(instrument_returns.std() * (WEEKS_IN_YEAR ** .5)) | |
self.returns_vector = np.array(instrument_returns.mean() * WEEKS_IN_YEAR) | |
self.sharpe_ratio = self.returns_vector / self.vol_vector | |
self.allow_leverage = allow_leverage | |
self.risk_target = risk_target | |
self.use_SR_estimates = use_SR_estimates | |
self.top_level_weights = top_level_weights | |
self.log = log | |
def __repr__(self): | |
return "Portfolio with %d instruments" % len(self.instruments) | |
def _cluster_breakdown(self): | |
""" | |
Creates clusters from the portfolio (doesn't create sub portfolios, but tells you which ones to make) | |
Credit to this notebook: https://github.com/TheLoneNut/CorrelationMatrixClustering/blob/master/CorrelationMatrixClustering.ipynb | |
:return: list of int same length as instruments | |
""" | |
X = self.corr_matrix.values | |
d = sch.distance.pdist(X) | |
L = sch.linkage(d, method='complete') | |
ind = sch.fcluster(L, MAX_CLUSTER_SIZE, criterion='maxclust') | |
return list(ind) | |
def _cluster_breakdown_using_risk_partition(self): | |
""" | |
Creates clusters, using a risk partitioning method | |
:return: list of int, same length as instruments | |
""" | |
risk_target = self.risk_target | |
self.log("Partioning into two groups to hit risk target of %f" % risk_target) | |
assert risk_target is not NO_RISK_TARGET | |
vol_vector = self.vol_vector | |
count_is_higher_risk = sum([instrument_vol > risk_target for instrument_vol in vol_vector]) | |
if count_is_higher_risk==0: | |
raise Exception("Risk target greater than vol of any instrument: will be impossible to hit risk target") | |
if count_is_higher_risk<(len(self.instruments)*WARN_ON_SUBPORTFOLIO_SIZE): | |
self.log("Not many instruments have risk higher than target; portfolio will be concentrated to hit risk target") | |
def _cluster_id(instrument_vol, risk_target): | |
# hard coded do not change; high vol is second group | |
if instrument_vol>risk_target: | |
return 2 | |
else: | |
return 1 | |
cluster_list = [_cluster_id(instrument_vol, risk_target) for instrument_vol in vol_vector] | |
return cluster_list | |
def _create_single_subportfolio(self, instrument_list): | |
""" | |
Create a single sub portfolio object | |
:param instrument_list: a subset of the instruments in self.instruments | |
:return: a new Portfolio object | |
""" | |
sub_portfolio_returns = self.instrument_returns[instrument_list] | |
# IMPORTANT NOTE: Sub portfolios don't inherit risk targets or leverage... that is only applied at top level | |
sub_portfolio = Portfolio(sub_portfolio_returns, use_SR_estimates=self.use_SR_estimates) | |
return sub_portfolio | |
def _create_child_subportfolios(self): | |
""" | |
Create sub portfolios. This doesn't create the entire 'tree', just the level below us (our children) | |
:return: a list of new portfolio objects (also modifies self.sub_portfolios) | |
""" | |
# get clusters | |
if len(self.instruments)<=MAX_CLUSTER_SIZE: | |
return NO_SUB_PORTFOLIOS | |
if self._require_partioned_portfolio(): | |
# Break into two groups to hit a risk target | |
self.log("Applying partition to hit risk target") | |
cluster_list = self._cluster_breakdown_using_risk_partition() | |
else: | |
self.log("Natural top level grouping used") | |
cluster_list = self._cluster_breakdown() | |
unique_clusters = list(set(cluster_list)) | |
instruments_by_cluster = [[self.instruments[idx] for idx,i in enumerate(cluster_list) if i==cluster_id] | |
for cluster_id in unique_clusters] | |
sub_portfolios = [self._create_single_subportfolio(instruments_for_this_cluster) | |
for instruments_for_this_cluster in instruments_by_cluster] | |
return sub_portfolios | |
def _require_partioned_portfolio(self): | |
""" | |
If risk_target set and no leverage allowed will be True, | |
OR if top level weights are passed | |
otherwise False | |
:return: bool | |
""" | |
if self.top_level_weights is not NO_TOP_LEVEL_WEIGHTS: | |
# if top level weights are passed we need to partition | |
return True | |
elif (not self.risk_target is NO_RISK_TARGET) and (not self.allow_leverage): | |
# if a risk target is set, but also no leverage allowed, we need to partition | |
return True | |
return False | |
def _create_all_subportfolios(self): | |
""" | |
Decluster the entire portfolio into a tree of subportfolios | |
:return: None [populates self.subportfolios] or NO_SUB_PORTFOLIOS | |
""" | |
## Create the first level of sub portfolios underneath us | |
sub_portfolios = self._create_child_subportfolios() | |
if sub_portfolios is NO_SUB_PORTFOLIOS: | |
# nothing to do | |
return NO_SUB_PORTFOLIOS | |
# Create the rest of the tree | |
for single_sub_portfolio in sub_portfolios: | |
# This will create all nested portfolios | |
single_sub_portfolio._create_all_subportfolios() | |
return sub_portfolios | |
def show_subportfolio_tree(self, prefix=""): | |
""" | |
Display the sub portfolio tree | |
:return: None | |
""" | |
descrlist=[] | |
if self.sub_portfolios is NO_SUB_PORTFOLIOS: | |
descrlist=["%s Contains %s" % (prefix, str(self.instruments))] | |
return descrlist | |
descrlist.append("%s Contains %d sub portfolios" % (prefix, len(self.sub_portfolios))) | |
prefix=prefix+"..." | |
for sub_portfolio in self.sub_portfolios: | |
descrlist.append(sub_portfolio.show_subportfolio_tree(prefix=prefix)) | |
return descrlist | |
def _diags_as_dataframe(self): | |
""" | |
:return: A list of tuples (label, dataframes) showing how the portfolio weights were built up | |
""" | |
diag = diagobject() | |
# not used - make sure everything is available | |
vw = self.volatility_weights | |
if self.sub_portfolios is NO_SUB_PORTFOLIOS: | |
description = "Portfolio containing %s instruments " % (str(self.instruments)) | |
diag.description = description | |
vol_weights = self.volatility_weights | |
raw_weights = self.raw_weights | |
SR = self.sharpe_ratio | |
diagmatrix = pd.DataFrame([raw_weights, vol_weights, list(SR)], columns=self.instruments, | |
index=["Raw vol (no SR adj)", "Vol (with SR adj)", "Sharpe Ratio"]) | |
diag.calcs = diagmatrix | |
diag.cash = "No cash calculated" | |
diag.aggregate = "Not an aggregate portfolio" | |
return diag | |
description = "Portfolio containing %d sub portfolios" % len(self.sub_portfolios) | |
diag.description = description | |
# do instrument level | |
dm_by_instrument_list = self.dm_by_instrument_list | |
instrument_vol_weight_in_sub_list = self.instrument_vol_weight_in_sub_list | |
sub_portfolio_vol_weight_list = self.sub_portfolio_vol_weight_list | |
vol_weights = self.volatility_weights | |
diagmatrix = pd.DataFrame([instrument_vol_weight_in_sub_list, | |
sub_portfolio_vol_weight_list, | |
dm_by_instrument_list, vol_weights], columns=self.instruments, | |
index=["Vol wt in group", | |
"Vol wt. of group", | |
"Div mult of group", "Vol wt."]) | |
diag.calcs = diagmatrix | |
# do aggregate next | |
diag.aggregate=diagobject() | |
diag.aggregate.description = description + " aggregate" | |
vol_weights = self.aggregate_portfolio.volatility_weights | |
raw_weights = self.aggregate_portfolio.raw_weights | |
div_mult = [sub_portfolio.div_mult for sub_portfolio in self.sub_portfolios] | |
sharpe_ratios = list(self.aggregate_portfolio.sharpe_ratio) | |
# unlabelled, sub portfolios don't get names | |
diagmatrix = pd.DataFrame([raw_weights, vol_weights, sharpe_ratios, div_mult], | |
index=["Raw vol (no SR adj or DM)", "Vol (with SR adj no DM)", "SR","Div mult"]) | |
diag.aggregate.calcs = diagmatrix | |
# do cash | |
diag.cash = diagobject() | |
description = "Portfolio containing %d instruments (cash calculations)" % len(self.instruments) | |
diag.cash.description = description | |
vol_weights = self.volatility_weights | |
cash_weights = self.cash_weights | |
vol_vector = list(self.vol_vector) | |
diagmatrix = pd.DataFrame([vol_weights, vol_vector, cash_weights], columns=self.instruments, | |
index=["Vol weights", "Std.", "Cash weights"]) | |
diag.cash.calcs = diagmatrix | |
return diag | |
def _calculate_weights_standalone_portfolio(self): | |
""" | |
For a standalone portfolio, calculates volatility weights | |
Uses the candidate matching method | |
:return: list of weights | |
""" | |
assert len(self.instruments)<=MAX_CLUSTER_SIZE | |
assert self.sub_portfolios is NO_SUB_PORTFOLIOS | |
raw_weights = get_weights_using_candidate_method(self.corr_matrix.values) | |
self.raw_weights = raw_weights | |
use_SR_estimates= self.use_SR_estimates | |
if use_SR_estimates: | |
SR_list = self.sharpe_ratio | |
adjusted_weights = adjust_weights_for_SR(raw_weights, SR_list) | |
else: | |
adjusted_weights = raw_weights | |
return adjusted_weights | |
def _calculate_portfolio_returns(self): | |
""" | |
If we have some weights, calculate the returns of the entire portfolio | |
Needs cash weights | |
:return: pd.Series of returns | |
""" | |
cash_weights = self.cash_weights | |
instrument_returns = self.instrument_returns | |
cash_weights_as_df = pd.DataFrame([cash_weights] * len(instrument_returns.index), instrument_returns.index) | |
cash_weights_as_df.columns = instrument_returns.columns | |
portfolio_returns_df = cash_weights_as_df * instrument_returns | |
portfolio_returns = portfolio_returns_df.sum(axis=1) | |
return portfolio_returns | |
def _calculate_portfolio_returns_std(self): | |
return self.portfolio_returns.std() * (WEEKS_IN_YEAR ** .5) | |
def _calculate_diversification_mult(self): | |
""" | |
Calculates the diversification multiplier for a portfolio | |
:return: float | |
""" | |
corr_matrix = self.corr_matrix.values | |
vol_weights = np.array(self.volatility_weights) | |
div_mult = 1.0/((np.dot(np.dot(vol_weights,corr_matrix), vol_weights.transpose()))**.5) | |
return div_mult | |
def _calculate_sub_portfolio_returns(self): | |
""" | |
Return a matrix of returns with sub portfolios each representing a single asset | |
:return: pd.DataFrame | |
""" | |
assert self.sub_portfolios is not NO_SUB_PORTFOLIOS | |
sub_portfolio_returns = [sub_portfolio.portfolio_returns for sub_portfolio in self.sub_portfolios] | |
sub_portfolio_returns = pd.concat(sub_portfolio_returns, axis=1) | |
return sub_portfolio_returns | |
def _calculate_weights_aggregated_portfolio(self): | |
""" | |
Calculate weights when we have sub portfolios | |
This is done by pulling in the weights from each sub portfolio, giving weights to each sub portfolio, and then getting the product | |
:return: list of weights | |
""" | |
sub_portfolio_returns = self._calculate_sub_portfolio_returns() | |
# create another Portfolio object made up of the sub portfolios | |
aggregate_portfolio = Portfolio(sub_portfolio_returns, use_SR_estimates=self.use_SR_estimates) | |
# store to look at later if you want | |
self.aggregate_portfolio = aggregate_portfolio | |
# calculate the weights- these will be the weight on each sub portfolio | |
if self.top_level_weights is NO_TOP_LEVEL_WEIGHTS: | |
# calculate the weights in the normal way | |
aggregate_weights = aggregate_portfolio.volatility_weights | |
raw_weights = aggregate_portfolio.raw_weights | |
else: | |
# override with top_level_weights - used when risk targeting | |
try: | |
assert len(self.top_level_weights)==len(aggregate_portfolio.instruments) | |
except: | |
raise Exception("Top level weights length %d is different from number of top level groups %d" | |
% (len(self.top_level_weights)==len(self.aggregate_portfolio.instruments))) | |
aggregate_weights = self.top_level_weights | |
raw_weights = aggregate_weights | |
# calculate the product of div_mult, aggregate weights and sub portfolio weights, return as list | |
vol_weights = [] | |
dm_by_instrument_list = [] | |
instrument_vol_weight_in_sub_list = [] | |
sub_portfolio_vol_weight_list =[] | |
for instrument_code in self.instruments: | |
weight = None | |
for sub_portfolio, sub_weight in zip(self.sub_portfolios, aggregate_weights): | |
if instrument_code in sub_portfolio.instruments: | |
if weight is not None: | |
raise Exception("Instrument %s in multiple sub portfolios" % instrument_code) | |
# A weight is the product of: the diversification multiplier for the subportfolio it comes from, | |
# the weight of that instrument within that subportfolio, and | |
# the weight of the subportfolio within the larger portfolio | |
div_mult = sub_portfolio.div_mult | |
instrument_idx = sub_portfolio.instruments.index(instrument_code) | |
instrument_weight = sub_portfolio.volatility_weights[instrument_idx] | |
weight = div_mult * instrument_weight * sub_weight | |
# useful diagnostics | |
dm_by_instrument_list.append(div_mult) | |
instrument_vol_weight_in_sub_list.append(instrument_weight) | |
sub_portfolio_vol_weight_list.append(sub_weight) | |
if weight is None: | |
raise Exception("Instrument %s missing from all sub portfolios" % instrument_code) | |
vol_weights.append(weight) | |
vol_weights = norm_weights(vol_weights) | |
# store diags | |
self.dm_by_instrument_list = dm_by_instrument_list | |
self.instrument_vol_weight_in_sub_list = instrument_vol_weight_in_sub_list | |
self.sub_portfolio_vol_weight_list = sub_portfolio_vol_weight_list | |
self.raw_weights = raw_weights | |
return vol_weights | |
def _calculate_volatility_weights(self): | |
""" | |
Calculates the volatility weights of the portfolio | |
If the portfolio contains sub_portfolios; it will calculate the volatility weights of each sub_portfolio, and then | |
weight towards sub_portfolios, and then calculate the multiplied out weights | |
If the portfolio does not contain sub_portfolios; just calculate the weights | |
:return: volatility weights, also sets self.volatility_weights | |
""" | |
if self.sub_portfolios is NO_SUB_PORTFOLIOS: | |
vol_weights = self._calculate_weights_standalone_portfolio() | |
else: | |
vol_weights = self._calculate_weights_aggregated_portfolio() | |
return vol_weights | |
def _calculate_cash_weights_no_risk_target(self): | |
""" | |
Calculate cash weights without worrying about risk targets | |
:return: list of cash weights | |
""" | |
vol_weights = self.volatility_weights | |
instrument_std = self.vol_vector | |
raw_cash_weights = [vweight / vol for vweight, vol in zip(vol_weights, instrument_std)] | |
raw_cash_weights = norm_weights(raw_cash_weights) | |
return raw_cash_weights | |
def _calculate_cash_weights_with_risk_target_partitioned(self): | |
""" | |
Readjust partitioned top level groups to hit a risk target | |
(https://qoppac.blogspot.com/2018/12/portfolio-construction-through_7.html) | |
:return: list of weights | |
""" | |
assert self._require_partioned_portfolio() | |
assert len(self.sub_portfolios)==2 | |
# hard coded - high vol is second group. Don't change! | |
high_vol_sub_portfolio = self.sub_portfolios[1] | |
low_vol_sub_portfolio = self.sub_portfolios[0] | |
high_vol_std = high_vol_sub_portfolio.portfolio_std | |
low_vol_std = low_vol_sub_portfolio.portfolio_std | |
risk_target_std = self.risk_target | |
assert high_vol_std>low_vol_std | |
# Now for the correlation estimate | |
# first create another Portfolio object made up of the sub portfolios | |
sub_portfolio_returns = self._calculate_sub_portfolio_returns() | |
assert len(sub_portfolio_returns.columns)==2 # should be guaranteed by partioning but just to check | |
correlation = sub_portfolio_returns.corr().values[0][1] # only works for groups of 2 | |
# formula from https://qoppac.blogspot.com/2018/12/portfolio-construction-through_7.html | |
a_value = (high_vol_std**2) + (low_vol_std **2) - (2*high_vol_std *low_vol_std * correlation) | |
b_value = (2*high_vol_std*low_vol_std*correlation) - 2 * (low_vol_std**2) | |
c_value = (low_vol_std**2) - (risk_target_std**2) | |
# standard formula for solving a quadratic | |
high_cash_weight = (-b_value + (((b_value**2) - (4*a_value * c_value))**.5))/(2*a_value) | |
try: | |
assert high_cash_weight>=0.0 | |
except: | |
raise Exception("Something went wrong; cash weight target on high risk portfolio is negative!") | |
try: | |
assert high_cash_weight<=1.0 | |
except: | |
raise Exception("Can't hit risk target of %f - make it lower or include riskier assets!" % risk_target_std) | |
# new_weight is the weight on the HIGH_VOL portfolio | |
low_cash_weight = 1.0 - high_cash_weight | |
# These are cash weights; change to a vol weight | |
high_vol_weight = high_cash_weight * high_vol_std | |
low_vol_weight = low_cash_weight * low_vol_std | |
self.log("Need to limit low cash group to %f (vol) %f (cash) of portfolio to hit risk target of %f" % | |
(low_vol_weight, low_cash_weight, risk_target_std)) | |
# Hard coded - high vol is second group | |
top_level_weights = norm_weights([low_vol_weight, high_vol_weight]) | |
self.top_level_weights = top_level_weights | |
# We create an adjusted portfolio with the required top level weights as constraints | |
# we also need to pass the risk target to get same partitioning | |
# and use_SR_estimates to guarantee weights are the same | |
# | |
adjusted_portfolio = Portfolio(self.instrument_returns, use_SR_estimates=self.use_SR_estimates, | |
top_level_weights=top_level_weights, risk_target=self.risk_target) | |
return adjusted_portfolio.cash_weights | |
def _calculate_cash_weights_with_risk_target(self): | |
""" | |
Calculate cash weights given a risk target | |
:return: list of weights | |
""" | |
target_std = self.risk_target | |
self.log("Calculating weights to hit a risk target of %f" % target_std) | |
# create version without risk target to check natural risk | |
# note all sub portfolios are like this | |
natural_portfolio = Portfolio(self.instrument_returns, risk_target=NO_RISK_TARGET) | |
natural_std = natural_portfolio.portfolio_std | |
natural_cash_weights = natural_portfolio.cash_weights | |
# store for diagnostics | |
self.natural_cash_weights = natural_cash_weights | |
self.natural_std = natural_std | |
if natural_std > target_std: | |
# Too much risk | |
# blend with cash | |
cash_required = (natural_std - target_std) / natural_std | |
portfolio_capital_left = 1.0 - cash_required | |
self.log("Too much risk %f of the portfolio will be cash" % cash_required) | |
cash_weights = list(np.array(natural_cash_weights)*portfolio_capital_left) | |
# stored as diag | |
self.cash_required = cash_required | |
return cash_weights | |
elif natural_std < target_std: | |
# Not enough risk | |
if self.allow_leverage: | |
# calc leverage | |
leverage = target_std / natural_std | |
self.log("Not enough risk leverage factor of %f applied" % leverage) | |
cash_weights = list(np.array(natural_cash_weights)*leverage) | |
# stored as diag | |
self.leverage = leverage | |
return cash_weights | |
else: | |
# no leverage allowed | |
# need to adjust weights | |
self.log("Not enough risk, no leverage allowed, using partition method") | |
return self._calculate_cash_weights_with_risk_target_partitioned() | |
# will only get here if the target and natural std are identical... | |
# unlikely - but! | |
return natural_cash_weights | |
def _calculate_cash_weights(self): | |
""" | |
Calculate cash weights | |
Note - this will apply a risk target if required | |
Note 2 - only top level portfolios have risk targets - sub portfolios don't | |
:return: list of weights | |
""" | |
target_std = self.risk_target | |
if target_std is NO_RISK_TARGET: | |
# no risk target, can use natural weights | |
return self._calculate_cash_weights_no_risk_target() | |
elif self.top_level_weights is not NO_TOP_LEVEL_WEIGHTS: | |
# top level weights passed, use natural weights | |
return self._calculate_cash_weights_no_risk_target() | |
else: | |
# need a risk target | |
return self._calculate_cash_weights_with_risk_target() | |
""" | |
Boilerplate getter functions | |
""" | |
@property | |
def volatility_weights(self): | |
if hasattr(self, "_volatility_weights"): | |
return self._volatility_weights | |
else: | |
weights_vol = self._calculate_volatility_weights() | |
self._volatility_weights = weights_vol | |
return weights_vol | |
@property | |
def cash_weights(self): | |
if hasattr(self, "_cash_weights"): | |
return self._cash_weights | |
else: | |
weights_cash = self._calculate_cash_weights() | |
self._cash_weights = weights_cash | |
return weights_cash | |
@property | |
def sub_portfolios(self): | |
if hasattr(self, "_sub_portfolios"): | |
return self._sub_portfolios | |
else: | |
sub_portfolios = self._create_all_subportfolios() | |
self._sub_portfolios = sub_portfolios | |
return sub_portfolios | |
@property | |
def portfolio_returns(self): | |
if hasattr(self, "_portfolio_returns"): | |
return self._portfolio_returns | |
else: | |
portfolio_returns = self._calculate_portfolio_returns() | |
self._portfolio_returns = portfolio_returns | |
return portfolio_returns | |
@property | |
def portfolio_std(self): | |
if hasattr(self, "_portfolio_returns_std"): | |
return self._portfolio_returns_std | |
else: | |
portfolio_returns_std = self._calculate_portfolio_returns_std() | |
self._portfolio_returns_std = portfolio_returns_std | |
return portfolio_returns_std | |
@property | |
def div_mult(self): | |
if hasattr(self, "_div_mult"): | |
return self._div_mult | |
else: | |
div_mult = self._calculate_diversification_mult() | |
self._div_mult = div_mult | |
return div_mult | |
@property | |
def diags(self): | |
if hasattr(self, "_diags"): | |
return self._diags | |
else: | |
diags = self._diags_as_dataframe() | |
self._diags = diags | |
return diags | |
p=Portfolio(returns) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment