Created
August 20, 2020 17:30
-
-
Save lrhn/d76df7c68b6e3eb4cb1a072b3169414b to your computer and use it in GitHub Desktop.
Dart load-balancing stream extension.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import "dart:async"; | |
import "dart:collection" show Queue; | |
extension StreamExtension<T> on Stream<T> { | |
/// Create a load balanced stream. | |
/// | |
/// A load balanced stream can be listened to multiple times, | |
/// and events are spread across actively listeninng | |
/// (non-cancelled, non-paused) subscriptions in a round-robin | |
/// order. | |
/// | |
/// If [cancelOnError] is true, the load balanced stream | |
/// ends on the first error, and that error is distributed | |
/// to all listeners. Otherwise errors are load-balanced | |
/// the same ways as data events, being passed only to the | |
/// next ready listener. | |
Stream<T> loadBalanced({bool cancelOnError = false}) { | |
StreamSubscription<T>? subscription; | |
AsyncError? error; | |
bool isDone = false; | |
var queue = Queue<MultiStreamController<T>>(); | |
MultiStreamController<T> nextTarget() { | |
assert(queue.isNotEmpty); | |
var result = queue.removeFirst(); | |
queue.add(result); | |
return result; | |
} | |
StreamSubscription<T> ensureSubscription() { | |
return subscription ??= (this.listen((value) { | |
nextTarget().addSync(value); | |
}, onError: (Object e, StackTrace s) { | |
if (cancelOnError) { | |
isDone = true; | |
error = AsyncError(e, s); | |
while (queue.isNotEmpty) { | |
var c = queue.removeFirst(); | |
c.addErrorSync(e, s); | |
c.closeSync(); | |
} | |
} else { | |
nextTarget().addErrorSync(e, s); | |
} | |
}, onDone: () { | |
isDone = true; | |
while (queue.isNotEmpty) { | |
var c = queue.removeFirst(); | |
if (!c.isClosed) c.closeSync(); | |
} | |
}, cancelOnError: cancelOnError)); | |
} | |
void add(MultiStreamController<T> c) { | |
if (isDone) { | |
if (queue.contains(c)) return; | |
if (!c.isClosed) { | |
var err = error; | |
if (err != null) { | |
assert(cancelOnError); | |
c.addError(err.error, err.stackTrace); | |
} | |
c.close(); | |
} | |
return; | |
} | |
if (queue.isEmpty) { | |
ensureSubscription().resume(); | |
} | |
queue.add(c); | |
} | |
void remove(MultiStreamController<T> c) { | |
if (isDone) return; | |
queue.remove(c); | |
if (queue.isEmpty) { | |
subscription!.pause(); | |
} | |
} | |
return Stream<T>.multi((c) { | |
if (isDone) { | |
var err = error; | |
if (err != null) { | |
assert(cancelOnError); | |
c.addError(err.error, err.stackTrace); | |
} | |
c.close(); | |
return; | |
} | |
void addThis() { | |
add(c); | |
} | |
void removeThis() { | |
remove(c); | |
} | |
c | |
..onPause = removeThis | |
..onResume = addThis | |
..onCancel = removeThis; | |
add(c); | |
}); | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import "dart:async"; | |
import "stream_load_balancer.dart"; | |
main() async { | |
Stream<int> stream() async* { | |
for (var i = 0; i < 270; i++) { | |
yield i; | |
await Future.delayed(const Duration(milliseconds: 20)); | |
} | |
throw "Error"; | |
} | |
var lb = stream().loadBalanced(cancelOnError: true); | |
for (var i = 0; i < 10; i++) { | |
loop(lb, i); | |
await Future.delayed(const Duration(milliseconds: 100)); | |
} | |
} | |
loop(stream, id) async { | |
try { | |
await for (var e in stream) { | |
print("$id: $e"); | |
if (e > id * 50 + 20) break; | |
} | |
} catch (e) { | |
print("$id: Error: $e"); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment