Last active
May 30, 2018 13:42
-
-
Save long1eu/9c9e257f111e0c8d3ba44cbf840a8767 to your computer and use it in GitHub Desktop.
Dart retryWhen transformer
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// File created by | |
// long1eu <[email protected]> | |
// on 30/05/2018 | |
import 'dart:async'; | |
typedef Stream<T> RetryWhenStreamFactory<T>(Stream<Object> error); | |
class RetryWhenStreamTransformer<T> extends StreamTransformerBase<T, T> { | |
final StreamTransformer<T, T> transformer; | |
RetryWhenStreamTransformer(RetryWhenStreamFactory<T> streamFactory) | |
: transformer = _buildTransformer(streamFactory); | |
@override | |
Stream<T> bind(Stream<T> stream) => transformer.bind(stream); | |
static StreamTransformer<T, T> _buildTransformer<T>( | |
RetryWhenStreamFactory<T> streamFactory) { | |
if (streamFactory == null) { | |
throw new ArgumentError('streamFactory cannot be null'); | |
} | |
return new StreamTransformer<T, T>((Stream<T> input, bool cancelOnError) { | |
final List<T> events = <T>[]; | |
StreamController<T> controller; | |
StreamSubscription<T> subscription; | |
bool canEmit = true; | |
controller = new StreamController<T>( | |
sync: true, | |
onListen: () { | |
subscription = input.listen( | |
(T event) { | |
if (canEmit) { | |
events.add(event); | |
controller.add(event); | |
} | |
}, | |
onError: (dynamic e) { | |
canEmit = false; | |
streamFactory(new Stream<dynamic>.fromIterable(<dynamic>[e])) | |
.listen( | |
(T event) { | |
final List<T> items = events.toList(growable: false); | |
events.clear(); | |
items.forEach(controller.add); | |
canEmit = true; | |
}, | |
onError: (dynamic e, StackTrace s) { | |
controller.addError(e, s); | |
subscription.cancel(); | |
controller.close(); | |
}, | |
cancelOnError: true, | |
); | |
}, | |
onDone: () { | |
subscription.cancel(); | |
controller.close(); | |
}, | |
); | |
}, | |
onPause: () => subscription.pause(), | |
onResume: () => subscription.resume(), | |
onCancel: () => subscription.cancel(), | |
); | |
return controller.stream.listen(null); | |
}); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment