Skip to content

Instantly share code, notes, and snippets.

@lrhn
Created August 19, 2020 20:16
Show Gist options
  • Save lrhn/495e5e4b0ae6e6a40642eac88dd59af7 to your computer and use it in GitHub Desktop.
Save lrhn/495e5e4b0ae6e6a40642eac88dd59af7 to your computer and use it in GitHub Desktop.
Stream Load Balancer
import "dart:async" show StreamController, StreamSubscription;
import "dart:collection" show Queue;
/// Stream splitter with load balancing.
///
/// Spreads the events of one stream onto as many streams as
/// desired. Events are delivered in round-robin order to
/// all currently interested target streams.
///
/// Add a new potential target stream using [createTarget].
/// That stream is considered interested when it's being listened
/// to and is not paused.
/// If there are no interested target streams, the source stream
/// is paused.
class StreamLoadBalancer<T> {
final Stream<T> _source;
/// Set when the stream has ended.
///
/// Happens either if the source stream actually ends,
/// or if the load balancer or been [cancel]led and asked
/// to send a done event anyway.
bool _isClosed = false;
/// Whether the load balancer has been cancelled.
bool _isCancelled = false;
StreamSubscription<T>? _subscription;
Queue<StreamController<T>> _targets = Queue();
/// Creates a new stream load balancer.
StreamLoadBalancer(Stream<T> source) : _source = source;
/// Creates a new potential target for the load balancer.
///
/// The target won't get events until it's listened to,
/// or while it's paused.
Stream<T> createTarget() {
if (_isClosed) return Stream<T>.empty();
var target = StreamController<T>(sync: true);
if (!_isCancelled) {
target.onListen = () {
target.onListen = null;
void removeListener() {
_removeListener(target);
}
_addListener(target);
target.onPause = removeListener;
target.onResume = () {
_addListener(target);
};
target.onCancel = removeListener;
};
}
return target.stream;
}
StreamSubscription<T> _ensureSubscription() => _subscription ??=
_source.listen(_onData, onError: _onError, onDone: _onDone);
/// Cancels the source stream.
///
/// If [sendDone] is set to true, all target streams get a done
/// event. Otherwise the target streams gets no further events.
///
/// Should only be called once. If called more than once,
/// the [sendDone] values must agree, and then nothing further happens.
Future<void> cancel({bool sendDone = false}) {
if (_isCancelled) {
if (sendDone != _isClosed) {
throw StateError("Already closed with sendDone: ${!sendDone}");
}
}
if (_isClosed) return Future.value(null);
var result = _ensureSubscription().cancel();
_isCancelled = true;
if (sendDone) {
_close();
} else {
_targets.clear();
}
return result;
}
void _addListener(StreamController<T> listener) {
if (_isClosed) {
if (!listener.isClosed) listener.close();
return;
}
if (_isCancelled) return;
if (_targets.isEmpty) {
_ensureSubscription().resume();
}
_targets.add(listener);
}
void _removeListener(StreamController<T> listener) {
if (_isClosed || _isCancelled) return;
_targets.remove(listener);
if (_targets.isEmpty) _subscription!.pause();
}
void _onData(T data) {
_nextTarget().add(data);
}
void _onError(Object error, StackTrace stack) {
_nextTarget().addError(error, stack);
}
void _onDone() {
_close();
_subscription = null;
}
void _close() {
_isClosed = true;
for (var target in _targets) {
if (!target.isClosed) target.close();
}
_targets.clear();
}
StreamController<T> _nextTarget() {
var result = _targets.removeFirst();
_targets.add(result);
return result;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment