Skip to content

Instantly share code, notes, and snippets.

@PlugFox
Created April 18, 2025 13:41
Show Gist options
  • Save PlugFox/46b7a333c1c073b72ce32987ed341365 to your computer and use it in GitHub Desktop.
Save PlugFox/46b7a333c1c073b72ce32987ed341365 to your computer and use it in GitHub Desktop.
Dart SSE with Dio
import 'dart:async';
import 'dart:convert';
import 'dart:math' as math;
import 'package:dio/dio.dart';
import 'package:l/l.dart';
/// A type alias for a server sent event (SSE) pair.
/// This is a tuple that contains the event name and the data associated with it.
typedef SSEPair = ({String event, Map<String, Object?> data});
/// {@template sse}
/// A class to handle server sent events (SSE) using Dio.
/// This class is used to connect to a server and listen for events sent by the server.
/// {@endtemplate}
class SSE {
/// {@macro sse}
SSE({
Dio? client,
Set<String> ignore = const <String>{'connected', 'heartbeat'},
void Function(Object error, StackTrace stackTrace)? onError,
String? Function()? lastEventId,
String? Function()? authenticationToken,
Duration connectionTimeout = const Duration(seconds: 5),
Duration heartbeatTimeout = const Duration(seconds: 60),
}) : _client = client ?? Dio(),
_internalClient = client == null,
_ignore = ignore,
_onError = onError,
_lastEventId = lastEventId,
_authenticationToken = authenticationToken,
_connectionTimeout = connectionTimeout,
_heartbeatTimeout = heartbeatTimeout;
/// The http client used to make requests.
/// This client is used to make requests to the server and receive events.
final Dio _client;
final bool _internalClient;
/// Get the last event id.
final String? Function()? _lastEventId;
/// Get the authentication (Bearer) token.
final String? Function()? _authenticationToken;
/// Events to ignore.
/// Such as heartbeat events.
final Set<String> _ignore;
/// A flag to indicate if the SSE has a heartbeat event.
bool get hasHeartbeat => _heartbeatTimeout > Duration.zero;
/// Stream controller for server sent events.
final StreamController<SSEPair> _events = StreamController<SSEPair>.broadcast();
/// The stream for all server sent events.
Stream<SSEPair> get events => _events.stream;
/// Callback function to handle errors.
final void Function(Object error, StackTrace stackTrace)? _onError;
/// The timeout for the connection to the server and receiving chanks of data.
final Duration _connectionTimeout;
/// The timeout for the heartbeat event.
final Duration _heartbeatTimeout;
/// A flag to indicate if the SSE is closed/disposed permanently.
bool _isClosed = false;
/// A flag to indicate if the SSE is closed/disposed permanently.
bool get isClosed => _isClosed;
/// The function to stop polling the server for events.
void Function()? _stopPolling;
/// A timer to check the health of the connection.
Timer? _healthCheckTimer;
/// A flag to indicate if the SSE is active.
bool get active => _stopPolling != null && !_isClosed;
/// A flag to indicate if the SSE has connection.
bool _hasConnection = false;
/// A flag to indicate if the SSE has connection.
bool get hasConnection => _hasConnection;
/// A counter to keep track of the total number of connections.
static int _numberOfConnections = 0;
/// A counter to keep track of the total number of active connections.
static int get numberOfConnections => _numberOfConnections;
/// Start the SSE connection and listen for events.
/// This method will start polling the server for events and send them to the stream controller.
/// [url] is the url to connect to.
/// [query] is the query parameters to send with the request.
void start(String url, {Map<String, String>? query, Map<String, String>? headers}) {
stop();
var stopped = false;
var connectionAttempts = 0;
void onStop(void Function() fn) {
final current = _stopPolling;
_stopPolling = () {
current?.call();
fn.call();
};
}
// Stopwatch to check the heartbeat or last event received.
// This is used to check if the connection is still alive or not.
final heartbeatStopwatch = Stopwatch();
// Start the polling
Future<void> startPolling() async {
if (_isClosed) return;
// Stop the previous polling if it exists
_stopPolling?.call();
_stopPolling = null;
// Mark the connection as not established but not closed
stopped = false;
_hasConnection = false;
connectionAttempts++;
onStop(() {
stopped = true;
heartbeatStopwatch.stop();
});
final cancelToken = CancelToken();
onStop(() => cancelToken.cancel('SSE stopped'));
heartbeatStopwatch
..reset()
..start();
ResponseBody? body;
try {
final lastEventId = _lastEventId?.call();
final token = _authenticationToken?.call();
final Response(data: ResponseBody? $body) = await _client.get<ResponseBody>(
url,
queryParameters: query,
options: Options(
responseType: ResponseType.stream,
headers: <String, String>{
'Accept': 'text/event-stream',
'Cache-Control': 'no-cache',
'Accept-Charset': 'utf-8',
'Connection': 'keep-alive',
if (lastEventId != null) 'Last-Event-ID': lastEventId,
if (token != null) 'Authorization': 'Bearer $token',
//'Accept-Encoding': 'gzip, deflate, br',
...?headers,
},
sendTimeout: _connectionTimeout,
persistentConnection: true,
preserveHeaderCase: false,
responseDecoder: null,
receiveDataWhenStatusError: false,
followRedirects: true,
maxRedirects: 5,
validateStatus:
(status) => switch (status ?? 400) {
>= 200 && < 300 => true,
_ => false,
},
),
cancelToken: cancelToken,
);
// Check body is not null and is a stream
body = $body;
} on Object catch (e, s) {
if (stopped || _isClosed) {
// If the polling was stopped or the stream is closed, ignore the error
return;
} else if (e case DioException(type: DioExceptionType.cancel)) {
// Ignore the cancel error, this is expected when the polling is stopped
return;
} else if (e case DioException(type: DioExceptionType.connectionTimeout)) {
_onError?.call(e, s);
l.w('SSE connection timeout', s);
} else if (e case DioException(type: DioExceptionType.badResponse)) {
_onError?.call(e, s);
l.w('SSE bad response', s);
} else {
_onError?.call(e, s);
l.w('SSE unexpected connection error', s);
}
}
// Check if the polling was stopped or the stream is closed
if (_isClosed || stopped || body == null) return;
final iterator = StreamIterator<String>(
body.stream
.where((event) => event.isNotEmpty)
.cast<List<int>>()
/* .transform(utf8.decoder) */
.transform<String>(const Utf8Decoder(allowMalformed: true))
.transform<String>(const LineSplitter()),
);
onStop(() => iterator.cancel().ignore());
_numberOfConnections += 1;
_hasConnection = true;
connectionAttempts = 0;
String? currentEvent;
// Listen to the stream and parse the events
while (await iterator.moveNext()) {
try {
final line = iterator.current;
if (line.isEmpty) continue;
if (stopped || _isClosed) break;
final pos = line.indexOf(': ');
if (pos == -1) {
_onError?.call(Exception('Invalid SSE event: $line'), StackTrace.current);
l.w('Invalid SSE event: "$line"');
continue;
}
final eventType = line.substring(0, pos).trim().toLowerCase();
final eventData = line.substring(pos + 2).trim();
heartbeatStopwatch.reset();
switch (eventType) {
case 'event' when eventData.isNotEmpty:
// Set the current event to the event data
currentEvent = eventData;
case 'data' when _ignore.contains(currentEvent):
// Ignore heartbeat and other events
currentEvent = null;
case 'data'
when currentEvent != null &&
eventData.isNotEmpty &&
eventData.startsWith('{') &&
eventData.endsWith('}'):
// Parse the event data as JSON and add it to the stream
_events.add((event: currentEvent, data: jsonDecode(eventData)));
currentEvent = null;
default:
_onError?.call(Exception('Unknown SSE event: $eventType'), StackTrace.current);
l.w('Unknown SSE event: "$line"');
}
} on Object catch (e, s) {
currentEvent = null;
_onError?.call(e, s);
l.w('Error while parsing SSE events: $e', s);
}
}
_hasConnection = false;
_numberOfConnections -= 1;
}
startPolling().ignore();
const maxBackoff = 60; // 60 seconds
void reconnectWithBackoff(int attempt) {
final delay = math.min(math.pow(2, attempt).toInt(), maxBackoff);
heartbeatStopwatch
..reset()
..stop();
Timer(Duration(seconds: delay), () {
if (stopped || _isClosed) return;
startPolling().ignore();
});
}
/// Start a timer to healthcheck the connection every 10 seconds.
_healthCheckTimer = Timer.periodic(const Duration(seconds: 5), (timer) {
if (stopped) {
timer.cancel();
return;
} else if (_isClosed) {
_stopPolling?.call();
return;
} else if (!_hasConnection) {
// If the connection is not established, start polling again
_onError?.call(Exception('SSE connection not established'), StackTrace.current);
l.w('SSE connection not established, restarting polling...');
reconnectWithBackoff(connectionAttempts);
} else if (_heartbeatTimeout > Duration.zero && heartbeatStopwatch.elapsed > _heartbeatTimeout) {
// If the heartbeat event is not received within 60 seconds, restart the polling
_onError?.call(Exception('SSE heartbeat not received'), StackTrace.current);
l.w('SSE heartbeat not received in 60 seconds, restarting polling...');
reconnectWithBackoff(connectionAttempts);
}
});
}
/// Stop the SSE polling.
void stop() {
try {
_healthCheckTimer?.cancel();
_stopPolling?.call();
_stopPolling = null;
_hasConnection = false;
} on Object catch (e, s) {
_onError?.call(e, s);
l.w('Error while stopping SSE polling: $e', s);
}
}
/// Dispose the SSE connection and close the stream.
/// This will stop the polling and close the stream controller.
void close() {
_isClosed = true;
stop();
if (_internalClient) _client.close(force: true);
_events.close().ignore();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment