Skip to content

Instantly share code, notes, and snippets.

@Kavantix
Last active February 3, 2022 13:38
Show Gist options
  • Save Kavantix/21a73de8895cd310275773a2e1b89d36 to your computer and use it in GitHub Desktop.
Save Kavantix/21a73de8895cd310275773a2e1b89d36 to your computer and use it in GitHub Desktop.
Simple dart 2.15 compute implementation
import 'dart:async';
import 'dart:isolate';
Future<R> compute<I, R>(R Function(I) f, I input,
{String debugName = ''}) async {
final resultPort = ReceivePort();
final args = _ComputeArgs<I, R>(resultPort.sendPort, f, input);
final isolate = await Isolate.spawn<_ComputeArgs<I, R>>(
_compute,
args,
debugName: debugName,
);
return await resultPort.first as R;
}
Stream<R> computeMany<I, R>(R Function(I) f, Stream<I> input,
{String debugName = ''}) async* {
final resultPort = ReceivePort();
final args = _ComputeManyArgs<I, R>(resultPort.sendPort, f);
final isolate = await Isolate.spawn<_ComputeManyArgs<I, R>>(
_computeMany,
args,
debugName: debugName,
);
final controller = StreamController<R>();
final results = resultPort.asBroadcastStream();
final inputPort = (await results.first) as SendPort;
int toReceive = 0;
bool done = false;
input.listen(
(data) {
toReceive += 1;
inputPort.send(data);
},
onDone: () {
done = true;
if (toReceive <= 0 && !controller.isClosed) {
controller.close();
}
},
);
results.listen(
(Object? data) {
controller.add(data as R);
toReceive -= 1;
if (done && !controller.isClosed && toReceive <= 0) {
controller.close();
}
},
);
await for (final result in controller.stream) {
yield result;
}
isolate.kill();
}
extension FunctionInIsolateExtension<I, R> on R Function(I) {
Future<R> runInIsolate(I input) => compute(this, input);
Stream<R> runManyInIsolate(Stream<I> input) => computeMany(this, input);
Stream<R> runIterableInIsolate(Iterable<I> input) =>
computeMany(this, Stream.fromIterable(input));
}
extension<T extends Object?> on Iterable<T> {
Future<List<R>> mapInParallel<R>(R Function(T) f) =>
Future.wait<R>(map(f.runInIsolate));
/// Run [f] distributed over the available system treads
Future<List<R>> mapInParallelGrouped<R>(
R Function(List<T>) f, {
int? groups,
}) {
groups ??= Platform.numberOfProcessors;
final list = toList();
final groupSize = (list.length / groups).ceil();
return Future.wait([
for (int i = 0; i < groups; i++)
f.runInIsolate(list.skip(i * groupSize).take(groupSize).toList()),
]);
}
Future<List<R>> mapInIsolate<R>(R Function(T) f) =>
f.runIterableInIsolate(this).toList();
}
class _ComputeArgs<I, R> {
_ComputeArgs(this.resultPort, this.f, this.input);
final SendPort resultPort;
final I input;
final R Function(I) f;
R run() => f(input);
}
void _compute<I, R>(_ComputeArgs<I, R> args) {
Isolate.exit(args.resultPort, args.run());
}
class _ComputeManyArgs<I, R> {
_ComputeManyArgs(this.resultPort, this.f);
final SendPort resultPort;
final R Function(I) f;
Future<void> run(ReceivePort inputPort) async {
await for (final i in inputPort) {
resultPort.send(f(i as I));
}
}
}
void _computeMany<I, R>(_ComputeManyArgs<I, R> args) async {
final inputPort = ReceivePort();
args.resultPort.send(inputPort.sendPort);
await args.run(inputPort);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment