Last active
June 6, 2024 17:17
-
-
Save ingoogni/a7f7c0fed9e96b3545e9a8c6139d80b0 to your computer and use it in GitHub Desktop.
CherryPy SSE / Server-Sent Events, Eventsource helpers to publish data from cherrypy's pub/sub bus
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import threading | |
import cherrypy | |
from cherrypy.process import plugins | |
class Portier(threading.Thread): | |
""" | |
The Doorman (Portier) detects changes of message by listening to the | |
subscribed channel, opens 'the door' as a message appears, yield it | |
and closes the door once trough. | |
channel: the cherrypy bus channel to listen to. | |
heartbeat: start a default heartbeat on the channel, every 30 s a blank SSE | |
comment line. | |
interval: set interval of heartbeat, in s. | |
hbmsg: set message for the heartbeat, mostley usefull for debugging. | |
""" | |
def __init__(self, channel, heartbeat=None, interval=None, hbmsg=None,group=None, target=None, name=None, | |
args=(), kwargs=None, *, daemon=None): | |
super().__init__(group=group, target=target, name=name, | |
daemon=daemon) | |
#self.daemon = True | |
self.channel = channel | |
self.e = threading.Event() | |
self.name = 'Portier-'+self.name | |
cherrypy.engine.subscribe(channel, self._msgs) | |
if heartbeat: | |
beatmsg = {'channel': self.channel, | |
'listner': threading.current_thread()} | |
if interval: | |
beatmsg['interval'] = interval | |
if hbmsg: | |
beatmsg['message'] = hbmsg | |
cherrypy.engine.publish('startbeat', beatmsg) | |
@property | |
def message(self): | |
"""contains the last message published to the bus channel""" | |
return self._message | |
@message.setter | |
def message(self, msg): | |
"""Sets the latest message and triggers the 'door' to open""" | |
self.e.set() | |
self._message = msg | |
def messages(self): | |
""" | |
The Doorman's door, yields the messages as they appear on | |
the bus channel. | |
""" | |
while True: | |
self.e.wait() | |
yield self._message | |
self.e.clear() | |
def _msgs(self, message): | |
"""Receives the messages from the bus""" | |
self.message = message | |
def unsubscribe(self): | |
""" | |
Unsubscribe from the message stream, signals to remove the thread | |
from the heartbet stream | |
""" | |
cherrypy.engine.unsubscribe(self.channel, self._msgs) | |
cherrypy.engine.publish('stopbeat', | |
{'channel': self.channel, | |
'listner': threading.current_thread() | |
}) | |
class Beats(threading.Thread): | |
""" | |
publishes a heartbeat message on the cherrypy bus in a steady rithm. | |
interval: in seconds | |
channel: channel to publish the heartbeat message to | |
message: the heartbeat message to be published | |
""" | |
def __init__(self, interval, channel, message): | |
threading.Thread.__init__(self) | |
self.daemon = True | |
self.event = threading.Event() | |
self.interval = interval | |
self.channel = channel | |
self.message = message # ": \n\n" | |
self.start() | |
def run(self): | |
"""publishes the messages""" | |
while not self.event.is_set(): | |
cherrypy.engine.publish(self.channel, self.message) | |
self.event.wait(self.interval) | |
def backbeat(self): | |
"""'awakens the thread, paused by 'breakbeat'""" | |
self.event.clear() | |
def breakbeat(self): | |
"""'pauses' the beats until awakend by 'backbeat'""" | |
self.event.is_set() | |
class Beatmanager(): | |
""" | |
Instantiates the Beats classes, one per channel. Registers the channel, | |
the listners of a channel and pauses the heartbeat when there are no | |
listeners left. Communication with the Beatmaster is through the | |
Heartbeat class that listens to the channels 'startbeat' and 'stopbeat'. | |
""" | |
def __init__(self): | |
self.threads = {} | |
self.listners = {} | |
def start(self, msg): | |
""" | |
Starts or resumes a Beats thread. | |
msg: requires a dictionary as message | |
{'channel': channel name, required | |
'listner': listner name (can be treadID), required | |
'interval': interval in seconds, optional, default 30 | |
'message': optional, default, a blank SSE comment | |
} | |
""" | |
channel = msg['channel'] | |
listner = msg['listner'] | |
interval = msg.setdefault('interval', 30) | |
message = msg.setdefault('message', ": \n\n") | |
if channel not in self.threads: | |
self.threads[channel] = Beats(interval, channel, message) | |
self.listners[channel] = [listner] | |
elif len(self.listners[channel])==0: | |
self.listners[channel] = [listner] | |
self.threads[channel].backbeat() | |
else: | |
self.listners[channel].append(listner) | |
def stop(self, msg): | |
""" | |
Listens to the 'stopbeat' channel, removes listeners, pauses the | |
Beats thread when there are no listeners left. | |
msg: requires a dictionary as message | |
{'channel': channel name, 'listener': listnername} | |
""" | |
channel = msg['channel'] | |
listner = msg['listner'] | |
if listner in self.listners[channel]: | |
self.listners[channel].remove(listner) | |
if len(self.listners[channel])==0: | |
self.threads[channel].breakbeat() | |
class Heartbeat(plugins.SimplePlugin): | |
""" | |
Accesspoint to the Beatmanager and thus Beats classes. Listens to | |
the startbeat and stopbeat channels and passes the data on. Registers | |
both to the bus. | |
""" | |
def __init__(self, bus, Beatmanager): | |
plugins.SimplePlugin.__init__(self, bus) | |
self.heartbeat = Beatmanager() | |
self.bus.subscribe('startbeat', self.heartbeat.start) | |
self.bus.subscribe('stopbeat', self.heartbeat.stop) | |
def start(self): | |
self.bus.log('Starting up Heartbeat') | |
def stop(self): | |
self.bus.unsubscribe('startbeat', self.heartbeat.start) | |
self.bus.unsubscribe('stopbeat', self.heartbeat.stop) | |
self.bus.log('Shut down Heartbeat') | |
Heartbeat(cherrypy.engine, Beatmanager).subscribe() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
CherryPy as a web framework is build on a pub/sub process bus. It is rather easy to push | |
data onto that bus, as well from background processes as from web apps / POST methods. | |
Getting data from that bus and post them to the web proved to be a bit harder. The bus in | |
my eyes is the ideal mechanism to push data through and then publish them by means of | |
Server Sent Events, eventually I figured out how it could be done, hence the Portier class. | |
The Portier class is used to subscribe to a message channel and monitors changes to the | |
message (@message.setter). On a change it 'opens the door' (Portier.messages), yields the | |
message to the receiving publishing method (in this case Root.pubcpu) and closes the door. | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import cherrypy | |
import time | |
import random | |
import requests | |
def rndtobus(channel): | |
""" | |
generate randomly intervalled <2s, random int 0-100, | |
push the messages directly onto the cherrypy bus. | |
Simulate for example the listen/notify from Postgresql putting | |
the latest transaction on the bus. | |
""" | |
time.sleep(2) | |
while True: | |
time.sleep(random.random()*2) | |
msg = "event: time\ndata: {}\n\n".format(str(random.randint(0,100))) | |
cherrypy.engine.publish(channel, msg) | |
def rndtoport(): | |
""" | |
generate randomly intervalled <2s, random int 0-100, | |
post the data to the sensor method | |
Simulate for example an ESP8266 posting data to a IP over WiFi. | |
""" | |
time.sleep(2) | |
headers = {"Content-Type": "application/json"} | |
url = 'http://10.0.0.4:8080/sensor/' | |
while True: | |
time.sleep(random.random()*2) | |
payload={'cpu': random.randint(0,100)} | |
r = requests.post(url, headers=headers, json=payload) | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<!doctype html> | |
<html> | |
<head> | |
<title>Line</title> | |
<!-- <script src="/static/Chart.min.js"></script> --> | |
<script src="https://cdnjs.cloudflare.com/ajax/libs/Chart.js/2.4.0/Chart.min.js"></script> | |
</head> | |
<body > | |
<div style="width:75%; box-shadow: 10px 10px 5px #999; margin:auto"> | |
<canvas id="canvas"></canvas> | |
</div> | |
<script> | |
var processor = { | |
type: 'line', | |
data: { | |
labels: [0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0], | |
datasets: [{ | |
data: [0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0], | |
label: "CPU", fill: false, pointRadius: 4, tension:0 | |
}] | |
}, | |
options: { | |
responsive: true, | |
title:{display: false, text: 'CPU'}, | |
legend:{display: false, text: 'CPU'} | |
} | |
}; | |
window.onload = function() { | |
var ctx = document.getElementById("canvas").getContext("2d"); | |
window.myLine = new Chart(ctx, processor); | |
}; | |
document.addEventListener('DOMContentLoaded', function () { | |
var source = new EventSource('pubcpu'); | |
source.addEventListener('time', function (event) { | |
var yy = event.data.split(","); | |
console.log('SSE YY:', yy); | |
processor.data.datasets.forEach(function(dataset, index) { | |
dataset.data.push(yy[index]); | |
dataset.data.shift(); | |
}); | |
console.log('array shift:', processor.data.datasets[0].data); | |
window.myLine.update(); | |
}); | |
source.addEventListener('error', function (event){ | |
console.log('SSE error:', event); | |
console.log('SSE state:', source.readyState); | |
}); | |
}, false); | |
</script> | |
</body> | |
</html> |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import time | |
import threading | |
import cherrypy | |
import cherrypy_SSE | |
import datapusher | |
class Root(): | |
@cherrypy.expose | |
@cherrypy.tools.json_in() | |
def sensor(self, **kwargs): | |
""" | |
receives POSTed data, converts to SSE ready message and | |
pushes it onto the cherrypy bus | |
""" | |
input_json = cherrypy.request.json | |
msg = "event: time\ndata: {}\n\n".format(str(input_json['cpu'])) | |
cherrypy.engine.publish("cpu", msg) | |
@cherrypy.expose | |
def pubcpu(self): | |
""" | |
publishes data from the subscribed channel... | |
""" | |
channel = 'cpu' | |
#doorman = Portier(channel, heartbeat=True) | |
#to show the heartbeat in the graph: | |
doorman = cherrypy_SSE.Portier(channel, | |
heartbeat=True, | |
interval=5, hbmsg="event: time\ndata: 999\n\n") | |
cherrypy.response.headers["Content-Type"] = "text/event-stream" | |
def pub(): | |
for message in doorman.messages(): | |
try: | |
yield message | |
except GeneratorExit: | |
# cherrypy shuts down the generator when the client | |
# disconnects. Catch disconnect and unsubscribe to clean up | |
doorman.unsubscribe() | |
return | |
return pub() | |
pubcpu._cp_config = {'response.stream': True} | |
@cherrypy.expose | |
def index(self): | |
return open('index.html') | |
if __name__ == '__main__': | |
rnd_bus = threading.Thread(target=datapusher.rndtobus, args=('cpu',)) | |
rnd_bus.daemon = True | |
rnd_bus.start() | |
rnd_port = threading.Thread(target=datapusher.rndtoport) | |
rnd_port.daemon = True | |
#rnd_port.start() | |
conf = { | |
'global': { | |
'server.socket_host': "10.0.0.4", | |
'server.socket_port': 8080, | |
'server.thread_pool': 25, | |
'server.socket_queue_size': 10, | |
'engine.autoreload.on': False, | |
} | |
} | |
cherrypy.quickstart(Root(), config=conf) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Yes. Your
lengthy_operation
or processes called by it could publish messages directly on the bus that then are published by SSE. Iflengthy_operations
is a separate process it couldPOST
data that are received and the put on the bus to be published. datapusher.py is an example of both.Yet another option is to write plugins that receive data and push them on the bus.
https://gist.github.com/ingoogni/416caa7ed0e8b79391f1899ed9de15b5
https://gist.github.com/ingoogni/169a2b8b1e484c71c95c55a2da2a65f0