Created
July 1, 2019 17:12
-
-
Save JUSTINMKAUFMAN/032e3d004350d9a4dae2cd767085753a to your computer and use it in GitHub Desktop.
Swift SSE EventSource Server for Vapor
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import Foundation | |
import NIO | |
import Vapor | |
/** | |
SSE EventSource server implementation for Swift Vapor 3 | |
Usage: | |
``` | |
// Create a route for all incoming SSE connection | |
// requests and save a reference to the new stream | |
router.get("sse", String.parameter) { request -> SSEStream<SSEEvent> in | |
let stream = SSEStream<SSEEvent>(on: request) | |
self.streams.add(stream) | |
return stream | |
} | |
// Schedule an event to be published on the stream | |
let event = SSEEvent(type: .put, id: eventId, data: eventData) | |
stream.schedule(event) | |
``` | |
*/ | |
enum SSEEventType: String, Content { | |
case put, ping, patch, delete | |
} | |
public struct SSEEvent: Content { | |
let type: SSEEventType | |
let id: String? | |
let data: String? | |
init(type: SSEEventType, | |
id: String? = nil, | |
data: String? = nil) { | |
self.type = type | |
self.id = id | |
self.data = data | |
} | |
} | |
class SSEStream<ResponseType>: ResponseEncodable where ResponseType: Encodable { | |
private let chunkedStream: HTTPChunkedStream | |
private let allocator: ByteBufferAllocator | |
private let response: Response | |
private let request: Request | |
init(on request: Request) { | |
self.request = request | |
response = Response(using: request) | |
chunkedStream = HTTPChunkedStream(on: request) | |
response.http.headers.add(name: .contentType, value: "text/event-stream") | |
response.http.headers.add(name: .transferEncoding, value: "chunked") | |
response.http.headers.add(name: .cacheControl, value: "no-cache") | |
response.http.headers.add(name: .connection, value: "keep-alive") | |
response.http.headers.add(name: .accessControlAllowOrigin, value: "*") | |
response.http.status = .ok | |
response.http.body = chunkedStream.convertToHTTPBody() | |
allocator = ByteBufferAllocator() | |
} | |
func encode(for req: Request) throws -> EventLoopFuture<Response> { | |
return req.future(response) | |
} | |
public func schedule(_ event: SSEEvent, | |
initialDelay: TimeAmount = .seconds(0), | |
repeatInterval: TimeAmount? = nil) { | |
if let repeatInterval = repeatInterval { | |
response.eventLoop.scheduleRepeatedTask(initialDelay: initialDelay, delay: repeatInterval) { [unowned self] task -> Future<Void> in | |
guard !self.chunkedStream.isClosed else { | |
task.cancel() | |
return self.request.future() | |
} | |
let response = self.eventFuture(for: event) | |
response.whenFailure { _ in task.cancel() } | |
return response | |
} | |
} else { | |
request.eventLoop.scheduleTask(in: initialDelay) { [unowned self] () -> Future<Void> in | |
guard !self.chunkedStream.isClosed else { return self.request.future() } | |
return self.eventFuture(for: event) | |
} | |
} | |
} | |
@discardableResult | |
func succeed(with content: ResponseType) throws -> Future<Void> { | |
let encoder = JSONEncoder() | |
let data = try encoder.encode(content) | |
let stream = chunkedStream | |
var contentBuffer = allocator.buffer(capacity: data.count) | |
contentBuffer.write(bytes: data) | |
return stream.eventLoop.submit { | |
stream.write(.chunk(contentBuffer)) | |
.then { stream.write(.end) } | |
}.then { $0 } | |
} | |
} | |
private extension SSEStream { | |
func startPulse(for request: Request) { | |
var keepAliveBuffer: ByteBuffer = allocator.buffer(capacity: 1) | |
keepAliveBuffer.write(string: "\n") | |
request.eventLoop.scheduleRepeatedTask(initialDelay: .seconds(0), delay: .seconds(0)) { [unowned self] task -> Future<Void> in | |
guard !self.chunkedStream.isClosed else { | |
task.cancel() | |
return request.future() | |
} | |
let response = self.chunkedStream.write(.chunk(keepAliveBuffer)) | |
response.whenFailure { _ in task.cancel() } | |
return response | |
} | |
} | |
@discardableResult | |
func eventFuture(for event: SSEEvent) -> Future<Void> { | |
let data: [UInt8] = Array(payload(for: event).utf8) | |
var contentBuffer = allocator.buffer(capacity: data.count) | |
contentBuffer.write(bytes: data) | |
let stream = chunkedStream | |
return stream.eventLoop.submit { | |
stream.write(.chunk(contentBuffer)) | |
}.then { $0 } | |
} | |
func payload(for event: SSEEvent) -> String { | |
var payload: String = "" | |
if let id = event.id { payload += "id:\(id)\n" } | |
if let eventType = event.type { payload += "event:\(eventType)\n" } | |
payload += "data:\(event.data ?? "")\n\n" | |
return payload | |
} | |
} | |
final class SSEStreamManager<T: Encodable> { | |
private var streams: [SSEStream<T>] = [] | |
private let queue = DispatchQueue(label: "SSE-\(T.self)-Queue") | |
func add(_ stream: SSEStream<T>) { | |
queue.async { [weak self] in | |
self?.streams.append(stream) | |
} | |
} | |
func pop() -> SSEStream<T>? { | |
return queue.sync { [weak self] in | |
guard let self = self, !self.streams.isEmpty else { return nil } | |
return self.streams.removeFirst() | |
} | |
} | |
func publish(_ event: T) { | |
streams.forEach { stream in | |
if let event = event as? SSEEvent { | |
stream.schedule(event) | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Thanks this was really useful for me. There is a compilation error on line 137 (event.type is not optional).
So initially I added the line
unconditionally. With that in place it didn't work. I never got a callback. After removing the line altogether it works.
What is the intention of that line? Should it work when this line is included?