Created
April 3, 2020 14:58
-
-
Save diefferson/0f08c15ba18510778d977b3a51189c33 to your computer and use it in GitHub Desktop.
Dart Single Subject
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import 'dart:async'; | |
import 'package:async/async.dart'; | |
import 'package:rxdart/src/rx.dart'; | |
import 'package:rxdart/src/streams/value_stream.dart'; | |
import 'package:rxdart/src/subjects/subject.dart'; | |
import 'package:rxdart/src/transformers/start_with.dart'; | |
import 'package:rxdart/src/transformers/start_with_error.dart'; | |
/// A special StreamController that emits only a item and clean after listen. | |
/// Is a good alternative to send a event fo notify anything and not persist data, | |
/// like notify execeptions. | |
/// | |
/// This subject allows sending data, error and done events to the listener. | |
/// The value always is null, when data is added the subject stay with data until a | |
/// a listner receive it. | |
/// | |
/// SingleSubject is, by default, a broadcast (aka hot) controller, in order | |
/// to fulfill the Rx Subject contract. This means the Subject's `stream` can | |
/// be listened to multiple times but only the newest listener receive the data. | |
/// | |
/// ### Example | |
/// | |
/// final subject = SingleSubject<int>(); | |
/// | |
/// subject.add(1); | |
/// subject.add(2); | |
/// subject.add(3); | |
/// | |
/// subject.stream.listen(print); // prints 3 | |
/// subject.stream.listen(print); // no print, no data | |
/// | |
class SingleSubject<T> extends Subject<T> implements ValueStream<T> { | |
_Wrapper<T> _wrapper; | |
SingleSubject._( | |
StreamController<T> controller, | |
Stream<T> stream, | |
this._wrapper, | |
) : super(controller, stream); | |
/// Constructs a [SingleSubject], optionally pass handlers for | |
/// [onListen], [onCancel] and a flag to handle events [sync]. | |
/// | |
/// See also [StreamController.broadcast] | |
factory SingleSubject({ | |
void Function() onListen, | |
void Function() onCancel, | |
bool sync = false, | |
}) { | |
// ignore: close_sinks | |
final controller = StreamController<T>.broadcast( | |
onListen: onListen, | |
onCancel: onCancel, | |
sync: sync, | |
); | |
final wrapper = _Wrapper<T>(); | |
return SingleSubject<T>._( | |
controller, | |
Rx.defer<T>(_deferStream(wrapper, controller), reusable: true), | |
wrapper); | |
} | |
/// Constructs a [SingleSubject], optionally pass handlers for | |
/// [onListen], [onCancel] and a flag to handle events [sync]. | |
/// | |
/// [seedValue] becomes the current [value] and is emitted immediately. | |
/// | |
/// See also [StreamController.broadcast] | |
factory SingleSubject.seeded( | |
T seedValue, { | |
void Function() onListen, | |
void Function() onCancel, | |
bool sync = false, | |
}) { | |
// ignore: close_sinks | |
final controller = StreamController<T>.broadcast( | |
onListen: onListen, | |
onCancel: onCancel, | |
sync: sync, | |
); | |
final wrapper = _Wrapper<T>.seeded(seedValue); | |
return SingleSubject<T>._( | |
controller, | |
Rx.defer<T>(_deferStream(wrapper, controller), reusable: true), | |
wrapper, | |
); | |
} | |
static Stream<T> Function() _deferStream<T>( | |
_Wrapper<T> wrapper, StreamController<T> controller) => | |
() { | |
if (wrapper.latestIsError) { | |
return controller.stream.transform(StartWithErrorStreamTransformer( | |
wrapper.latestError, wrapper.latestStackTrace)); | |
} else if (wrapper.latestIsValue) { | |
return controller.stream | |
.transform(StartWithStreamTransformer(wrapper.latestValue)); | |
} | |
return controller.stream; | |
}; | |
@override | |
void onAdd(T event) => _wrapper.setValue(event); | |
@override | |
void onAddError(Object error, [StackTrace stackTrace]) => | |
_wrapper.setError(error, stackTrace); | |
@override | |
ValueStream<T> get stream => this; | |
@override | |
bool get hasValue => _wrapper.latestIsValue; | |
/// Get the latest value emitted by the Subject | |
@override | |
T get value => _wrapper.latestValue; | |
/// Set and emit the new value | |
set value(T newValue) => add(newValue); | |
@override | |
StreamSubscription<T> listen(void Function(T value) onData, {Function onError, void Function() onDone, bool cancelOnError}) { | |
final onSingleData = (T value) { | |
onData(value); | |
_wrapper.clean(); | |
}; | |
super.listen(onSingleData, onError:onError, onDone:onDone, cancelOnError:cancelOnError); | |
} | |
} | |
class _Wrapper<T> { | |
T latestValue; | |
Object latestError; | |
StackTrace latestStackTrace; | |
bool latestIsValue = false, latestIsError = false; | |
/// Non-seeded constructor | |
_Wrapper(); | |
_Wrapper.seeded(this.latestValue) : latestIsValue = true; | |
void setValue(T event) { | |
latestIsValue = true; | |
latestIsError = false; | |
latestValue = event; | |
latestError = null; | |
latestStackTrace = null; | |
} | |
void clean(){ | |
latestIsValue = false; | |
latestIsError = false; | |
latestValue = null; | |
latestError = null; | |
latestStackTrace = null; | |
} | |
void setError(Object error, [StackTrace stackTrace]) { | |
latestIsValue = false; | |
latestIsError = true; | |
latestValue = null; | |
latestError = error; | |
latestStackTrace = stackTrace; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment