Created
March 24, 2018 13:47
-
-
Save yoonbae81/89de4661c2eba4a25f5ab139de6b1e00 to your computer and use it in GitHub Desktop.
yQuant prototype using ZeroMQ
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python3 | |
import datetime | |
import json | |
import time | |
import zmq | |
import random | |
import operator | |
import re | |
import time | |
import sys | |
import pymysql | |
from collections import defaultdict | |
from os import getpid | |
from threading import Thread | |
from multiprocessing import Process, Pool | |
from urllib.request import urlopen | |
from urllib.error import URLError | |
from logger import get_logger | |
def feed_dummy_price(start_date, end_date): | |
""" | |
for Backtesting | |
""" | |
# SELECT * FROM price_intraday WHERE datetime BTEWEEN start_date AND | |
# end_date | |
pass | |
def feed_price(port): | |
get_logger().debug( | |
"Process[feed_price] is started (pid:{})".format(getpid())) | |
def parse(URL, socket): | |
# 정규표현식으로 파싱(키값에 따옴표가 없어서 JSON 파싱 불가) | |
# example : , {code:"095570",name :"AJ네트웍스",cost :"34,650",updn | |
# :"▲100",rate :"+0.29%"} | |
pattern = "code:\"(.+)\",name :\"(.+)\",cost :\"(.+)\",updn" | |
rep = re.compile(pattern) | |
start = time.time() | |
response = urlopen(URL, timeout=TIMEOUT).read().decode('UTF-8') | |
get_logger().debug( | |
'웹페이지 수신 ({0:.5g} seconds)'.format(time.time() - start)) | |
start = time.time() | |
for line in response.splitlines(): | |
if '장종료' in line: | |
is_market_finished = True | |
# 장이 종료되었으면 마지막 한 번은 저장 | |
if 'code' not in line: | |
continue | |
# print(line) | |
match = rep.search(line) | |
symbol = match.group(1) | |
name = match.group(2) | |
price = int(match.group(3).replace(',', '')) | |
get_logger().debug( | |
"종목코드/가격 파싱 ({0:.5g} seconds)".format(time.time() - start)) | |
# response = urlopen(URL, timeout=1).read().decode('UTF-8') | |
# for line in response.splitlines(): | |
symbol = random.choice(['삼성전자', '한국전력', '현대차']) | |
price = random.randrange(10000, 90000, 1000) | |
socket.send_string("{}={}".format(symbol, price)) | |
get_logger().debug( | |
"feed_price sent a message ({}={})".format(symbol, price)) | |
context = zmq.Context() | |
socket = context.socket(zmq.PUB) | |
socket.bind("tcp://127.0.0.1:{}".format(port)) | |
TIMEOUT = 1 | |
INTERVAL = 3 | |
URLs = ('http://finance.daum.net/xml/xmlallpanel.daum?stype=P&type=S', | |
'http://finance.daum.net/xml/xmlallpanel.daum?stype=Q&type=S') | |
try: | |
while True: | |
for URL in URLs: | |
# 반드시 Thread (socket 전달과 main에서 모든 프로세스 종료 목적) | |
# 측정결과 parse는 Thread로 하여도 Process 대비 성능저하 없음 | |
job = Thread(target=parse, args=(URL, socket)) | |
job.start() | |
jobs.append(job) | |
time.sleep(INTERVAL) | |
except URLError: | |
get_logger().debug('웹페이지 수신 {}초내 수신 실패'.format(TIMEOUT)) | |
return | |
except KeyboardInterrupt: | |
for job in jobs: | |
job.terminate() | |
finally: | |
socket.close() | |
context.term() | |
def store_price(port): | |
get_logger().debug( | |
"Process[store_price] is started (pid:{})".format(getpid())) | |
context = zmq.Context() | |
socket = context.socket(zmq.SUB) | |
socket.setsockopt_string(zmq.SUBSCRIBE, '') | |
socket.connect("tcp://127.0.0.1:{}".format(port)) | |
with open('config.json') as file: | |
config = json.load(file)['mysql'] | |
prices = defaultdict(lambda: 0) | |
try: | |
start = time.time() | |
conn = pymysql.connect(host=config['host'], | |
port=config['port'], | |
user=config['user'], | |
password=config['password'], | |
db=config['db']) | |
cursor = conn.cursor() | |
get_logger().debug( | |
'Database 연결 ({0:.5g} seconds)'.format(time.time() - start)) | |
while True: | |
message = socket.recv_string() | |
get_logger().debug( | |
"store_price recevied a message ({})".format(message)) | |
start = time.time() | |
affected_rows = 0 | |
insert_values.append("('{}', {})".format(symbol, price)) | |
if(len(insert_values) > 0): | |
cursor.execute( | |
"INSERT INTO price_intraday (symbol, price) VALUES " + ", ".join(insert_values)) | |
affected_rows = cursor.rowcount | |
return affected_rows | |
# INSERT INTO price_intraday | |
except pymysql.err.OperationalError: | |
get_logger().warn('Database 연결 실패') | |
raise | |
finally: | |
socket.close() | |
context.term() | |
conn.commit() | |
conn.close() | |
get_logger().debug("Database {0}건 저장 ({1:.5g} seconds)".format( | |
affected_rows, time.time() - start)) | |
def monitor_price(port, symbol, price, op=operator.lt): | |
get_logger().debug( | |
"Process[monitor_price] started (pid:{})".format(getpid())) | |
context = zmq.Context() | |
socket = context.socket(zmq.SUB) | |
socket.setsockopt_string(zmq.SUBSCRIBE, symbol) | |
socket.connect("tcp://127.0.0.1:{}".format(port)) | |
try: | |
while True: | |
message = socket.recv_string() | |
get_logger().debug( | |
"monitor_price recevied a message ({})".format(message)) | |
symbol = message.split('=')[0] | |
current_price = int(message.split('=')[1]) | |
if op(price, current_price): | |
get_logger().debug("{} reached more than the monitoring price, {} (current: {})".format( | |
symbol, price, current_price)) | |
# Send a order message to portfolio or broker process | |
finally: | |
socket.close() | |
context.term() | |
def main(): | |
get_logger().debug("Process[main] is started (pid:{})".format(getpid())) | |
pool = Pool() | |
try: | |
pool.apply_async(feed_price, args=(50001,)) | |
pool.apply_async(store_price, args=(50001,)) | |
#pool.apply_async(monitor_price, args=(50001, '현대차', 65000)) | |
except KeyboardInterrupt: | |
pool.terminate() | |
else: | |
pool.close() | |
finally: | |
pool.join() | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment