Last active
September 23, 2019 15:02
-
-
Save busti/322e7de4637c34294024821a53bc86b5 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import cats.effect._ | |
import monix.eval.TaskLike | |
import monix.execution.Ack.Continue | |
import monix.execution.{Ack, Cancelable} | |
import monix.reactive.OverflowStrategy.Unbounded | |
import monix.reactive._ | |
import mycelium.client.raw.{ReconnectingWebSocket, ReconnectingWebsocketOptions} | |
import org.scalajs.dom._ | |
import scala.concurrent.Future | |
import scala.scalajs.js.UndefOr | |
import scala.scalajs.js.typedarray._ | |
object WebSocketTypes { | |
sealed trait WebSocketEvent | |
case class Open(event: Event) extends WebSocketEvent | |
case class Close(event: CloseEvent) extends WebSocketEvent | |
case class Message(event: MessageEvent) extends WebSocketEvent | |
case class Error(event: Event) extends WebSocketEvent | |
trait WebSocketData { | |
def sendData(webSocket: WebSocket) | |
} | |
implicit class dataString(data: String) extends WebSocketData { | |
def sendData(webSocket: WebSocket) = webSocket.send(data) | |
} | |
implicit class dataBlob(data: Blob) extends WebSocketData { | |
def sendData(webSocket: WebSocket) = webSocket.send(data) | |
} | |
implicit class dataArrayBuffer(data: ArrayBuffer) extends WebSocketData { | |
def sendData(webSocket: WebSocket) = webSocket.send(data) | |
} | |
} | |
class WebSocketConnection(webSocket: WebSocket) { | |
import WebSocketTypes._ | |
def observable: Observable[WebSocketEvent] = | |
Observable.create(Unbounded) { observer => | |
webSocket.onopen = (event: Event) => observer.onNext(Open(event)) | |
webSocket.onclose = (event: CloseEvent) => observer.onNext(Close(event)) | |
webSocket.onmessage = (event: MessageEvent) => observer.onNext(Message(event)) | |
webSocket.onerror = (event: Event) => observer.onNext(Error(event)) | |
Cancelable.empty | |
} | |
def observer[F[_]: Sync]: F[Observer[WebSocketData]] = | |
Sync[F].delay { | |
new Observer[WebSocketData] { | |
override def onNext(data: WebSocketData): Future[Ack] = { | |
data.sendData(webSocket) | |
Continue | |
} | |
override def onError(ex: Throwable) = throw ex | |
override def onComplete() = () | |
} | |
} | |
} | |
object WebSocketConnection { | |
def openReconnectingWebSocket[F[_]: Sync]( | |
url: String | |
): Resource[F, ReconnectingWebSocket] = | |
Resource.make { | |
Sync[F].delay(new ReconnectingWebSocket(url)) | |
} { in => | |
Sync[F].delay(in.close()) | |
} | |
def openWebSocket[F[_]: Sync]( | |
url: String | |
): Resource[F, WebSocket] = | |
Resource.make { | |
Sync[F].delay(new WebSocket(url)) | |
} { in => | |
Sync[F].delay(in.close()) | |
} | |
def apply[F[_]: Sync]( | |
resource: Resource[F, WebSocket] | |
): Resource[F, WebSocketConnection] = resource.evalMap { webSocket => | |
Sync[F].delay(new WebSocketConnection(webSocket)) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment