There's something called server-sent events which is an alternative to using web sockets if you want to break free from the strict request-response scheme.
I want to explore this technology using Dart.
First, let's create the simplest webserver that could possibly work:
Future<void> main(List<String> arguments) async {
final server = await HttpServer.bind('0.0.0.0', 7007);
await for (final request in server) {
request.response
..writeln('Time is ${DateTime.now()}!')
..close().catchError(print);
}
}
This server performs poorly if compared with other solutions, because it does everything in just one thread, but it should be sufficient for this experiment.
The server will repond to any request to http://localhost:7007/ with the current time stamp using text/plain;charset=utf8
as default content type and text encoding and 200
as default status code.
A browser can process server-sent events by creating an event source and listen to messages like so:
<script>
const source = new EventSource('http://localhost:7007/events');
source.onmessage = ({data}) => console.log(data);
</script>
Let's refactor and change our server to return a static index page if asked for /
which will contain the above JavaScript code. Otherwise we will return a 404 error.
Future<void> main(List<String> arguments) async {
final server = await HttpServer.bind('0.0.0.0', 7007);
await for (final request in server) {
_handle(request).catchError(print);
}
}
Future<void> _handle(HttpRequest request) async {
if (request.uri.path == '/') {
return _html('index.html', request.response);
}
await (request.response..statusCode = 404).close();
}
Future<void> _html(String path, HttpResponse response) async {
await File(path).openRead().pipe(response..headers.contentType = ContentType.html);
}
We need a second endpoint. If asked for /events
, we need to setup a publisher that sends events to the client – which are then printed to the brower's console by the JavaScript contained in index.html
.
Future<void> _handle(HttpRequest request) async {
...
if (request.uri.path == '/events') {
return _publish(request.response);
}
...
}
I want to emit 6 timestamps with a delay of 1 second each.
The _publish
method needs a bit more work. We need to initialize the response with a certain content type, tell the browser to keep the connection alive, disable caching, and activate chunked
encoding. Unfortunately, I don't see a way to chunk the response directly from an HttpResponse
, so I have to do it myself by detaching the socket and construct the encoding manually.
That's easy enough to do because each chunk consists of the chunk length as a hexadecimal (!) number in ascii representation, followed by CR and LF, followed by the chunk, followed by CR and LF again. The last chunk must be of length 0.
Future<void> _publish(HttpResponse response) async {
response.headers
..contentType = ContentType('text', 'event-stream')
..set('transfer-encoding', 'chunked')
..set('connection', 'keep-alive')
..set('cache-control', 'no-cache');
final socket = await response.detachSocket();
for (var i = 0; i < 6; i++) {
await Future.delayed(Duration(seconds: 1));
final data = utf8.encode('event: message\ndata: ${DateTime.now()}\n\n');
socket.add(utf8.encode(data.length.toRadixString(16)));
socket.add([13, 10]);
socket.add(data);
socket.add([13, 10]);
await socket.flush();
}
socket.add([48, 13, 10, 13, 10]);
await socket.close();
}
Closing the socket will close the response and will make the EventSource
object to automatically reconnect to the server after a short pause. I'm not sure why there is a short pause.
This concludes the server implementation.
Let's build a client using the http library. Here's how to request the events as if it would be a normal resource:
Future<void> main() async {
final response = await get(Uri.parse('http://localhost:7007/events'));
print(response.body);
}
As expected, this will return the six messages, but only after the server closes the connection. Obviously, this isn't want we want. We can't use this high-level API and need to go one level deeper:
Future<void> main() async {
final client = Client();
final response = await client.send(Request('GET', Uri.parse('http://localhost:7007/events')));
if (response.statusCode != 200) throw Exception();
final isEventStream = response.headers.entries.any((e) =>
e.key.toLowerCase() == 'content-type' && //
e.value.toLowerCase().split(';')[0].trim() == 'text/event-stream');
if (!isEventStream) throw Exception();
await for (final line in response //
.stream
.transform(utf8.decoder)
.transform(LineSplitter())) {
print(line);
}
client.close();
}
This will send the request and listen to the response as a stream, convert the streamed chunks into strings (assuming UTF-8) and split those strings into lines and then print those lines on the console. I don't want to assume that each stream event is a complete chunk, so I use stream transformers.
BTW, the http
package really needs a better way to access header values!
The next task is to combine those lines into real events, like this:
class Event {
const Event(this.id, this.event, this.data);
final String? id;
final String? event;
final String? data;
}
Here is a reusable function to create a stream of such events:
Stream<Event> eventSource(Uri url) async* {
final client = Client();
final response = await client.send(Request('GET', url));
if (response.statusCode != 200) throw Exception();
final isEventStream = response.headers.entries.any((e) =>
e.key.toLowerCase() == 'content-type' && //
e.value.toLowerCase().split(';')[0].trim() == 'text/event-stream');
if (!isEventStream) throw Exception();
String? id, event, data;
await for (final line in response //
.stream
.transform(utf8.decoder)
.transform(LineSplitter())) {
if (line.isEmpty) {
yield Event(id, event, data);
id = event = data = null;
} else if (line.startsWith('id:')) {
id = line.substring(3).trim();
} else if (line.startsWith('event:')) {
event = line.substring(6).trim();
} else if (line.startsWith('data:')) {
final chunk = line.substring(5).trimLeft();
data = data != null ? '$data\n$chunk' : chunk;
}
}
client.close();
}
The protocol is very easy to understand, again. Each line must be prefixed with either id:
, event:
, or data:
. I ignore unknown prefixes. The optional id
is some kind of identifier. The optional event
is the type of the event wich must be message
if a JavaScript EventSource.onmessage
should understand it. Otherwise you have to use EventSource.addListener
. Two or more data
lines are automatically combined. Once an empty line is received, the event is ready. Note that both by server and my client ignore the Last-Event-ID
header.
And here is a new client to get six events from the server:
Future<void> main() async {
await for (final event in eventSource(Uri.parse('http://localhost:7007/events'))) {
print(event.data);
}
}
There's one thing left for you, dear reader: try to reconnect to the server if the connection is closed. Also, it should be possible to close the Stream<Event>
before draining all events and this should close the HTTP connection.
But otherwise, this could be used by a non-web Flutter app to consume a stream of server-sent events. There's just one problem: They work best with HTTP/2 and Dart doesn't support this out of the box – yet.
In the future, I might build upon this foundation to create a pub/sub chat-like client and server infrastructure.
Hi, i hope you see this comment.
if the data returns like the below
data: hey
data: hey, how
data: hey, how are you
data: hey, how are you doing.
how do i display a complete sentence that still gives the effect like the words are added in chunks?