Last active
April 18, 2025 19:38
-
-
Save PlugFox/f9702d6274792212d2ec40036b9b1c8e to your computer and use it in GitHub Desktop.
Dart SSE stream with HTTP Client
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import 'dart:async'; | |
import 'dart:convert'; | |
import 'package:http/http.dart' as http; | |
import 'package:l/l.dart'; | |
/// Creates a stream of server-sent events (SSE) from the given URL. | |
/// The stream will emit events as a tuple of event name and data. | |
Stream<({String event, Map<String, Object?> data})> sse( | |
String url, { | |
Map<String, String>? headers, | |
Map<String, String>? queryParameters, | |
Set<String> ignore = const <String>{'heartbeat', 'ping', 'connected'}, | |
void Function(Object error, StackTrace stackTrace)? onError, | |
}) { | |
var closed = false; | |
final events = StreamController<({String event, Map<String, Object?> data})>(); | |
events.onCancel = () { | |
closed = true; | |
events.close(); | |
}; | |
Future<void>(() async { | |
final client = http.Client(); | |
try { | |
var uri = Uri.parse(url); | |
if (queryParameters != null) | |
uri = uri.replace(queryParameters: <String, String>{...uri.queryParameters, ...queryParameters}); | |
final response = await client.send( | |
http.Request('GET', uri) | |
..headers.addAll(<String, String>{ | |
'Accept': 'text/event-stream', | |
'Cache-Control': 'no-cache', | |
'Accept-Charset': 'utf-8', | |
'Connection': 'keep-alive', | |
...?headers, | |
}), | |
); | |
if (response.statusCode > 299 || response.statusCode < 200) { | |
final err = Exception('SSE connection error: ${response.statusCode} ${response.reasonPhrase}'.trimRight()); | |
onError?.call(err, StackTrace.current); | |
return; | |
} | |
l.v5('SSE connection established to $url'); | |
// Listen to the stream and parse the events | |
// Use StreamIterator to handle the stream events | |
final iterator = StreamIterator<String>( | |
response.stream | |
.where((event) => event.isNotEmpty) | |
.cast<List<int>>() | |
/* .transform(utf8.decoder) */ | |
.transform<String>(const Utf8Decoder(allowMalformed: true)) | |
.transform<String>(const LineSplitter()), | |
); | |
// TODO(plugfox): Add health check for the connection | |
// throw error if no events received in 60 seconds | |
// Mike Matiunin <[email protected]>, 18 April 2025 | |
String? currentEvent; | |
// Listen to the stream and parse the events | |
while (await iterator.moveNext()) { | |
try { | |
final line = iterator.current; | |
if (currentEvent == null && line.isEmpty) { | |
continue; | |
} else if (closed) { | |
break; | |
} | |
final pos = line.indexOf(': '); | |
if (pos == -1) { | |
onError?.call(Exception('Invalid SSE event: $line'), StackTrace.current); | |
l.w('Invalid SSE event: "$line"'); | |
continue; | |
} | |
final eventType = line.substring(0, pos).trim().toLowerCase(); | |
final eventData = line.substring(pos + 2).trim(); | |
switch (eventType) { | |
case 'event' when eventData.isNotEmpty: | |
// Set the current event to the event data | |
currentEvent = eventData; | |
case 'data' when ignore.contains(currentEvent): | |
// Ignore heartbeat and other events | |
l.v6('Ignoring SSE event: $currentEvent'); | |
currentEvent = null; | |
case 'data' | |
when currentEvent != null && | |
eventData.isNotEmpty && | |
eventData.startsWith('{') && | |
eventData.endsWith('}'): | |
// Parse the event data as JSON and add it to the stream | |
events.add((event: currentEvent, data: jsonDecode(eventData))); | |
l.v6('SSE event: $currentEvent'); | |
currentEvent = null; | |
default: | |
onError?.call(Exception('Unknown SSE event: $eventType'), StackTrace.current); | |
l.w('Unknown SSE event: "$line"'); | |
} | |
} on Object catch (e, s) { | |
currentEvent = null; | |
onError?.call(e, s); | |
l.w('Error while parsing SSE events: $e', s); | |
break; | |
} | |
} | |
} on Object catch (e, s) { | |
onError?.call(e, s); | |
l.w('SSE connection error: $e', s); | |
} finally { | |
closed = true; | |
client.close(); | |
events.close(); | |
l.v5('SSE connection closed to $url'); | |
} | |
}).ignore(); | |
return events.stream; | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Stream<ChatEntity> getChatUpdates(ChatId chatId) { | |
const decoder = ChatEntityJsonDecoder(); | |
StreamSubscription<({String event, Map<String, Object?> data})>? subscription; | |
late final StreamController<ChatEntity> controller; | |
void onListen() async { | |
_connections++; | |
l.d('Starting chat updates stream for chatId: $chatId'); | |
subscription?.cancel(); | |
if (controller.isClosed) return; | |
final token = await switch (FirebaseAuth.instance.currentUser) { | |
null => null, | |
User user => user.getIdToken(), | |
}; | |
if (controller.isClosed) return; | |
final stream = sse( | |
'${_apiClient.options.baseUrl}/v1/chats/$chatId/updates', | |
headers: <String, String>{'Authorization': 'Bearer $token'}, | |
); | |
late final StreamSubscription<({String event, Map<String, Object?> data})> sub; | |
sub = | |
subscription = stream.listen( | |
(e) { | |
if (controller.isClosed) { | |
sub.cancel(); | |
return; | |
} | |
switch (e.event) { | |
case 'message' when e.data['chat_id'] == chatId: | |
controller.add(decoder.convert(e.data)); | |
default: | |
l.w('Unknown event: ${e.event}'); | |
} | |
}, | |
cancelOnError: true, | |
onDone: () { | |
if (controller.isClosed) return; | |
Timer(Duration(seconds: 1), onListen); | |
}, | |
); | |
Timer.periodic(const Duration(seconds: 1), (timer) { | |
if (controller.isClosed) { | |
timer.cancel(); | |
sub.cancel(); | |
} | |
}); | |
} | |
void onCancel() { | |
_connections--; | |
l.d('Close chat updates stream for chatId: $chatId'); | |
subscription?.cancel(); | |
controller.close(); | |
} | |
controller = StreamController<ChatEntity>(onListen: onListen, onCancel: onCancel, sync: false); | |
return controller.stream; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment