Skip to content

Instantly share code, notes, and snippets.

@diefferson
Created April 3, 2020 14:58
Show Gist options
  • Save diefferson/0f08c15ba18510778d977b3a51189c33 to your computer and use it in GitHub Desktop.
Save diefferson/0f08c15ba18510778d977b3a51189c33 to your computer and use it in GitHub Desktop.
Dart Single Subject
import 'dart:async';
import 'package:async/async.dart';
import 'package:rxdart/src/rx.dart';
import 'package:rxdart/src/streams/value_stream.dart';
import 'package:rxdart/src/subjects/subject.dart';
import 'package:rxdart/src/transformers/start_with.dart';
import 'package:rxdart/src/transformers/start_with_error.dart';
/// A special StreamController that emits only a item and clean after listen.
/// Is a good alternative to send a event fo notify anything and not persist data,
/// like notify execeptions.
///
/// This subject allows sending data, error and done events to the listener.
/// The value always is null, when data is added the subject stay with data until a
/// a listner receive it.
///
/// SingleSubject is, by default, a broadcast (aka hot) controller, in order
/// to fulfill the Rx Subject contract. This means the Subject's `stream` can
/// be listened to multiple times but only the newest listener receive the data.
///
/// ### Example
///
/// final subject = SingleSubject<int>();
///
/// subject.add(1);
/// subject.add(2);
/// subject.add(3);
///
/// subject.stream.listen(print); // prints 3
/// subject.stream.listen(print); // no print, no data
///
class SingleSubject<T> extends Subject<T> implements ValueStream<T> {
_Wrapper<T> _wrapper;
SingleSubject._(
StreamController<T> controller,
Stream<T> stream,
this._wrapper,
) : super(controller, stream);
/// Constructs a [SingleSubject], optionally pass handlers for
/// [onListen], [onCancel] and a flag to handle events [sync].
///
/// See also [StreamController.broadcast]
factory SingleSubject({
void Function() onListen,
void Function() onCancel,
bool sync = false,
}) {
// ignore: close_sinks
final controller = StreamController<T>.broadcast(
onListen: onListen,
onCancel: onCancel,
sync: sync,
);
final wrapper = _Wrapper<T>();
return SingleSubject<T>._(
controller,
Rx.defer<T>(_deferStream(wrapper, controller), reusable: true),
wrapper);
}
/// Constructs a [SingleSubject], optionally pass handlers for
/// [onListen], [onCancel] and a flag to handle events [sync].
///
/// [seedValue] becomes the current [value] and is emitted immediately.
///
/// See also [StreamController.broadcast]
factory SingleSubject.seeded(
T seedValue, {
void Function() onListen,
void Function() onCancel,
bool sync = false,
}) {
// ignore: close_sinks
final controller = StreamController<T>.broadcast(
onListen: onListen,
onCancel: onCancel,
sync: sync,
);
final wrapper = _Wrapper<T>.seeded(seedValue);
return SingleSubject<T>._(
controller,
Rx.defer<T>(_deferStream(wrapper, controller), reusable: true),
wrapper,
);
}
static Stream<T> Function() _deferStream<T>(
_Wrapper<T> wrapper, StreamController<T> controller) =>
() {
if (wrapper.latestIsError) {
return controller.stream.transform(StartWithErrorStreamTransformer(
wrapper.latestError, wrapper.latestStackTrace));
} else if (wrapper.latestIsValue) {
return controller.stream
.transform(StartWithStreamTransformer(wrapper.latestValue));
}
return controller.stream;
};
@override
void onAdd(T event) => _wrapper.setValue(event);
@override
void onAddError(Object error, [StackTrace stackTrace]) =>
_wrapper.setError(error, stackTrace);
@override
ValueStream<T> get stream => this;
@override
bool get hasValue => _wrapper.latestIsValue;
/// Get the latest value emitted by the Subject
@override
T get value => _wrapper.latestValue;
/// Set and emit the new value
set value(T newValue) => add(newValue);
@override
StreamSubscription<T> listen(void Function(T value) onData, {Function onError, void Function() onDone, bool cancelOnError}) {
final onSingleData = (T value) {
onData(value);
_wrapper.clean();
};
super.listen(onSingleData, onError:onError, onDone:onDone, cancelOnError:cancelOnError);
}
}
class _Wrapper<T> {
T latestValue;
Object latestError;
StackTrace latestStackTrace;
bool latestIsValue = false, latestIsError = false;
/// Non-seeded constructor
_Wrapper();
_Wrapper.seeded(this.latestValue) : latestIsValue = true;
void setValue(T event) {
latestIsValue = true;
latestIsError = false;
latestValue = event;
latestError = null;
latestStackTrace = null;
}
void clean(){
latestIsValue = false;
latestIsError = false;
latestValue = null;
latestError = null;
latestStackTrace = null;
}
void setError(Object error, [StackTrace stackTrace]) {
latestIsValue = false;
latestIsError = true;
latestValue = null;
latestError = error;
latestStackTrace = stackTrace;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment