Created
November 7, 2011 10:57
-
-
Save tsuna/1344664 to your computer and use it in GitHub Desktop.
Twitter Querulous leaking connections?
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| /* | |
| This is for https://github.com/nkallen/querulous/issues/24 | |
| I must be doing something wrong but I can't see what. In my loadtests it's pretty apparent | |
| that new MySQL connections are always created all the time and are never re-used, so my app | |
| then runs out of FD. | |
| */ | |
| package whatever | |
| import java.sql.ResultSet | |
| import scala.collection.mutable.Set | |
| import scala.util.Random | |
| import org.slf4j.LoggerFactory | |
| import com.twitter.conversions.time._ | |
| import com.twitter.util.Future | |
| import com.twitter.querulous.async.AsyncQueryEvaluator | |
| import com.twitter.querulous.async.AsyncQueryEvaluatorFactory | |
| import com.twitter.querulous.query.QueryClass | |
| class MySQL(confdir: String, env: String, dataset: String) { | |
| import MySQL._ | |
| private val servers = parseConfig(confdir, env, dataset, "slaves") | |
| private val rnd = new Random | |
| private val queryEvaluatorFactory = getQueryEvaluatorFactory | |
| /** | |
| * One-shot SELECT query. | |
| * If you're going to issue multiple requests in a row, you should call | |
| * {@code getQueryEvaluator} and hold on to the instance returned and call | |
| * select() on that reference instead. | |
| */ | |
| def select[A](query: String, params: Any*)(f: ResultSet => A): Future[Seq[A]] = | |
| getQueryEvaluator.select(query, params: _*)(f) | |
| /** | |
| * One-shot single-row SELECT query. | |
| * If you're going to issue multiple requests in a row, you should call | |
| * {@code getQueryEvaluator} and hold on to the instance returned and call | |
| * select() on that reference instead. | |
| */ | |
| def selectOne[A](query: String, params: Any*)(f: ResultSet => A): Future[Option[A]] = | |
| getQueryEvaluator.selectOne(query, params: _*)(f) | |
| /** | |
| * Returns a query evaluator to be used to execute one or more queries. | |
| */ | |
| def getQueryEvaluator: AsyncQueryEvaluator = | |
| queryEvaluatorFactory(rnd.shuffle(servers.hosts), schema, | |
| servers.user, servers.pass) | |
| /** Closes all connections and releases all threads. */ | |
| def shutdown() { | |
| // TODO: Once https://github.com/twitter/querulous/pull/10 is merged | |
| // call the shutdown method here. | |
| } | |
| override val toString = "MySQL(" + confdir + ", " + env + ", " + dataset + ')' | |
| } | |
| /** Holds a list of servers (ip:port) and credentials to connect to them. */ | |
| final case class MySQLServers(val hosts: List[String], | |
| val user: String, val pass: String) | |
| object MySQL { | |
| private val log = LoggerFactory.getLogger(getClass) | |
| /** | |
| * Finds what MySQL servers to use based on the YAML service config. | |
| * @param confdir Path to the ServiceConfiguration directory. | |
| * @param env Name of the environment (e.g. "prd" or "dev"). | |
| * @param kind Kind of MySQL server (e.g. "slaves" or "mining"). | |
| * @param dataset Name of the dataset (e.g. "userdb" or "ratings"). | |
| * @throws yaml.ConfigException if the config couldn't be read, or was | |
| * otherwise invalid. | |
| */ | |
| private def parseConfig(confdir: String, env: String, | |
| dataset: String, kind: String): MySQLServers = { | |
| /* Irrelevant code reading stuff from a config file removed */ | |
| MySQLServers(servers, user, pass) | |
| } | |
| /** Returns a factory of asynchronous QueryEvaluators. */ | |
| private def getQueryEvaluatorFactory: AsyncQueryEvaluatorFactory = { | |
| import com.twitter.querulous.config.ApachePoolingDatabase | |
| import com.twitter.querulous.config.AsyncQueryEvaluator | |
| import com.twitter.querulous.config.AutoDisablingDatabase | |
| import com.twitter.querulous.config.QueryTimeout | |
| val ncpu = Runtime.getRuntime.availableProcessors | |
| val config = new AsyncQueryEvaluator { | |
| database.pool = new ApachePoolingDatabase { | |
| sizeMin = ncpu / 2 // Min. open connections. | |
| sizeMax = ncpu // Max. open connections. | |
| // If we run out of connections, how long we're willing to get blocked | |
| // to wait for someone else to release a connection: | |
| maxWait = 1.second | |
| // Min. amount of time before an idle connection can be dropped. Our | |
| // MySQL servers have this set to 1000 seconds (see SHOW GLOBAL | |
| // VARIABLES LIKE 'wait_timeout';) | |
| minEvictableIdle = 60.seconds | |
| // Every `testIdle' interval, all the connections will receive a test | |
| // query ("/* ping */ SELECT 1") to make sure they're still alive. | |
| testIdle = 10.second | |
| // Whether or not to check the connection with a "/* ping */ SELECT 1" | |
| // once we grab it from the pool to use it. We don't do this as this | |
| // would cause extra round-trips every single time we grab a connection | |
| // from the pool. | |
| testOnBorrow = false | |
| } | |
| // Disable all connections to a particular host after a certain | |
| // number of SQL Exceptions. | |
| database.autoDisable = new AutoDisablingDatabase { | |
| val errorCount = 200 | |
| val interval = 60.seconds | |
| } | |
| // Without this it seems that the Twitter code closes the connection | |
| // after each query... Uh? | |
| database.memoize = true | |
| query.debug = { msg => log.info("MySQL query: " + msg) } | |
| query.timeouts = Map( | |
| QueryClass.Select -> QueryTimeout(2.seconds), | |
| QueryClass.Execute -> QueryTimeout(5.seconds) | |
| ) | |
| } | |
| // Normally this function would then just return `config()', but because | |
| // we need to wrap the query factory, we need to add all the cruft below: | |
| import com.twitter.querulous.async | |
| import com.twitter.querulous.NullStatsCollector | |
| val stats = NullStatsCollector | |
| val db = new async.BlockingDatabaseWrapperFactory( | |
| async.AsyncQueryEvaluator.defaultWorkPool, | |
| async.AsyncQueryEvaluator.checkoutPool(config.maxWaiters), | |
| config.database(stats) | |
| ) | |
| val queryfactory = new TaggingQueryFactory(config.query(stats), | |
| "foo") | |
| new async.StandardAsyncQueryEvaluatorFactory(db, queryfactory) | |
| } | |
| } | |
| import com.twitter.querulous.query.QueryFactory | |
| /** | |
| * Prefixes every SQL query with a tag enclosed in coments. | |
| * This makes debugging easier during/after outages. | |
| */ | |
| final class TaggingQueryFactory(queryFactory: QueryFactory, val tag: String) | |
| extends QueryFactory { | |
| import java.sql.Connection | |
| def apply(connection: Connection, queryClass: QueryClass, query: String, params: Any*) = | |
| queryFactory(connection, queryClass, "/*" + tag + "*/" + query, params: _*) | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment