Skip to content

Instantly share code, notes, and snippets.

@SnowyPainter
Created February 28, 2024 16:21
Show Gist options
  • Save SnowyPainter/0e37f4570f88a3740f4f08b6fb82d95f to your computer and use it in GitHub Desktop.
Save SnowyPainter/0e37f4570f88a3740f4f08b6fb82d95f to your computer and use it in GitHub Desktop.
import mojito
import websockets
import json
import requests
import os
import asyncio
import time
from Crypto.Cipher import AES
from Crypto.Util.Padding import unpad
from base64 import b64decode
key_bytes = 32
# AES256 DECODE
def aes_cbc_base64_dec(key, iv, cipher_text):
"""
:param key: str type AES256 secret key value
:param iv: str type AES256 Initialize Vector
:param cipher_text: Base64 encoded AES256 str
:return: Base64-AES256 decodec str
"""
cipher = AES.new(key.encode('utf-8'), AES.MODE_CBC, iv.encode('utf-8'))
return bytes.decode(unpad(cipher.decrypt(b64decode(cipher_text)), AES.block_size))
# 웹소켓 접속키 발급
def get_approval(key, secret):
url = 'https://openapivts.koreainvestment.com:29443' # 모의투자계좌
#url = 'https://openapi.koreainvestment.com:9443' # 실전투자계좌
headers = {"content-type": "application/json"}
body = {"grant_type": "client_credentials",
"appkey": key,
"secretkey": secret}
PATH = "oauth2/Approval"
URL = f"{url}/{PATH}"
res = requests.post(URL, headers=headers, data=json.dumps(body))
approval_key = res.json()["approval_key"]
return approval_key
KOREA_ASKINGPRICE_ID = 'H0STASP0'
KOREA_TRANSCATION_NOTICE_ID = 'H0STCNI0' #모의투자 H0STCNI9, tr_key = HTS ID
NYSE_ASKINGPRICE_ID = 'HDFSASP0'
NYSE_TRANSCATION_NOTICE_ID = 'H0GSCNI0' # tr_key = HTS ID
def get_tr_key_by_symbol(symbol):
return f"DNAS{symbol}"
def create_websocket_data(approval_key, tr_id, tr_key):
return '{"header" : {"approval_key": "%s", "custtype" : "P", "tr_type" : "1", "content-type" : "utf-8" }, "body" : { "input":{ "tr_id" : "%s", "tr_key" : "%s"} } }'%(approval_key,tr_id, tr_key)
async def connect(appkey, appsecret, hts_id):
actions = [[KOREA_ASKINGPRICE_ID, '005930'], [KOREA_TRANSCATION_NOTICE_ID, hts_id]
[NYSE_ASKINGPRICE_ID, get_tr_key_by_symbol('NVDA')], [NYSE_TRANSCATION_NOTICE_ID, hts_id]]
approval_key = get_approval(appkey, appsecret)
send_data_list = []
for action in actions:
send_data_list.append(create_websocket_data(approval_key, action[0], action[1]))
#url = 'ws://ops.koreainvestment.com:21000' # 실전투자계좌
url = 'ws://ops.koreainvestment.com:31000' # 모의투자계좌
async with websockets.connect(url, ping_interval=None) as websocket:
for data in send_data_list:
await websocket.send(data)
await asyncio.sleep(0.5)
while True:
try:
data = await websocket.recv()
if data[0] == '0':
recvstr = data.split('|')
trid0 = recvstr[1]
if trid0 == KOREA_ASKINGPRICE_ID:
print("#### 주식호가 ####")
stockhoka_domestic(recvstr[3])
await asyncio.sleep(0.2)
elif trid0 == NYSE_ASKINGPRICE_ID:
print("#### 해외(미국)주식호가 ####")
stockhoka_overseas(recvstr[3])
await asyncio.sleep(0.2)
elif data[0] == '1':
recvstr = data.split('|')
trid0 = recvstr[1]
# 주실체결 통보 처리
if trid0 == KOREA_TRANSCATION_NOTICE_ID or trid0 == "H0STCNI9":
stocksigningnotice_domestic(recvstr[3], aes_key, aes_iv)
await asyncio.sleep(0.2)
# 해외주실체결 통보 처리
elif trid0 == NYSE_TRANSCATION_NOTICE_ID:
stocksigningnotice_overseas(recvstr[3], aes_key, aes_iv)
await asyncio.sleep(0.2)
else:
jsonObject = json.loads(data)
trid = jsonObject["header"]["tr_id"]
if trid != "PINGPONG":
rt_cd = jsonObject["body"]["rt_cd"]
if rt_cd == '1': # 에러일 경우 처리
if jsonObject["body"]["msg1"] != 'ALREADY IN SUBSCRIBE':
print("### ERROR RETURN CODE [ %s ][ %s ] MSG [ %s ]" % (jsonObject["header"]["tr_key"], rt_cd, jsonObject["body"]["msg1"]))
break
elif rt_cd == '0': # 정상일 경우 처리
print("### RETURN CODE [ %s ][ %s ] MSG [ %s ]" % (jsonObject["header"]["tr_key"], rt_cd, jsonObject["body"]["msg1"]))
# 국내주식
if trid == KOREA_TRANSCATION_NOTICE_ID or trid == "H0STCNI9":
aes_key = jsonObject["body"]["output"]["key"]
aes_iv = jsonObject["body"]["output"]["iv"]
print("### TRID [%s] KEY[%s] IV[%s]" % (trid, aes_key, aes_iv))
# 해외주식
elif trid == NYSE_TRANSCATION_NOTICE_ID:
aes_key = jsonObject["body"]["output"]["key"]
aes_iv = jsonObject["body"]["output"]["iv"]
print("### TRID [%s] KEY[%s] IV[%s]" % (trid, aes_key, aes_iv))
elif trid == "PINGPONG":
print("### RECV [PINGPONG] [%s]" % (data))
await websocket.pong(data)
print("### SEND [PINGPONG] [%s]" % (data))
except websockets.ConnectionClosed:
continue
# 주식호가 출력라이브러리
def stockhoka_domestic(data):
""" 넘겨받는데이터가 정상인지 확인
print("stockhoka[%s]"%(data))
"""
recvvalue = data.split('^')
print("유가증권 단축 종목코드 [" + recvvalue[0] + "]")
print("영업시간 [" + recvvalue[1] + "]" + "시간구분코드 [" + recvvalue[2] + "]")
print("======================================")
print("매도호가10 [%s] 잔량10 [%s]" % (recvvalue[12], recvvalue[32]))
print("매도호가09 [%s] 잔량09 [%s]" % (recvvalue[11], recvvalue[31]))
print("매도호가08 [%s] 잔량08 [%s]" % (recvvalue[10], recvvalue[30]))
print("매도호가07 [%s] 잔량07 [%s]" % (recvvalue[9], recvvalue[29]))
print("매도호가06 [%s] 잔량06 [%s]" % (recvvalue[8], recvvalue[28]))
print("매도호가05 [%s] 잔량05 [%s]" % (recvvalue[7], recvvalue[27]))
print("매도호가04 [%s] 잔량04 [%s]" % (recvvalue[6], recvvalue[26]))
print("매도호가03 [%s] 잔량03 [%s]" % (recvvalue[5], recvvalue[25]))
print("매도호가02 [%s] 잔량02 [%s]" % (recvvalue[4], recvvalue[24]))
print("매도호가01 [%s] 잔량01 [%s]" % (recvvalue[3], recvvalue[23]))
print("--------------------------------------")
print("매수호가01 [%s] 잔량01 [%s]" % (recvvalue[13], recvvalue[33]))
print("매수호가02 [%s] 잔량02 [%s]" % (recvvalue[14], recvvalue[34]))
print("매수호가03 [%s] 잔량03 [%s]" % (recvvalue[15], recvvalue[35]))
print("매수호가04 [%s] 잔량04 [%s]" % (recvvalue[16], recvvalue[36]))
print("매수호가05 [%s] 잔량05 [%s]" % (recvvalue[17], recvvalue[37]))
print("매수호가06 [%s] 잔량06 [%s]" % (recvvalue[18], recvvalue[38]))
print("매수호가07 [%s] 잔량07 [%s]" % (recvvalue[19], recvvalue[39]))
print("매수호가08 [%s] 잔량08 [%s]" % (recvvalue[20], recvvalue[40]))
print("매수호가09 [%s] 잔량09 [%s]" % (recvvalue[21], recvvalue[41]))
print("매수호가10 [%s] 잔량10 [%s]" % (recvvalue[22], recvvalue[42]))
print("======================================")
print("총매도호가 잔량 [%s]" % (recvvalue[43]))
print("총매도호가 잔량 증감 [%s]" % (recvvalue[54]))
print("총매수호가 잔량 [%s]" % (recvvalue[44]))
print("총매수호가 잔량 증감 [%s]" % (recvvalue[55]))
print("시간외 총매도호가 잔량 [%s]" % (recvvalue[45]))
print("시간외 총매수호가 증감 [%s]" % (recvvalue[46]))
print("시간외 총매도호가 잔량 [%s]" % (recvvalue[56]))
print("시간외 총매수호가 증감 [%s]" % (recvvalue[57]))
print("예상 체결가 [%s]" % (recvvalue[47]))
print("예상 체결량 [%s]" % (recvvalue[48]))
print("예상 거래량 [%s]" % (recvvalue[49]))
print("예상체결 대비 [%s]" % (recvvalue[50]))
print("부호 [%s]" % (recvvalue[51]))
print("예상체결 전일대비율 [%s]" % (recvvalue[52]))
print("누적거래량 [%s]" % (recvvalue[53]))
print("주식매매 구분코드 [%s]" % (recvvalue[58]))
# 해외주식호가 출력라이브러리
def stockhoka_overseas(data):
""" 넘겨받는데이터가 정상인지 확인
print("stockhoka[%s]"%(data))
"""
recvvalue = data.split('^') # 수신데이터를 split '^'
print("실시간종목코드 [" + recvvalue[0] + "]" + ", 종목코드 [" + recvvalue[1] + "]")
print("소숫점자리수 [" + recvvalue[2] + "]")
print("현지일자 [" + recvvalue[3] + "]" + ", 현지시간 [" + recvvalue[4] + "]")
print("한국일자 [" + recvvalue[5] + "]" + ", 한국시간 [" + recvvalue[6] + "]")
print("======================================")
print("매수총 잔량 [%s]" % (recvvalue[7]))
print("매수총잔량대비 [%s]" % (recvvalue[9]))
print("매도총 잔량 [%s]" % (recvvalue[8]))
print("매도총잔략대비 [%s]" % (recvvalue[10]))
print("매수호가 [%s]" % (recvvalue[11]))
print("매도호가 [%s]" % (recvvalue[12]))
print("매수잔량 [%s]" % (recvvalue[13]))
print("매도잔량 [%s]" % (recvvalue[14]))
print("매수잔량대비 [%s]" % (recvvalue[15]))
print("매도잔량대비 [%s]" % (recvvalue[16]))
# 국내주식체결통보 출력라이브러리
def stocksigningnotice_domestic(data, key, iv):
# AES256 처리 단계
aes_dec_str = aes_cbc_base64_dec(key, iv, data)
pValue = aes_dec_str.split('^')
if pValue[13] == '2': # 체결통보
print("#### 국내주식 체결 통보 ####")
menulist = "고객ID|계좌번호|주문번호|원주문번호|매도매수구분|정정구분|주문종류|주문조건|주식단축종목코드|체결수량|체결단가|주식체결시간|거부여부|체결여부|접수여부|지점번호|주문수량|계좌명|체결종목명|신용구분|신용대출일자|체결종목명40|주문가격"
menustr1 = menulist.split('|')
else:
print("#### 국내주식 주문·정정·취소·거부 접수 통보 ####")
menulist = "고객ID|계좌번호|주문번호|원주문번호|매도매수구분|정정구분|주문종류|주문조건|주식단축종목코드|주문수량|주문가격|주식체결시간|거부여부|체결여부|접수여부|지점번호|주문수량|계좌명|주문종목명|신용구분|신용대출일자|체결종목명40|체결단가"
menustr1 = menulist.split('|')
i = 0
for menu in menustr1:
print("%s [%s]" % (menu, pValue[i]))
i += 1
# 해외주식체결통보 출력라이브러리
def stocksigningnotice_overseas(data, key, iv):
menulist = "고객 ID|계좌번호|주문번호|원주문번호|매도매수구분|정정구분|주문종류2|단축종목코드|주문수량|체결단가|체결시간|거부여부|체결여부|접수여부|지점번호|체결수량|계좌명|체결종목명|해외종목구분|담보유형코드|담보대출일자"
menustr1 = menulist.split('|')
# AES256 처리 단계
aes_dec_str = aes_cbc_base64_dec(key, iv, data)
pValue = aes_dec_str.split('^')
if pValue[12] == '2': # 체결통보
print("#### 해외주식 체결 통보 ####")
else:
print("#### 해외주식 주문·정정·취소·거부 접수 통보 ####")
i = 0
for menu in menustr1:
print("%s [%s]" % (menu, pValue[i]))
i += 1
def create_broker(api_key, api_secret, acc_no):
broker = mojito.KoreaInvestment(
api_key=api_key,
api_secret=api_secret,
acc_no=acc_no,
exchange='NASD'
)
return broker
def get_balance(broker):
return broker.fetch_present_balance()
def get_stock_price(broker, symbol):
return broker.fetch_price(symbol)
def place_buy_order_limits(broker, symbol, price, qty):
return broker.create_limit_buy_order(
symbol=symbol,
price=price,
quantity=qty
)
def place_sell_order_limits(broker, symbol, price, qty):
return broker.create_limit_sell_order(
symbol=symbol,
price=price,
quantity=qty
)
def cancel_order(broker, KRX_FWDG_ORD_ORGNO, ODNO, qty, cancel_all=False):
return broker.cancel_order(org_no=KRX_FWDG_ORD_ORGNO, order_no=ODNO, quantity=qty, total=cancel_all)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment