Skip to content

Instantly share code, notes, and snippets.

@lrhn
Created August 20, 2020 17:30
Show Gist options
  • Save lrhn/d76df7c68b6e3eb4cb1a072b3169414b to your computer and use it in GitHub Desktop.
Save lrhn/d76df7c68b6e3eb4cb1a072b3169414b to your computer and use it in GitHub Desktop.
Dart load-balancing stream extension.
import "dart:async";
import "dart:collection" show Queue;
extension StreamExtension<T> on Stream<T> {
/// Create a load balanced stream.
///
/// A load balanced stream can be listened to multiple times,
/// and events are spread across actively listeninng
/// (non-cancelled, non-paused) subscriptions in a round-robin
/// order.
///
/// If [cancelOnError] is true, the load balanced stream
/// ends on the first error, and that error is distributed
/// to all listeners. Otherwise errors are load-balanced
/// the same ways as data events, being passed only to the
/// next ready listener.
Stream<T> loadBalanced({bool cancelOnError = false}) {
StreamSubscription<T>? subscription;
AsyncError? error;
bool isDone = false;
var queue = Queue<MultiStreamController<T>>();
MultiStreamController<T> nextTarget() {
assert(queue.isNotEmpty);
var result = queue.removeFirst();
queue.add(result);
return result;
}
StreamSubscription<T> ensureSubscription() {
return subscription ??= (this.listen((value) {
nextTarget().addSync(value);
}, onError: (Object e, StackTrace s) {
if (cancelOnError) {
isDone = true;
error = AsyncError(e, s);
while (queue.isNotEmpty) {
var c = queue.removeFirst();
c.addErrorSync(e, s);
c.closeSync();
}
} else {
nextTarget().addErrorSync(e, s);
}
}, onDone: () {
isDone = true;
while (queue.isNotEmpty) {
var c = queue.removeFirst();
if (!c.isClosed) c.closeSync();
}
}, cancelOnError: cancelOnError));
}
void add(MultiStreamController<T> c) {
if (isDone) {
if (queue.contains(c)) return;
if (!c.isClosed) {
var err = error;
if (err != null) {
assert(cancelOnError);
c.addError(err.error, err.stackTrace);
}
c.close();
}
return;
}
if (queue.isEmpty) {
ensureSubscription().resume();
}
queue.add(c);
}
void remove(MultiStreamController<T> c) {
if (isDone) return;
queue.remove(c);
if (queue.isEmpty) {
subscription!.pause();
}
}
return Stream<T>.multi((c) {
if (isDone) {
var err = error;
if (err != null) {
assert(cancelOnError);
c.addError(err.error, err.stackTrace);
}
c.close();
return;
}
void addThis() {
add(c);
}
void removeThis() {
remove(c);
}
c
..onPause = removeThis
..onResume = addThis
..onCancel = removeThis;
add(c);
});
}
}
import "dart:async";
import "stream_load_balancer.dart";
main() async {
Stream<int> stream() async* {
for (var i = 0; i < 270; i++) {
yield i;
await Future.delayed(const Duration(milliseconds: 20));
}
throw "Error";
}
var lb = stream().loadBalanced(cancelOnError: true);
for (var i = 0; i < 10; i++) {
loop(lb, i);
await Future.delayed(const Duration(milliseconds: 100));
}
}
loop(stream, id) async {
try {
await for (var e in stream) {
print("$id: $e");
if (e > id * 50 + 20) break;
}
} catch (e) {
print("$id: Error: $e");
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment