Created
March 24, 2018 14:02
-
-
Save yoonbae81/1909ba16c29daada0c41d30b5a9678b2 to your computer and use it in GitHub Desktop.
A prototype monitor module with ZeroMQ
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python3 | |
from logger import get_logger | |
import asyncio | |
import json | |
import numpy | |
import os | |
import time | |
import sys | |
import zmq | |
from collections import defaultdict | |
def main(): | |
get_logger().info("Starting {} process (pid:{})".format(os.path.basename(__file__), os.getpid())) | |
global socket_prev | |
global socket_next | |
global config | |
with open("config.json") as file: | |
config = json.load(file) | |
loop = asyncio.get_event_loop() | |
try: | |
get_logger().debug("Connecting a socket to subscribe price message on port {}".format(config["fetcher"]["port"])) | |
socket_prev = zmq.Context().socket(zmq.SUB) | |
socket_prev.setsockopt_string(zmq.SUBSCRIBE, '') | |
socket_prev.connect("tcp://127.0.0.1:{}".format(config["fetcher"]["port"])) | |
socket_next = zmq.Context().socket(zmq.PUB) | |
socket_next.connect("tcp://127.0.0.1:{}".format(config["riskmanager"]["port"])) | |
get_logger().debug('Entering an event loop') | |
loop.run_until_complete(endless_task()) | |
except KeyboardInterrupt: | |
get_logger().debug('Stopping the event loop by keyboard interrupt') | |
loop.stop() | |
except Exception as e: | |
get_logger().warn(repr(e)) | |
raise | |
finally: | |
get_logger().debug('Closing the event loop') | |
loop.close() | |
async def endless_task(): | |
while True: | |
message = await receive() | |
#get_logger().debug("message received {}".format(message)) | |
symbol, _ = message.split() | |
if symbol == sys.argv[1]: | |
asyncio.get_event_loop().create_task(analyze(message)) | |
async def receive(): | |
await asyncio.sleep(0.001) # analyze가 끼어들 틈을 주기 위함 | |
return socket_prev.recv_string() | |
async def analyze(message): | |
global config | |
symbol, price = message.split() | |
price = int(price) | |
get_logger().debug("analyzed {}".format(message)) | |
await asyncio.sleep(0.7) # 분석에 시간이 걸림 | |
if price < 40000: | |
await send(message) | |
async def send(message): | |
global socket_next | |
get_logger().debug("sending to riskmanager {}".format(message)) | |
socket_next.send_string(message) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment