Created
May 12, 2018 04:23
-
-
Save ruoyu0088/ea9a199762993b91f7ca13d0dbdbcdaa to your computer and use it in GitHub Desktop.
a demo for zmq process with bokeh server
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from os import path | |
from bokeh.models import Button, Div | |
from bokeh.layouts import column | |
from bokeh.document import without_document_lock | |
from bokeh.io import curdoc | |
from zmq_subprocess import ZmqSubProcessClient | |
ok_button = Button(label="ok") | |
div = Div() | |
def ok_button_clicked(): | |
subproc.send("reset") | |
ok_button.on_click(ok_button_clicked) | |
def process_message(message, doc=curdoc()): | |
def show(): | |
div.text = str(message) | |
doc.add_next_tick_callback(show) | |
subproc = ZmqSubProcessClient(curdoc()) | |
subproc.start_subprocess(path.join(path.dirname(__file__), "calc_process.py"), (), process_message) | |
curdoc().add_root(column(ok_button, div)) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import sys | |
import time | |
import zmq | |
from zmq_subprocess import ZmqSubProcess | |
zsp = ZmqSubProcess() | |
count = 0 | |
while True: | |
if zsp.poll() != 0: | |
message = zsp.recv() | |
if message == "reset": | |
count = 0 | |
print("send", count) | |
zsp.send(count) | |
count += 1 | |
time.sleep(1.0) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import sys | |
import subprocess | |
import zmq | |
import zmq.asyncio | |
from bokeh.document import without_document_lock | |
from bokeh.application.handlers import Handler | |
def on_session_destroyed(self, session_context): | |
if hasattr(session_context, "on_destroyed"): | |
return session_context.on_destroyed(session_context) | |
Handler.on_session_destroyed = on_session_destroyed | |
class ZmqSubProcessClient: | |
def __init__(self, doc, port=0): | |
ctx = zmq.asyncio.Context.instance() | |
self.socket = ctx.socket(zmq.PAIR) | |
if port == 0: | |
port = self.socket.bind_to_random_port("tcp://127.0.0.1") | |
else: | |
addr = "tcp://127.0.0.1:{}".format(port) | |
self.socket.bind(addr) | |
self.port = port | |
self.doc = doc | |
def start_subprocess(self, pyfile, args, message_callback): | |
self.process = subprocess.Popen(["python", pyfile] + [str(self.port)] + list(args)) | |
self.message_callback = message_callback | |
self.doc.add_next_tick_callback(self.message_loop) | |
self.doc.session_context.on_destroyed = self.destroy | |
def destroy(self, session_context): | |
self.process.kill() | |
@without_document_lock | |
async def message_loop(self): | |
while True: | |
message = await self.socket.recv_pyobj() | |
self.message_callback(message) | |
def send(self, message): | |
@without_document_lock | |
async def _send_message(): | |
await self.socket.send_pyobj(message) | |
self.doc.add_next_tick_callback(_send_message) | |
class ZmqSubProcess: | |
def __init__(self, port=None): | |
if port is None: | |
port = int(sys.argv[1]) | |
ctx = zmq.Context.instance() | |
self.socket = ctx.socket(zmq.PAIR) | |
self.socket.connect("tcp://127.0.0.1:{}".format(port)) | |
def send(self, obj): | |
self.socket.send_pyobj(obj) | |
def poll(self): | |
return self.socket.poll(timeout=0) | |
def recv(self): | |
return self.socket.recv_pyobj() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment