Last active
May 18, 2020 22:42
-
-
Save mikaelbartlett/033602f74bbe67545a982c9a9ec70162 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import Foundation | |
import Vapor | |
final class TokenBucket { | |
public let capacity: Int | |
public var tokenCount: Int | |
let workQueue = DispatchQueue(label: "com.midjar.resourcetokenbucket") | |
public init(capacity: Int, initialTokenCount: Int = 0) { | |
self.capacity = capacity | |
self.tokenCount = min(capacity, initialTokenCount) | |
} | |
public func consume(_ count: Int, on eventLoop: EventLoop) -> EventLoopFuture<Void> { | |
guard count <= capacity else { | |
fatalError("Cannot consume \(count) amount of tokens on a bucket with capacity \(capacity)") | |
} | |
return tryConsume(count, until: Date.distantFuture, on: eventLoop).map { _ in Void() } | |
} | |
public func tryConsume(_ count: Int, until limitDate: Date, on eventLoop: EventLoop) -> EventLoopFuture<Bool> { | |
guard count <= capacity else { | |
fatalError("Cannot consume \(count) amount of tokens on a bucket with capacity \(capacity)") | |
} | |
let promise = eventLoop.makePromise(of: Bool.self) | |
workQueue.async { | |
let result = self.wait(until: limitDate, for: count) | |
promise.succeed(result) | |
} | |
return promise.futureResult | |
} | |
private let condition = NSCondition() | |
func replenish(_ count: Int) { | |
condition.lock() | |
tokenCount = min(tokenCount + count, capacity) | |
condition.unlock() | |
} | |
private func wait(until limitDate: Date, for tokens: Int) -> Bool { | |
condition.lock() | |
defer { | |
condition.unlock() | |
} | |
while tokenCount < tokens { | |
if limitDate < Date() { | |
return false | |
} | |
_ = condition.wait(until: Date().addingTimeInterval(0.2)) | |
} | |
tokenCount -= tokens | |
return true | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/// Example usage with bucket what constraints to 3 tokens. | |
/// Need to call replenish on the token bucket to "give back" the token so the next | |
/// in line can continue | |
let tokenBucket = ResourceTokenBucket(capacity: 3, initialTokenCount: 3) | |
func downloadFile(using context: QueueContext, | |
authentication: Authentication, | |
path: String, | |
filename: String) throws -> EventLoopFuture<Result>? { | |
let filePath = "\(path)\(filename)" | |
let request = try HTTPClient.Request(url: filePath) | |
let downloadFilePath = tempFile(filename: filename) | |
guard !FileManager.default.fileExists(atPath: downloadFilePath) else { return nil } | |
let delegate = FileWriterResponseDelegate(request: request, filePath: downloadFilePath) | |
return tokenBucket.consume(1, on: context.eventLoop).flatMap { _ -> EventLoopFuture<Result> in | |
let result = context.application.http.client.shared.execute(request: request, delegate: delegate, eventLoop: .indifferent) | |
return result.futureResult.map { | |
if case FileWriterResponseDelegate.State.error(let error) = delegate.state { | |
context.logger.error("\(error)") | |
} | |
self.tokenBucket.replenish(1) | |
return $0 | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment