Skip to content

Instantly share code, notes, and snippets.

@wataru420
Created January 7, 2012 12:18
Show Gist options
  • Save wataru420/1574608 to your computer and use it in GitHub Desktop.
Save wataru420/1574608 to your computer and use it in GitHub Desktop.
Scalaでノンブロッキング
import java.nio.channels.SocketChannel
import java.nio.charset.Charset
import java.nio.CharBuffer
import java.nio.ByteBuffer
import java.net.InetSocketAddress
import java.io.BufferedReader
import java.io.InputStreamReader
object EchoClient {
def main(args:Array[String]) {
val channel = SocketChannel.open(new InetSocketAddress("localhost",8080))
val charset = Charset.forName("UTF-8")
val buf = ByteBuffer.allocate(1024)
val keyin = new BufferedReader(new InputStreamReader(System.in))
print("send:")
val line = keyin.readLine
channel.write(charset.encode(CharBuffer.wrap(line + "\n")))
while (channel.isConnected()) {
buf.clear
if (channel.read(buf) < 0) {
return
}
buf.flip
print("recieve:" + charset.decode(buf).toString)
}
channel.close
}
}
import scala.actors.Actor
import scala.actors.Actor._
import java.net.InetSocketAddress
import scala.collection.JavaConversions._
import java.nio.ByteBuffer
import java.nio.channels.{SelectionKey,Selector,ServerSocketChannel,SocketChannel}
import java.nio.charset.Charset
object EchoServer {
//ソケットチャンネルを作成
val serverChannel:ServerSocketChannel = ServerSocketChannel.open()
// セレクタの生成
val selector:Selector = Selector.open()
def main(args:Array[String]) {
//起動
echoServer.start
init()
}
def init() {
serverChannel.socket().bind(new InetSocketAddress(8080))
// ノンブロッキングモードに設定
serverChannel.configureBlocking(false)
// ソケットチャネルをセレクタに登録
serverChannel.register(selector, SelectionKey.OP_ACCEPT)
println("EchoServer start. port:" + serverChannel.socket().getLocalPort())
echoServer ! Select
}
case class Init()
case class Select()
case class Accept(key:SelectionKey)
case class Read(key:SelectionKey)
val echoServer = actor {
loop {
react {
//Seceltorオブジェクトによる操作の監視をループさせる
case Select => {
select()
//startSelect()
}
//Acceptの場合の処理
case Accept(key) => accept(key)
//Readの場合の処理
case Read(key) => read(key)
}
}
}
def startSelect() {
echoServer ! Select
}
//Selectorによる操作の監視
def select() {
selector.select()
selector.selectedKeys().foreach { key =>
if (key.isAcceptable()) {
echoServer ! Accept(key)
} else
if (key.isReadable()) {
echoServer ! Read(key)
}
}
echoServer ! Select
}
//接続処理
def accept(key:SelectionKey) {
val socket:ServerSocketChannel = key.channel().asInstanceOf[ServerSocketChannel]
socket.accept() match {
case null =>
case channel:SocketChannel =>
val remoteAddress:String = channel.socket().getRemoteSocketAddress().toString()
println(remoteAddress + ":[connect]")
channel.configureBlocking(false)
channel.register(selector, SelectionKey.OP_READ)
}
}
//メッセージの受信
def read(key:SelectionKey) {
val channel:SocketChannel = key.channel().asInstanceOf[SocketChannel]
val remoteAddress = channel.socket().getRemoteSocketAddress().toString()
val buf:ByteBuffer = ByteBuffer.allocate(1024)
def close(remoteAddress:String, channel:SocketChannel) {
channel.close()
println(remoteAddress + ":[disconnect]")
}
channel.read(buf) match {
case -1 => close(remoteAddress, channel)
case 0 =>
case x =>
buf.flip()
print(remoteAddress + ":" + Charset.forName("UTF-8").decode(buf).toString())
buf.flip()
channel.write(buf)
close(remoteAddress, channel)
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment