A simple martingale strategy for okx market
Created
November 18, 2024 07:44
-
-
Save lbr77/cb030e04d94d56dace2d4038f87c4999 to your computer and use it in GitHub Desktop.
Martingale Strategy on OKX api
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
[trade] | |
coin_pair = "DOGE-USDT-SWAP" | |
start_amount = 1 | |
[log] | |
level = "debug" | |
[proxy] | |
# https = "http://localhost:1080" | |
[telegram] | |
telegram_url = "httsp://api.telegram.org" | |
bot_token = "xxxxx" | |
chat_id = "xxxxx" | |
# [[accounts]] | |
[okx] | |
# api_key = 'xxxxx' | |
# api_secret = 'xxxxx' | |
# passphrase = 'xxxxx' | |
rest_url="https://aws.okx.com" | |
# rest_url="https://www.okx.com" | |
ws_url="wss://wsaws.okx.com:8443/" | |
# ws_url="wss://wspap.okx.com:8443/" | |
test = "0" | |
# test = "1" |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio,signal,toml,json,uuid,logging,aiohttp | |
from decimal import Decimal | |
from okx.websocket.WsPrivateAsync import WsPrivateAsync | |
from okx.PublicData import PublicAPI | |
from okx.Account import AccountAPI | |
strategy_config = [ | |
[Decimal("xxx"),Decmial("xxx")], | |
#... | |
] # 补仓策略 | |
profit_rate = Decimal('0.8')/Decimal('100') # 盈利策略 | |
logging.getLogger("WebSocketFactory").setLevel(logging.CRITICAL) | |
logging.getLogger("WsPrivate").setLevel(logging.CRITICAL) | |
logger = logging.getLogger("okx_trade") | |
# logger.setLevel(logging.INFO) | |
logger.setLevel(logging.DEBUG) | |
COLOR_CODES = { | |
'DEBUG': "\033[36m", # 青色 | |
'INFO': "\033[32m", # 绿色 | |
'WARNING': "\033[33m", # 黄色 | |
'ERROR': "\033[31m", # 红色 | |
'CRITICAL': "\033[1;31m" # 粗红色 | |
} | |
RESET_CODE = "\033[0m" | |
class ColorFormatter(logging.Formatter): | |
def format(self, record): | |
color = COLOR_CODES.get(record.levelname, RESET_CODE) | |
record.msg = f"{color}{record.msg}{RESET_CODE}" | |
return super().format(record) | |
handler = logging.StreamHandler() | |
handler.setFormatter(ColorFormatter("[%(asctime)s][%(levelname)s] %(message)s")) | |
logger.handlers = [handler] | |
logger.propagate = False | |
class OkxTrade: | |
def __init__(self,symbol,api_key,api_secret,passphrase,start_amount,bot_token,chat_id,ws_url,rest_url,test="1"): | |
self.symbol = symbol | |
self.fixed_precision = Decimal('0') | |
self.stoking = Decimal('0') # 持仓 | |
self.cost = Decimal('0') # 成本 | |
self.start_amount = Decimal(start_amount) # 第一次买入金额 | |
self.avgPrice = Decimal('0') | |
self.first_buy_amount = Decimal('0') | |
self.ws_client = WsPrivateAsync(apiKey=api_key, | |
secretKey=api_secret, | |
passphrase=passphrase, | |
url=f"{ws_url}ws/v5/private", | |
useServerTime=False) | |
self.public_api = PublicAPI( | |
api_key=api_key, | |
api_secret_key=api_secret, | |
passphrase=passphrase, | |
domain=f"{rest_url}",flag=test) | |
self.account_api = AccountAPI( | |
api_key=api_key, | |
api_secret_key=api_secret, | |
passphrase=passphrase, | |
domain=f"{rest_url}",flag=test) | |
self.start_event = asyncio.Event() | |
self.stop_event = asyncio.Event() | |
self.bot_token = bot_token | |
self.chat_id = chat_id | |
self.message_queue = asyncio.Queue() | |
self.ct_Val = Decimal(self.public_api.get_instruments(instType="SWAP",instId=self.symbol)["data"][0]["ctVal"]) | |
self.account_api.set_position_mode(posMode="net_mode") | |
logger.info("OkxTrade initialized!") | |
logger.info(f"Ws url: {ws_url}ws/v5/private") | |
logger.info(f"Rest url: {rest_url}") | |
self.start_event.set() | |
def signal_handler(self,message,_): | |
logger.info("Signal handler called!") | |
asyncio.create_task(self.send("Stopping bot...")) | |
asyncio.create_task(self.cancel_potential_buys()) # 取消挂单 | |
asyncio.create_task(self.ws_client.stop()) | |
self.public_api.close() | |
self.stop_event.set() | |
__import__("sys").exit(0) | |
def round_to_precision(self,value:Decimal): | |
return str(value.quantize(self.fixed_precision,rounding="ROUND_UP")) # TODO:误差? | |
async def start(self): | |
signal.signal(signal.SIGINT, self.signal_handler) | |
signal.signal(signal.SIGTERM, self.signal_handler) | |
asyncio.create_task(self.telegram_worker()) | |
await self.ws_client.start() | |
while self.ws_client.websocket is None: | |
await asyncio.sleep(1) | |
logger.debug("Connecting...") | |
await self.ws_client.subscribe([{ | |
"channel": "orders", | |
"instType": "ANY", | |
}], self.callback_wrapper()) | |
await self.first_buy() | |
await self.stop_event.wait() | |
async def place_order(self,args): | |
payload = json.dumps({ | |
"id": str(uuid.uuid4()).replace("-", ""), | |
"op": "order", | |
"args": [args] | |
}) | |
logger.debug(f"place_order: {payload}") | |
await self.ws_client.websocket.send(payload) | |
async def place_batch_orders(self,args): | |
payload = json.dumps({ | |
"id": str(uuid.uuid4()).replace("-", ""), | |
"op": "batch-orders", | |
"args": args | |
}) | |
logger.debug(f"place_batch_orders: {payload}") | |
await self.ws_client.websocket.send(payload) | |
async def cancel_order(self,args): | |
payload = json.dumps({ | |
"id": str(uuid.uuid4()).replace("-", ""), | |
"op": "cancel-order", | |
"args": [args] | |
}) | |
logger.debug(f"cancel_order: {payload}") | |
await self.ws_client.websocket.send(payload) | |
async def cancel_batch_orders(self,args): | |
payload = json.dumps({ | |
"id": str(uuid.uuid4()).replace("-", ""), | |
"op": "batch-cancel-orders", | |
"args": args | |
}) | |
logger.debug(f"cancel_batch_orders: {payload}") | |
await self.ws_client.websocket.send(payload) | |
async def modify_order(self,args): | |
payload = json.dumps({ | |
"id": str(uuid.uuid4()).replace("-", ""), | |
"op": "amend-order", | |
"args": [args] | |
}) | |
logger.debug(f"modify_order: {payload}") | |
await self.ws_client.websocket.send(payload) | |
async def first_buy(self): | |
# self.start_amount = Decimal(self.account_api.get_account_balance()["data"][0]["details"]["eq"]) * (Decimal("1.74") / Decimal("100")) | |
self.currentHash = str(uuid.uuid4()).replace("-", "")[:10] | |
await self.place_order({ | |
"instId": self.symbol, | |
"tdMode": "isolated", | |
"side": "buy", | |
"ordType": "market", | |
"posSide": "net", | |
"ccy": "USDT", | |
# "tgtCcy": "base_ccy", | |
"sz": str(self.start_amount), | |
"clOrdId": "firstBuy"+self.currentHash | |
}) | |
async def set_take_profit(self,mod=False): # 设置止盈 | |
v = "Modify" if mod else "Place" | |
price = self.cost / (self.stoking * self.ct_Val) | |
price = self.round_to_precision(price * (Decimal('1') + profit_rate)) | |
logger.info(f"{v} take profit order at {price} with size {self.stoking} per {self.ct_Val}") | |
if mod: | |
await self.modify_order({ | |
"instId": self.symbol, | |
"newSz": str(self.stoking), | |
"newPx": price, | |
"clOrdId": "takeProfit"+self.currentHash | |
}) | |
else: | |
await self.place_order({ | |
"instId": self.symbol, | |
"tdMode": "isolated", | |
"side": "sell", | |
"ordType": "limit", | |
# "posSide": "net", # 卖出 | |
"ccy": "USDT", | |
"sz": str(self.stoking), | |
"px": price, | |
"clOrdId": "takeProfit"+self.currentHash | |
}) | |
async def set_potential_buys(self): | |
args = [] | |
price = self.cost / (self.stoking * self.ct_Val) | |
amount_now = self.first_buy_amount | |
for price_mult,quant_mult,idx in strategy_config: | |
amount_now = round(amount_now * quant_mult) | |
price_now = price * (Decimal('1') - price_mult / Decimal('100')) | |
price_now_rounded = self.round_to_precision(price_now) | |
logger.info(f"Next buy at {price_now_rounded} with amount {amount_now}") | |
args.append({ | |
"instId": self.symbol, | |
"tdMode": "isolated", | |
"side": "buy", | |
"ordType": "limit", | |
"ccy": "USDT", | |
"posSide": "net", | |
"sz": str(amount_now), | |
"px": price_now_rounded, | |
"clOrdId": f"addPosition{idx}"+self.currentHash | |
}) | |
await self.place_batch_orders(args) | |
async def cancel_potential_buys(self): | |
args = [] | |
for _,_,idx in strategy_config: | |
args.append({ | |
"instId": self.symbol, | |
"clOrdId": f"addPosition{idx}"+self.currentHash | |
}) | |
await self.cancel_batch_orders(args) | |
def callback_wrapper(self): | |
async def callback(message:str): | |
message = json.loads(message) | |
if "code" in message: | |
if int(message["code"])!= 0: | |
logger.error(f"Error: {message}") | |
if "arg" in message: | |
if message["arg"]["channel"] == "tickers": | |
self.avgPrice = Decimal(message["data"][0]["avp"]) | |
if message["arg"]["channel"] == "orders": | |
logger.debug(f"Orders channel recieved: {message}") | |
if "data" not in message: | |
return | |
message = message["data"] | |
if len(message) != 0: | |
message = message[0] | |
if message["state"] == "filled": # 订单完全成交 | |
if "firstBuy" in message["clOrdId"]: # 第一次买入,设置止盈和限价 | |
self.stoking = Decimal(message["accFillSz"]) # 加仓 | |
self.first_buy_amount = Decimal(message["accFillSz"]) | |
# self.cost = Decimal(message["sz"]) * Decimal(message["avgPx"]) # 成本 | |
self.cost = Decimal(message["fillNotionalUsd"]) | |
self.fixed_precision = Decimal("1") / Decimal("10") ** Decimal(len(message["avgPx"].split(".")[1])) | |
logger.debug(f"Fixed precision: {self.fixed_precision}") | |
logger.info(f"Bought at {message['avgPx']}, stoking: {self.stoking}, cost: {self.cost}") | |
await self.send(f"Bought at {message['avgPx']}, stoking: {self.stoking}, cost: {self.cost}") | |
await self.set_take_profit(mod=False) | |
await self.set_potential_buys() | |
elif "takeProfit" in message["clOrdId"]: # 止盈 | |
profit = Decimal(message["fillNotionalUsd"]) - self.cost | |
logger.info(f"Take profit order filled! With profit: {profit}, profit rate: {profit/self.cost}") | |
await self.send(f"Take profit order filled! With profit: {profit}, profit rate: {profit/self.cost}") | |
await self.cancel_potential_buys() | |
await self.first_buy() # 重新买入 | |
elif "addPosition" in message["clOrdId"]: # 加仓! | |
self.stoking += Decimal(message["sz"]) # 加仓 | |
self.cost += Decimal(message["sz"]) * Decimal(message["avgPx"]) * self.ct_Val | |
logger.info(f"Add position at {message['avgPx']}, stoking: {self.stoking}, cost: {self.cost}") | |
await self.send(f"Add position at {message['avgPx']}, stoking: {self.stoking}, cost: {self.cost}") | |
await self.set_take_profit(mod=True) | |
def wrapper(message:str): | |
asyncio.create_task(callback(message)) | |
return wrapper | |
async def _send_telegram_message(self,message): | |
url = f"https://api.telegram.org/bot{self.bot_token}/sendMessage" | |
async with aiohttp.ClientSession() as session: | |
try: | |
async with session.post(url, json={"chat_id": self.chat_id, "text": message}) as response: | |
if response.status != 200: | |
logger.error(f"Failed to send message to telegram: {response.status}") | |
except Exception as e: | |
logger.error(f"Failed to send message to telegram: {e}") | |
async def send(self,message): | |
await self.message_queue.put(message) | |
async def telegram_worker(self): | |
while not self.stop_event.is_set(): | |
message = await self.message_queue.get() | |
await self._send_telegram_message(message) | |
self.message_queue.task_done() | |
async def main(): | |
config = toml.load("config.toml") | |
if config["log"]["level"] == "DEBUG": | |
logger.setLevel(logging.DEBUG) | |
if config["log"]["level"] == "INFO": | |
logger.setLevel(logging.INFO) | |
if config["log"]["level"] == "WARNING": | |
logger.setLevel(logging.WARNING) | |
if config["log"]["level"] == "ERROR": | |
logger.setLevel(logging.ERROR) | |
if config["log"]["level"] == "CRITICAL": | |
logger.setLevel(logging.CRITICAL) | |
okx_trade = OkxTrade( | |
symbol=config["trade"]["coin_pair"], | |
api_key=config["okx"]["api_key"], | |
api_secret=config["okx"]["api_secret"], | |
passphrase=config["okx"]["passphrase"], | |
start_amount=config["trade"]["start_amount"], | |
bot_token=config["telegram"]["bot_token"], | |
chat_id=config["telegram"]["chat_id"], | |
ws_url=config["okx"]["ws_url"], | |
rest_url=config["okx"]["rest_url"], | |
test=config["okx"]["test"] | |
) | |
await okx_trade.start_event.wait() | |
await okx_trade.start() | |
if __name__ == "__main__": | |
asyncio.run(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment