Last active
February 28, 2017 05:18
-
-
Save dat-boris/75c7006f8c103a60aed35ed880860f9d to your computer and use it in GitHub Desktop.
Quantopian tool for researching pair trading algorithm
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
A simple tool for researching Pairs based on: | |
https://www.quantopian.com/clone_notebook?id=57ed7c41144f8837290000da | |
""" | |
from datetime import date, timedelta | |
import matplotlib.pyplot as plt | |
from statsmodels.tsa.stattools import coint | |
from zipline import TradingAlgorithm | |
import pyfolio as pf | |
# CROSS_VALIDATION_BOUNDARY = [[start1, end1]] | |
# STOCK_PAIRS = [] | |
LOOKBACK_WINDOW = timedelta(days=60) | |
class Pair(object): | |
def __init__(self, s1, s2, validation_segment=0): | |
self.s1 = s1 | |
self.s2 = s2 | |
self.validation_segment = validation_segment | |
self.start_date, self.end_date = CROSS_VALIDATION_BOUNDARY[self.validation_segment] | |
self.pricing = [None] * len(CROSS_VALIDATION_BOUNDARY) | |
def get_pricing(self): | |
self.pricing[self.validation_segment] = get_pricing( | |
[self.s1, self.s2], | |
fields=PRICE_USED, | |
start_date=self.start_date - LOOKBACK_WINDOW, | |
# ensure that we dont fall on holiday | |
end_date=self.end_date | |
).fillna(method='backfill') | |
return self | |
def plot(self): | |
self.pricing[self.validation_segment].plot() | |
def coint_test(self, plot=True): | |
""" | |
For a good pair, we should able to see a very low co-integration value | |
(i.e. < 0.05) | |
""" | |
pricing_data = self.pricing[self.validation_segment] | |
X = pricing_data[self.s1] | |
Y = pricing_data[self.s2] | |
if plot: | |
(Y - X).plot() # Plot the spread | |
plt.axhline((Y - X).mean(), color='red', linestyle='--') # Add the mean | |
plt.xlabel('Time') | |
plt.legend(['Price Spread', 'Mean']); | |
# compute the p-value of the cointegration test | |
# will inform us as to whether the spread between the 2 timeseries is stationary | |
# around its mean | |
score, pvalue, _ = coint(X,Y) | |
print "Coint ({} to {}) {} vs {}: {}".format( | |
self.start_date.date(), self.end_date.date(), | |
self.s1, self.s2, pvalue) | |
return pvalue | |
def test_trading(self): | |
algo_initialize = get_backtest_algo(self.s1, self.s2) | |
# see https://www.quantopian.com/research/notebooks/201609-pead-reversion/Value-v-Glamour%20stock.ipynb for setup | |
#http://www.zipline.io/appendix.html | |
algo_obj = TradingAlgorithm( | |
initialize=algo_initialize, | |
#before_trading_start=check_pair_status, | |
start=self.start_date, | |
data_frequency='daily', | |
end=self.end_date, | |
) | |
# Run algorithms | |
pricing_data = self.pricing[self.validation_segment] | |
results = algo_obj.run( | |
pricing_data, #.transpose(2,1,0), | |
overwrite_sim_params=False | |
) | |
sharpe = (results.returns.mean()*252)/(results.returns.std() * np.sqrt(252)) | |
print "The Sharpe ratio is %0.6f" % sharpe | |
self.results = results | |
self.sharpe = sharpe | |
return results, sharpe | |
def tearsheet_from_results(self, simple=True): | |
results = self.results | |
algo_returns, positions, algo_transactions, gross_lev = pf.utils.extract_rets_pos_txn_from_zipline(results) | |
if simple: | |
pf.create_returns_tear_sheet(algo_returns) | |
else: | |
pf.create_full_tear_sheet(algo_returns, positions=positions, | |
transactions=algo_transactions, | |
gross_lev=gross_lev | |
) | |
# test_pair = Pair(*STOCK_PAIRS[1]) | |
# test_pair.get_pricing() | |
# #test_pair.plot() | |
# # should see a low value | |
# #test_pair.coint_test() | |
# results, sharpe = test_pair.test_trading() | |
# source from https://www.quantopian.com/lectures#Example:-Pairs-Trading-Algorithm | |
import numpy as np | |
import statsmodels.api as sm | |
import pandas as pd | |
from zipline.utils import tradingcalendar | |
import pytz | |
import itertools | |
from zipline.api import ( | |
schedule_function, date_rules, time_rules, sid, symbol, | |
set_slippage, slippage, set_commission, commission, | |
get_datetime, order_target_percent, record, attach_pipeline, | |
order_target, get_open_orders, history | |
) | |
def get_backtest_algo(s1, s2): | |
""" | |
Setting up the algorithm for real testing | |
""" | |
ALGO_STOCK_PAIRS = [[s1,s2]] | |
#BENCHMARK = symbols('SPY') | |
#UNIVERSE = list(itertools.chain([BENCHMARK], *ALGO_STOCK_PAIRS)) | |
UNIVERSE = list(itertools.chain(*ALGO_STOCK_PAIRS)) | |
def initialize(context): | |
# Quantopian backtester specific variables | |
#set_slippage(slippage.FixedSlippage(spread=0)) | |
#set_commission(commission.PerTrade(cost=1)) | |
#set_symbol_lookup_date('2014-01-01') | |
context.stock_pairs = ALGO_STOCK_PAIRS | |
context.universe = UNIVERSE | |
# set_benchmark(context.y) | |
context.num_pairs = len(context.stock_pairs) | |
# strategy specific variables | |
context.lookback = 20 # used for regression | |
context.z_window = 20 # used for zscore calculation, must be <= lookback | |
context.spread = np.ndarray((context.num_pairs, 0)) | |
# context.hedgeRatioTS = np.ndarray((context.num_pairs, 0)) | |
context.inLong = [False] * context.num_pairs | |
context.inShort = [False] * context.num_pairs | |
# Only do work 30 minutes before close | |
schedule_function(func=check_pair_status, date_rule=date_rules.every_day(), time_rule=time_rules.market_close(minutes=30)) | |
# Will be called on every trade event for the securities you specify. | |
def handle_data(context, data): | |
# Our work is now scheduled in check_pair_status | |
pass | |
def check_pair_status(context, data): | |
if get_open_orders(): | |
return | |
prices = data.history(context.universe, 'price', 35, '1d').iloc[-context.lookback::] | |
new_spreads = np.ndarray((context.num_pairs, 1)) | |
for i in range(context.num_pairs): | |
(stock_y, stock_x) = context.stock_pairs[i] | |
Y = prices[stock_y] | |
X = prices[stock_x] | |
try: | |
hedge = hedge_ratio(Y, X, add_const=True) | |
except ValueError as e: | |
log.debug(e) | |
return | |
# context.hedgeRatioTS = np.append(context.hedgeRatioTS, hedge) | |
new_spreads[i, :] = Y[-1] - hedge * X[-1] | |
if context.spread.shape[1] > context.z_window: | |
# Keep only the z-score lookback period | |
spreads = context.spread[i, -context.z_window:] | |
zscore = (spreads[-1] - spreads.mean()) / spreads.std() | |
if context.inShort[i] and zscore < 0.0: | |
order_target(stock_y, 0) | |
order_target(stock_x, 0) | |
context.inShort[i] = False | |
context.inLong[i] = False | |
record(X_pct=0, Y_pct=0) | |
return | |
if context.inLong[i] and zscore > 0.0: | |
order_target(stock_y, 0) | |
order_target(stock_x, 0) | |
context.inShort[i] = False | |
context.inLong[i] = False | |
record(X_pct=0, Y_pct=0) | |
return | |
if zscore < -1.0 and (not context.inLong[i]): | |
# Only trade if NOT already in a trade | |
y_target_shares = 1 | |
X_target_shares = -hedge | |
context.inLong[i] = True | |
context.inShort[i] = False | |
(y_target_pct, x_target_pct) = computeHoldingsPct( y_target_shares,X_target_shares, Y[-1], X[-1] ) | |
order_target_percent( stock_y, y_target_pct * (1.0/context.num_pairs) / float(context.num_pairs) ) | |
order_target_percent( stock_x, x_target_pct * (1.0/context.num_pairs) / float(context.num_pairs) ) | |
record(Y_pct=y_target_pct, X_pct=x_target_pct) | |
return | |
if zscore > 1.0 and (not context.inShort[i]): | |
# Only trade if NOT already in a trade | |
y_target_shares = -1 | |
X_target_shares = hedge | |
context.inShort[i] = True | |
context.inLong[i] = False | |
(y_target_pct, x_target_pct) = computeHoldingsPct( y_target_shares, X_target_shares, Y[-1], X[-1] ) | |
order_target_percent( stock_y, y_target_pct * (1.0/context.num_pairs) / float(context.num_pairs) ) | |
order_target_percent( stock_x, x_target_pct * (1.0/context.num_pairs) / float(context.num_pairs) ) | |
record(Y_pct=y_target_pct, X_pct=x_target_pct) | |
context.spread = np.hstack([context.spread, new_spreads]) | |
def hedge_ratio(Y, X, add_const=True): | |
if add_const: | |
X = sm.add_constant(X) | |
model = sm.OLS(Y, X).fit() | |
return model.params[1] | |
model = sm.OLS(Y, X).fit() | |
return model.params.values | |
def computeHoldingsPct(yShares, xShares, yPrice, xPrice): | |
yDol = yShares * yPrice | |
xDol = xShares * xPrice | |
notionalDol = abs(yDol) + abs(xDol) | |
y_target_pct = yDol / notionalDol | |
x_target_pct = xDol / notionalDol | |
return (y_target_pct, x_target_pct) | |
return initialize | |
#get_backtest_algo(*STOCK_PAIRS[0]) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment