Created
June 8, 2020 15:44
-
-
Save busti/42a2573fea31b4e44751ecd127bbcb96 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import cats.implicits._ | |
import cats.effect._ | |
import io.chrisdavenport.log4cats.Logger | |
import monix.eval.TaskLike | |
import monix.execution.Ack.Continue | |
import monix.execution.{Ack, Cancelable} | |
import monix.reactive.OverflowStrategy.Unbounded | |
import monix.reactive._ | |
import mycelium.client.raw.{ReconnectingWebSocket, ReconnectingWebsocketOptions} | |
import org.scalajs.dom._ | |
import scala.concurrent.Future | |
import scala.scalajs.js.UndefOr | |
import scala.scalajs.js.typedarray._ | |
import scala.scalajs.js | |
import js.JSConverters._ | |
object WebSocketTypes { | |
sealed trait WebSocketEvent | |
case class Open(event: Event) extends WebSocketEvent | |
case class Close(event: CloseEvent) extends WebSocketEvent | |
case class Message(event: MessageEvent) extends WebSocketEvent | |
case class Error(event: Event) extends WebSocketEvent | |
trait WebSocketData { | |
def sendData(webSocket: WebSocket) | |
} | |
implicit class dataString(data: String) extends WebSocketData { | |
def sendData(webSocket: WebSocket) = webSocket.send(data) | |
} | |
implicit class dataBlob(data: Blob) extends WebSocketData { | |
def sendData(webSocket: WebSocket) = webSocket.send(data) | |
} | |
implicit class dataArrayBuffer(data: ArrayBuffer) extends WebSocketData { | |
def sendData(webSocket: WebSocket) = webSocket.send(data) | |
} | |
} | |
class WebSocketConnection(webSocket: WebSocket) { | |
import WebSocketTypes._ | |
def observable: Observable[WebSocketEvent] = | |
Observable.create(Unbounded) { observer => | |
webSocket.onopen = (event: Event) => observer.onNext(Open(event)) | |
webSocket.onclose = (event: CloseEvent) => observer.onNext(Close(event)) | |
webSocket.onmessage = (event: MessageEvent) => observer.onNext(Message(event)) | |
webSocket.onerror = (event: Event) => observer.onNext(Error(event)) | |
Cancelable.empty | |
} | |
def observer[F[_]: Sync]: F[Observer[WebSocketData]] = | |
Sync[F].delay { | |
new Observer[WebSocketData] { | |
override def onNext(data: WebSocketData): Future[Ack] = { | |
data.sendData(webSocket) | |
Continue | |
} | |
override def onError(ex: Throwable): Unit = throw ex | |
override def onComplete(): Unit = ??? | |
} | |
} | |
} | |
object WebSocketConnection { | |
def openReconnectingWebSocket[F[_]: Sync: Logger]( | |
url: String | |
): Resource[F, ReconnectingWebSocket] = | |
Resource.make { | |
Logger[F].info(s"opening reconnecting websocket to $url") *> | |
Sync[F].delay(new ReconnectingWebSocket(url)) | |
} { in => | |
Logger[F].info(s"closing reconnecting websocket to $url") *> | |
Sync[F].delay(in.close()) | |
} | |
def openWebSocket[F[_]: Sync: Logger]( | |
url: String | |
): Resource[F, WebSocket] = | |
Resource.make { | |
Logger[F].info(s"opening websocket to $url") *> | |
Sync[F].delay(new WebSocket(url)) | |
} { in => | |
Logger[F].info(s"opening websocket to $url") *> | |
Sync[F].delay(in.close()) | |
} | |
def apply[F[_]: Sync]( | |
resource: Resource[F, WebSocket] | |
): Resource[F, WebSocketConnection] = | |
resource.evalMap { webSocket => | |
Sync[F].delay(new WebSocketConnection(webSocket)) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment