Created
January 16, 2021 12:42
-
-
Save dogwood008/fda98e9645b0f1f82fc9c034b809874d to your computer and use it in GitHub Desktop.
「株のシステムトレードをしよう - 1から始める株自動取引システムの作り方」で使用しているバックテスト https://bit.ly/3qvOehD
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
"""read_kabu_plus_csv_on_backtrader | |
Automatically generated by Colaboratory. | |
Original file is located at | |
https://colab.research.google.com/drive/1eHqULDGmmM6bQLaxJyt8DSB_qcwsTkk- | |
## Pip install | |
""" | |
#!pip install git+https://github.com/dogwood008/backtrader.git@bugfix_order_day_doesnt_works backtrader_plotting | |
!pip install backtrader==1.9.76.123 backtrader_plotting | |
"""## Consts""" | |
USE_BOKEH = False | |
"""## Load CSV""" | |
path_to_csv = '/content/drive/MyDrive/Project/kabu-plus/japan-stock-prices-2_2020_9143_adjc.csv' | |
import pandas as pd | |
import backtrader as bt | |
csv = pd.read_csv(path_to_csv) | |
pd.set_option('display.max_rows', 500) | |
csv | |
"""## FeedsData""" | |
############################################################# | |
# Copyright (C) 2020 dogwood008 (original author: Daniel Rodriguez; https://github.com/mementum/backtraders) | |
# | |
# This program is free software: you can redistribute it and/or modify | |
# it under the terms of the GNU General Public License as published by | |
# the Free Software Foundation, either version 3 of the License, or | |
# (at your option) any later version. | |
# | |
# This program is distributed in the hope that it will be useful, | |
# but WITHOUT ANY WARRANTY; without even the implied warranty of | |
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
# GNU General Public License for more details. | |
# | |
# You should have received a copy of the GNU General Public License | |
# along with this program. If not, see <https://www.gnu.org/licenses/>. | |
############################################################# | |
import csv | |
import itertools | |
import io | |
import pytz | |
from datetime import date, datetime | |
from backtrader.utils import date2num | |
from typing import Any | |
class KabuPlusJPCSVData(bt.feeds.YahooFinanceCSVData): | |
''' | |
Parses pre-downloaded KABU+ CSV Data Feeds (or locally generated if they | |
comply to the Yahoo formatg) | |
Specific parameters: | |
- ``dataname``: The filename to parse or a file-like object | |
- ``reverse`` (default: ``True``) | |
It is assumed that locally stored files have already been reversed | |
during the download process | |
- ``round`` (default: ``True``) | |
Whether to round the values to a specific number of decimals after | |
having adjusted the close | |
- ``roundvolume`` (default: ``0``) | |
Round the resulting volume to the given number of decimals after having | |
adjusted it | |
- ``decimals`` (default: ``2``) | |
Number of decimals to round to | |
- ``swapcloses`` (default: ``False``) | |
[2018-11-16] It would seem that the order of *close* and *adjusted | |
close* is now fixed. The parameter is retained, in case the need to | |
swap the columns again arose. | |
''' | |
DATE = 'date' | |
OPEN = 'open' | |
HIGH = 'high' | |
LOW = 'low' | |
CLOSE = 'close' | |
VOLUME = 'volume' | |
ADJUSTED_CLOSE = 'adjusted_close' | |
params = ( | |
('reverse', True), | |
('round', True), | |
('decimals', 2), | |
('roundvolume', False), | |
('swapcloses', False), | |
('headers', True), | |
('header_names', { # CSVのカラム名と内部的なキーを変換する辞書 | |
DATE: 'date', | |
OPEN: 'open', | |
HIGH: 'high', | |
LOW: 'low', | |
CLOSE: 'close', | |
VOLUME: 'volumes', | |
ADJUSTED_CLOSE: 'adj_close', | |
}), | |
('tz', pytz.timezone('Asia/Tokyo')) | |
) | |
def _fetch_value(self, values: dict, column_name: str) -> Any: | |
''' | |
パラメタで指定された変換辞書を使用して、 | |
CSVで定義されたカラム名に沿って値を取得する。 | |
''' | |
index = self._column_index(self.p.header_names[column_name]) | |
return values[index] | |
def _column_index(self, column_name: str) -> int: | |
''' | |
与えたカラム名に対するインデックス番号を返す。 | |
見つからなければ ValueError を投げる。 | |
''' | |
return self._csv_headers.index(column_name) | |
# copied from https://github.com/mementum/backtrader/blob/0426c777b0abdfafbb0988f5c31347553256a2de/backtrader/feed.py#L666-L679 | |
def start(self): | |
super(bt.feed.CSVDataBase, self).start() | |
if self.f is None: | |
if hasattr(self.p.dataname, 'readline'): | |
self.f = self.p.dataname | |
else: | |
# Let an exception propagate to let the caller know | |
self.f = io.open(self.p.dataname, 'r') | |
if self.p.headers and self.p.header_names: | |
_csv_reader = csv.reader([self.f.readline()]) | |
self._csv_headers = next(_csv_reader) | |
self.separator = self.p.separator | |
def _loadline(self, linetokens): | |
while True: | |
nullseen = False | |
for tok in linetokens[1:]: | |
if tok == 'null': | |
nullseen = True | |
linetokens = self._getnextline() # refetch tokens | |
if not linetokens: | |
return False # cannot fetch, go away | |
# out of for to carry on wiwth while True logic | |
break | |
if not nullseen: | |
break # can proceed | |
dttxt = self._fetch_value(linetokens, self.DATE) | |
dt = date(int(dttxt[0:4]), int(dttxt[5:7]), int(dttxt[8:10])) | |
dtnum = date2num(datetime.combine(dt, self.p.sessionend)) | |
#dtnum = date2num(datetime.combine(dt, self.p.sessionend), tz=pytz.timezone('Asia/Tokyo')) | |
self.lines.datetime[0] = dtnum | |
o = float(self._fetch_value(linetokens, self.OPEN)) | |
h = float(self._fetch_value(linetokens, self.HIGH)) | |
l = float(self._fetch_value(linetokens, self.LOW)) | |
rawc = float(self._fetch_value(linetokens, self.CLOSE)) | |
self.lines.openinterest[0] = 0.0 | |
adjustedclose = float(self._fetch_value(linetokens, self.ADJUSTED_CLOSE)) | |
v = float(self._fetch_value(linetokens, self.VOLUME)) | |
if self.p.swapcloses: # swap closing prices if requested | |
rawc, adjustedclose = adjustedclose, rawc | |
adjfactor = rawc / adjustedclose | |
o /= adjfactor | |
h /= adjfactor | |
l /= adjfactor | |
v *= adjfactor | |
if self.p.round: | |
decimals = self.p.decimals | |
o = round(o, decimals) | |
h = round(h, decimals) | |
l = round(l, decimals) | |
rawc = round(rawc, decimals) | |
v = round(v, self.p.roundvolume) | |
self.lines.open[0] = o | |
self.lines.high[0] = h | |
self.lines.low[0] = l | |
self.lines.close[0] = adjustedclose | |
self.lines.volume[0] = v | |
return True | |
"""## Sizer""" | |
# import backtrader as bt | |
# from bt.comminfo import CommInfoBase | |
# from bt.feeds import AbstractDataBase | |
# | |
# class RangeSizer(bt.Sizer): | |
# ''' | |
# 決められた注文額に納まるように、注文数を調整する。 | |
# ''' | |
# | |
# params = ( | |
# ('min_order_price', 10 * 10000), # 最低購入金額(円) | |
# ('max_order_price', 50 * 10000), # 最高購入金額(円) | |
# ) | |
# | |
# def _getsizing(self, comminfo: CommInfoBase, | |
# cash: float, data: AbstractDataBase, isbuy: bool) -> int: | |
# ''' | |
# base: https://github.com/mementum/backtrader/blob/master/backtrader/sizer.py | |
# Parameters | |
# ---------------------- | |
# comminfo : bt.comminfo.CommInfoBase | |
# The CommissionInfo instance that contains | |
# information about the commission for the data and allows | |
# calculation of position value, operation cost, commision for the | |
# operation | |
# | |
# cash : float | |
# current available cash in the *broker* | |
# | |
# data : Any | |
# target of the operation | |
# | |
# isbuy : bool | |
# will be `True` for *buy* operations and `False` | |
# for *sell* operations | |
# | |
# | |
# Returns | |
# ----------------------- | |
# size : int | |
# The size of an order | |
# ''' | |
# | |
# issell = not isbuy | |
# | |
# if issell: | |
# # 売り注文なら、全量を指定 | |
# position = self.broker.getposition(data) | |
# if not position.size: | |
# return 0 # do not sell if nothing is open | |
# else: | |
# return position.size | |
# | |
# else: | |
# # TODO | |
# | |
"""## StrategyFetcher""" | |
# # https://www.backtrader.com/blog/posts/2017-05-16-stsel-revisited/stsel-revisited/ | |
# class StFetcher(object): | |
# _STRATS = [] | |
# | |
# @classmethod | |
# def register(cls, target): | |
# cls._STRATS.append(target) | |
# | |
# @classmethod | |
# def COUNT(cls): | |
# return range(len(cls._STRATS)) | |
# | |
# def __new__(cls, *args, **kwargs): | |
# idx = kwargs.pop('idx') | |
# | |
# obj = cls._STRATS[idx](*args, **kwargs) | |
# return obj | |
"""## Strategy""" | |
import backtrader as bt | |
from logging import getLogger, StreamHandler, Formatter, DEBUG, INFO, WARN | |
if USE_BOKEH: | |
from backtrader_plotting import Bokeh | |
from backtrader_plotting.schemes import Tradimo | |
# for jupyter | |
if 'PercentageBuySellStrategyWithLogger' in globals(): | |
del PercentageBuySellStrategyWithLogger | |
from typing import Callable, Union, Optional | |
LazyString = Callable[[str], str] | |
LazyableString = Union[LazyString, str] | |
# Create a Stratey | |
# @StFetcher.register | |
class PercentageBuySellStrategyWithLogger(bt.Strategy): | |
params = ( | |
('default_unit_size', 100), # デフォルトの単元株の株数 | |
('buy_under_percentage', 5), # 前日終値と比較し本日始値が▲x%の場合に買い注文 | |
('sell_over_percentage', 5), # 前日終値と比較し本日始値が+y%の場合に売り注文 | |
('min_order_price', 10 * 10000), # 最低購入金額(円) | |
('max_order_price', 50 * 10000), # 最高購入金額(円) | |
('smaperiod', 5), | |
# ('sizer', RangeSizer), # 数量指定ロジック | |
) | |
# def _log(self, txt: LazyLazyableString, dt=None): | |
# ''' Logging function for this strategy ''' | |
# dt = dt or self.datas[0].datetime.date(0) | |
# self._logger.debug('%s, %s' % (dt.isoformat(), txt)) | |
def __init__(self, loglevel): | |
# Keep a reference to the "close" line in the data[0] dataseries | |
self._dataopen = self.datas[0].open | |
self._datahigh = self.datas[0].high | |
self._datalow = self.datas[0].low | |
self._dataclose = self.datas[0].close | |
self._dataadjclose = self.datas[0].adjclose | |
self._datavolume = self.datas[0].volume | |
self._logger = getLogger(__name__) | |
self.handler = StreamHandler() | |
self.handler.setLevel(loglevel) | |
self._logger.setLevel(loglevel) | |
self._logger.addHandler(self.handler) | |
self._logger.propagate = False | |
self.handler.setFormatter( | |
Formatter('[%(levelname)s] %(message)s')) | |
self.sma = bt.indicators.SimpleMovingAverage( | |
self.datas[0], period=self.params.smaperiod) | |
def notify_order(self, order): | |
if order.status in [order.Submitted, order.Accepted]: | |
return | |
if order.status in [order.Completed]: | |
if order.isbuy(): | |
buy_or_sell = 'BUY' | |
elif order.issell(): | |
buy_or_sell = 'SELL' | |
else: | |
buy_or_sell = 'UNDEFINED' | |
self._log(lambda: '{b_s:4s} EXECUTED, {price:7.2f}'.format( | |
b_s=buy_or_sell, | |
price=order.executed.price)) | |
elif order.status in [order.Canceled, order.Margin, order.Rejected]: | |
self._debug(lambda: 'Order Canceled/Margin/Rejected') | |
elif order.status in [order.Expired]: | |
# from IPython.core.debugger import Pdb; Pdb().set_trace() | |
self._debug(lambda: 'Expired: {b_s:s} ¥{sum:,d} (@{price:.2f} * {unit:4d}), valid: ({valid:s}, {valid_raw:f})'.format( | |
sum=int(order.price * order.size), | |
b_s=order.ordtypename(), price=order.price, unit=order.size, | |
valid=str(bt.utils.dateintern.num2date(order.valid, tz=pytz.timezone('Asia/Tokyo'))), | |
valid_raw=order.valid) | |
) | |
else: | |
self._debug(order.getstatusname()) | |
def _log(self, txt: LazyableString, loglevel=INFO, dt=None): | |
''' Logging function for this strategy ''' | |
dt = dt or self.datas[0].datetime.date(0) | |
logtext = txt() if callable(txt) else str(txt) | |
self._logger.log(loglevel, '%s, %s' % (dt.isoformat(), logtext)) | |
def _debug(self, txt: LazyableString, dt=None): | |
self._log(txt, DEBUG, dt) | |
def _info(self, txt: LazyableString, dt=None): | |
self._log(txt, INFO, dt) | |
def _size(self, unit_price: float, is_buy: bool) -> Optional[int]: | |
''' | |
Params | |
------------------ | |
unit_price : flat | |
ある銘柄の価格 | |
is_buy : bool | |
買い注文なら True, 売り注文なら False | |
''' | |
if not is_buy: | |
# 売り注文なら、全量を指定する | |
return None | |
min_size = 100 | |
max_size = 2000 | |
for i in range(int(min_size / 100), int(max_size / 100 + 1)): | |
unit = i * 100 | |
order_value = unit_price * unit | |
if order_value < self.p.min_order_price: | |
# 安すぎる場合、購入量を増やす | |
continue | |
elif self.p.min_order_price <= order_value <= self.p.max_order_price: | |
return unit | |
else: | |
# 高すぎて買えない場合、買わない | |
return 0 | |
def _is_to_buy(self, open_today: float, close_yesterday: float, high_today: Optional[float] = None) -> bool: | |
# 本日始値*閾値% <= 前日終値 | |
to_buy_condition: bool = open_today * (100.0 - self.p.buy_under_percentage) / 100.0 <= close_yesterday | |
if high_today: | |
# バックテスト実行時に、始値より高値が高いことを確認する。 | |
# これにより、実際にこの戦略をリアルタイムで動かした場合にも、動作可能であることを確認する。 | |
return to_buy_condition and open_today <= high_today | |
else: | |
# リアルタイムで動かした場合は、high_today = None | |
return to_buy_condition | |
def _is_to_close(self, open_today: float, close_yesterday: float, low_today: Optional[float] = None) -> bool: | |
# 前日終値*閾値% <= 本日始値 | |
to_sell_condition: bool = close_yesterday * (100.0 + self.p.sell_over_percentage) / 100.0 <= open_today | |
if low_today: | |
# バックテスト実行時に、始値より安値が低いことを確認する。 | |
# これにより、実際にこの戦略をリアルタイムで動かした場合にも、動作可能であることを確認する。 | |
return to_sell_condition and low_today <= low_today | |
else: | |
# リアルタイムで動かした場合は、low_today = None | |
return to_sell_condition | |
def next(self): | |
# 当日の始値を見るためにチートする | |
return | |
def next_open(self): # 当日の始値を見るためにチートする | |
open_today = self._dataopen[0] | |
high_today = self._datahigh[0] | |
low_today = self._datalow[0] | |
close_yesterday = self._dataclose[-1] | |
if self._is_to_buy(open_today, close_yesterday, high_today): | |
size = self._size(unit_price=open_today, is_buy=True) | |
self._info(lambda: 'BUY CREATE @{price:.2f}, #{unit:4d} (open_today, close_yesterday)=({open_today:f}, {close_yesterday:f})'.format( | |
# FIXME: fix unit size | |
price=open_today, unit=-100, open_today=open_today, close_yesterday=close_yesterday) | |
) | |
self._debug(lambda: '(o, h, l, c) = ({o:}, {h:}, {l:}, {c:})'.format( | |
o=self._dataopen[0], h=self._datahigh[0], l=self._datalow[0], c=self._dataclose[0])) | |
self.buy(size=size, price=open_today, exectype=bt.Order.Limit, valid=bt.Order.DAY) | |
elif self._is_to_close(open_today, close_yesterday, low_today): | |
size = self._size(unit_price=open_today, is_buy=False) | |
self._info(lambda: 'CLOSE (SELL) CREATE @{price:.2f}, #all (open_today, close_yesterday)=({open_today:f}, {close_yesterday:f})'.format( | |
price=open_today, open_today=open_today, close_yesterday=close_yesterday) | |
) | |
self.close(size=size, price=open_today, exectype=bt.Order.Limit, valid=bt.Order.DAY) | |
if USE_BOKEH: | |
del PercentageBuySellStrategyWithLogger | |
from percentage_buy_sell_strategy_with_logger import PercentageBuySellStrategyWithLogger | |
"""## main""" | |
import backtrader.analyzers as btanalyzers | |
class BackTest: | |
def __init__(self, strategy: bt.Strategy, cheat_on_open=True): | |
self.cerebro = bt.Cerebro(tz='Asia/Tokyo', cheat_on_open=cheat_on_open) | |
# Set cheat-on-close | |
# self.cerebro.brokerj.set_coc(True) | |
data = KabuPlusJPCSVData( | |
dataname=path_to_csv, | |
fromdate=datetime(2020, 1, 1), | |
todate=datetime(2020, 11, 30), | |
reverse=False) | |
self.cerebro.adddata(data) | |
# Add a strategy | |
IN_DEVELOPMENT = False # このフラグにより、ログレベルを切り替えることで、本番ではWARN以上のみをログに出すようにする。 | |
# フラグの切り替えは、環境変数で行う事が望ましいが今は一旦先送りする。 | |
loglevel = DEBUG if IN_DEVELOPMENT else WARN | |
self.cerebro.broker.setcash(100 * 10000 * 3) # 信用取引なので3倍 | |
self.cerebro.addstrategy(strategy, loglevel) | |
self.cerebro.addanalyzer(btanalyzers.DrawDown, _name='drawdown') | |
self.cerebro.addanalyzer(btanalyzers.AnnualReturn, _name='annualreturn') | |
# self.cerebro.optstrategy(StFetcher, idx=StFetcher.COUNT()) | |
def run(self): | |
initial_cash = self.cerebro.broker.getvalue() | |
thestrats = self.cerebro.run() | |
thestrat = thestrats[0] | |
print('Initial Portfolio Value: {val:,}'.format(val=initial_cash)) | |
print('Final Portfolio Value: {val:,}'.format(val=int(self.cerebro.broker.getvalue()))) | |
print('DrawDown:', thestrat.analyzers.drawdown.get_analysis()) | |
print('Annual Return:', thestrat.analyzers.annualreturn.get_analysis()) | |
save_file = False | |
if USE_BOKEH: | |
if save_file: | |
b = Bokeh(style='bar', plot_mode='single', scheme=Tradimo(), output_mode='save', filename='chart.html') | |
else: | |
b = Bokeh(style='bar', plot_mode='single', scheme=Tradimo()) | |
return self.cerebro.plot(b, iplot=not save_file) | |
else: | |
return self.cerebro.plot(use='agg') | |
# def num2date(val: float, tz=pytz.timezone('Asia/Tokyo')): | |
# return bt.utils.dateintern.num2date(val, tz=tz) | |
# | |
# def p_order(order: bt.Order): | |
# print(str(order)) | |
"""## [Order Execution Logic](https://www.backtrader.com/docu/order-creation-execution/order-creation-execution/) | |
- https://www.backtrader.com/blog/posts/2017-05-01-cheat-on-open/cheat-on-open/ | |
""" | |
if __name__ == '__main__': | |
backtest = BackTest(strategy=PercentageBuySellStrategyWithLogger) | |
chart = backtest.run() | |
from IPython.display import display | |
display(chart[0][0]) | |
backtest.cerebro.strats = [] |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment