Skip to content

Instantly share code, notes, and snippets.

@vzsg
Created November 4, 2018 13:20
Show Gist options
  • Save vzsg/9bef9be86483023a2a1496a41b7a86a8 to your computer and use it in GitHub Desktop.
Save vzsg/9bef9be86483023a2a1496a41b7a86a8 to your computer and use it in GitHub Desktop.
An attempt to tame long polling with chunked streams in Vapor 3
import Foundation
import NIO
import Vapor
final class LongPollManager<T: Encodable> {
private var longPolls: [LongPoll<T>] = []
private let queue = DispatchQueue(label: "LongPollManager-\(T.self)-Queue")
func add(_ longPoll: LongPoll<T>) {
queue.async {
self.longPolls.append(longPoll)
}
}
func pop() -> LongPoll<T>? {
return queue.sync {
guard !longPolls.isEmpty else {
return nil
}
return longPolls.removeFirst()
}
}
}
class LongPoll<ResponseType>: ResponseEncodable where ResponseType: Encodable {
func encode(for req: Request) throws -> EventLoopFuture<Response> {
return req.future(response)
}
private let chunkedStream: HTTPChunkedStream
private let response: Response
private let allocator: ByteBufferAllocator
init(on request: Request) {
self.response = Response(using: request)
self.chunkedStream = HTTPChunkedStream(on: request)
response.http.headers.add(name: .contentType, value: "application/json")
response.http.body = chunkedStream.convertToHTTPBody()
self.allocator = ByteBufferAllocator()
var keepAliveBuffer = allocator.buffer(capacity: 1)
keepAliveBuffer.write(string: "\n")
request.eventLoop.scheduleRepeatedTask(initialDelay: .seconds(0), delay: .seconds(5)) { task -> Future<Void> in
guard !self.chunkedStream.isClosed else {
task.cancel()
return request.future()
}
let res = self.chunkedStream.write(.chunk(keepAliveBuffer))
res.whenFailure { _ in task.cancel() }
return res
}
}
@discardableResult
func succeed(with content: ResponseType) throws -> Future<Void> {
let encoder = JSONEncoder()
let data = try encoder.encode(content)
let stream = chunkedStream
var contentBuffer = allocator.buffer(capacity: data.count)
contentBuffer.write(bytes: data)
return stream.eventLoop.submit {
stream.write(.chunk(contentBuffer))
.then { stream.write(.end) }
}.then { $0 }
}
}
import Foundation
import Vapor
struct Test: Encodable {
let name: String
let age: Int
}
let longPolls = LongPollManager<Test>()
public func routes(_ router: Router) throws {
router.get("sub") { request -> LongPoll<Test> in
let longPoll = LongPoll<Test>(on: request)
longPolls.add(longPoll)
return longPoll
}
router.get("pub") { request -> String in
let test = Test(name: "foo", age: 22)
var count = 0
while let longPoll = longPolls.pop() {
try longPoll.succeed(with: test)
count = count + 1
}
return "done \(count)"
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment