Last active
November 5, 2023 11:33
-
-
Save AstraLuma/bd340e1ba20613ce5c1f1b5ff552ebfb to your computer and use it in GitHub Desktop.
Python Server-Sent Events/EventSource Implementations
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# For Django Channels | |
import json | |
from channels.exceptions import StopConsumer | |
from channels.generic.http import AsyncHttpConsumer | |
from django.utils.http import parse_header_parameters | |
from django.utils.http import parse_header_parameters | |
def get_accepts(scope: dict): | |
for header, value in scope['headers']: | |
if header.lower() == b'accept': | |
yield from parse_accept(value.decode('utf-8')) | |
def parse_accept(txt: str): | |
for bit in txt.split(','): | |
yield parse_header_parameters(bit.strip()) | |
class EventsRouter: | |
""" | |
ASGI middleware that seperates EventSource requests from normal ones. | |
Namely, if the request accepts text/event-stream, SSE is used. | |
""" | |
def __init__(self, events, web): | |
self.events_handler = events | |
self.web_handler = web | |
def __call__(self, scope, receive, send): | |
types = [t.lower() for t, _ in get_accepts(scope)] | |
if 'text/event-stream' in types and scope['method'] == 'GET': | |
return self.events_handler(scope, receive, send) | |
else: | |
return self.web_handler(scope, receive, send) | |
class AsyncSSEConsumer(AsyncHttpConsumer): | |
# This mostly overrides AsyncHttpConsumer but it provides some useful utilities | |
@property | |
def last_event_id(self): | |
""" | |
The value of the Last-Event-ID header, or None if not given. | |
Raises an error if the headers haven't been received yet. | |
""" | |
return self.scope['headers'].get('Last-Event-ID', None) | |
# User API | |
async def send_headers(self, *, status=200, headers=None): | |
if status == 200: | |
if headers is None: | |
headers = [ | |
(b'Content-Type', b'text/event-stream'), | |
(b'Cache-Control', b'no-cache'), | |
] | |
elif isinstance(headers, dict): | |
headers[b'Content-Type'] = b'text/event-stream' | |
headers[b'Cache-Control'] = b'no-cache' | |
else: | |
headers += [ | |
(b'Content-Type', b'text/event-stream'), | |
(b'Cache-Control', b'no-cache'), | |
] | |
return await super().send_headers(status=status, headers=headers) | |
async def accept(self): | |
""" | |
Accept the SSE connection. | |
Sends a 200 Ok with the appropriate Content-Type and such. | |
""" | |
await self.send_headers(status=200) | |
# Force sending headers immediately | |
await self.send_body(b"", more_body=True) | |
async def reject(self, *, status, headers=None, body=None): | |
""" | |
Reject the SSE, sending an error and terminating the connection. | |
""" | |
await self.send_headers(status=status, headers=headers) | |
await self.send_body(body, more_body=False) | |
await self.disconnect() | |
raise StopConsumer() | |
async def send_event(self, *, data, **fields): | |
""" | |
Sends an event to the client, with the fields as keyword arguments. | |
The data field is required. | |
Other fields specified in HTML5 (section 9.2): | |
* event: the event type | |
* id: the event ID (used to when reconnecting) | |
* retry: time to wait before reconnecting, in seconds | |
""" | |
fields['data'] = data | |
await self.send_body( | |
b"\n".join( | |
f"{name}: {line}".encode('utf-8') | |
for name, value in fields.items() | |
for line in str(value).replace('\r\n', '\n').replace('\r', '\n').split('\n') | |
) + b"\n\n", | |
more_body=True, | |
) | |
async def send_event_json(self, *, data, **fields): | |
await self.send_event(data=json.dumps(data), **fields) | |
async def terminate(self): | |
""" | |
Kill the connection from the server side. | |
""" | |
await self.send_body(b"", more_body=False) | |
await self.disconnect() | |
raise StopConsumer() | |
# Overridables | |
async def connect(self): | |
""" | |
Called when the SSE is opened | |
""" | |
# Default: Just accept the connection | |
await self.accept() | |
async def disconnect(self): | |
""" | |
Overrideable place to run disconnect handling. Do not send anything | |
from here. | |
""" | |
pass |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Partial implementation of an SSE client on requests | |
# This needs work to be more generic | |
import json | |
import requests | |
import socket | |
import time | |
import urllib3 | |
class HTTPAdapterWithKeepalive(requests.adapters.HTTPAdapter): | |
# Keepalive parameters | |
interval_sec = 30 | |
idle_sec = interval_sec | |
max_fails = 5 | |
def init_poolmanager(self, *args, **kwargs): | |
sockopts = urllib3.connection.HTTPConnection.default_socket_options + [] | |
if hasattr(socket, 'SO_KEEPALIVE'): | |
# Not Windows | |
sockopts += [ | |
(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1), | |
] | |
if hasattr(socket, 'TCP_KEEPALIVE'): | |
# Mac | |
sockopts += [ | |
(socket.IPPROTO_TCP, socket.TCP_KEEPALIVE, self.interval_sec) | |
] | |
if hasattr(socket, 'TCP_KEEPIDLE'): | |
# Linux | |
sockopts += [ | |
(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, self.idle_sec) | |
] | |
if hasattr(socket, 'TCP_KEEPINTVL'): | |
# Linux | |
sockopts += [ | |
(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, self.interval_sec) | |
] | |
if hasattr(socket, 'TCP_KEEPCNT'): | |
# Linux | |
sockopts += [ | |
(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, self.max_fails) | |
] | |
# Windows: | |
# sock.ioctl(socket.SIO_KEEPALIVE_VALS, (<1 to turn on>, <idle time in ms>, <interval in ms>)) | |
# https://msdn.microsoft.com/en-us/library/dd877220%28v=vs.85%29.aspx | |
super().init_poolmanager(*args, socket_options=sockopts, **kwargs) | |
def get_session(): | |
adapter = HTTPAdapterWithKeepalive() | |
s = requests.session() | |
s.mount("http://", adapter) | |
s.mount("https://", adapter) | |
return s | |
def stream_raw_sse(mkrequest, *pargs, _last_event_id=None, headers=None, **kwargs): | |
""" | |
Streams Server-Sent Events, each event produced as a sequence of | |
(field, value) pairs. | |
Does not handle reconnection, etc. | |
""" | |
if headers is None: | |
headers = {} | |
headers['Accept'] = 'text/event-stream' | |
headers['Cache-Control'] = 'no-cache' | |
# Per https://html.spec.whatwg.org/multipage/server-sent-events.html#sse-processing-model | |
if _last_event_id is not None: | |
headers['Last-Event-ID'] = _last_event_id | |
with mkrequest(*pargs, headers=headers, stream=True, **kwargs) as resp: | |
fields = [] | |
for line in resp.iter_lines(decode_unicode=True): | |
# https://html.spec.whatwg.org/multipage/server-sent-events.html#event-stream-interpretation | |
if not line: | |
yield fields | |
fields = [] | |
elif line.startswith(':'): | |
pass | |
elif ':' in line: | |
field, value = line.split(':', 1) | |
if value.startswith(' '): | |
value = value[1:] | |
fields += [(field, value)] | |
else: # Non-blank, without a colon | |
fields += [(line, '')] | |
def stream_sse(mkrequest, *pargs, **kwargs): | |
""" | |
Streams server-sent events, producing a dictionary of the fields. | |
Handles reconnecting, Last-Event-ID, and retry waits. | |
Deviates by spec by passing through unknown fields instead of ignoring them. | |
If an unknown field is given more than once, the last given wins (like | |
event and id). | |
""" | |
retry = 0 | |
last_id = None | |
while True: | |
try: | |
for rawmsg in stream_raw_sse(mkrequest, *pargs, _last_event_id=last_id, **kwargs): | |
msg = {'event': 'message', 'data': ''} | |
# https://html.spec.whatwg.org/multipage/server-sent-events.html#event-stream-interpretation | |
for k, v in rawmsg: | |
if k == 'retry': | |
try: | |
retry = int(v) | |
except ValueError: | |
pass | |
elif k == 'data': | |
if msg['data']: | |
msg['data'] += '\n' + v | |
else: | |
msg['data'] = v | |
else: | |
if k == 'id': | |
last_id = v | |
# Spec says we should ignore unknown fields. We're passing them on. | |
msg[k] = v | |
if not msg['data']: | |
pass | |
yield msg | |
else: | |
raise StopIteration # Really just exists to get caught in the next line | |
except (StopIteration, requests.RequestException, EOFError): | |
# End of stream, try to reconnect | |
# NOTE: GeneratorExit is thrown if the consumer kills us (or we get GC'd) | |
# TODO: Log something? | |
# Wait, fall through, and start at the top | |
time.sleep(retry / 1000) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment