Last active
December 25, 2020 10:49
-
-
Save ajaychandran/c5054ab5ffa1c30f9f11971d39a88fbd to your computer and use it in GitHub Desktop.
Airstream: Http Streams
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Perform HTTP requests lazily (onStart) as opposed to EventStream.fromFuture(dom.Ajax.*) | |
final class AjaxEventStream( | |
method: String, | |
url: String, | |
data: dom.ext.Ajax.InputData, | |
timeout: Int, | |
headers: Map[String, String], | |
withCredentials: Boolean, | |
responseType: String | |
) extends EventStream[dom.XMLHttpRequest] { | |
protected[airstream] val topoRank: Int = 1 | |
override protected[this] def onStart(): Unit = { | |
val req = new dom.XMLHttpRequest | |
req.onreadystatechange = (_: dom.Event) => | |
if (req.readyState == 4) { | |
val status = req.status | |
if ((status >= 200 && status < 300) || status == 304) | |
new Transaction(fireValue(req, _)) | |
else | |
new Transaction(fireError(AjaxEventStream.Error(req), _)) | |
} | |
req.open(method, url) | |
req.responseType = responseType | |
req.timeout = timeout.toDouble | |
req.withCredentials = withCredentials | |
headers.foreach(Function.tupled(req.setRequestHeader)) | |
if (data == null) req.send() else req.send(data) | |
} | |
} | |
object AjaxEventStream { | |
final case class Error(xhr: dom.XMLHttpRequest) extends Exception | |
def apply( | |
method: String, | |
url: String, | |
data: dom.ext.Ajax.InputData, | |
timeout: Int, | |
headers: Map[String, String], | |
withCredentials: Boolean, | |
responseType: String | |
): EventStream[dom.XMLHttpRequest] = | |
new AjaxEventStream(method, url, data, timeout, headers, withCredentials, responseType) | |
def get( | |
url: String, | |
data: dom.ext.Ajax.InputData = null, | |
timeout: Int = 0, | |
headers: Map[String, String] = Map.empty, | |
withCredentials: Boolean = false, | |
responseType: String = "" | |
): EventStream[dom.XMLHttpRequest] = | |
apply("GET", url, data, timeout, headers, withCredentials, responseType) | |
def post( | |
url: String, | |
data: dom.ext.Ajax.InputData = null, | |
timeout: Int = 0, | |
headers: Map[String, String] = Map.empty, | |
withCredentials: Boolean = false, | |
responseType: String = "" | |
): EventStream[dom.XMLHttpRequest] = | |
apply("POST", url, data, timeout, headers, withCredentials, responseType) | |
} | |
// Web socket EventStream (server -> client) | |
final class WebSocketEventStream(url: String) extends EventStream[dom.MessageEvent] { | |
protected[airstream] val topoRank: Int = 1 | |
private var socket: dom.WebSocket = _ | |
override protected[this] def onStart(): Unit = { | |
socket = new dom.WebSocket(url) | |
socket.onmessage = v => new Transaction(fireValue(v, _)) | |
socket.onerror = e => new Transaction(fireError(WebSocketEventStream.Error(e), _)) | |
} | |
override protected[this] def onStop(): Unit = { | |
if (null != socket) socket.close() | |
socket = null | |
} | |
} | |
object WebSocketEventStream { | |
def apply(url: String): EventStream[dom.MessageEvent] = | |
new WebSocketEventStream(url) | |
def relative(path: String): EventStream[dom.MessageEvent] = | |
apply(url(path)) | |
def url(path: String): String = { | |
val prefix = dom.document.location.protocol match { | |
case "https:" => "wss:" | |
case _ => "ws:" | |
} | |
val suffix = if (path.startsWith("/")) path else s"/$path" | |
s"$prefix//${dom.document.location.hostname}:${dom.document.location.port}$suffix" | |
} | |
final case class Error(event: dom.Event) extends Exception | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment