Last active
August 28, 2018 06:58
-
-
Save MrMage/8972472ee73ab7b7d506ffb1c2b6e42e to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import NIOConcurrencyHelpers | |
import Vapor | |
public protocol CloseableResource: class { | |
var eventLoop: EventLoop { get } | |
var isClosed: Bool { get } | |
func close() | |
} | |
public struct GlobalResourcePoolConfig { | |
/// The number of resources that the pool should not fall below. | |
public let minResources: Int | |
/// The maximum number of resources allocated by the pool. | |
public let maxResources: Int | |
public let resourceRequestTimeout: TimeInterval | |
public let pruneInterval: TimeInterval | |
public let idleTimeForPruning: TimeInterval | |
public init(minResources: Int = 2, maxResources: Int = 10, | |
resourceRequestTimeout: TimeInterval = 5, | |
pruneInterval: TimeInterval = 60, idleTimeForPruning: TimeInterval = 120) { | |
self.minResources = minResources | |
self.maxResources = maxResources | |
self.resourceRequestTimeout = resourceRequestTimeout | |
self.pruneInterval = pruneInterval | |
self.idleTimeForPruning = idleTimeForPruning | |
} | |
} | |
public final class GlobalResourcePool<Resource>: Service where Resource: CloseableResource { | |
public enum Error: Swift.Error { | |
case timedOut | |
} | |
public typealias ResourceFactory = (EventLoop) -> Future<Resource> | |
private let resourceFactory: ResourceFactory | |
public let config: GlobalResourcePoolConfig | |
private let eventLoopGroup: EventLoopGroup | |
private let logger: Logger? | |
private var active: [PooledResource<Resource>] | |
private var available: [PooledResource<Resource>] | |
private let semaphore: DispatchSemaphore | |
private let lock: Lock | |
private let waitingQueue: DispatchQueue | |
public init(config: GlobalResourcePoolConfig, eventLoopGroup: EventLoopGroup, logger: Logger?, | |
resourceFactory: @escaping ResourceFactory) { | |
self.config = config | |
self.eventLoopGroup = eventLoopGroup | |
self.logger = logger | |
self.resourceFactory = resourceFactory | |
self.active = [] | |
self.available = [] | |
self.semaphore = DispatchSemaphore(value: config.maxResources) | |
self.lock = Lock() | |
self.waitingQueue = DispatchQueue(label: String(describing: type(of: self)) + ".waitingQueue") | |
self.pruneResources() | |
} | |
public func withResource<T>(_ closure: @escaping (Resource) throws -> Future<T>) -> Future<T> { | |
return requestResource().flatMap(to: T.self) { resource in | |
try closure(resource) | |
.always { self.releaseResource(resource) } | |
} | |
} | |
public func requestResource() -> Future<Resource> { | |
let promise = self.eventLoopGroup.next().newPromise(Resource.self) | |
waitingQueue.async { | |
guard self.semaphore.wait( | |
timeout: .now() + .milliseconds(Int(1000 * self.config.resourceRequestTimeout))) == .success else { | |
promise.fail(error: Error.timedOut) | |
return | |
} | |
if let availableResource = (self.lock.withLock { self.available.popLast() }) { | |
let currentResource = self.lock.withLock { availableResource.resource! } | |
if !currentResource.isClosed { | |
promise.succeed(result: currentResource) | |
} else { | |
self.logger?.info("[\(self)] Resource closed; re-creating") | |
self.resourceFactory(currentResource.eventLoop).map { createdResource in | |
self.lock.withLockVoid { availableResource.resource = createdResource } | |
return createdResource | |
}.cascade(promise: promise) | |
} | |
} else { | |
let newPooledResource = PooledResource<Resource>() | |
var numberOfActiveResources = 0 | |
self.lock.withLockVoid { | |
self.active.append(newPooledResource) | |
numberOfActiveResources = self.active.count | |
} | |
self.logger?.info("[\(self)] Allocated new resource (\(numberOfActiveResources)/\(self.config.maxResources))") | |
let eventLoop = self.eventLoopGroup.next() | |
self.resourceFactory(eventLoop).map { createdResource in | |
self.lock.withLockVoid { newPooledResource.resource = createdResource } | |
return createdResource | |
}.cascade(promise: promise) | |
} | |
} | |
return promise.futureResult | |
} | |
public func releaseResource(_ resource: Resource) { | |
guard let pooledResource = (lock.withLock { active.first { $0.resource === resource } }) else { | |
assertionFailure("Attempted to release a connection to a pool that did not create it.") | |
return | |
} | |
let now = Date() | |
lock.withLockVoid { | |
pooledResource.lastUsed = now | |
available.append(pooledResource) | |
semaphore.signal() | |
} | |
} | |
private func pruneResources() { | |
lock.withLockVoid { | |
let now = Date() | |
let oldActiveCount = active.count | |
var toPrune = Set<ObjectIdentifier>() | |
for pooledResource in available { | |
if pooledResource.resource.isClosed | |
|| (now.timeIntervalSince(pooledResource.lastUsed) >= config.idleTimeForPruning | |
&& (active.count - toPrune.count) > config.minResources) { | |
pooledResource.resource.close() | |
toPrune.insert(ObjectIdentifier(pooledResource)) | |
} | |
} | |
available = available.filter { !toPrune.contains(ObjectIdentifier($0)) } | |
active = active.filter { !toPrune.contains(ObjectIdentifier($0)) } | |
logger?.info("[\(self)] Prune: \(oldActiveCount) active resources before, \(active.count) now.") | |
} | |
_ = eventLoopGroup.next().scheduleTask(in: .milliseconds(Int(1000 * config.pruneInterval))) { [weak self] in | |
self?.pruneResources() | |
} | |
} | |
} | |
private final class PooledResource<Resource> where Resource: CloseableResource { | |
var resource: Resource! | |
var lastUsed = Date() | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment