Created
January 6, 2019 16:09
-
-
Save distagon/6ce5c152d27151367a5062f615c78529 to your computer and use it in GitHub Desktop.
群益證券報價 API 實驗程式
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import json | |
import math | |
import time | |
import signal | |
import asyncio | |
import threading | |
import pythoncom | |
import comtypes.client | |
import comtypes.gen.SKCOMLib as sk | |
# 文件 4-1 p16 | |
skC = comtypes.client.CreateObject(sk.SKCenterLib, interface=sk.ISKCenterLib) | |
# 文件 4-4 p77 | |
skQ = comtypes.client.CreateObject(sk.SKQuoteLib, interface=sk.ISKQuoteLib) | |
# Ctrl+C 狀態參數 | |
app_done = False | |
class QuoteReceiver(threading.Thread): | |
def __init__(self): | |
super().__init__() | |
self.ready = False | |
with open('quicksk.json', 'r') as cfgfile: | |
self.config = json.load(cfgfile) | |
self.log_path = os.path.realpath(self.config['log_path']) | |
async def wait_for_ready(self): | |
while not self.ready: | |
time.sleep(0.25) | |
async def monitor_quote(self): | |
global skC, skQ, app_done | |
try: | |
#skC.SKCenterLib_ResetServer('morder1.capital.com.tw') | |
skC.SKCenterLib_SetLogPath(self.log_path) | |
print('登入', flush=True) | |
retq = -1 | |
retc = skC.SKCenterLib_Login(self.config['account'], self.config['password']) | |
if retc == 0: | |
print('啟動行情監控', flush=True) | |
retry = 0 | |
while retq != 0 and retry < 3: | |
if retry > 0: | |
print('嘗試再啟動 {}'.format(retry)) | |
retq = skQ.SKQuoteLib_EnterMonitor() | |
retry += 1 | |
else: | |
msg = skC.SKCenterLib_GetReturnCodeMessage(retc) | |
print('登入失敗: #{} {}'.format(retc, msg)) | |
if retq == 0: | |
print('等待行情監控器啟動完成') | |
await self.wait_for_ready() | |
print('設定商品', flush=True) | |
list_with_comma = ','.join(self.config['products']) | |
skQ.SKQuoteLib_RequestStocks(0, list_with_comma) | |
else: | |
print('無法監看報價: #{}'.format(retq)) | |
while not app_done: | |
time.sleep(1) | |
except: | |
print('init() 發生不預期狀況', flush=True) | |
def run(self): | |
ehC = comtypes.client.GetEvents(skC, self) | |
ehQ = comtypes.client.GetEvents(skQ, self) | |
asyncio \ | |
.new_event_loop() \ | |
.run_until_complete(self.monitor_quote()) | |
def OnTimer(self, nTime): | |
print('OnTimer(): {}'.format(nTime), flush=True) | |
def OnConnection(self, nKind, nCode): | |
print('OnConnection(): nKind={}, nCode={}'.format(nKind, nCode), flush=True) | |
if nKind == 3003: | |
self.ready = True | |
def OnNotifyQuote(self, sMarketNo, sStockidx): | |
pStock = sk.SKSTOCK() | |
skQ.SKQuoteLib_GetStockByIndex(sMarketNo, sStockidx, pStock) | |
msg = '{} {} 總量: {} 收盤: {}'.format( | |
pStock.bstrStockNo, | |
pStock.bstrStockName, | |
pStock.nTQty, | |
pStock.nClose / math.pow(10, pStock.sDecimal) | |
) | |
print(msg) | |
def ctrl_c(sig, frm): | |
global app_done, skQ | |
print('Ctrl+C detected.') | |
skQ.SKQuoteLib_LeaveMonitor() | |
app_done = True | |
def main(): | |
signal.signal(signal.SIGINT, ctrl_c) | |
qrcv = QuoteReceiver() | |
qrcv.start() | |
print('Main thread: #{}'.format(threading.get_ident()), flush=True) | |
print('Receiver thread: #{}'.format(qrcv.ident), flush=True) | |
while not app_done: | |
pythoncom.PumpWaitingMessages() | |
time.sleep(5) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment