Created
May 28, 2019 08:25
-
-
Save lrhn/93409c1c31ec814897e427016ae7606d to your computer and use it in GitHub Desktop.
Stream Wrapper Allowing Cancel On The Side
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import "dart:async"; | |
| /// Adapts a stream so that its subscription can be cancelled from the outside. | |
| /// | |
| /// The output [stream] provides the same events as a specified source stream, | |
| /// but the [cancel] operation can cancel the subscription on the source | |
| /// stream and prevent any more events from reaching the output [stream]. | |
| abstract class StreamCanceller<T> { | |
| /// Creates a stream canceller for [source]. | |
| /// | |
| /// Listening on the [stream] of the returned object will provide | |
| /// the same events as the [source]. | |
| /// | |
| /// The provided stream can only be listened to once. | |
| factory StreamCanceller(Stream<T> source) = _StreamCanceller<T>; | |
| /// A stream which can be cancelled as usual or by calling [cancel]. | |
| Stream<T> get stream; | |
| /// Cancel the subscription on the source stream. | |
| /// | |
| /// Returns the future returned by [StreamSubscription.cancel] from the | |
| /// source stream. If the stream has already been cancelled, the same future | |
| /// is returned. | |
| Future<void> cancel(); | |
| } | |
| class _StreamCanceller<T> implements StreamCanceller<T> { | |
| final _CancellerStream<T> _stream; | |
| _StreamCanceller(Stream<T> source) | |
| : _stream = source.isBroadcast | |
| ? _BroadcastCancellerStream<T>(source) | |
| : _SingleCancellerStream<T>(source); | |
| Stream<T> get stream => _stream; | |
| Future<void> cancel() => _stream._cancel(); | |
| } | |
| abstract class _CancellerStream<T> extends Stream<T> { | |
| Future<void> _cancel(); | |
| } | |
| class _SingleCancellerStream<T> extends _CancellerStream<T> { | |
| final Stream<T> _source; | |
| StreamSubscription<T> _subscription; | |
| Future<void> _cancelFuture; | |
| _SingleCancellerStream(this._source); | |
| bool get isBroadcast => false; | |
| StreamSubscription<T> listen(void onData(T data), { | |
| Function onError, void onDone(), bool cancelOnError = false}) { | |
| if (_cancelFuture != null) return Stream<T>.empty().listen(null)..cancel(); | |
| if (_subscription != null) { | |
| throw StateError("Stream has already been listened to"); | |
| } | |
| return _subscription = _source.listen(onData, | |
| onError: onError, onDone: onDone, cancelOnError: cancelOnError); | |
| } | |
| Future<void> _cancel() => | |
| _cancelFuture ??= _subscription?.cancel() ?? _voidFuture; | |
| static Future<void> _voidFuture = Future<void>.value(); | |
| } | |
| class _BroadcastCancellerStream<T> extends _CancellerStream<T> { | |
| List<StreamSubscription<T>> _subscriptions = []; | |
| final Stream<T> _source; | |
| Future<void> _cancelFuture; | |
| _BroadcastCancellerStream(this._source); | |
| bool get isBroadcast => true; | |
| StreamSubscription<T> listen(void onData(T data), { | |
| Function onError, void onDone(), bool cancelOnError = false}) { | |
| if (_cancelFuture != null) return Stream<T>.empty().listen(null)..cancel(); | |
| var subscription = _source.listen(onData, | |
| onError: onError, onDone: onDone, cancelOnError: cancelOnError); | |
| _subscriptions.add(subscription); | |
| return subscription; | |
| } | |
| Future<void> _cancel() => | |
| _cancelFuture ??= Future.wait(_subscriptions.map(_cancelSubscription)).then<void>(_kVoid); | |
| static Future<void> _cancelSubscription(StreamSubscription<Object> subscription) => subscription.cancel(); | |
| static void _kVoid(_) {} | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment