Proof of concept for implementing Reactive Programming Buffer and Throttle semantics on Dart Streams
Last active
August 29, 2015 14:20
-
-
Save natebosch/88a8f3cb62b6501f93a9 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import 'dart:async'; | |
void main() { | |
final stream = new Stream.fromIterable(new Iterable.generate(100)); | |
final throttled = stream.transform(new Throttle<int>(new Duration(seconds:1))); | |
final buffered = throttled.transform(new Buffer<int>(new Duration(seconds:2))); | |
buffered.listen((x) {print(x);}); | |
} | |
class Buffer<T> implements StreamTransformer<T,List<T>> { | |
final _resultStreamController = new StreamController<List<T>>(); | |
Timer _timer; | |
final Duration _duration; | |
List<T> currentResults = <T>[]; | |
Buffer(this._duration); | |
Stream<List<T>> bind(Stream<T> original) { | |
original.listen((T value) { | |
if (_timer == null) { | |
_timer = new Timer(_duration,() { | |
_resultStreamController.add(currentResults); | |
currentResults = <T>[]; | |
_timer = null; | |
}); | |
} | |
currentResults.add(value); | |
}); | |
return _resultStreamController.stream; | |
} | |
} | |
class Throttle<T> implements StreamTransformer<T,T> { | |
final _resultStreamController = new StreamController<T>(); | |
Future _addNextElement = new Future.value(); | |
final Duration _duration; | |
Throttle(this._duration); | |
Stream<T> bind(Stream<T> original) { | |
original.listen((T value) { | |
_addNextElement = _addNextElement.then((_) { | |
_resultStreamController.add(value); | |
return new Future.delayed(_duration); | |
}); | |
}); | |
return _resultStreamController.stream; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment