Last active
January 23, 2018 15:40
-
-
Save hanhha/a0b6f9a6fa87c9843de70d8e5f18415f to your computer and use it in GitHub Desktop.
Demo of Bokeh programmatic server
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#/usr/bin/env python3 | |
from datetime import datetime | |
from time import sleep | |
from bokeh.plotting import figure | |
from bokeh.application import Application | |
from bokeh.application.handlers.function import FunctionHandler | |
from bokeh.document import without_document_lock | |
from bokeh.models import ColumnDataSource | |
from bokeh.models.glyphs import Line | |
from bokeh.server.server import Server | |
from tornado import gen, ioloop | |
from threading import Thread, Lock | |
class BPA (object): # Base Processing Agent | |
def __init__ (self, source = None): | |
self._source = source | |
self._observer = list () | |
if self._source is not None: | |
self._source.Bind (self.CallBack) | |
def Bind (self, cb): | |
self._observer.append (cb) | |
def CallBack (self, data): | |
assert "Must be overidden", 0 | |
def BroadCast (self, data): | |
for cb in self._observer: | |
cb (data) | |
class DataGen (BPA): | |
def start (self): | |
in_turn = True | |
idx, y0, y1 = 0, 0, 0 | |
try: | |
while True: | |
s = 0 if in_turn else 1 | |
y = y0 if in_turn else y1 | |
self.BroadCast ({s : {'x': idx, 'y': y}}) | |
print ("New data for stream {s}: {x} - {y}".format (s = s, x = idx, y = y)) | |
idx += 1 | |
y0 += 1 if in_turn else 0 | |
y1 += 1 if not in_turn else 0 | |
in_turn = not in_turn | |
sleep (1) | |
except KeyboardInterrupt: | |
print ("\rUser interrupted.") | |
class Chart (BPA): | |
def __init__ (self, source): | |
self._new_data = dict () | |
self._sources = dict () | |
self.port = 8888 | |
self.allow_websocket_origin = ['localhost:8888'] | |
self._cb_lock = Lock () | |
self._new_data_lock = Lock () | |
self._exec_t = None | |
self.doc = None | |
BPA.__init__ (self, source) | |
@gen.coroutine | |
def real_update_data (self): | |
self._new_data_lock.acquire () | |
new_plot_data = self._new_data.copy() | |
self._new_data = dict () | |
self._new_data_lock.release () | |
for k, v in new_plot_data.items(): | |
if k in self._sources.keys(): | |
self._sources[k].stream (v, rollover = 288) | |
self._cb_lock.release () | |
@gen.coroutine | |
@without_document_lock | |
def update_data (self): | |
if self._cb_lock.acquire (False): | |
self.doc.add_next_tick_callback (self.real_update_data) | |
def CallBack (self, data): | |
self._new_data_lock.acquire () | |
for k, v in data.items(): | |
if k not in self._new_data.keys(): | |
self._new_data[k] = {} | |
for kd, vd in v.items(): | |
if kd not in self._new_data[k].keys(): | |
self._new_data[k][kd] = [vd] | |
else: | |
self._new_data[k][kd].append (vd) | |
self._new_data_lock.release () | |
def modify_document (self, doc): | |
self._cb_lock.acquire () | |
self.doc = doc | |
p = figure(sizing_mode = 'scale_width', plot_height = 200) | |
p.grid.grid_line_alpha = 0.6 | |
if self._sources: | |
for k, v in self._sources.items(): | |
tmp = v.to_df().to_dict(orient='list') | |
self._sources[k] = ColumnDataSource(tmp) | |
else: | |
self._sources [0] = ColumnDataSource ({'x':[],'y':[]}) | |
self._sources [1] = ColumnDataSource ({'x':[],'y':[]}) | |
g0 = Line (x = 'x', y = 'y', line_color = 'blue') | |
g1 = Line (x = 'x', y = 'y', line_color = 'red') | |
p.add_glyph (self._sources[0], g0) | |
p.add_glyph (self._sources[1], g1) | |
self.doc.add_root (p) | |
self.doc.add_periodic_callback (self.update_data, 1000) | |
self._cb_lock.release () | |
return self.doc | |
def start (self): | |
app = {'/': Application(FunctionHandler(self.modify_document))} | |
self.server = Server (app, port = self.port, allow_websocket_origin = self.allow_websocket_origin) | |
self.server.start () | |
self._exec_t = Thread (name = "bokeh_chart", target = self.server.io_loop.start) | |
self._exec_t.start () | |
def stop (self): | |
self.server.io_loop.stop () | |
self._exec_t.join () | |
if __name__ == '__main__': | |
datagen = DataGen () | |
chart = Chart (datagen) | |
chart.start () | |
datagen.start () | |
chart.stop () | |
exit (0) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment