Created
April 18, 2025 13:41
-
-
Save PlugFox/46b7a333c1c073b72ce32987ed341365 to your computer and use it in GitHub Desktop.
Dart SSE with Dio
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import 'dart:async'; | |
import 'dart:convert'; | |
import 'dart:math' as math; | |
import 'package:dio/dio.dart'; | |
import 'package:l/l.dart'; | |
/// A type alias for a server sent event (SSE) pair. | |
/// This is a tuple that contains the event name and the data associated with it. | |
typedef SSEPair = ({String event, Map<String, Object?> data}); | |
/// {@template sse} | |
/// A class to handle server sent events (SSE) using Dio. | |
/// This class is used to connect to a server and listen for events sent by the server. | |
/// {@endtemplate} | |
class SSE { | |
/// {@macro sse} | |
SSE({ | |
Dio? client, | |
Set<String> ignore = const <String>{'connected', 'heartbeat'}, | |
void Function(Object error, StackTrace stackTrace)? onError, | |
String? Function()? lastEventId, | |
String? Function()? authenticationToken, | |
Duration connectionTimeout = const Duration(seconds: 5), | |
Duration heartbeatTimeout = const Duration(seconds: 60), | |
}) : _client = client ?? Dio(), | |
_internalClient = client == null, | |
_ignore = ignore, | |
_onError = onError, | |
_lastEventId = lastEventId, | |
_authenticationToken = authenticationToken, | |
_connectionTimeout = connectionTimeout, | |
_heartbeatTimeout = heartbeatTimeout; | |
/// The http client used to make requests. | |
/// This client is used to make requests to the server and receive events. | |
final Dio _client; | |
final bool _internalClient; | |
/// Get the last event id. | |
final String? Function()? _lastEventId; | |
/// Get the authentication (Bearer) token. | |
final String? Function()? _authenticationToken; | |
/// Events to ignore. | |
/// Such as heartbeat events. | |
final Set<String> _ignore; | |
/// A flag to indicate if the SSE has a heartbeat event. | |
bool get hasHeartbeat => _heartbeatTimeout > Duration.zero; | |
/// Stream controller for server sent events. | |
final StreamController<SSEPair> _events = StreamController<SSEPair>.broadcast(); | |
/// The stream for all server sent events. | |
Stream<SSEPair> get events => _events.stream; | |
/// Callback function to handle errors. | |
final void Function(Object error, StackTrace stackTrace)? _onError; | |
/// The timeout for the connection to the server and receiving chanks of data. | |
final Duration _connectionTimeout; | |
/// The timeout for the heartbeat event. | |
final Duration _heartbeatTimeout; | |
/// A flag to indicate if the SSE is closed/disposed permanently. | |
bool _isClosed = false; | |
/// A flag to indicate if the SSE is closed/disposed permanently. | |
bool get isClosed => _isClosed; | |
/// The function to stop polling the server for events. | |
void Function()? _stopPolling; | |
/// A timer to check the health of the connection. | |
Timer? _healthCheckTimer; | |
/// A flag to indicate if the SSE is active. | |
bool get active => _stopPolling != null && !_isClosed; | |
/// A flag to indicate if the SSE has connection. | |
bool _hasConnection = false; | |
/// A flag to indicate if the SSE has connection. | |
bool get hasConnection => _hasConnection; | |
/// A counter to keep track of the total number of connections. | |
static int _numberOfConnections = 0; | |
/// A counter to keep track of the total number of active connections. | |
static int get numberOfConnections => _numberOfConnections; | |
/// Start the SSE connection and listen for events. | |
/// This method will start polling the server for events and send them to the stream controller. | |
/// [url] is the url to connect to. | |
/// [query] is the query parameters to send with the request. | |
void start(String url, {Map<String, String>? query, Map<String, String>? headers}) { | |
stop(); | |
var stopped = false; | |
var connectionAttempts = 0; | |
void onStop(void Function() fn) { | |
final current = _stopPolling; | |
_stopPolling = () { | |
current?.call(); | |
fn.call(); | |
}; | |
} | |
// Stopwatch to check the heartbeat or last event received. | |
// This is used to check if the connection is still alive or not. | |
final heartbeatStopwatch = Stopwatch(); | |
// Start the polling | |
Future<void> startPolling() async { | |
if (_isClosed) return; | |
// Stop the previous polling if it exists | |
_stopPolling?.call(); | |
_stopPolling = null; | |
// Mark the connection as not established but not closed | |
stopped = false; | |
_hasConnection = false; | |
connectionAttempts++; | |
onStop(() { | |
stopped = true; | |
heartbeatStopwatch.stop(); | |
}); | |
final cancelToken = CancelToken(); | |
onStop(() => cancelToken.cancel('SSE stopped')); | |
heartbeatStopwatch | |
..reset() | |
..start(); | |
ResponseBody? body; | |
try { | |
final lastEventId = _lastEventId?.call(); | |
final token = _authenticationToken?.call(); | |
final Response(data: ResponseBody? $body) = await _client.get<ResponseBody>( | |
url, | |
queryParameters: query, | |
options: Options( | |
responseType: ResponseType.stream, | |
headers: <String, String>{ | |
'Accept': 'text/event-stream', | |
'Cache-Control': 'no-cache', | |
'Accept-Charset': 'utf-8', | |
'Connection': 'keep-alive', | |
if (lastEventId != null) 'Last-Event-ID': lastEventId, | |
if (token != null) 'Authorization': 'Bearer $token', | |
//'Accept-Encoding': 'gzip, deflate, br', | |
...?headers, | |
}, | |
sendTimeout: _connectionTimeout, | |
persistentConnection: true, | |
preserveHeaderCase: false, | |
responseDecoder: null, | |
receiveDataWhenStatusError: false, | |
followRedirects: true, | |
maxRedirects: 5, | |
validateStatus: | |
(status) => switch (status ?? 400) { | |
>= 200 && < 300 => true, | |
_ => false, | |
}, | |
), | |
cancelToken: cancelToken, | |
); | |
// Check body is not null and is a stream | |
body = $body; | |
} on Object catch (e, s) { | |
if (stopped || _isClosed) { | |
// If the polling was stopped or the stream is closed, ignore the error | |
return; | |
} else if (e case DioException(type: DioExceptionType.cancel)) { | |
// Ignore the cancel error, this is expected when the polling is stopped | |
return; | |
} else if (e case DioException(type: DioExceptionType.connectionTimeout)) { | |
_onError?.call(e, s); | |
l.w('SSE connection timeout', s); | |
} else if (e case DioException(type: DioExceptionType.badResponse)) { | |
_onError?.call(e, s); | |
l.w('SSE bad response', s); | |
} else { | |
_onError?.call(e, s); | |
l.w('SSE unexpected connection error', s); | |
} | |
} | |
// Check if the polling was stopped or the stream is closed | |
if (_isClosed || stopped || body == null) return; | |
final iterator = StreamIterator<String>( | |
body.stream | |
.where((event) => event.isNotEmpty) | |
.cast<List<int>>() | |
/* .transform(utf8.decoder) */ | |
.transform<String>(const Utf8Decoder(allowMalformed: true)) | |
.transform<String>(const LineSplitter()), | |
); | |
onStop(() => iterator.cancel().ignore()); | |
_numberOfConnections += 1; | |
_hasConnection = true; | |
connectionAttempts = 0; | |
String? currentEvent; | |
// Listen to the stream and parse the events | |
while (await iterator.moveNext()) { | |
try { | |
final line = iterator.current; | |
if (line.isEmpty) continue; | |
if (stopped || _isClosed) break; | |
final pos = line.indexOf(': '); | |
if (pos == -1) { | |
_onError?.call(Exception('Invalid SSE event: $line'), StackTrace.current); | |
l.w('Invalid SSE event: "$line"'); | |
continue; | |
} | |
final eventType = line.substring(0, pos).trim().toLowerCase(); | |
final eventData = line.substring(pos + 2).trim(); | |
heartbeatStopwatch.reset(); | |
switch (eventType) { | |
case 'event' when eventData.isNotEmpty: | |
// Set the current event to the event data | |
currentEvent = eventData; | |
case 'data' when _ignore.contains(currentEvent): | |
// Ignore heartbeat and other events | |
currentEvent = null; | |
case 'data' | |
when currentEvent != null && | |
eventData.isNotEmpty && | |
eventData.startsWith('{') && | |
eventData.endsWith('}'): | |
// Parse the event data as JSON and add it to the stream | |
_events.add((event: currentEvent, data: jsonDecode(eventData))); | |
currentEvent = null; | |
default: | |
_onError?.call(Exception('Unknown SSE event: $eventType'), StackTrace.current); | |
l.w('Unknown SSE event: "$line"'); | |
} | |
} on Object catch (e, s) { | |
currentEvent = null; | |
_onError?.call(e, s); | |
l.w('Error while parsing SSE events: $e', s); | |
} | |
} | |
_hasConnection = false; | |
_numberOfConnections -= 1; | |
} | |
startPolling().ignore(); | |
const maxBackoff = 60; // 60 seconds | |
void reconnectWithBackoff(int attempt) { | |
final delay = math.min(math.pow(2, attempt).toInt(), maxBackoff); | |
heartbeatStopwatch | |
..reset() | |
..stop(); | |
Timer(Duration(seconds: delay), () { | |
if (stopped || _isClosed) return; | |
startPolling().ignore(); | |
}); | |
} | |
/// Start a timer to healthcheck the connection every 10 seconds. | |
_healthCheckTimer = Timer.periodic(const Duration(seconds: 5), (timer) { | |
if (stopped) { | |
timer.cancel(); | |
return; | |
} else if (_isClosed) { | |
_stopPolling?.call(); | |
return; | |
} else if (!_hasConnection) { | |
// If the connection is not established, start polling again | |
_onError?.call(Exception('SSE connection not established'), StackTrace.current); | |
l.w('SSE connection not established, restarting polling...'); | |
reconnectWithBackoff(connectionAttempts); | |
} else if (_heartbeatTimeout > Duration.zero && heartbeatStopwatch.elapsed > _heartbeatTimeout) { | |
// If the heartbeat event is not received within 60 seconds, restart the polling | |
_onError?.call(Exception('SSE heartbeat not received'), StackTrace.current); | |
l.w('SSE heartbeat not received in 60 seconds, restarting polling...'); | |
reconnectWithBackoff(connectionAttempts); | |
} | |
}); | |
} | |
/// Stop the SSE polling. | |
void stop() { | |
try { | |
_healthCheckTimer?.cancel(); | |
_stopPolling?.call(); | |
_stopPolling = null; | |
_hasConnection = false; | |
} on Object catch (e, s) { | |
_onError?.call(e, s); | |
l.w('Error while stopping SSE polling: $e', s); | |
} | |
} | |
/// Dispose the SSE connection and close the stream. | |
/// This will stop the polling and close the stream controller. | |
void close() { | |
_isClosed = true; | |
stop(); | |
if (_internalClient) _client.close(force: true); | |
_events.close().ignore(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment