Created
August 18, 2019 14:20
-
-
Save humblerookie/9b395f653a81cca17280921175064232 to your computer and use it in GitHub Desktop.
Dart IO Pooling Stream/Observable transformer
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import 'dart:async'; | |
import 'package:isolate/isolate_runner.dart'; | |
import 'package:isolate/load_balancer.dart'; | |
class IoTransformer<S, T> implements StreamTransformer<S, T> { | |
StreamController _controller; | |
StreamSubscription _subscription; | |
Function _transformer; | |
int _streamCounter = 0; | |
bool _sourceStreamClosed = false; | |
bool cancelOnError; | |
bool get isPending => _streamCounter != 0; | |
// Original Stream | |
Stream<S> _stream; | |
IoTransformer(S transform(T value), {bool sync: false, this.cancelOnError}) { | |
_transformer = transform; | |
_controller = new StreamController<T>( | |
onListen: _onListen, | |
onCancel: _onCancel, | |
onPause: () { | |
_subscription.pause(); | |
}, | |
onResume: () { | |
_subscription.resume(); | |
}, | |
sync: sync); | |
} | |
IoTransformer.broadcast({bool sync: false, bool this.cancelOnError}) { | |
_controller = new StreamController<T>.broadcast( | |
onListen: _onListen, onCancel: _onCancel, sync: sync); | |
} | |
void _onListen() { | |
_subscription = _stream.listen(onData, | |
onError: _controller.addError, | |
onDone: onDone, | |
cancelOnError: cancelOnError); | |
} | |
void _onCancel() { | |
_subscription.cancel(); | |
_subscription = null; | |
} | |
/** | |
* Transformation | |
*/ | |
void onDone() { | |
if (!isPending) { | |
_controller.close(); | |
} | |
_sourceStreamClosed = true; | |
} | |
void onData(S data) { | |
_streamCounter++; | |
initLoadbalancer().then((val) { | |
_loadBalancer.run(_transformer, data).then((transformed) { | |
_controller.add(transformed as T); | |
_pop_and_close(); | |
}, onError: (error) { | |
_pop_and_close(); | |
throw error; | |
}); | |
}); | |
} | |
Stream<T> bind(Stream<S> stream) { | |
this._stream = stream; | |
return _controller.stream; | |
} | |
void _pop_and_close() { | |
_streamCounter--; | |
if (!isPending && _sourceStreamClosed) { | |
_controller.close(); | |
} | |
} | |
@override | |
StreamTransformer<RS, RT> cast<RS, RT>() { | |
return null; | |
} | |
static LoadBalancer _loadBalancer; | |
static const int POOL_SIZE = 6; | |
static Future<void> initLoadbalancer() async { | |
_loadBalancer = await LoadBalancer.create(POOL_SIZE, IsolateRunner.spawn); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment