Skip to content

Instantly share code, notes, and snippets.

@mraleph
Created July 12, 2012 12:20
Show Gist options
  • Save mraleph/3097802 to your computer and use it in GitHub Desktop.
Save mraleph/3097802 to your computer and use it in GitHub Desktop.
// Sending futures across isolate boundaries.
#import('dart:isolate', prefix: 'isolate');
// FutureSendPort/FutureReceivePorts are ports for sending and receiving futures
// When connecting futures across isolates boundaries they do it by
// building the following structure:
//
// ISOLATE A
// [Future] --events--> [Completer SendPort]
// |
// |
// =================================================== events ==========
// ISOLATE B |
// V
// [Future] <--events-- [Completer] <--events-- [Completer ReceivePort]
//
//
class FutureSendPort {
// Wrap SendPort assuming that it is connected to FutureReceivePort.
FutureSendPort(SendPort port) : _port = port { }
void send(future) {
var connectorPort = new isolate.ReceivePort();
connectorPort.receive((message, completerPort) {
assert(message === "connect");
onSuccess(value) {
completerPort.send(["success", value]);
}
onException(exception) {
completerPort.send(["failure", exception.toString(), future.stackTrace()]);
}
future.then(onSuccess);
future.handleException(onException);
connectorPort.close();
});
_port.send("future", connectorPort.toSendPort());
}
final SendPort _port;
}
class FutureReceivePort {
FutureReceivePort() : _port = new isolate.ReceivePort() { }
void receive(callback) {
_port.receive((message, replyTo) {
assert(message === "future");
callback(_connectFuture(replyTo));
});
}
Future _connectFuture(connectorPort) {
var completer = new Completer();
var completerPort = new isolate.ReceivePort();
completerPort.receive((message, replyTo) {
assert(message is List);
assert(message[0] === "success" || message[0] === "failure");
switch (message[0]) {
case "success": completer.complete(message[1]); break;
case "failure": completer.completeException(message[1], message[2]); break;
}
completerPort.close();
});
connectorPort.send("connect", completerPort.toSendPort());
return completer.future;
}
SendPort toSendPort() {
return _port.toSendPort();
}
void close() { _port.close(); }
final ReceivePort _port;
}
Future makeTimerFuture() {
var completer = new Completer();
new isolate.Timer(1000, (timer) {
completer.complete("timer fired!");
});
return completer.future;
}
void isolate_body() {
var p = isolate.port;
p.receive((message, replyTo) {
assert(message === "give me futures");
var future = makeTimerFuture();
future.then((value) => print("future completed in isolate: ${value}"));
new FutureSendPort(replyTo).send(future);
p.close();
});
}
main () {
var p = isolate.spawnFunction(isolate_body);
var futures = new FutureReceivePort();
futures.receive((future) {
future.then((x) => print("future completed in main: ${x}"));
futures.close();
});
p.send("give me futures", futures.toSendPort());
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment