Skip to content

Instantly share code, notes, and snippets.

@tsuna
Created November 7, 2011 10:57
Show Gist options
  • Select an option

  • Save tsuna/1344664 to your computer and use it in GitHub Desktop.

Select an option

Save tsuna/1344664 to your computer and use it in GitHub Desktop.
Twitter Querulous leaking connections?
/*
This is for https://github.com/nkallen/querulous/issues/24
I must be doing something wrong but I can't see what. In my loadtests it's pretty apparent
that new MySQL connections are always created all the time and are never re-used, so my app
then runs out of FD.
*/
package whatever
import java.sql.ResultSet
import scala.collection.mutable.Set
import scala.util.Random
import org.slf4j.LoggerFactory
import com.twitter.conversions.time._
import com.twitter.util.Future
import com.twitter.querulous.async.AsyncQueryEvaluator
import com.twitter.querulous.async.AsyncQueryEvaluatorFactory
import com.twitter.querulous.query.QueryClass
class MySQL(confdir: String, env: String, dataset: String) {
import MySQL._
private val servers = parseConfig(confdir, env, dataset, "slaves")
private val rnd = new Random
private val queryEvaluatorFactory = getQueryEvaluatorFactory
/**
* One-shot SELECT query.
* If you're going to issue multiple requests in a row, you should call
* {@code getQueryEvaluator} and hold on to the instance returned and call
* select() on that reference instead.
*/
def select[A](query: String, params: Any*)(f: ResultSet => A): Future[Seq[A]] =
getQueryEvaluator.select(query, params: _*)(f)
/**
* One-shot single-row SELECT query.
* If you're going to issue multiple requests in a row, you should call
* {@code getQueryEvaluator} and hold on to the instance returned and call
* select() on that reference instead.
*/
def selectOne[A](query: String, params: Any*)(f: ResultSet => A): Future[Option[A]] =
getQueryEvaluator.selectOne(query, params: _*)(f)
/**
* Returns a query evaluator to be used to execute one or more queries.
*/
def getQueryEvaluator: AsyncQueryEvaluator =
queryEvaluatorFactory(rnd.shuffle(servers.hosts), schema,
servers.user, servers.pass)
/** Closes all connections and releases all threads. */
def shutdown() {
// TODO: Once https://github.com/twitter/querulous/pull/10 is merged
// call the shutdown method here.
}
override val toString = "MySQL(" + confdir + ", " + env + ", " + dataset + ')'
}
/** Holds a list of servers (ip:port) and credentials to connect to them. */
final case class MySQLServers(val hosts: List[String],
val user: String, val pass: String)
object MySQL {
private val log = LoggerFactory.getLogger(getClass)
/**
* Finds what MySQL servers to use based on the YAML service config.
* @param confdir Path to the ServiceConfiguration directory.
* @param env Name of the environment (e.g. "prd" or "dev").
* @param kind Kind of MySQL server (e.g. "slaves" or "mining").
* @param dataset Name of the dataset (e.g. "userdb" or "ratings").
* @throws yaml.ConfigException if the config couldn't be read, or was
* otherwise invalid.
*/
private def parseConfig(confdir: String, env: String,
dataset: String, kind: String): MySQLServers = {
/* Irrelevant code reading stuff from a config file removed */
MySQLServers(servers, user, pass)
}
/** Returns a factory of asynchronous QueryEvaluators. */
private def getQueryEvaluatorFactory: AsyncQueryEvaluatorFactory = {
import com.twitter.querulous.config.ApachePoolingDatabase
import com.twitter.querulous.config.AsyncQueryEvaluator
import com.twitter.querulous.config.AutoDisablingDatabase
import com.twitter.querulous.config.QueryTimeout
val ncpu = Runtime.getRuntime.availableProcessors
val config = new AsyncQueryEvaluator {
database.pool = new ApachePoolingDatabase {
sizeMin = ncpu / 2 // Min. open connections.
sizeMax = ncpu // Max. open connections.
// If we run out of connections, how long we're willing to get blocked
// to wait for someone else to release a connection:
maxWait = 1.second
// Min. amount of time before an idle connection can be dropped. Our
// MySQL servers have this set to 1000 seconds (see SHOW GLOBAL
// VARIABLES LIKE 'wait_timeout';)
minEvictableIdle = 60.seconds
// Every `testIdle' interval, all the connections will receive a test
// query ("/* ping */ SELECT 1") to make sure they're still alive.
testIdle = 10.second
// Whether or not to check the connection with a "/* ping */ SELECT 1"
// once we grab it from the pool to use it. We don't do this as this
// would cause extra round-trips every single time we grab a connection
// from the pool.
testOnBorrow = false
}
// Disable all connections to a particular host after a certain
// number of SQL Exceptions.
database.autoDisable = new AutoDisablingDatabase {
val errorCount = 200
val interval = 60.seconds
}
// Without this it seems that the Twitter code closes the connection
// after each query... Uh?
database.memoize = true
query.debug = { msg => log.info("MySQL query: " + msg) }
query.timeouts = Map(
QueryClass.Select -> QueryTimeout(2.seconds),
QueryClass.Execute -> QueryTimeout(5.seconds)
)
}
// Normally this function would then just return `config()', but because
// we need to wrap the query factory, we need to add all the cruft below:
import com.twitter.querulous.async
import com.twitter.querulous.NullStatsCollector
val stats = NullStatsCollector
val db = new async.BlockingDatabaseWrapperFactory(
async.AsyncQueryEvaluator.defaultWorkPool,
async.AsyncQueryEvaluator.checkoutPool(config.maxWaiters),
config.database(stats)
)
val queryfactory = new TaggingQueryFactory(config.query(stats),
"foo")
new async.StandardAsyncQueryEvaluatorFactory(db, queryfactory)
}
}
import com.twitter.querulous.query.QueryFactory
/**
* Prefixes every SQL query with a tag enclosed in coments.
* This makes debugging easier during/after outages.
*/
final class TaggingQueryFactory(queryFactory: QueryFactory, val tag: String)
extends QueryFactory {
import java.sql.Connection
def apply(connection: Connection, queryClass: QueryClass, query: String, params: Any*) =
queryFactory(connection, queryClass, "/*" + tag + "*/" + query, params: _*)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment