Skip to content

Instantly share code, notes, and snippets.

@dogwood008
Last active March 5, 2021 13:52
Show Gist options
  • Save dogwood008/d26627648b5a670a46f979557a6d1e9d to your computer and use it in GitHub Desktop.
Save dogwood008/d26627648b5a670a46f979557a6d1e9d to your computer and use it in GitHub Desktop.
kabu STATION API を backtraderから使う https://how-to-make-stock-trading-system.dogwood008.com/
Display the source blob
Display the rendered blob
Raw
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
#!/usr/bin/env python
# coding: utf-8
# ## Utilities
# In[1]:
def is_in_jupyter() -> bool:
'''
Determine wheather is the environment Jupyter Notebook
https://blog.amedama.jp/entry/detect-jupyter-env
'''
if 'get_ipython' not in globals():
# Python shell
return False
env_name = get_ipython().__class__.__name__
if env_name == 'TerminalInteractiveShell':
# IPython shell
return False
# Jupyter Notebook
return True
print(is_in_jupyter())
# In[3]:
# https://recruit-tech.co.jp/blog/2018/10/16/jupyter_notebook_tips/
if is_in_jupyter():
def set_stylesheet():
from IPython.display import display, HTML
css = get_ipython().getoutput('wget https://raw.githubusercontent.com/lapis-zero09/jupyter_notebook_tips/master/css/jupyter_notebook/monokai.css -q -O -')
css = "\n".join(css)
display(HTML('<style type="text/css">%s</style>'%css))
set_stylesheet()
# ## Main
# In[ ]:
#!/usr/bin/env python
# -*- coding: utf-8; py-indent-offset:4 -*-
###############################################################################
#
# Copyright (C) 2015-2020 Daniel Rodriguez
# Copyright (C) 2021 dogwood008 (modified)
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
###############################################################################
from __future__ import (absolute_import, division, print_function,
unicode_literals)
import collections
from datetime import datetime, timedelta
import time as _time
import json
import threading
# import oandapy
# import requests # oandapy depdendency
import backtrader as bt
from backtrader.metabase import MetaParams
from backtrader.utils.py3 import queue, with_metaclass
from backtrader.utils import AutoDict
import kabusapi
# In[ ]:
# Extend the exceptions to support extra cases
# '''
# class OandaRequestError(oandapy.OandaError):
# def __init__(self):
# er = dict(code=599, message='Request Error', description='')
# super(self.__class__, self).__init__(er)
#
#
# class OandaStreamError(oandapy.OandaError):
# def __init__(self, content=''):
# er = dict(code=598, message='Failed Streaming', description=content)
# super(self.__class__, self).__init__(er)
#
#
# class OandaTimeFrameError(oandapy.OandaError):
# def __init__(self, content):
# er = dict(code=597, message='Not supported TimeFrame', description='')
# super(self.__class__, self).__init__(er)
#
#
# class OandaNetworkError(oandapy.OandaError):
# def __init__(self):
# er = dict(code=596, message='Network Error', description='')
# super(self.__class__, self).__init__(er)
# '''
# class API(oandapy.API):
# def request(self, endpoint, method='GET', params=None):
# # Overriden to make something sensible out of a
# # request.RequestException rather than simply issuing a print(str(e))
# url = '%s/%s' % (self.api_url, endpoint)
#
# method = method.lower()
# params = params or {}
#
# func = getattr(self.client, method)
#
# request_args = {}
# if method == 'get':
# request_args['params'] = params
# else:
# request_args['data'] = params
#
# # Added the try block
# try:
# response = func(url, **request_args)
# except requests.RequestException as e:
# return OandaRequestError().error_response
#
# content = response.content.decode('utf-8')
# content = json.loads(content)
#
# # error message
# if response.status_code >= 400:
# # changed from raise to return
# return oandapy.OandaError(content).error_response
#
# return content
# In[ ]:
#FIXME
# class Streamer(oandapy.Streamer):
# def __init__(self, q, headers=None, *args, **kwargs):
# # Override to provide headers, which is in the standard API interface
# super(Streamer, self).__init__(*args, **kwargs)
#
# if headers:
# self.client.headers.update(headers)
#
# self.q = q
#
# def run(self, endpoint, params=None):
# # Override to better manage exceptions.
# # Kept as much as possible close to the original
# self.connected = True
#
# params = params or {}
#
# ignore_heartbeat = None
# if 'ignore_heartbeat' in params:
# ignore_heartbeat = params['ignore_heartbeat']
#
# request_args = {}
# request_args['params'] = params
#
# url = '%s/%s' % (self.api_url, endpoint)
#
# while self.connected:
# # Added exception control here
# try:
# response = self.client.get(url, **request_args)
# except requests.RequestException as e:
# self.q.put(OandaRequestError().error_response)
# break
#
# if response.status_code != 200:
# self.on_error(response.content)
# break # added break here
#
# # Changed chunk_size 90 -> None
# try:
# for line in response.iter_lines(chunk_size=None):
# if not self.connected:
# break
#
# if line:
# data = json.loads(line.decode('utf-8'))
# if not (ignore_heartbeat and 'heartbeat' in data):
# self.on_success(data)
#
# except: # socket.error has been seen
# self.q.put(OandaStreamError().error_response)
# break
#
# def on_success(self, data):
# if 'tick' in data:
# self.q.put(data['tick'])
# elif 'transaction' in data:
# self.q.put(data['transaction'])
#
# def on_error(self, data):
# self.disconnect()
# self.q.put(OandaStreamError(data).error_response)
# In[ ]:
from enum import Enum
class KabusAPIEnv(Enum):
DEV = 'dev'
PROD = 'prod'
# In[ ]:
class MetaSingleton(MetaParams):
'''Metaclass to make a metaclassed class a singleton'''
def __init__(cls, name, bases, dct):
super(MetaSingleton, cls).__init__(name, bases, dct)
cls._singleton = None
def __call__(cls, *args, **kwargs):
if cls._singleton is None:
cls._singleton = (
super(MetaSingleton, cls).__call__(*args, **kwargs))
return cls._singleton
class KabuSAPIStore(with_metaclass(MetaSingleton, object)):
'''Singleton class wrapping to control the connections to Kabu STATION API.
Params:
- ``token`` (default:``None``): API access token
- ``account`` (default: ``None``): account id
- ``practice`` (default: ``False``): use the test environment
- ``account_tmout`` (default: ``10.0``): refresh period for account
value/cash refresh
'''
BrokerCls = None # broker class will autoregister
DataCls = None # data class will auto register
params = (
('url', 'localhost'),
('env', KabuSAPIEnv.DEV),
('port', None),
('password', None),
)
# _DTEPOCH = datetime(1970, 1, 1)
# _ENVPRACTICE = 'practice'
# _ENVLIVE = 'live'
@classmethod
def getdata(cls, *args, **kwargs):
'''Returns ``DataCls`` with args, kwargs'''
return cls.DataCls(*args, **kwargs)
@classmethod
def getbroker(cls, *args, **kwargs):
'''Returns broker with *args, **kwargs from registered ``BrokerCls``'''
return cls.BrokerCls(*args, **kwargs)
def __init__(self):
def _getport() -> int:
if self.p.port:
return port
return 18081 if self.p.env == KabuSAPIEnv.DEV else 18080
def _init_kabusapi_client(self) -> kabusapiapi.Context:
url = self.p.url
port = self.p.get('port', _getport())
password = self.p.password
token = kabusapi.Context(url, port, password).token
self.kapi = kabusapi.Context(url, port, token=token)
super(KabuSAPIStore, self).__init__()
self.notifs = collections.deque() # store notifications for cerebro
self._env = None # reference to cerebro for general notifications
self.broker = None # broker instance
self.datas = list() # datas that have registered over start
self._orders = collections.OrderedDict() # map order.ref to oid
self._ordersrev = collections.OrderedDict() # map oid to order.ref
self._transpend = collections.defaultdict(collections.deque)
_init_kabusapi_client()
self._cash = 0.0
self._value = 0.0
self._evt_acct = threading.Event()
def start(self, data=None, broker=None):
# Datas require some processing to kickstart data reception
if data is None and broker is None:
self.cash = None
return
if data is not None:
self._env = data._env
# For datas simulate a queue with None to kickstart co
self.datas.append(data)
if self.broker is not None:
self.broker.data_started(data)
elif broker is not None:
self.broker = broker
self.streaming_events()
self.broker_threads()
def stop(self):
# signal end of thread
if self.broker is not None:
self.q_ordercreate.put(None)
self.q_orderclose.put(None)
self.q_account.put(None)
def put_notification(self, msg, *args, **kwargs):
self.notifs.append((msg, args, kwargs))
def get_notifications(self):
'''Return the pending "store" notifications'''
self.notifs.append(None) # put a mark / threads could still append
return [x for x in iter(self.notifs.popleft, None)]
# Oanda supported granularities
# _GRANULARITIES = {
# (bt.TimeFrame.Seconds, 5): 'S5',
# (bt.TimeFrame.Seconds, 10): 'S10',
# (bt.TimeFrame.Seconds, 15): 'S15',
# (bt.TimeFrame.Seconds, 30): 'S30',
# (bt.TimeFrame.Minutes, 1): 'M1',
# (bt.TimeFrame.Minutes, 2): 'M3',
# (bt.TimeFrame.Minutes, 3): 'M3',
# (bt.TimeFrame.Minutes, 4): 'M4',
# (bt.TimeFrame.Minutes, 5): 'M5',
# (bt.TimeFrame.Minutes, 10): 'M5',
# (bt.TimeFrame.Minutes, 15): 'M5',
# (bt.TimeFrame.Minutes, 30): 'M5',
# (bt.TimeFrame.Minutes, 60): 'H1',
# (bt.TimeFrame.Minutes, 120): 'H2',
# (bt.TimeFrame.Minutes, 180): 'H3',
# (bt.TimeFrame.Minutes, 240): 'H4',
# (bt.TimeFrame.Minutes, 360): 'H6',
# (bt.TimeFrame.Minutes, 480): 'H8',
# (bt.TimeFrame.Days, 1): 'D',
# (bt.TimeFrame.Weeks, 1): 'W',
# (bt.TimeFrame.Months, 1): 'M',
# }
def get_positions(self):
try:
positions = self.oapi.get_positions(self.p.account)
except (oandapy.OandaError, OandaRequestError,):
return None
poslist = positions.get('positions', [])
return poslist
#def get_granularity(self, timeframe, compression):
# return self._GRANULARITIES.get((timeframe, compression), None)
def get_instrument(self, dataname):
try:
insts = self.oapi.get_instruments(self.p.account,
instruments=dataname)
except (oandapy.OandaError, OandaRequestError,):
return None
i = insts.get('instruments', [{}])
return i[0] or None
def streaming_events(self, tmout=None):
q = queue.Queue()
kwargs = {'q': q, 'tmout': tmout}
t = threading.Thread(target=self._t_streaming_listener, kwargs=kwargs)
t.daemon = True
t.start()
t = threading.Thread(target=self._t_streaming_events, kwargs=kwargs) # FIXME: _t_streaming_events
t.daemon = True
t.start()
return q
def _t_streaming_listener(self, q, tmout=None):
while True:
trans = q.get()
self._transaction(trans)
# FIXME: Streamer
def _t_streaming_events(self, q, tmout=None):
if tmout is not None:
_time.sleep(tmout)
# FIXME: oandapy.Streamer
# streamer = Streamer(q,
# environment=self._oenv,
# access_token=self.p.token,
# headers={'X-Accept-Datetime-Format': 'UNIX'})
#
# streamer.events(ignore_heartbeat=False)
def candles(self, dataname, dtbegin, dtend, timeframe, compression,
candleFormat, includeFirst):
kwargs = locals().copy()
kwargs.pop('self')
kwargs['q'] = q = queue.Queue()
t = threading.Thread(target=self._t_candles, kwargs=kwargs) # FIXME: _t_candles
t.daemon = True
t.start()
return q
def _t_candles(self, dataname, dtbegin, dtend, timeframe, compression,
candleFormat, includeFirst, q):
# FIXME: granularity = self.get_granularity(timeframe, compression)
if granularity is None:
e = OandaTimeFrameError()
q.put(e.error_response)
return
dtkwargs = {}
if dtbegin is not None:
dtkwargs['start'] = int((dtbegin - self._DTEPOCH).total_seconds()) # FIXME: _DTEPOCH
if dtend is not None:
dtkwargs['end'] = int((dtend - self._DTEPOCH).total_seconds()) # FIXME: _DTEPOCH
try:
# FIXME: granularity
# response = self.oapi.get_history(instrument=dataname,
# granularity=granularity,
# candleFormat=candleFormat,
# **dtkwargs)
except oandapy.OandaError as e:
q.put(e.error_response)
q.put(None)
return
# FIXME
for candle in response.get('candles', []):
q.put(candle)
q.put({}) # end of transmission
def streaming_prices(self, dataname, tmout=None):
q = queue.Queue()
kwargs = {'q': q, 'dataname': dataname, 'tmout': tmout}
t = threading.Thread(target=self._t_streaming_prices, kwargs=kwargs) # FIXME: _t_streaming_prices
t.daemon = True
t.start()
return q
# FIXME: Streamer
def _t_streaming_prices(self, dataname, q, tmout):
if tmout is not None:
_time.sleep(tmout)
# FIXME: Streamer
# FIXME streamer = Streamer(q, environment=self._oenv,
# FIXME access_token=self.p.token,
# FIXME headers={'X-Accept-Datetime-Format': 'UNIX'})
# FIXME streamer.rates(self.p.account, instruments=dataname)
def get_cash(self):
return self._cash
def get_value(self):
return self._value
_ORDEREXECS = {
bt.Order.Market: 'market',
bt.Order.Limit: 'limit',
bt.Order.Stop: 'stop',
bt.Order.StopLimit: 'stop',
}
def broker_threads(self):
self.q_account = queue.Queue()
self.q_account.put(True) # force an immediate update
t = threading.Thread(target=self._t_account)
t.daemon = True
t.start()
self.q_ordercreate = queue.Queue()
t = threading.Thread(target=self._t_order_create)
t.daemon = True
t.start()
self.q_orderclose = queue.Queue()
t = threading.Thread(target=self._t_order_cancel)
t.daemon = True
t.start()
# Wait once for the values to be set
self._evt_acct.wait(self.p.account_tmout)
def _t_account(self):
while True:
try:
msg = self.q_account.get(timeout=self.p.account_tmout)
if msg is None:
break # end of thread
except queue.Empty: # tmout -> time to refresh
pass
try:
accinfo = self.oapi.get_account(self.p.account)
except Exception as e:
self.put_notification(e)
continue
try:
self._cash = accinfo['marginAvail']
self._value = accinfo['balance']
except KeyError:
pass
self._evt_acct.set()
def order_create(self, order, stopside=None, takeside=None, **kwargs):
okwargs = dict()
okwargs['instrument'] = order.data._dataname
okwargs['units'] = abs(order.created.size)
okwargs['side'] = 'buy' if order.isbuy() else 'sell'
okwargs['type'] = self._ORDEREXECS[order.exectype]
if order.exectype != bt.Order.Market:
okwargs['price'] = order.created.price
if order.valid is None:
# 1 year and datetime.max fail ... 1 month works
valid = datetime.utcnow() + timedelta(days=30)
else:
valid = order.data.num2date(order.valid)
# To timestamp with seconds precision
okwargs['expiry'] = int((valid - self._DTEPOCH).total_seconds()) # FIXME: _DTEPOCH
if order.exectype == bt.Order.StopLimit:
okwargs['lowerBound'] = order.created.pricelimit
okwargs['upperBound'] = order.created.pricelimit
if order.exectype == bt.Order.StopTrail:
okwargs['trailingStop'] = order.trailamount
if stopside is not None:
okwargs['stopLoss'] = stopside.price
if takeside is not None:
okwargs['takeProfit'] = takeside.price
okwargs.update(**kwargs) # anything from the user
self.q_ordercreate.put((order.ref, okwargs,))
return order
_OIDSINGLE = ['orderOpened', 'tradeOpened', 'tradeReduced']
_OIDMULTIPLE = ['tradesClosed']
def _t_order_create(self):
while True:
msg = self.q_ordercreate.get()
if msg is None:
break
oref, okwargs = msg
try:
o = self.oapi.create_order(self.p.account, **okwargs)
except Exception as e:
self.put_notification(e)
self.broker._reject(oref)
return
# Ids are delivered in different fields and all must be fetched to
# match them (as executions) to the order generated here
oids = list()
for oidfield in self._OIDSINGLE:
if oidfield in o and 'id' in o[oidfield]:
oids.append(o[oidfield]['id'])
for oidfield in self._OIDMULTIPLE:
if oidfield in o:
for suboidfield in o[oidfield]:
oids.append(suboidfield['id'])
if not oids:
self.broker._reject(oref)
return
self._orders[oref] = oids[0]
self.broker._submit(oref)
if okwargs['type'] == 'market':
self.broker._accept(oref) # taken immediately
for oid in oids:
self._ordersrev[oid] = oref # maps ids to backtrader order
# An transaction may have happened and was stored
tpending = self._transpend[oid]
tpending.append(None) # eom marker
while True:
trans = tpending.popleft()
if trans is None:
break
self._process_transaction(oid, trans)
def order_cancel(self, order):
self.q_orderclose.put(order.ref)
return order
def _t_order_cancel(self):
while True:
oref = self.q_orderclose.get()
if oref is None:
break
oid = self._orders.get(oref, None)
if oid is None:
continue # the order is no longer there
try:
o = self.oapi.close_order(self.p.account, oid)
except Exception as e:
continue # not cancelled - FIXME: notify
self.broker._cancel(oref)
_X_ORDER_CREATE = ('STOP_ORDER_CREATE',
'LIMIT_ORDER_CREATE', 'MARKET_IF_TOUCHED_ORDER_CREATE',)
def _transaction(self, trans):
# Invoked from Streaming Events. May actually receive an event for an
# oid which has not yet been returned after creating an order. Hence
# store if not yet seen, else forward to processer
ttype = trans['type']
if ttype == 'MARKET_ORDER_CREATE':
try:
oid = trans['tradeReduced']['id']
except KeyError:
try:
oid = trans['tradeOpened']['id']
except KeyError:
return # cannot do anything else
elif ttype in self._X_ORDER_CREATE:
oid = trans['id']
elif ttype == 'ORDER_FILLED':
oid = trans['orderId']
elif ttype == 'ORDER_CANCEL':
oid = trans['orderId']
elif ttype == 'TRADE_CLOSE':
oid = trans['id']
pid = trans['tradeId']
if pid in self._orders and False: # Know nothing about trade
return # can do nothing
# Skip above - at the moment do nothing
# Received directly from an event in the WebGUI for example which
# closes an existing position related to order with id -> pid
# COULD BE DONE: Generate a fake counter order to gracefully
# close the existing position
msg = ('Received TRADE_CLOSE for unknown order, possibly generated'
' over a different client or GUI')
self.put_notification(msg, trans)
return
else: # Go aways gracefully
try:
oid = trans['id']
except KeyError:
oid = 'None'
msg = 'Received {} with oid {}. Unknown situation'
msg = msg.format(ttype, oid)
self.put_notification(msg, trans)
return
try:
oref = self._ordersrev[oid]
self._process_transaction(oid, trans)
except KeyError: # not yet seen, keep as pending
self._transpend[oid].append(trans)
_X_ORDER_FILLED = ('MARKET_ORDER_CREATE',
'ORDER_FILLED', 'TAKE_PROFIT_FILLED',
'STOP_LOSS_FILLED', 'TRAILING_STOP_FILLED',)
def _process_transaction(self, oid, trans):
try:
oref = self._ordersrev.pop(oid)
except KeyError:
return
ttype = trans['type']
if ttype in self._X_ORDER_FILLED:
size = trans['units']
if trans['side'] == 'sell':
size = -size
price = trans['price']
self.broker._fill(oref, size, price, ttype=ttype)
elif ttype in self._X_ORDER_CREATE:
self.broker._accept(oref)
self._ordersrev[oid] = oref
elif ttype in 'ORDER_CANCEL':
reason = trans['reason']
if reason == 'ORDER_FILLED':
pass # individual execs have done the job
elif reason == 'TIME_IN_FORCE_EXPIRED':
self.broker._expire(oref)
elif reason == 'CLIENT_REQUEST':
self.broker._cancel(oref)
else: # default action ... if nothing else
self.broker._reject(oref)
# In[ ]:
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment