Created
August 22, 2011 16:05
-
-
Save ewhauser/1162766 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
//Maven repo at - maven.twttr.com | |
libraries.twitter = [ | |
'com.twitter.common:application:0.0.5', | |
'com.twitter.common:application-module-stats:0.0.1', | |
'com.twitter.common:application-module-http:0.0.1', | |
'com.twitter.common:application-module-log:0.0.1', | |
'com.twitter.common:inject:0.0.10', | |
'com.twitter.common:net-pool:0.0.12', | |
] |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.exacttarget.bloomin.jedis; | |
import com.google.common.base.Preconditions; | |
import com.twitter.common.net.pool.Connection; | |
import redis.clients.jedis.Jedis; | |
public class JedisConnection implements Connection<Jedis, Jedis> { | |
private final Jedis jedis; | |
public JedisConnection(Jedis jedis) { | |
this.jedis = Preconditions.checkNotNull(jedis); | |
} | |
@Override | |
public Jedis get() { | |
return jedis; | |
} | |
@Override | |
public boolean isValid() { | |
return true; | |
} | |
@Override | |
public void close() { | |
try { | |
try { | |
jedis.quit(); | |
} catch (Exception ignored) { | |
//ignored | |
} | |
jedis.disconnect(); | |
} catch (Exception ignored) { | |
//ignored | |
} | |
} | |
@Override | |
public Jedis getEndpoint() { | |
return jedis; | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.exacttarget.bloomin.jedis; | |
import com.google.common.base.Preconditions; | |
import com.google.common.collect.Maps; | |
import com.google.common.collect.Sets; | |
import com.twitter.common.net.pool.ConnectionFactory; | |
import com.twitter.common.quantity.Amount; | |
import com.twitter.common.quantity.Time; | |
import redis.clients.jedis.Jedis; | |
import java.io.IOException; | |
import java.net.InetSocketAddress; | |
import java.util.Set; | |
import java.util.concurrent.TimeUnit; | |
import java.util.concurrent.locks.Lock; | |
import java.util.concurrent.locks.ReentrantLock; | |
public class JedisConnectionFactory implements ConnectionFactory<JedisConnection> { | |
private final int maxConnections; | |
private final Set<JedisConnection> activeConnections = | |
Sets.newSetFromMap( | |
Maps.<JedisConnection, Boolean>newIdentityHashMap()); | |
private volatile int lastActiveConnectionsSize = 0; | |
private final Lock activeConnectionsWriteLock = new ReentrantLock(true); | |
private String poolName; | |
private ShardInfo shard; | |
public JedisConnectionFactory(ShardInfo shard, int maxConnections) { | |
this.shard = Preconditions.checkNotNull(shard); | |
Preconditions.checkState(maxConnections > 0); | |
this.maxConnections = maxConnections; | |
this.poolName = shard.getMaster().toString(); | |
} | |
@Override | |
public boolean mightCreate() { | |
return lastActiveConnectionsSize < maxConnections; | |
} | |
@Override | |
public JedisConnection create(Amount<Long, Time> timeout) throws IOException { | |
Preconditions.checkNotNull(timeout); | |
if (timeout.getValue() == 0) { | |
return create(); | |
} | |
try { | |
long timeRemainingNs = timeout.as(Time.NANOSECONDS); | |
long start = System.nanoTime(); | |
if (activeConnectionsWriteLock.tryLock(timeRemainingNs, TimeUnit.NANOSECONDS)) { | |
try { | |
if (!willCreateSafe()) { | |
return null; | |
} | |
timeRemainingNs -= System.nanoTime() - start; | |
return createConnection(); | |
} finally { | |
activeConnectionsWriteLock.unlock(); | |
} | |
} else { | |
return null; | |
} | |
} catch (InterruptedException e) { | |
return null; | |
} | |
} | |
private JedisConnection create() throws IOException { | |
activeConnectionsWriteLock.lock(); | |
try { | |
if (!willCreateSafe()) { | |
return null; | |
} | |
return createConnection(); | |
} finally { | |
activeConnectionsWriteLock.unlock(); | |
} | |
} | |
private JedisConnection createConnection() throws IOException { | |
InetSocketAddress master = shard.getMaster(); | |
Jedis jedis = new Jedis(master.getHostName(), master.getPort()); | |
JedisConnection connection = new JedisConnection(jedis); | |
activeConnections.add(connection); | |
lastActiveConnectionsSize = activeConnections.size(); | |
return connection; | |
} | |
private boolean willCreateSafe() { | |
return activeConnections.size() < maxConnections; | |
} | |
@Override | |
public void destroy(JedisConnection connection) { | |
activeConnectionsWriteLock.lock(); | |
try { | |
boolean wasActiveConnection = activeConnections.remove(connection); | |
Preconditions.checkArgument(wasActiveConnection, | |
"connection %s not created by this factory", connection); | |
lastActiveConnectionsSize = activeConnections.size(); | |
} finally { | |
activeConnectionsWriteLock.unlock(); | |
} | |
// We close the connection outside the critical section which means we may have more connections | |
// "active" (open) than maxConnections for a very short time | |
connection.close(); | |
} | |
@Override | |
public String toString() { | |
return String.format("%s[%s]", getClass().getSimpleName(), poolName); | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.exacttarget.bloomin.jedis; | |
import com.google.common.base.Preconditions; | |
import com.google.inject.Inject; | |
import com.twitter.common.base.ExceptionalClosure; | |
import com.twitter.common.base.ExceptionalFunction; | |
import com.twitter.common.base.ExceptionalSupplier; | |
import com.twitter.common.net.pool.ConnectionPool; | |
import com.twitter.common.quantity.Amount; | |
import com.twitter.common.quantity.Time; | |
import com.twitter.common.util.BackoffHelper; | |
import redis.clients.jedis.Jedis; | |
import javax.inject.Named; | |
import java.io.IOException; | |
public class JedisHelper { | |
private final ConnectionPool<JedisConnection> connectionPool; | |
private final BackoffHelper backoff; | |
/** | |
* @param pool Connection pool for Jedis | |
* @param jedisBackOffms The amount of time in millisconds to wait for Jedis connections to be killed before | |
* they are retried | |
* @param jedisBackOffMax The amount of time in milliseconds that Jedis connections will be retried | |
*/ | |
@Inject | |
public JedisHelper(ConnectionPool<JedisConnection> pool, | |
@Named("jedis.backoff.ms") long jedisBackOffms, | |
@Named("jedis.backoff.max") long jedisBackOffMax) { | |
this.connectionPool = Preconditions.checkNotNull(pool); | |
this.backoff = new BackoffHelper(Amount.of(jedisBackOffms, Time.MILLISECONDS), | |
Amount.of(jedisBackOffMax, Time.MILLISECONDS)); | |
} | |
public void doInPool(final ExceptionalClosure<Jedis, IOException> closure) { | |
doInPool(new ExceptionalFunction<Jedis, Object, IOException>() { | |
@Override | |
public Object apply(Jedis table) throws IOException { | |
closure.execute(table); | |
return Void.TYPE; | |
} | |
}); | |
} | |
public <T> T doInPool(final ExceptionalFunction<Jedis, T, IOException> function) { | |
try { | |
return backoff.doUntilResult(new ExceptionalSupplier<T, Exception>() { | |
@Override | |
public T get() throws Exception { | |
JedisConnection jedisConnection = connectionPool.get(); | |
try { | |
Jedis jedis = jedisConnection.get(); | |
return function.apply(jedis); | |
} catch (Exception e) { | |
throw new RuntimeException(e); | |
} finally { | |
connectionPool.release(jedisConnection); | |
} | |
} | |
}); | |
} catch (Exception e) { | |
throw new RuntimeException(e); | |
} | |
} | |
public void close() { | |
connectionPool.close(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment