Last active
May 4, 2018 12:43
-
-
Save peterjurkovic/9bf083fa786f06db7e97b7ee262f51c0 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.nexmo.chatapp.connector.api; | |
import com.google.common.util.concurrent.ThreadFactoryBuilder; | |
import com.nexmo.chatapp.connector.config.MonitoringServiceKeys; | |
import com.nexmo.chatapp.connector.config.ThreadPoolConfig; | |
import com.nexmo.common.server.MonitoringService; | |
import org.apache.logging.log4j.LogManager; | |
import org.apache.logging.log4j.Logger; | |
import java.util.Collection; | |
import java.util.List; | |
import java.util.Map; | |
import java.util.Objects; | |
import java.util.concurrent.CompletableFuture; | |
import java.util.concurrent.ConcurrentHashMap; | |
import java.util.concurrent.CountDownLatch; | |
import java.util.concurrent.LinkedBlockingQueue; | |
import java.util.concurrent.ThreadPoolExecutor; | |
import java.util.concurrent.TimeUnit; | |
import static java.util.Objects.requireNonNull; | |
import static java.util.concurrent.CompletableFuture.runAsync; | |
public class ThreadPoolFactory { | |
private final static Logger Log = LogManager.getLogger(); | |
private final Map<String, ThreadPool> registry = new ConcurrentHashMap<>(); | |
public ThreadPoolExecutor getThreadPool(String name, ThreadPoolConfig config) { | |
return registry.computeIfAbsent(name.toUpperCase(), k -> new ThreadPool(k, config)).get(); | |
} | |
public void shutdown() { | |
Collection<ThreadPool> thradPools = registry.values(); | |
CountDownLatch latch = new CountDownLatch(thradPools.size()); | |
thradPools.forEach( pool -> runAsync( () -> pool.shutdown(latch) )); | |
try { | |
latch.await(); | |
} catch (InterruptedException e) { | |
Log.warn("Failed to shutdown threadpools", e); | |
} | |
} | |
/** | |
* This can be called in the end of the Server class | |
*/ | |
public void collectMetrics(){ | |
registry.values().forEach( threadPool -> { | |
MonitoringService.createThreadPoolSizeGauge(threadPool.monitoringKey(), threadPool.get()); | |
}); | |
} | |
private static class ThreadPool { | |
private final ThreadPoolExecutor pool; | |
private final ThreadPoolConfig config; | |
private final String name; | |
public ThreadPool(String name, ThreadPoolConfig config) { | |
this.name = requireNonNull(name); | |
this.config = requireNonNull(config); | |
this.pool = new ThreadPoolExecutor(config.minThread(), | |
config.maxThread(), | |
config.keepAliveThreadMs(), | |
TimeUnit.MILLISECONDS, | |
new LinkedBlockingQueue<>(config.queueSize()), | |
new ThreadFactoryBuilder().setNameFormat(name + "-%d").build()); | |
} | |
void shutdown(CountDownLatch latch) { | |
try { | |
Log.error("Shutting down threadpool [ {} ] timeout [ {} ] ms ", this.name, config.shutdownAwaitingTimeMs()); | |
pool.shutdown(); | |
pool.awaitTermination(config.shutdownAwaitingTimeMs(), TimeUnit.MILLISECONDS); | |
} catch (InterruptedException e) { | |
Log.error("Error while shutting down threadpool [ {} ] ", this.name, e); | |
pool.shutdownNow(); | |
}finally { | |
latch.countDown(); | |
} | |
} | |
ThreadPoolExecutor get() { | |
return this.pool; | |
} | |
public String monitoringKey(){ | |
return "chatapp_" + this.name.toLowerCase().replace("-","_") + "_threadpool"; | |
} | |
@Override | |
public boolean equals(Object o) { | |
if (this == o) return true; | |
if (o == null || getClass() != o.getClass()) return false; | |
ThreadPool that = (ThreadPool) o; | |
return Objects.equals(name, that.name); | |
} | |
@Override | |
public int hashCode() { | |
return Objects.hash(name); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment