Skip to content

Instantly share code, notes, and snippets.

@lbr77
Created November 18, 2024 07:44
Show Gist options
  • Save lbr77/cb030e04d94d56dace2d4038f87c4999 to your computer and use it in GitHub Desktop.
Save lbr77/cb030e04d94d56dace2d4038f87c4999 to your computer and use it in GitHub Desktop.
Martingale Strategy on OKX api

A simple martingale strategy for okx market

[trade]
coin_pair = "DOGE-USDT-SWAP"
start_amount = 1
[log]
level = "debug"
[proxy]
# https = "http://localhost:1080"
[telegram]
telegram_url = "httsp://api.telegram.org"
bot_token = "xxxxx"
chat_id = "xxxxx"
# [[accounts]]
[okx]
# api_key = 'xxxxx'
# api_secret = 'xxxxx'
# passphrase = 'xxxxx'
rest_url="https://aws.okx.com"
# rest_url="https://www.okx.com"
ws_url="wss://wsaws.okx.com:8443/"
# ws_url="wss://wspap.okx.com:8443/"
test = "0"
# test = "1"
import asyncio,signal,toml,json,uuid,logging,aiohttp
from decimal import Decimal
from okx.websocket.WsPrivateAsync import WsPrivateAsync
from okx.PublicData import PublicAPI
from okx.Account import AccountAPI
strategy_config = [
[Decimal("xxx"),Decmial("xxx")],
#...
] # 补仓策略
profit_rate = Decimal('0.8')/Decimal('100') # 盈利策略
logging.getLogger("WebSocketFactory").setLevel(logging.CRITICAL)
logging.getLogger("WsPrivate").setLevel(logging.CRITICAL)
logger = logging.getLogger("okx_trade")
# logger.setLevel(logging.INFO)
logger.setLevel(logging.DEBUG)
COLOR_CODES = {
'DEBUG': "\033[36m", # 青色
'INFO': "\033[32m", # 绿色
'WARNING': "\033[33m", # 黄色
'ERROR': "\033[31m", # 红色
'CRITICAL': "\033[1;31m" # 粗红色
}
RESET_CODE = "\033[0m"
class ColorFormatter(logging.Formatter):
def format(self, record):
color = COLOR_CODES.get(record.levelname, RESET_CODE)
record.msg = f"{color}{record.msg}{RESET_CODE}"
return super().format(record)
handler = logging.StreamHandler()
handler.setFormatter(ColorFormatter("[%(asctime)s][%(levelname)s] %(message)s"))
logger.handlers = [handler]
logger.propagate = False
class OkxTrade:
def __init__(self,symbol,api_key,api_secret,passphrase,start_amount,bot_token,chat_id,ws_url,rest_url,test="1"):
self.symbol = symbol
self.fixed_precision = Decimal('0')
self.stoking = Decimal('0') # 持仓
self.cost = Decimal('0') # 成本
self.start_amount = Decimal(start_amount) # 第一次买入金额
self.avgPrice = Decimal('0')
self.first_buy_amount = Decimal('0')
self.ws_client = WsPrivateAsync(apiKey=api_key,
secretKey=api_secret,
passphrase=passphrase,
url=f"{ws_url}ws/v5/private",
useServerTime=False)
self.public_api = PublicAPI(
api_key=api_key,
api_secret_key=api_secret,
passphrase=passphrase,
domain=f"{rest_url}",flag=test)
self.account_api = AccountAPI(
api_key=api_key,
api_secret_key=api_secret,
passphrase=passphrase,
domain=f"{rest_url}",flag=test)
self.start_event = asyncio.Event()
self.stop_event = asyncio.Event()
self.bot_token = bot_token
self.chat_id = chat_id
self.message_queue = asyncio.Queue()
self.ct_Val = Decimal(self.public_api.get_instruments(instType="SWAP",instId=self.symbol)["data"][0]["ctVal"])
self.account_api.set_position_mode(posMode="net_mode")
logger.info("OkxTrade initialized!")
logger.info(f"Ws url: {ws_url}ws/v5/private")
logger.info(f"Rest url: {rest_url}")
self.start_event.set()
def signal_handler(self,message,_):
logger.info("Signal handler called!")
asyncio.create_task(self.send("Stopping bot..."))
asyncio.create_task(self.cancel_potential_buys()) # 取消挂单
asyncio.create_task(self.ws_client.stop())
self.public_api.close()
self.stop_event.set()
__import__("sys").exit(0)
def round_to_precision(self,value:Decimal):
return str(value.quantize(self.fixed_precision,rounding="ROUND_UP")) # TODO:误差?
async def start(self):
signal.signal(signal.SIGINT, self.signal_handler)
signal.signal(signal.SIGTERM, self.signal_handler)
asyncio.create_task(self.telegram_worker())
await self.ws_client.start()
while self.ws_client.websocket is None:
await asyncio.sleep(1)
logger.debug("Connecting...")
await self.ws_client.subscribe([{
"channel": "orders",
"instType": "ANY",
}], self.callback_wrapper())
await self.first_buy()
await self.stop_event.wait()
async def place_order(self,args):
payload = json.dumps({
"id": str(uuid.uuid4()).replace("-", ""),
"op": "order",
"args": [args]
})
logger.debug(f"place_order: {payload}")
await self.ws_client.websocket.send(payload)
async def place_batch_orders(self,args):
payload = json.dumps({
"id": str(uuid.uuid4()).replace("-", ""),
"op": "batch-orders",
"args": args
})
logger.debug(f"place_batch_orders: {payload}")
await self.ws_client.websocket.send(payload)
async def cancel_order(self,args):
payload = json.dumps({
"id": str(uuid.uuid4()).replace("-", ""),
"op": "cancel-order",
"args": [args]
})
logger.debug(f"cancel_order: {payload}")
await self.ws_client.websocket.send(payload)
async def cancel_batch_orders(self,args):
payload = json.dumps({
"id": str(uuid.uuid4()).replace("-", ""),
"op": "batch-cancel-orders",
"args": args
})
logger.debug(f"cancel_batch_orders: {payload}")
await self.ws_client.websocket.send(payload)
async def modify_order(self,args):
payload = json.dumps({
"id": str(uuid.uuid4()).replace("-", ""),
"op": "amend-order",
"args": [args]
})
logger.debug(f"modify_order: {payload}")
await self.ws_client.websocket.send(payload)
async def first_buy(self):
# self.start_amount = Decimal(self.account_api.get_account_balance()["data"][0]["details"]["eq"]) * (Decimal("1.74") / Decimal("100"))
self.currentHash = str(uuid.uuid4()).replace("-", "")[:10]
await self.place_order({
"instId": self.symbol,
"tdMode": "isolated",
"side": "buy",
"ordType": "market",
"posSide": "net",
"ccy": "USDT",
# "tgtCcy": "base_ccy",
"sz": str(self.start_amount),
"clOrdId": "firstBuy"+self.currentHash
})
async def set_take_profit(self,mod=False): # 设置止盈
v = "Modify" if mod else "Place"
price = self.cost / (self.stoking * self.ct_Val)
price = self.round_to_precision(price * (Decimal('1') + profit_rate))
logger.info(f"{v} take profit order at {price} with size {self.stoking} per {self.ct_Val}")
if mod:
await self.modify_order({
"instId": self.symbol,
"newSz": str(self.stoking),
"newPx": price,
"clOrdId": "takeProfit"+self.currentHash
})
else:
await self.place_order({
"instId": self.symbol,
"tdMode": "isolated",
"side": "sell",
"ordType": "limit",
# "posSide": "net", # 卖出
"ccy": "USDT",
"sz": str(self.stoking),
"px": price,
"clOrdId": "takeProfit"+self.currentHash
})
async def set_potential_buys(self):
args = []
price = self.cost / (self.stoking * self.ct_Val)
amount_now = self.first_buy_amount
for price_mult,quant_mult,idx in strategy_config:
amount_now = round(amount_now * quant_mult)
price_now = price * (Decimal('1') - price_mult / Decimal('100'))
price_now_rounded = self.round_to_precision(price_now)
logger.info(f"Next buy at {price_now_rounded} with amount {amount_now}")
args.append({
"instId": self.symbol,
"tdMode": "isolated",
"side": "buy",
"ordType": "limit",
"ccy": "USDT",
"posSide": "net",
"sz": str(amount_now),
"px": price_now_rounded,
"clOrdId": f"addPosition{idx}"+self.currentHash
})
await self.place_batch_orders(args)
async def cancel_potential_buys(self):
args = []
for _,_,idx in strategy_config:
args.append({
"instId": self.symbol,
"clOrdId": f"addPosition{idx}"+self.currentHash
})
await self.cancel_batch_orders(args)
def callback_wrapper(self):
async def callback(message:str):
message = json.loads(message)
if "code" in message:
if int(message["code"])!= 0:
logger.error(f"Error: {message}")
if "arg" in message:
if message["arg"]["channel"] == "tickers":
self.avgPrice = Decimal(message["data"][0]["avp"])
if message["arg"]["channel"] == "orders":
logger.debug(f"Orders channel recieved: {message}")
if "data" not in message:
return
message = message["data"]
if len(message) != 0:
message = message[0]
if message["state"] == "filled": # 订单完全成交
if "firstBuy" in message["clOrdId"]: # 第一次买入,设置止盈和限价
self.stoking = Decimal(message["accFillSz"]) # 加仓
self.first_buy_amount = Decimal(message["accFillSz"])
# self.cost = Decimal(message["sz"]) * Decimal(message["avgPx"]) # 成本
self.cost = Decimal(message["fillNotionalUsd"])
self.fixed_precision = Decimal("1") / Decimal("10") ** Decimal(len(message["avgPx"].split(".")[1]))
logger.debug(f"Fixed precision: {self.fixed_precision}")
logger.info(f"Bought at {message['avgPx']}, stoking: {self.stoking}, cost: {self.cost}")
await self.send(f"Bought at {message['avgPx']}, stoking: {self.stoking}, cost: {self.cost}")
await self.set_take_profit(mod=False)
await self.set_potential_buys()
elif "takeProfit" in message["clOrdId"]: # 止盈
profit = Decimal(message["fillNotionalUsd"]) - self.cost
logger.info(f"Take profit order filled! With profit: {profit}, profit rate: {profit/self.cost}")
await self.send(f"Take profit order filled! With profit: {profit}, profit rate: {profit/self.cost}")
await self.cancel_potential_buys()
await self.first_buy() # 重新买入
elif "addPosition" in message["clOrdId"]: # 加仓!
self.stoking += Decimal(message["sz"]) # 加仓
self.cost += Decimal(message["sz"]) * Decimal(message["avgPx"]) * self.ct_Val
logger.info(f"Add position at {message['avgPx']}, stoking: {self.stoking}, cost: {self.cost}")
await self.send(f"Add position at {message['avgPx']}, stoking: {self.stoking}, cost: {self.cost}")
await self.set_take_profit(mod=True)
def wrapper(message:str):
asyncio.create_task(callback(message))
return wrapper
async def _send_telegram_message(self,message):
url = f"https://api.telegram.org/bot{self.bot_token}/sendMessage"
async with aiohttp.ClientSession() as session:
try:
async with session.post(url, json={"chat_id": self.chat_id, "text": message}) as response:
if response.status != 200:
logger.error(f"Failed to send message to telegram: {response.status}")
except Exception as e:
logger.error(f"Failed to send message to telegram: {e}")
async def send(self,message):
await self.message_queue.put(message)
async def telegram_worker(self):
while not self.stop_event.is_set():
message = await self.message_queue.get()
await self._send_telegram_message(message)
self.message_queue.task_done()
async def main():
config = toml.load("config.toml")
if config["log"]["level"] == "DEBUG":
logger.setLevel(logging.DEBUG)
if config["log"]["level"] == "INFO":
logger.setLevel(logging.INFO)
if config["log"]["level"] == "WARNING":
logger.setLevel(logging.WARNING)
if config["log"]["level"] == "ERROR":
logger.setLevel(logging.ERROR)
if config["log"]["level"] == "CRITICAL":
logger.setLevel(logging.CRITICAL)
okx_trade = OkxTrade(
symbol=config["trade"]["coin_pair"],
api_key=config["okx"]["api_key"],
api_secret=config["okx"]["api_secret"],
passphrase=config["okx"]["passphrase"],
start_amount=config["trade"]["start_amount"],
bot_token=config["telegram"]["bot_token"],
chat_id=config["telegram"]["chat_id"],
ws_url=config["okx"]["ws_url"],
rest_url=config["okx"]["rest_url"],
test=config["okx"]["test"]
)
await okx_trade.start_event.wait()
await okx_trade.start()
if __name__ == "__main__":
asyncio.run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment