Skip to content

Instantly share code, notes, and snippets.

@yoonbae81
Created March 24, 2018 14:02
Show Gist options
  • Save yoonbae81/1909ba16c29daada0c41d30b5a9678b2 to your computer and use it in GitHub Desktop.
Save yoonbae81/1909ba16c29daada0c41d30b5a9678b2 to your computer and use it in GitHub Desktop.
A prototype monitor module with ZeroMQ
#!/usr/bin/python3
from logger import get_logger
import asyncio
import json
import numpy
import os
import time
import sys
import zmq
from collections import defaultdict
def main():
get_logger().info("Starting {} process (pid:{})".format(os.path.basename(__file__), os.getpid()))
global socket_prev
global socket_next
global config
with open("config.json") as file:
config = json.load(file)
loop = asyncio.get_event_loop()
try:
get_logger().debug("Connecting a socket to subscribe price message on port {}".format(config["fetcher"]["port"]))
socket_prev = zmq.Context().socket(zmq.SUB)
socket_prev.setsockopt_string(zmq.SUBSCRIBE, '')
socket_prev.connect("tcp://127.0.0.1:{}".format(config["fetcher"]["port"]))
socket_next = zmq.Context().socket(zmq.PUB)
socket_next.connect("tcp://127.0.0.1:{}".format(config["riskmanager"]["port"]))
get_logger().debug('Entering an event loop')
loop.run_until_complete(endless_task())
except KeyboardInterrupt:
get_logger().debug('Stopping the event loop by keyboard interrupt')
loop.stop()
except Exception as e:
get_logger().warn(repr(e))
raise
finally:
get_logger().debug('Closing the event loop')
loop.close()
async def endless_task():
while True:
message = await receive()
#get_logger().debug("message received {}".format(message))
symbol, _ = message.split()
if symbol == sys.argv[1]:
asyncio.get_event_loop().create_task(analyze(message))
async def receive():
await asyncio.sleep(0.001) # analyze가 끼어들 틈을 주기 위함
return socket_prev.recv_string()
async def analyze(message):
global config
symbol, price = message.split()
price = int(price)
get_logger().debug("analyzed {}".format(message))
await asyncio.sleep(0.7) # 분석에 시간이 걸림
if price < 40000:
await send(message)
async def send(message):
global socket_next
get_logger().debug("sending to riskmanager {}".format(message))
socket_next.send_string(message)
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment