Created
November 4, 2018 13:20
-
-
Save vzsg/9bef9be86483023a2a1496a41b7a86a8 to your computer and use it in GitHub Desktop.
An attempt to tame long polling with chunked streams in Vapor 3
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import Foundation | |
import NIO | |
import Vapor | |
final class LongPollManager<T: Encodable> { | |
private var longPolls: [LongPoll<T>] = [] | |
private let queue = DispatchQueue(label: "LongPollManager-\(T.self)-Queue") | |
func add(_ longPoll: LongPoll<T>) { | |
queue.async { | |
self.longPolls.append(longPoll) | |
} | |
} | |
func pop() -> LongPoll<T>? { | |
return queue.sync { | |
guard !longPolls.isEmpty else { | |
return nil | |
} | |
return longPolls.removeFirst() | |
} | |
} | |
} | |
class LongPoll<ResponseType>: ResponseEncodable where ResponseType: Encodable { | |
func encode(for req: Request) throws -> EventLoopFuture<Response> { | |
return req.future(response) | |
} | |
private let chunkedStream: HTTPChunkedStream | |
private let response: Response | |
private let allocator: ByteBufferAllocator | |
init(on request: Request) { | |
self.response = Response(using: request) | |
self.chunkedStream = HTTPChunkedStream(on: request) | |
response.http.headers.add(name: .contentType, value: "application/json") | |
response.http.body = chunkedStream.convertToHTTPBody() | |
self.allocator = ByteBufferAllocator() | |
var keepAliveBuffer = allocator.buffer(capacity: 1) | |
keepAliveBuffer.write(string: "\n") | |
request.eventLoop.scheduleRepeatedTask(initialDelay: .seconds(0), delay: .seconds(5)) { task -> Future<Void> in | |
guard !self.chunkedStream.isClosed else { | |
task.cancel() | |
return request.future() | |
} | |
let res = self.chunkedStream.write(.chunk(keepAliveBuffer)) | |
res.whenFailure { _ in task.cancel() } | |
return res | |
} | |
} | |
@discardableResult | |
func succeed(with content: ResponseType) throws -> Future<Void> { | |
let encoder = JSONEncoder() | |
let data = try encoder.encode(content) | |
let stream = chunkedStream | |
var contentBuffer = allocator.buffer(capacity: data.count) | |
contentBuffer.write(bytes: data) | |
return stream.eventLoop.submit { | |
stream.write(.chunk(contentBuffer)) | |
.then { stream.write(.end) } | |
}.then { $0 } | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import Foundation | |
import Vapor | |
struct Test: Encodable { | |
let name: String | |
let age: Int | |
} | |
let longPolls = LongPollManager<Test>() | |
public func routes(_ router: Router) throws { | |
router.get("sub") { request -> LongPoll<Test> in | |
let longPoll = LongPoll<Test>(on: request) | |
longPolls.add(longPoll) | |
return longPoll | |
} | |
router.get("pub") { request -> String in | |
let test = Test(name: "foo", age: 22) | |
var count = 0 | |
while let longPoll = longPolls.pop() { | |
try longPoll.succeed(with: test) | |
count = count + 1 | |
} | |
return "done \(count)" | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment