Skip to content

Instantly share code, notes, and snippets.

@PlugFox
Last active April 18, 2025 19:38
Show Gist options
  • Save PlugFox/f9702d6274792212d2ec40036b9b1c8e to your computer and use it in GitHub Desktop.
Save PlugFox/f9702d6274792212d2ec40036b9b1c8e to your computer and use it in GitHub Desktop.
Dart SSE stream with HTTP Client
import 'dart:async';
import 'dart:convert';
import 'package:http/http.dart' as http;
import 'package:l/l.dart';
/// Creates a stream of server-sent events (SSE) from the given URL.
/// The stream will emit events as a tuple of event name and data.
Stream<({String event, Map<String, Object?> data})> sse(
String url, {
Map<String, String>? headers,
Map<String, String>? queryParameters,
Set<String> ignore = const <String>{'heartbeat', 'ping', 'connected'},
void Function(Object error, StackTrace stackTrace)? onError,
}) {
var closed = false;
final events = StreamController<({String event, Map<String, Object?> data})>();
events.onCancel = () {
closed = true;
events.close();
};
Future<void>(() async {
final client = http.Client();
try {
var uri = Uri.parse(url);
if (queryParameters != null)
uri = uri.replace(queryParameters: <String, String>{...uri.queryParameters, ...queryParameters});
final response = await client.send(
http.Request('GET', uri)
..headers.addAll(<String, String>{
'Accept': 'text/event-stream',
'Cache-Control': 'no-cache',
'Accept-Charset': 'utf-8',
'Connection': 'keep-alive',
...?headers,
}),
);
if (response.statusCode > 299 || response.statusCode < 200) {
final err = Exception('SSE connection error: ${response.statusCode} ${response.reasonPhrase}'.trimRight());
onError?.call(err, StackTrace.current);
return;
}
l.v5('SSE connection established to $url');
// Listen to the stream and parse the events
// Use StreamIterator to handle the stream events
final iterator = StreamIterator<String>(
response.stream
.where((event) => event.isNotEmpty)
.cast<List<int>>()
/* .transform(utf8.decoder) */
.transform<String>(const Utf8Decoder(allowMalformed: true))
.transform<String>(const LineSplitter()),
);
// TODO(plugfox): Add health check for the connection
// throw error if no events received in 60 seconds
// Mike Matiunin <[email protected]>, 18 April 2025
String? currentEvent;
// Listen to the stream and parse the events
while (await iterator.moveNext()) {
try {
final line = iterator.current;
if (currentEvent == null && line.isEmpty) {
continue;
} else if (closed) {
break;
}
final pos = line.indexOf(': ');
if (pos == -1) {
onError?.call(Exception('Invalid SSE event: $line'), StackTrace.current);
l.w('Invalid SSE event: "$line"');
continue;
}
final eventType = line.substring(0, pos).trim().toLowerCase();
final eventData = line.substring(pos + 2).trim();
switch (eventType) {
case 'event' when eventData.isNotEmpty:
// Set the current event to the event data
currentEvent = eventData;
case 'data' when ignore.contains(currentEvent):
// Ignore heartbeat and other events
l.v6('Ignoring SSE event: $currentEvent');
currentEvent = null;
case 'data'
when currentEvent != null &&
eventData.isNotEmpty &&
eventData.startsWith('{') &&
eventData.endsWith('}'):
// Parse the event data as JSON and add it to the stream
events.add((event: currentEvent, data: jsonDecode(eventData)));
l.v6('SSE event: $currentEvent');
currentEvent = null;
default:
onError?.call(Exception('Unknown SSE event: $eventType'), StackTrace.current);
l.w('Unknown SSE event: "$line"');
}
} on Object catch (e, s) {
currentEvent = null;
onError?.call(e, s);
l.w('Error while parsing SSE events: $e', s);
break;
}
}
} on Object catch (e, s) {
onError?.call(e, s);
l.w('SSE connection error: $e', s);
} finally {
closed = true;
client.close();
events.close();
l.v5('SSE connection closed to $url');
}
}).ignore();
return events.stream;
}
Stream<ChatEntity> getChatUpdates(ChatId chatId) {
const decoder = ChatEntityJsonDecoder();
StreamSubscription<({String event, Map<String, Object?> data})>? subscription;
late final StreamController<ChatEntity> controller;
void onListen() async {
_connections++;
l.d('Starting chat updates stream for chatId: $chatId');
subscription?.cancel();
if (controller.isClosed) return;
final token = await switch (FirebaseAuth.instance.currentUser) {
null => null,
User user => user.getIdToken(),
};
if (controller.isClosed) return;
final stream = sse(
'${_apiClient.options.baseUrl}/v1/chats/$chatId/updates',
headers: <String, String>{'Authorization': 'Bearer $token'},
);
late final StreamSubscription<({String event, Map<String, Object?> data})> sub;
sub =
subscription = stream.listen(
(e) {
if (controller.isClosed) {
sub.cancel();
return;
}
switch (e.event) {
case 'message' when e.data['chat_id'] == chatId:
controller.add(decoder.convert(e.data));
default:
l.w('Unknown event: ${e.event}');
}
},
cancelOnError: true,
onDone: () {
if (controller.isClosed) return;
Timer(Duration(seconds: 1), onListen);
},
);
Timer.periodic(const Duration(seconds: 1), (timer) {
if (controller.isClosed) {
timer.cancel();
sub.cancel();
}
});
}
void onCancel() {
_connections--;
l.d('Close chat updates stream for chatId: $chatId');
subscription?.cancel();
controller.close();
}
controller = StreamController<ChatEntity>(onListen: onListen, onCancel: onCancel, sync: false);
return controller.stream;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment