Last active
August 1, 2019 07:05
-
-
Save ibaca/5d02ecb86901c155febab83321c12737 to your computer and use it in GitHub Desktop.
Minimal SSE server using SUN HttpServer + Kotlin.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import com.sun.net.httpserver.HttpExchange | |
import com.sun.net.httpserver.HttpServer | |
import java.io.InputStream | |
import java.io.OutputStream | |
import java.net.InetSocketAddress | |
import java.util.* | |
import java.util.concurrent.atomic.AtomicInteger | |
import java.util.logging.Logger | |
import kotlin.collections.ArrayList | |
import kotlin.text.Charsets.UTF_8 | |
import java.util.stream.Collectors.joining as join | |
typealias Seq = Pair<Int, String> | |
val L = Logger.getLogger("server")!! | |
val SEQ = AtomicInteger((Math.random() * 9999.0).toInt()) | |
val queue = ArrayDeque<Seq>(1024) | |
val subscribers = ArrayList<(Seq) -> Unit>() | |
fun push(msg: String) = Pair(SEQ.getAndIncrement(), msg).apply { | |
L.info("broadcast $this") | |
queue.add(this) | |
subscribers.forEach { it(this) } | |
} | |
fun main() = HttpServer.create(InetSocketAddress(8000), 0 /*default*/).apply { | |
executor = null | |
createContext("/api/sent") { req -> | |
if (req.cors()) return@createContext | |
req.requestBody.readAll().let(::push) | |
// Out | |
req.sendResponseHeaders(200, 0) | |
req.responseBody.writeAll("\"ok\"") | |
} | |
createContext("/api/listen") { req -> | |
if (req.cors()) return@createContext | |
req.requestBody.readAll() // just read, to flush network buffers | |
// Out | |
req.responseHeaders["content-type"] = listOf("text/event-stream") | |
req.sendResponseHeaders(200, 0) | |
L.info("subscribing ${req.remoteAddress}") | |
val first = Integer.parseInt(req.requestHeaders.getFirst("Last-Event-ID") ?: "0") | |
req.send(Pair(0, "\"subscription success\"")) | |
queue.filter { it.first > first }.forEach { req.send(it) } | |
subscribers += { req.send(it) } | |
} | |
}.start() | |
fun HttpExchange.send(n: Seq) { | |
L.info("sending data to $remoteAddress") | |
val out = responseBody | |
if (n.first > 0) out.write(("id: ${n.first}\n").toByteArray()) | |
n.second.split("\n").filter(String::isNotEmpty).forEach { out.write("data: $it\n".toByteArray()) } | |
out.write("\n".toByteArray()) | |
out.flush() | |
} | |
fun HttpExchange.cors(): Boolean { | |
val headers = responseHeaders | |
headers.add("Access-Control-Allow-Headers", "Content-Type") | |
headers.add("Access-Control-Allow-Methods", "GET, HEAD, POST, PUT, DELETE") | |
headers.add("Access-Control-Allow-Origin", requestHeaders.getFirst("origin")) | |
headers.add("Access-Control-Max-Age", "3600") | |
if (requestMethod != "OPTIONS") return false | |
else return sendResponseHeaders(200, 0).apply { responseBody.writeAll("") }.let { true } | |
} | |
fun InputStream.readAll() = bufferedReader(UTF_8).use { it.lines().collect(join("\n"))!! } | |
fun OutputStream.writeAll(data: String) = bufferedWriter().use { it.write(data) } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment