-
-
Save gakuzzzz/3806119 to your computer and use it in GitHub Desktop.
package com.yuroyoro.websocket | |
import javax.servlet.http._ | |
import org.eclipse.jetty.websocket._ | |
import org.eclipse.jetty.websocket.WebSocket.Outbound | |
import java.util.concurrent.atomic._ | |
class ChatServlet extends WebSocketServlet { | |
val clients = new AtomicReference[Set[ChatWebSocket]](Set()) | |
override def doGet(req:HttpServletRequest, res:HttpServletResponse ) = | |
getServletContext.getNamedDispatcher("default").forward(req, res) | |
override def doWebSocketConnect(req:HttpServletRequest, protocol:String ) = | |
new ChatWebSocket | |
class ChatWebSocket extends WebSocket { | |
@volatile var outbound:Outbound = _ | |
override def onConnect(outbound:Outbound ) = { | |
this.outbound = outbound | |
clients.lazySet(clients.get + this) | |
onMessage( 0, "WebSocket is success!!!"); | |
} | |
override def onMessage(frame:Byte, data:Array[Byte], offset:Int, length:Int ) = {} | |
override def onMessage(frame:Byte, data:String ) = | |
clients.get.foreach{ c => c.outbound.sendMessage( frame, data ) } | |
override def onDisconnect = clients.lazySet(clients.get - this) | |
} | |
} |
すいません、はい、その通りです!
削除したはずのboundsが復活してたりする不具合がありますね。
これ、Synchronized だと foreach の際にボトルネックになるか、問題が発生しそうです。
素直に CopyOnWriteArraySet を使うか、
package com.yuroyoro.websocket
import javax.servlet.http._
import org.eclipse.jetty.websocket._
import org.eclipse.jetty.websocket.WebSocket.Outbound
import java.util.concurrent.atomic._
class ChatServlet extends WebSocketServlet {
val clients = new AtomicReference[Set[ChatWebSocket]](Set())
@annotation.tailrec
def updateClient(f: Set[ChatWebSocket] => Set[ChatWebSocket]): Unit = {
val current = clients.get
if (clients.compareAndSet(current, f(current)) ()
else updateClient(f)
}
override def doGet(req:HttpServletRequest, res:HttpServletResponse ) =
getServletContext.getNamedDispatcher("default").forward(req, res)
override def doWebSocketConnect(req:HttpServletRequest, protocol:String ) =
new ChatWebSocket
class ChatWebSocket extends WebSocket {
@volatile var outbound:Outbound = _
override def onConnect(outbound:Outbound ) = {
this.outbound = outbound
updateClient(_ + this)
onMessage( 0, "WebSocket is success!!!");
}
override def onMessage(frame:Byte, data:Array[Byte], offset:Int, length:Int ) = {}
override def onMessage(frame:Byte, data:String ) =
clients.get.foreach{ c => c.outbound.sendMessage( frame, data ) }
override def onDisconnect = updateClient(_ - this)
}
}
こんな感じで CASループ回す必要がありそうです。
おお。これはカッコいい。
これなら、パフォーマンスへの影響も少なそうですね。
自分もこんな感じの処理にしたかったんですが、力不足で、単純なSynchronizedにしちゃいました。てへ
immutable版のSetを使っている?ところがミソ?
ミソっていうか、immutable版のSet使いたかったらこうなるって感じですかね。
mutable版のSetで良ければ素直に CopyOnWriteArraySet 使っちゃうのがいいと思います。
connect/disconnect よりも message の方が多いっぽいですし。
調べました。
なるほど、CopyOnWriteArraySetってのは、イテレータ(以下Ite)を回している最中にSetに変更があったら、別のSetを作って変更を受け入れるのですね。
メリット
Ite走査中に更新が走っても、Ite側は影響を受けない
デメリット
頻繁に更新が発生すると(Ite走査と更新がバッティングすると)パフォーマンスが落ちる
Immutableじゃない
そのため、Ite走査が多く、更新が少ないSet(List)の場合効果が高い。
対して、immutable版は、Immutableなコレクションとは、変更不可なため、参照さえ保持して入れば、過去のすべてのバージョンを見ることが出来る。
そのため、Iteが走査するSetは、必然的に過去のいつかのバージョンのSetとなり、そもそも他から影響を受けることはない。
その代わり、Setに要素を追加する度に新しいSetへの参照を保持する必要がある。Scalaの場合は、そのへんはmutableにすることが出きるが、Haskellなんかだと、参照を保持する側もImmutableなんで、新しい参照が必要で・・・と、どこまでも連鎖してしまう。この辺のところはモナドで何とかするのかな?
Haskell の場合は、このCASループの様なことを内部的にやってくれる STM っていうモナドがあったりします。
残念ながら、スレッドセーフになっていないような気がする。
調べたところ、AtomicReferenceは、複合アクションの原子性を保証しないので、
clients.lazySet(clients.get + this)
この部分、分解すると
val tmp = clients.get
val tmp2 = tmp + this
clients.lazySet(tmp2)
こんな感じたと思いますが、ここでclientsがemptyだったとして、
1行目のgetに同時に2スレッドが突入した場合、2行目のtmpが両方共、emptyにたいして+するので、
3ぎょうめは、スレッド1かスレッド2の遅い方の値が最終的な値になりそう。
ということで、Synchronized版のSetを使ってこんな感じに書いてみたのですが、いかがでしょう?
https://github.com/qtamaki/websocket-jetty-scala/blob/master/src/main/scala/com/qtamaki/websocket/DraggableServlet.scala