Created
August 31, 2015 13:48
-
-
Save raddy/4f6ab203bf744600dde5 to your computer and use it in GitHub Desktop.
secrets secrets
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import numpy as np | |
import numpy.random as rnd | |
from cvxopt import matrix | |
from cvxopt import blas | |
from cvxopt import solvers | |
from cvxopt.modeling import op | |
class IndexReplication(object): | |
def __init__(self): | |
self.G = None # number of groups undergoes mutation | |
self.K = None # number of stocks of IndexReplicationting portfolio | |
self.N = None # total number of constituent stocks | |
self.MF = None # mutation factor, ratio of genes being mutated | |
self.GroupList = None # a dynamic list for storing the results | |
solvers.options['show_progress'] = False # shut down the screen output of cvxopt | |
def build_group(self): | |
idx = zip(rnd.rand(self.N), self.I) | |
idx.sort(key=lambda x:x[0]) | |
idx = np.array([i[1] for i in idx[:self.K]]) | |
group = np.zeros(self.N).astype('bool') | |
group[idx] = True | |
return group | |
def init_groups(self): | |
groups = [self.build_group() for i in xrange(self.G)] | |
self.GroupList = [(self.minimize_tracking_error(g)[0], g) for g in groups] | |
self.GroupList.sort(key=lambda x:x[0]) | |
def minimize_tracking_error(self, g): | |
""" | |
Tracking Error = (Rw-S)'(Rw-S) = w'R'Rw + 2(-R'S)'w + S'S | |
G = R'R | |
a = -R'S | |
minimize 1/2*w'*G*w + a'*w | |
subject to Ci * w <= bi | |
Ce * w = be | |
""" | |
G = matrix(self.GM[:,g][g,:]) | |
a = matrix(self.av[g]) | |
#output = solvers.qp(G, a, self.Ci, self.bi, self.Ce, self.be) | |
output = solvers.qp(G, a, None,None, self.Ce, self.be) | |
w = output['x'] | |
#'primal objective': (1/2)*w'*G*w + a'*w. | |
TrackErr = ((output['primal objective'] * 2 + self.S2) / len(self.Rs)) ** 0.5 | |
return [TrackErr, output['x']] | |
def prep_data(self, index_to_rep,rep_instruments): | |
self.Rs = index_to_rep.values # stock index daily return time series : T x 1 | |
self.RC = rep_instruments.values # constituent stocks time series : T x N | |
self.S2 = np.dot(self.Rs, self.Rs) | |
# G matrix | |
self.GM = np.dot(self.RC.T, self.RC); | |
# a vector | |
self.av = -np.dot(self.RC.T, self.Rs); | |
#total # of constituent stocks | |
self.N = len(self.av) | |
#index of stocks | |
self.I = np.arange(self.N) | |
#inequality constraints | |
# w_i >= 0, long only portfolio | |
self.Ci = matrix(-np.eye(self.K)) | |
self.bi = matrix(np.zeros([self.K, 1])) | |
#eqality contraints | |
# sum(w_i) = 1, weights sum to 1 | |
self.Ce = matrix(1.0, (1, self.K)) | |
self.be = matrix(0.0) | |
def mutate(self): | |
""" | |
mutate one generation by a simple genetic algorithm: | |
1. firstly, identical genes from both parents are inherited | |
2. secondly, different genes are inherited by 50% probability | |
3. finally, all genes mutate by a probability of MF | |
""" | |
while 1: | |
idx = rnd.random_integers(0, self.G - 1, 2) | |
if idx[0] != idx[1]: break | |
ga = self.GroupList[idx[0]][1] | |
gb = self.GroupList[idx[1]][1] | |
g_and = ga & gb | |
g_or = ga | gb | |
g_out = g_or - g_and | |
for i in range(self.N): | |
g_out[i] = g_out[i] and rnd.random_integers(0, 1) | |
g_new = g_and + g_out | |
mut = rnd.rand(self.N) | |
mut[mut < (1 - self.MF)] = 0 | |
mut.astype('bool') | |
child = np.logical_xor(g_new, mut) | |
count = len(child[child]) | |
while count < self.K: | |
idx = rnd.random_integers(0, self.N - count - 1) | |
while child[idx] == True: idx += 1 | |
child[idx] = True | |
count += 1 | |
while count > self.K: | |
idx = rnd.random_integers(0, count - 1) | |
while child[idx] == False: idx += 1 | |
child[idx] = False | |
count -= 1 | |
node = (self.minimize_tracking_error(child)[0], child) | |
if node[0] < self.GroupList[-1][0]: | |
self.GroupList[-1] = node | |
self.GroupList.sort(key=lambda x:x[0]) | |
def find_portfolio(self,index_to_rep,rep_instruments,G, K, MF, N,verbose=True): | |
self.G = G | |
self.K = K | |
self.MF = MF | |
self.prep_data(index_to_rep,rep_instruments) | |
self.init_groups() | |
n = 1; | |
stepper = int(N * .1) | |
while n < N:# number of generations being mutated | |
self.mutate() | |
if verbose and n % stepper == 0: | |
print 'After', n, 'mutations, Tracking_Error = %.8f' % self.GroupList[0][0] | |
n += 1 | |
group = np.arange(0, self.N)[self.GroupList[0][1]] | |
final_weights = np.asarray(self.minimize_tracking_error(group)[1]) | |
return pd.DataFrame(final_weights,index=rep_instruments.columns[group],columns=[index_to_rep.index[0]]) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment