Created
August 19, 2020 20:16
-
-
Save lrhn/495e5e4b0ae6e6a40642eac88dd59af7 to your computer and use it in GitHub Desktop.
Stream Load Balancer
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import "dart:async" show StreamController, StreamSubscription; | |
import "dart:collection" show Queue; | |
/// Stream splitter with load balancing. | |
/// | |
/// Spreads the events of one stream onto as many streams as | |
/// desired. Events are delivered in round-robin order to | |
/// all currently interested target streams. | |
/// | |
/// Add a new potential target stream using [createTarget]. | |
/// That stream is considered interested when it's being listened | |
/// to and is not paused. | |
/// If there are no interested target streams, the source stream | |
/// is paused. | |
class StreamLoadBalancer<T> { | |
final Stream<T> _source; | |
/// Set when the stream has ended. | |
/// | |
/// Happens either if the source stream actually ends, | |
/// or if the load balancer or been [cancel]led and asked | |
/// to send a done event anyway. | |
bool _isClosed = false; | |
/// Whether the load balancer has been cancelled. | |
bool _isCancelled = false; | |
StreamSubscription<T>? _subscription; | |
Queue<StreamController<T>> _targets = Queue(); | |
/// Creates a new stream load balancer. | |
StreamLoadBalancer(Stream<T> source) : _source = source; | |
/// Creates a new potential target for the load balancer. | |
/// | |
/// The target won't get events until it's listened to, | |
/// or while it's paused. | |
Stream<T> createTarget() { | |
if (_isClosed) return Stream<T>.empty(); | |
var target = StreamController<T>(sync: true); | |
if (!_isCancelled) { | |
target.onListen = () { | |
target.onListen = null; | |
void removeListener() { | |
_removeListener(target); | |
} | |
_addListener(target); | |
target.onPause = removeListener; | |
target.onResume = () { | |
_addListener(target); | |
}; | |
target.onCancel = removeListener; | |
}; | |
} | |
return target.stream; | |
} | |
StreamSubscription<T> _ensureSubscription() => _subscription ??= | |
_source.listen(_onData, onError: _onError, onDone: _onDone); | |
/// Cancels the source stream. | |
/// | |
/// If [sendDone] is set to true, all target streams get a done | |
/// event. Otherwise the target streams gets no further events. | |
/// | |
/// Should only be called once. If called more than once, | |
/// the [sendDone] values must agree, and then nothing further happens. | |
Future<void> cancel({bool sendDone = false}) { | |
if (_isCancelled) { | |
if (sendDone != _isClosed) { | |
throw StateError("Already closed with sendDone: ${!sendDone}"); | |
} | |
} | |
if (_isClosed) return Future.value(null); | |
var result = _ensureSubscription().cancel(); | |
_isCancelled = true; | |
if (sendDone) { | |
_close(); | |
} else { | |
_targets.clear(); | |
} | |
return result; | |
} | |
void _addListener(StreamController<T> listener) { | |
if (_isClosed) { | |
if (!listener.isClosed) listener.close(); | |
return; | |
} | |
if (_isCancelled) return; | |
if (_targets.isEmpty) { | |
_ensureSubscription().resume(); | |
} | |
_targets.add(listener); | |
} | |
void _removeListener(StreamController<T> listener) { | |
if (_isClosed || _isCancelled) return; | |
_targets.remove(listener); | |
if (_targets.isEmpty) _subscription!.pause(); | |
} | |
void _onData(T data) { | |
_nextTarget().add(data); | |
} | |
void _onError(Object error, StackTrace stack) { | |
_nextTarget().addError(error, stack); | |
} | |
void _onDone() { | |
_close(); | |
_subscription = null; | |
} | |
void _close() { | |
_isClosed = true; | |
for (var target in _targets) { | |
if (!target.isClosed) target.close(); | |
} | |
_targets.clear(); | |
} | |
StreamController<T> _nextTarget() { | |
var result = _targets.removeFirst(); | |
_targets.add(result); | |
return result; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment