Skip to content

Instantly share code, notes, and snippets.

@ibaca
Last active August 1, 2019 07:05
Show Gist options
  • Save ibaca/5d02ecb86901c155febab83321c12737 to your computer and use it in GitHub Desktop.
Save ibaca/5d02ecb86901c155febab83321c12737 to your computer and use it in GitHub Desktop.
Minimal SSE server using SUN HttpServer + Kotlin.
import com.sun.net.httpserver.HttpExchange
import com.sun.net.httpserver.HttpServer
import java.io.InputStream
import java.io.OutputStream
import java.net.InetSocketAddress
import java.util.*
import java.util.concurrent.atomic.AtomicInteger
import java.util.logging.Logger
import kotlin.collections.ArrayList
import kotlin.text.Charsets.UTF_8
import java.util.stream.Collectors.joining as join
typealias Seq = Pair<Int, String>
val L = Logger.getLogger("server")!!
val SEQ = AtomicInteger((Math.random() * 9999.0).toInt())
val queue = ArrayDeque<Seq>(1024)
val subscribers = ArrayList<(Seq) -> Unit>()
fun push(msg: String) = Pair(SEQ.getAndIncrement(), msg).apply {
L.info("broadcast $this")
queue.add(this)
subscribers.forEach { it(this) }
}
fun main() = HttpServer.create(InetSocketAddress(8000), 0 /*default*/).apply {
executor = null
createContext("/api/sent") { req ->
if (req.cors()) return@createContext
req.requestBody.readAll().let(::push)
// Out
req.sendResponseHeaders(200, 0)
req.responseBody.writeAll("\"ok\"")
}
createContext("/api/listen") { req ->
if (req.cors()) return@createContext
req.requestBody.readAll() // just read, to flush network buffers
// Out
req.responseHeaders["content-type"] = listOf("text/event-stream")
req.sendResponseHeaders(200, 0)
L.info("subscribing ${req.remoteAddress}")
val first = Integer.parseInt(req.requestHeaders.getFirst("Last-Event-ID") ?: "0")
req.send(Pair(0, "\"subscription success\""))
queue.filter { it.first > first }.forEach { req.send(it) }
subscribers += { req.send(it) }
}
}.start()
fun HttpExchange.send(n: Seq) {
L.info("sending data to $remoteAddress")
val out = responseBody
if (n.first > 0) out.write(("id: ${n.first}\n").toByteArray())
n.second.split("\n").filter(String::isNotEmpty).forEach { out.write("data: $it\n".toByteArray()) }
out.write("\n".toByteArray())
out.flush()
}
fun HttpExchange.cors(): Boolean {
val headers = responseHeaders
headers.add("Access-Control-Allow-Headers", "Content-Type")
headers.add("Access-Control-Allow-Methods", "GET, HEAD, POST, PUT, DELETE")
headers.add("Access-Control-Allow-Origin", requestHeaders.getFirst("origin"))
headers.add("Access-Control-Max-Age", "3600")
if (requestMethod != "OPTIONS") return false
else return sendResponseHeaders(200, 0).apply { responseBody.writeAll("") }.let { true }
}
fun InputStream.readAll() = bufferedReader(UTF_8).use { it.lines().collect(join("\n"))!! }
fun OutputStream.writeAll(data: String) = bufferedWriter().use { it.write(data) }
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment