|
// scala -P:continuations:enable |
|
//Continuation对于诸如异步I/O,UI事件处理以及数据流并发之类的高级控制建造十分有帮助 |
|
import util.continuations._ |
|
|
|
object Continue{ |
|
def say = |
|
reset { |
|
shift { |
|
cf:(Int=>Int) => |
|
val even = cf(10) |
|
println(even) |
|
val one = cf(100) |
|
println(one) |
|
one |
|
|
|
} +1 |
|
} |
|
|
|
def main(args:Array[String]){ |
|
say // output: 11,101 |
|
|
|
} |
|
} |
|
|
|
|
|
|
|
/** |
|
另外一个例子 |
|
**/ |
|
|
|
import scala.util.continuations._ |
|
import java.util.concurrent.Executors |
|
|
|
object Test { |
|
|
|
val execService = Executors.newFixedThreadPool(2) |
|
|
|
def main(args: Array[String]): Unit = { |
|
reset { |
|
val conn = new MyLibraryClient(); |
|
conn.connect("127.0.0.1"); |
|
println("This will happen after the connection is finished"); |
|
} |
|
println("Outside reset"); |
|
} |
|
} |
|
|
|
class ChannelFuture { |
|
def addListener(listener: ChannelFutureListener): Unit = { |
|
val future = this |
|
Test.execService.submit(new Runnable { |
|
def run(): Unit = { |
|
listener.operationComplete(future) |
|
} |
|
}) |
|
} |
|
} |
|
|
|
trait ChannelFutureListener { |
|
def operationComplete(f: ChannelFuture): Unit |
|
} |
|
|
|
class MyLibraryClient { |
|
def connect(remoteAddr: String): Unit@cps[Unit] = { |
|
shift { |
|
retrn: (Unit => Unit) => { |
|
val future: ChannelFuture = new ChannelFuture() |
|
future.addListener(new ChannelFutureListener { |
|
def operationComplete(f: ChannelFuture): Unit = { |
|
println("operationComplete starts") |
|
retrn(); |
|
null |
|
} |
|
}); |
|
} |
|
} |
|
} |
|
} |
|
|
|
|
|
|
|
//another example.. |
|
|
|
import java.net.InetSocketAddress |
|
import java.nio.channels.SelectionKey |
|
import java.nio.channels.Selector |
|
import java.nio.channels.ServerSocketChannel |
|
import java.nio.channels.SocketChannel |
|
import java.nio.ByteBuffer |
|
|
|
import scala.collection.JavaConversions.collectionAsScalaIterable |
|
import scala.util.continuations.reset |
|
import scala.util.continuations.shift |
|
import scala.util.continuations.shiftUnit |
|
|
|
object Main extends App { |
|
val selector = Selector.open() |
|
val server = ServerSocketChannel.open() |
|
server.socket().bind(new InetSocketAddress(12345)) |
|
server.configureBlocking(false) |
|
reset { |
|
while (true) { |
|
server.accept() match { |
|
case c: SocketChannel => |
|
reset { |
|
println("Accept: " + c) |
|
c.configureBlocking(false) |
|
while (c.isOpen && c.isConnected) { |
|
val bb = ByteBuffer.allocateDirect(1024) |
|
c.read(bb) match { |
|
case count if count > 0 => |
|
println("Read: " + c + " count: " + count) |
|
bb.flip |
|
while (bb.hasRemaining) { |
|
c.write(bb) match { |
|
case count if count > 0 => |
|
println("Write: " + c + " count: " + count) |
|
shiftUnit[Unit, Unit, Unit]() |
|
case count if count == 0 => |
|
println("WriteBlock: " + c) |
|
shift[Unit, Unit, Unit] { cont => |
|
c.register(selector, SelectionKey.OP_WRITE, cont) |
|
} |
|
case _ => |
|
println("WriteError: " + c) |
|
bb.clear() |
|
c.close() |
|
shiftUnit[Unit, Unit, Unit]() |
|
} |
|
} |
|
case count if count == 0 => |
|
println("ReadBlock: " + c) |
|
shift[Unit, Unit, Unit] { cont => |
|
c.register(selector, SelectionKey.OP_READ, cont) |
|
} |
|
case _ => |
|
println("ReadError: " + c) |
|
c.close() |
|
shiftUnit[Unit, Unit, Unit]() |
|
} |
|
} |
|
} |
|
shiftUnit[Unit, Unit, Unit]() |
|
case null => |
|
println("AcceptBlock") |
|
shift[Unit, Unit, Unit] { cont => |
|
server.register(selector, SelectionKey.OP_ACCEPT, cont) |
|
} |
|
} |
|
shiftUnit[Unit, Unit, Unit]() |
|
} |
|
} |
|
|
|
val keys = selector.selectedKeys |
|
while (true) { |
|
selector.select |
|
keys foreach { k => |
|
k.interestOps(0) |
|
k.attachment.asInstanceOf[Function1[Unit, Unit]].apply(Unit) |
|
} |
|
keys.clear |
|
} |
|
} |
scala 向外抛出异常