Created
June 18, 2015 02:35
-
-
Save arganzheng/dd53d1fff3c50d94cdd0 to your computer and use it in GitHub Desktop.
An HttpClient with PoolingHttpClientConnectionManager
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package org.opentsdb.client; | |
import static com.google.common.base.Preconditions.checkArgument; | |
import java.io.IOException; | |
import java.util.concurrent.TimeUnit; | |
import org.apache.commons.lang.StringUtils; | |
import org.apache.http.HeaderElement; | |
import org.apache.http.HeaderElementIterator; | |
import org.apache.http.HttpEntity; | |
import org.apache.http.HttpResponse; | |
import org.apache.http.client.config.RequestConfig; | |
import org.apache.http.client.methods.HttpPost; | |
import org.apache.http.client.methods.HttpUriRequest; | |
import org.apache.http.conn.ConnectionKeepAliveStrategy; | |
import org.apache.http.conn.HttpClientConnectionManager; | |
import org.apache.http.entity.StringEntity; | |
import org.apache.http.impl.client.CloseableHttpClient; | |
import org.apache.http.impl.client.HttpClients; | |
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager; | |
import org.apache.http.message.BasicHeaderElementIterator; | |
import org.apache.http.protocol.HTTP; | |
import org.apache.http.protocol.HttpContext; | |
import org.apache.http.util.EntityUtils; | |
import org.opentsdb.client.response.SimpleHttpResponse; | |
/** | |
* | |
* @author arganzheng | |
* | |
*/ | |
public class PoolingHttpClient { | |
private static final int DEFAULT_MAX_TOTAL_CONNECTIONS = 200; | |
private static final int DEFAULT_MAX_CONNECTIONS_PER_ROUTE = DEFAULT_MAX_TOTAL_CONNECTIONS; | |
private static final int DEFAULT_CONNECTION_TIMEOUT_MILLISECONDS = (10 * 1000); | |
private static final int DEFAULT_READ_TIMEOUT_MILLISECONDS = (10 * 1000); | |
private static final int DEFAULT_WAIT_TIMEOUT_MILLISECONDS = (10 * 1000); | |
private static final int DEFAULT_KEEP_ALIVE_MILLISECONDS = (5 * 60 * 1000); | |
private static final String DEFAULT_CHARSET = "UTF-8"; | |
private static final int DEFAULT_RETRY_COUNT = 2; | |
private int keepAlive = DEFAULT_KEEP_ALIVE_MILLISECONDS; | |
private int maxTotalConnections = DEFAULT_MAX_TOTAL_CONNECTIONS; | |
private int maxConnectionsPerRoute = DEFAULT_MAX_CONNECTIONS_PER_ROUTE; | |
private int connectTimeout = DEFAULT_CONNECTION_TIMEOUT_MILLISECONDS; | |
private int readTimeout = DEFAULT_READ_TIMEOUT_MILLISECONDS; | |
private int waitTimeout = DEFAULT_WAIT_TIMEOUT_MILLISECONDS; | |
private int retries = DEFAULT_RETRY_COUNT; | |
private PoolingHttpClientConnectionManager connManager = new PoolingHttpClientConnectionManager(); | |
private CloseableHttpClient httpClient = null; | |
private ConnectionKeepAliveStrategy keepAliveStrategy = new ConnectionKeepAliveStrategy() { | |
@Override | |
public long getKeepAliveDuration(HttpResponse response, | |
HttpContext context) { | |
HeaderElementIterator it = new BasicHeaderElementIterator( | |
response.headerIterator(HTTP.CONN_KEEP_ALIVE)); | |
while (it.hasNext()) { | |
HeaderElement he = it.nextElement(); | |
String param = he.getName(); | |
String value = he.getValue(); | |
if (value != null && param.equalsIgnoreCase("timeout")) { | |
return Long.parseLong(value) * 1000; | |
} | |
} | |
return keepAlive; | |
} | |
}; | |
public PoolingHttpClient() { | |
// Increase max total connection | |
connManager.setMaxTotal(maxTotalConnections); | |
// Increase default max connection per route | |
connManager.setDefaultMaxPerRoute(maxConnectionsPerRoute); | |
// config timeout | |
RequestConfig config = RequestConfig.custom() | |
.setConnectTimeout(connectTimeout) | |
.setConnectionRequestTimeout(waitTimeout) | |
.setSocketTimeout(readTimeout).build(); | |
httpClient = HttpClients.custom() | |
.setKeepAliveStrategy(keepAliveStrategy) | |
.setConnectionManager(connManager) | |
.setDefaultRequestConfig(config).build(); | |
// detect idle and expired connections and close them | |
IdleConnectionMonitorThread staleMonitor = new IdleConnectionMonitorThread( | |
connManager); | |
staleMonitor.start(); | |
} | |
public SimpleHttpResponse doPost(String url, String data) | |
throws IOException { | |
StringEntity requestEntity = new StringEntity(data); | |
HttpPost postMethod = new HttpPost(url); | |
postMethod.setEntity(requestEntity); | |
HttpResponse response = execute(postMethod); | |
int statusCode = response.getStatusLine().getStatusCode(); | |
SimpleHttpResponse simpleResponse = new SimpleHttpResponse(); | |
simpleResponse.setStatusCode(statusCode); | |
HttpEntity entity = response.getEntity(); | |
if (entity != null) { | |
// should return: application/json; charset=UTF-8 | |
String ctype = entity.getContentType().getValue(); | |
String charset = getResponseCharset(ctype); | |
String content = EntityUtils.toString(entity, charset); | |
simpleResponse.setContent(content); | |
} | |
return simpleResponse; | |
} | |
private static String getResponseCharset(String ctype) { | |
String charset = DEFAULT_CHARSET; | |
if (!StringUtils.isEmpty(ctype)) { | |
String[] params = ctype.split(";"); | |
for (String param : params) { | |
param = param.trim(); | |
if (param.startsWith("charset")) { | |
String[] pair = param.split("=", 2); | |
if (pair.length == 2) { | |
if (!StringUtils.isEmpty(pair[1])) { | |
charset = pair[1].trim(); | |
} | |
} | |
break; | |
} | |
} | |
} | |
return charset; | |
} | |
public HttpResponse execute(HttpUriRequest request) throws IOException { | |
HttpResponse response; | |
int tries = ++retries; | |
while (true) { | |
tries--; | |
try { | |
response = httpClient.execute(request); | |
break; | |
} catch (IOException e) { | |
if (tries < 1) | |
throw e; | |
} | |
} | |
return response; | |
} | |
public void shutdown() throws IOException { | |
httpClient.close(); | |
} | |
public int getKeepAlive() { | |
return keepAlive; | |
} | |
public void setKeepAlive(int keepAlive) { | |
this.keepAlive = keepAlive; | |
} | |
public int getMaxTotalConnections() { | |
return maxTotalConnections; | |
} | |
public void setMaxTotalConnections(int maxTotalConnections) { | |
this.maxTotalConnections = maxTotalConnections; | |
} | |
public int getMaxConnectionsPerRoute() { | |
return maxConnectionsPerRoute; | |
} | |
public void setMaxConnectionsPerRoute(int maxConnectionsPerRoute) { | |
this.maxConnectionsPerRoute = maxConnectionsPerRoute; | |
} | |
public int getConnectTimeout() { | |
return connectTimeout; | |
} | |
public void setConnectTimeout(int connectTimeout) { | |
this.connectTimeout = connectTimeout; | |
} | |
public int getReadTimeout() { | |
return readTimeout; | |
} | |
public void setReadTimeout(int readTimeout) { | |
this.readTimeout = readTimeout; | |
} | |
public int getWaitTimeout() { | |
return waitTimeout; | |
} | |
public void setWaitTimeout(int waitTimeout) { | |
this.waitTimeout = waitTimeout; | |
} | |
public PoolingHttpClientConnectionManager getConnManager() { | |
return connManager; | |
} | |
public void setConnManager(PoolingHttpClientConnectionManager connManager) { | |
this.connManager = connManager; | |
} | |
public int getRetryCount() { | |
return retries; | |
} | |
public void setRetryCount(int retries) { | |
checkArgument(retries >= 0); | |
this.retries = retries; | |
} | |
public static class IdleConnectionMonitorThread extends Thread { | |
private final HttpClientConnectionManager connMgr; | |
private volatile boolean shutdown; | |
public IdleConnectionMonitorThread(HttpClientConnectionManager connMgr) { | |
super(); | |
this.connMgr = connMgr; | |
} | |
@Override | |
public void run() { | |
try { | |
while (!shutdown) { | |
synchronized (this) { | |
wait(5000); | |
// Close expired connections | |
connMgr.closeExpiredConnections(); | |
// Optionally, close connections | |
// that have been idle longer than 60 sec | |
connMgr.closeIdleConnections(60, TimeUnit.SECONDS); | |
} | |
} | |
} catch (InterruptedException ex) { | |
// terminate | |
shutdown(); | |
} | |
} | |
public void shutdown() { | |
shutdown = true; | |
synchronized (this) { | |
notifyAll(); | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Thanks for you, I resolved my project.