Skip to content

Instantly share code, notes, and snippets.

@lrhn
Created May 28, 2019 08:25
Show Gist options
  • Select an option

  • Save lrhn/93409c1c31ec814897e427016ae7606d to your computer and use it in GitHub Desktop.

Select an option

Save lrhn/93409c1c31ec814897e427016ae7606d to your computer and use it in GitHub Desktop.
Stream Wrapper Allowing Cancel On The Side
import "dart:async";
/// Adapts a stream so that its subscription can be cancelled from the outside.
///
/// The output [stream] provides the same events as a specified source stream,
/// but the [cancel] operation can cancel the subscription on the source
/// stream and prevent any more events from reaching the output [stream].
abstract class StreamCanceller<T> {
/// Creates a stream canceller for [source].
///
/// Listening on the [stream] of the returned object will provide
/// the same events as the [source].
///
/// The provided stream can only be listened to once.
factory StreamCanceller(Stream<T> source) = _StreamCanceller<T>;
/// A stream which can be cancelled as usual or by calling [cancel].
Stream<T> get stream;
/// Cancel the subscription on the source stream.
///
/// Returns the future returned by [StreamSubscription.cancel] from the
/// source stream. If the stream has already been cancelled, the same future
/// is returned.
Future<void> cancel();
}
class _StreamCanceller<T> implements StreamCanceller<T> {
final _CancellerStream<T> _stream;
_StreamCanceller(Stream<T> source)
: _stream = source.isBroadcast
? _BroadcastCancellerStream<T>(source)
: _SingleCancellerStream<T>(source);
Stream<T> get stream => _stream;
Future<void> cancel() => _stream._cancel();
}
abstract class _CancellerStream<T> extends Stream<T> {
Future<void> _cancel();
}
class _SingleCancellerStream<T> extends _CancellerStream<T> {
final Stream<T> _source;
StreamSubscription<T> _subscription;
Future<void> _cancelFuture;
_SingleCancellerStream(this._source);
bool get isBroadcast => false;
StreamSubscription<T> listen(void onData(T data), {
Function onError, void onDone(), bool cancelOnError = false}) {
if (_cancelFuture != null) return Stream<T>.empty().listen(null)..cancel();
if (_subscription != null) {
throw StateError("Stream has already been listened to");
}
return _subscription = _source.listen(onData,
onError: onError, onDone: onDone, cancelOnError: cancelOnError);
}
Future<void> _cancel() =>
_cancelFuture ??= _subscription?.cancel() ?? _voidFuture;
static Future<void> _voidFuture = Future<void>.value();
}
class _BroadcastCancellerStream<T> extends _CancellerStream<T> {
List<StreamSubscription<T>> _subscriptions = [];
final Stream<T> _source;
Future<void> _cancelFuture;
_BroadcastCancellerStream(this._source);
bool get isBroadcast => true;
StreamSubscription<T> listen(void onData(T data), {
Function onError, void onDone(), bool cancelOnError = false}) {
if (_cancelFuture != null) return Stream<T>.empty().listen(null)..cancel();
var subscription = _source.listen(onData,
onError: onError, onDone: onDone, cancelOnError: cancelOnError);
_subscriptions.add(subscription);
return subscription;
}
Future<void> _cancel() =>
_cancelFuture ??= Future.wait(_subscriptions.map(_cancelSubscription)).then<void>(_kVoid);
static Future<void> _cancelSubscription(StreamSubscription<Object> subscription) => subscription.cancel();
static void _kVoid(_) {}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment