Skip to content

Instantly share code, notes, and snippets.

@natebosch
Last active August 29, 2015 14:20
Show Gist options
  • Save natebosch/88a8f3cb62b6501f93a9 to your computer and use it in GitHub Desktop.
Save natebosch/88a8f3cb62b6501f93a9 to your computer and use it in GitHub Desktop.

Proof of concept for implementing Reactive Programming Buffer and Throttle semantics on Dart Streams

import 'dart:async';
void main() {
final stream = new Stream.fromIterable(new Iterable.generate(100));
final throttled = stream.transform(new Throttle<int>(new Duration(seconds:1)));
final buffered = throttled.transform(new Buffer<int>(new Duration(seconds:2)));
buffered.listen((x) {print(x);});
}
class Buffer<T> implements StreamTransformer<T,List<T>> {
final _resultStreamController = new StreamController<List<T>>();
Timer _timer;
final Duration _duration;
List<T> currentResults = <T>[];
Buffer(this._duration);
Stream<List<T>> bind(Stream<T> original) {
original.listen((T value) {
if (_timer == null) {
_timer = new Timer(_duration,() {
_resultStreamController.add(currentResults);
currentResults = <T>[];
_timer = null;
});
}
currentResults.add(value);
});
return _resultStreamController.stream;
}
}
class Throttle<T> implements StreamTransformer<T,T> {
final _resultStreamController = new StreamController<T>();
Future _addNextElement = new Future.value();
final Duration _duration;
Throttle(this._duration);
Stream<T> bind(Stream<T> original) {
original.listen((T value) {
_addNextElement = _addNextElement.then((_) {
_resultStreamController.add(value);
return new Future.delayed(_duration);
});
});
return _resultStreamController.stream;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment