Skip to content

Instantly share code, notes, and snippets.

@gakuzzzz
Created September 30, 2012 07:07
Show Gist options
  • Save gakuzzzz/3806119 to your computer and use it in GitHub Desktop.
Save gakuzzzz/3806119 to your computer and use it in GitHub Desktop.
ThreadSafe jetty server
package com.yuroyoro.websocket
import javax.servlet.http._
import org.eclipse.jetty.websocket._
import org.eclipse.jetty.websocket.WebSocket.Outbound
import java.util.concurrent.atomic._
class ChatServlet extends WebSocketServlet {
val clients = new AtomicReference[Set[ChatWebSocket]](Set())
override def doGet(req:HttpServletRequest, res:HttpServletResponse ) =
getServletContext.getNamedDispatcher("default").forward(req, res)
override def doWebSocketConnect(req:HttpServletRequest, protocol:String ) =
new ChatWebSocket
class ChatWebSocket extends WebSocket {
@volatile var outbound:Outbound = _
override def onConnect(outbound:Outbound ) = {
this.outbound = outbound
clients.lazySet(clients.get + this)
onMessage( 0, "WebSocket is success!!!");
}
override def onMessage(frame:Byte, data:Array[Byte], offset:Int, length:Int ) = {}
override def onMessage(frame:Byte, data:String ) =
clients.get.foreach{ c => c.outbound.sendMessage( frame, data ) }
override def onDisconnect = clients.lazySet(clients.get - this)
}
}
@qtamaki
Copy link

qtamaki commented Oct 2, 2012

残念ながら、スレッドセーフになっていないような気がする。
調べたところ、AtomicReferenceは、複合アクションの原子性を保証しないので、

clients.lazySet(clients.get + this)
この部分、分解すると

val tmp = clients.get
val tmp2 = tmp + this
clients.lazySet(tmp2)

こんな感じたと思いますが、ここでclientsがemptyだったとして、
1行目のgetに同時に2スレッドが突入した場合、2行目のtmpが両方共、emptyにたいして+するので、
3ぎょうめは、スレッド1かスレッド2の遅い方の値が最終的な値になりそう。

ということで、Synchronized版のSetを使ってこんな感じに書いてみたのですが、いかがでしょう?
https://github.com/qtamaki/websocket-jetty-scala/blob/master/src/main/scala/com/qtamaki/websocket/DraggableServlet.scala

@gakuzzzz
Copy link
Author

gakuzzzz commented Oct 3, 2012

すいません、はい、その通りです!
削除したはずのboundsが復活してたりする不具合がありますね。

これ、Synchronized だと foreach の際にボトルネックになるか、問題が発生しそうです。
素直に CopyOnWriteArraySet を使うか、

package com.yuroyoro.websocket

import javax.servlet.http._
import org.eclipse.jetty.websocket._
import org.eclipse.jetty.websocket.WebSocket.Outbound
import java.util.concurrent.atomic._

class ChatServlet extends WebSocketServlet {
  val clients = new AtomicReference[Set[ChatWebSocket]](Set())

  @annotation.tailrec
  def updateClient(f: Set[ChatWebSocket] => Set[ChatWebSocket]): Unit = {
    val current = clients.get
    if (clients.compareAndSet(current, f(current)) ()
    else updateClient(f)
  }

  override def doGet(req:HttpServletRequest, res:HttpServletResponse ) =
    getServletContext.getNamedDispatcher("default").forward(req, res)

  override def doWebSocketConnect(req:HttpServletRequest, protocol:String ) =
    new ChatWebSocket

  class ChatWebSocket extends WebSocket {

    @volatile var outbound:Outbound = _

    override def onConnect(outbound:Outbound ) = {
      this.outbound = outbound
      updateClient(_ + this)
      onMessage( 0, "WebSocket is success!!!");
    }

    override def onMessage(frame:Byte, data:Array[Byte], offset:Int, length:Int ) = {}

    override def onMessage(frame:Byte, data:String ) =
      clients.get.foreach{ c => c.outbound.sendMessage( frame, data ) }

    override def onDisconnect = updateClient(_ - this)
  }
}

こんな感じで CASループ回す必要がありそうです。

@qtamaki
Copy link

qtamaki commented Oct 3, 2012

おお。これはカッコいい。
これなら、パフォーマンスへの影響も少なそうですね。
自分もこんな感じの処理にしたかったんですが、力不足で、単純なSynchronizedにしちゃいました。てへ
immutable版のSetを使っている?ところがミソ?

@gakuzzzz
Copy link
Author

gakuzzzz commented Oct 3, 2012

ミソっていうか、immutable版のSet使いたかったらこうなるって感じですかね。
mutable版のSetで良ければ素直に CopyOnWriteArraySet 使っちゃうのがいいと思います。
connect/disconnect よりも message の方が多いっぽいですし。

@qtamaki
Copy link

qtamaki commented Oct 4, 2012

調べました。
なるほど、CopyOnWriteArraySetってのは、イテレータ(以下Ite)を回している最中にSetに変更があったら、別のSetを作って変更を受け入れるのですね。
メリット
Ite走査中に更新が走っても、Ite側は影響を受けない
デメリット
頻繁に更新が発生すると(Ite走査と更新がバッティングすると)パフォーマンスが落ちる
Immutableじゃない

そのため、Ite走査が多く、更新が少ないSet(List)の場合効果が高い。

対して、immutable版は、Immutableなコレクションとは、変更不可なため、参照さえ保持して入れば、過去のすべてのバージョンを見ることが出来る。
そのため、Iteが走査するSetは、必然的に過去のいつかのバージョンのSetとなり、そもそも他から影響を受けることはない。
その代わり、Setに要素を追加する度に新しいSetへの参照を保持する必要がある。Scalaの場合は、そのへんはmutableにすることが出きるが、Haskellなんかだと、参照を保持する側もImmutableなんで、新しい参照が必要で・・・と、どこまでも連鎖してしまう。この辺のところはモナドで何とかするのかな?

@gakuzzzz
Copy link
Author

gakuzzzz commented Oct 4, 2012

Haskell の場合は、このCASループの様なことを内部的にやってくれる STM っていうモナドがあったりします。

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment