Skip to content

Instantly share code, notes, and snippets.

@ingoogni
Last active June 6, 2024 17:17
Show Gist options
  • Save ingoogni/a7f7c0fed9e96b3545e9a8c6139d80b0 to your computer and use it in GitHub Desktop.
Save ingoogni/a7f7c0fed9e96b3545e9a8c6139d80b0 to your computer and use it in GitHub Desktop.
CherryPy SSE / Server-Sent Events, Eventsource helpers to publish data from cherrypy's pub/sub bus
import threading
import cherrypy
from cherrypy.process import plugins
class Portier(threading.Thread):
"""
The Doorman (Portier) detects changes of message by listening to the
subscribed channel, opens 'the door' as a message appears, yield it
and closes the door once trough.
channel: the cherrypy bus channel to listen to.
heartbeat: start a default heartbeat on the channel, every 30 s a blank SSE
comment line.
interval: set interval of heartbeat, in s.
hbmsg: set message for the heartbeat, mostley usefull for debugging.
"""
def __init__(self, channel, heartbeat=None, interval=None, hbmsg=None,group=None, target=None, name=None,
args=(), kwargs=None, *, daemon=None):
super().__init__(group=group, target=target, name=name,
daemon=daemon)
#self.daemon = True
self.channel = channel
self.e = threading.Event()
self.name = 'Portier-'+self.name
cherrypy.engine.subscribe(channel, self._msgs)
if heartbeat:
beatmsg = {'channel': self.channel,
'listner': threading.current_thread()}
if interval:
beatmsg['interval'] = interval
if hbmsg:
beatmsg['message'] = hbmsg
cherrypy.engine.publish('startbeat', beatmsg)
@property
def message(self):
"""contains the last message published to the bus channel"""
return self._message
@message.setter
def message(self, msg):
"""Sets the latest message and triggers the 'door' to open"""
self.e.set()
self._message = msg
def messages(self):
"""
The Doorman's door, yields the messages as they appear on
the bus channel.
"""
while True:
self.e.wait()
yield self._message
self.e.clear()
def _msgs(self, message):
"""Receives the messages from the bus"""
self.message = message
def unsubscribe(self):
"""
Unsubscribe from the message stream, signals to remove the thread
from the heartbet stream
"""
cherrypy.engine.unsubscribe(self.channel, self._msgs)
cherrypy.engine.publish('stopbeat',
{'channel': self.channel,
'listner': threading.current_thread()
})
class Beats(threading.Thread):
"""
publishes a heartbeat message on the cherrypy bus in a steady rithm.
interval: in seconds
channel: channel to publish the heartbeat message to
message: the heartbeat message to be published
"""
def __init__(self, interval, channel, message):
threading.Thread.__init__(self)
self.daemon = True
self.event = threading.Event()
self.interval = interval
self.channel = channel
self.message = message # ": \n\n"
self.start()
def run(self):
"""publishes the messages"""
while not self.event.is_set():
cherrypy.engine.publish(self.channel, self.message)
self.event.wait(self.interval)
def backbeat(self):
"""'awakens the thread, paused by 'breakbeat'"""
self.event.clear()
def breakbeat(self):
"""'pauses' the beats until awakend by 'backbeat'"""
self.event.is_set()
class Beatmanager():
"""
Instantiates the Beats classes, one per channel. Registers the channel,
the listners of a channel and pauses the heartbeat when there are no
listeners left. Communication with the Beatmaster is through the
Heartbeat class that listens to the channels 'startbeat' and 'stopbeat'.
"""
def __init__(self):
self.threads = {}
self.listners = {}
def start(self, msg):
"""
Starts or resumes a Beats thread.
msg: requires a dictionary as message
{'channel': channel name, required
'listner': listner name (can be treadID), required
'interval': interval in seconds, optional, default 30
'message': optional, default, a blank SSE comment
}
"""
channel = msg['channel']
listner = msg['listner']
interval = msg.setdefault('interval', 30)
message = msg.setdefault('message', ": \n\n")
if channel not in self.threads:
self.threads[channel] = Beats(interval, channel, message)
self.listners[channel] = [listner]
elif len(self.listners[channel])==0:
self.listners[channel] = [listner]
self.threads[channel].backbeat()
else:
self.listners[channel].append(listner)
def stop(self, msg):
"""
Listens to the 'stopbeat' channel, removes listeners, pauses the
Beats thread when there are no listeners left.
msg: requires a dictionary as message
{'channel': channel name, 'listener': listnername}
"""
channel = msg['channel']
listner = msg['listner']
if listner in self.listners[channel]:
self.listners[channel].remove(listner)
if len(self.listners[channel])==0:
self.threads[channel].breakbeat()
class Heartbeat(plugins.SimplePlugin):
"""
Accesspoint to the Beatmanager and thus Beats classes. Listens to
the startbeat and stopbeat channels and passes the data on. Registers
both to the bus.
"""
def __init__(self, bus, Beatmanager):
plugins.SimplePlugin.__init__(self, bus)
self.heartbeat = Beatmanager()
self.bus.subscribe('startbeat', self.heartbeat.start)
self.bus.subscribe('stopbeat', self.heartbeat.stop)
def start(self):
self.bus.log('Starting up Heartbeat')
def stop(self):
self.bus.unsubscribe('startbeat', self.heartbeat.start)
self.bus.unsubscribe('stopbeat', self.heartbeat.stop)
self.bus.log('Shut down Heartbeat')
Heartbeat(cherrypy.engine, Beatmanager).subscribe()
CherryPy as a web framework is build on a pub/sub process bus. It is rather easy to push
data onto that bus, as well from background processes as from web apps / POST methods.
Getting data from that bus and post them to the web proved to be a bit harder. The bus in
my eyes is the ideal mechanism to push data through and then publish them by means of
Server Sent Events, eventually I figured out how it could be done, hence the Portier class.
The Portier class is used to subscribe to a message channel and monitors changes to the
message (@message.setter). On a change it 'opens the door' (Portier.messages), yields the
message to the receiving publishing method (in this case Root.pubcpu) and closes the door.
import cherrypy
import time
import random
import requests
def rndtobus(channel):
"""
generate randomly intervalled <2s, random int 0-100,
push the messages directly onto the cherrypy bus.
Simulate for example the listen/notify from Postgresql putting
the latest transaction on the bus.
"""
time.sleep(2)
while True:
time.sleep(random.random()*2)
msg = "event: time\ndata: {}\n\n".format(str(random.randint(0,100)))
cherrypy.engine.publish(channel, msg)
def rndtoport():
"""
generate randomly intervalled <2s, random int 0-100,
post the data to the sensor method
Simulate for example an ESP8266 posting data to a IP over WiFi.
"""
time.sleep(2)
headers = {"Content-Type": "application/json"}
url = 'http://10.0.0.4:8080/sensor/'
while True:
time.sleep(random.random()*2)
payload={'cpu': random.randint(0,100)}
r = requests.post(url, headers=headers, json=payload)
<!doctype html>
<html>
<head>
<title>Line</title>
<!-- <script src="/static/Chart.min.js"></script> -->
<script src="https://cdnjs.cloudflare.com/ajax/libs/Chart.js/2.4.0/Chart.min.js"></script>
</head>
<body >
<div style="width:75%; box-shadow: 10px 10px 5px #999; margin:auto">
<canvas id="canvas"></canvas>
</div>
<script>
var processor = {
type: 'line',
data: {
labels: [0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0],
datasets: [{
data: [0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0],
label: "CPU", fill: false, pointRadius: 4, tension:0
}]
},
options: {
responsive: true,
title:{display: false, text: 'CPU'},
legend:{display: false, text: 'CPU'}
}
};
window.onload = function() {
var ctx = document.getElementById("canvas").getContext("2d");
window.myLine = new Chart(ctx, processor);
};
document.addEventListener('DOMContentLoaded', function () {
var source = new EventSource('pubcpu');
source.addEventListener('time', function (event) {
var yy = event.data.split(",");
console.log('SSE YY:', yy);
processor.data.datasets.forEach(function(dataset, index) {
dataset.data.push(yy[index]);
dataset.data.shift();
});
console.log('array shift:', processor.data.datasets[0].data);
window.myLine.update();
});
source.addEventListener('error', function (event){
console.log('SSE error:', event);
console.log('SSE state:', source.readyState);
});
}, false);
</script>
</body>
</html>
import os
import time
import threading
import cherrypy
import cherrypy_SSE
import datapusher
class Root():
@cherrypy.expose
@cherrypy.tools.json_in()
def sensor(self, **kwargs):
"""
receives POSTed data, converts to SSE ready message and
pushes it onto the cherrypy bus
"""
input_json = cherrypy.request.json
msg = "event: time\ndata: {}\n\n".format(str(input_json['cpu']))
cherrypy.engine.publish("cpu", msg)
@cherrypy.expose
def pubcpu(self):
"""
publishes data from the subscribed channel...
"""
channel = 'cpu'
#doorman = Portier(channel, heartbeat=True)
#to show the heartbeat in the graph:
doorman = cherrypy_SSE.Portier(channel,
heartbeat=True,
interval=5, hbmsg="event: time\ndata: 999\n\n")
cherrypy.response.headers["Content-Type"] = "text/event-stream"
def pub():
for message in doorman.messages():
try:
yield message
except GeneratorExit:
# cherrypy shuts down the generator when the client
# disconnects. Catch disconnect and unsubscribe to clean up
doorman.unsubscribe()
return
return pub()
pubcpu._cp_config = {'response.stream': True}
@cherrypy.expose
def index(self):
return open('index.html')
if __name__ == '__main__':
rnd_bus = threading.Thread(target=datapusher.rndtobus, args=('cpu',))
rnd_bus.daemon = True
rnd_bus.start()
rnd_port = threading.Thread(target=datapusher.rndtoport)
rnd_port.daemon = True
#rnd_port.start()
conf = {
'global': {
'server.socket_host': "10.0.0.4",
'server.socket_port': 8080,
'server.thread_pool': 25,
'server.socket_queue_size': 10,
'engine.autoreload.on': False,
}
}
cherrypy.quickstart(Root(), config=conf)
@grohom
Copy link

grohom commented Apr 24, 2020

Hi there! I was wondering, this script solves the problem I described here?

@ingoogni
Copy link
Author

ingoogni commented Apr 25, 2020

Hi there! I was wondering, this script solves the problem I described here?

Yes. Your lengthy_operation or processes called by it could publish messages directly on the bus that then are published by SSE. If lengthy_operations is a separate process it could POST data that are received and the put on the bus to be published. datapusher.py is an example of both.

Yet another option is to write plugins that receive data and push them on the bus.
https://gist.github.com/ingoogni/416caa7ed0e8b79391f1899ed9de15b5
https://gist.github.com/ingoogni/169a2b8b1e484c71c95c55a2da2a65f0

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment