Created
October 9, 2013 21:48
-
-
Save agleyzer/6909056 to your computer and use it in GitHub Desktop.
A Finagle Cluster that uses DNS, based on ZookeeperServerSetCluster.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import com.twitter.concurrent.Spool | |
import com.twitter.finagle.builder.Cluster | |
import com.twitter.finagle.util.DefaultTimer | |
import com.twitter.logging.Logger | |
import com.twitter.util.FuturePool | |
import com.twitter.util.{Duration, Future, JavaTimer, Promise, Return, Time, Timer} | |
import java.net.InetAddress | |
import java.net.UnknownHostException | |
import java.net.{InetSocketAddress, SocketAddress} | |
import java.security.Security | |
import scala.collection._ | |
import com.twitter.conversions.time._ | |
/** | |
* A Cluster of SocketAddresses that are continuously resolved via | |
* DNS. Comes in handy when accessing hosts with frequently-changing | |
* addresses (e.g. Akamai/AmazonS3/Google), especially from behind a | |
* firewall. | |
*/ | |
class DnsCluster(host: String, port: Int, ttl: Duration, timer: Timer) | |
extends Cluster[SocketAddress] { | |
private[this] val log = Logger(this.getClass) | |
private[this] val underlyingSet = new mutable.HashSet[SocketAddress] | |
private[this] var changes = new Promise[Spool[Cluster.Change[SocketAddress]]] | |
// exposed for testing | |
protected[util] def blockingDnsCall: Set[SocketAddress] = { | |
InetAddress.getAllByName(host).map { addr => | |
new InetSocketAddress(addr, port): SocketAddress | |
}.toSet | |
} | |
// exposed for testing | |
protected[util] def resolveHost: Future[Set[SocketAddress]] = | |
FuturePool.unboundedPool { blockingDnsCall } handle { | |
case ex: UnknownHostException => | |
log.error("DNS failed for host %s", host) | |
Set.empty[SocketAddress] | |
} | |
private[this] def updateAddress(newSet: Set[SocketAddress]) = synchronized { | |
def appendUpdate(update: Cluster.Change[SocketAddress]) = { | |
val newTail = new Promise[Spool[Cluster.Change[SocketAddress]]] | |
changes() = Return(update *:: newTail) | |
changes = newTail | |
} | |
if (newSet != underlyingSet) { | |
log.info("%s resolved as %s", host, newSet.mkString(", ")) | |
} | |
val added = newSet &~ underlyingSet | |
val removed = underlyingSet &~ newSet | |
added foreach { address => | |
underlyingSet += address | |
appendUpdate(Cluster.Add(address)) | |
} | |
removed foreach { address => | |
underlyingSet -= address | |
appendUpdate(Cluster.Rem(address)) | |
} | |
} | |
log.info("starting DNS cluster for %s:%d, ttl %s", host, port, ttl) | |
private[this] val task = timer.schedule(Time.now, ttl) { | |
resolveHost onSuccess { newSet => | |
updateAddress(newSet) | |
} onFailure { ex => | |
log.error(ex, "failed to resolve %s", host) | |
} | |
} | |
def stop() = task.cancel() | |
def snap: (Seq[SocketAddress], Future[Spool[Cluster.Change[SocketAddress]]]) = | |
synchronized { | |
(underlyingSet.toSeq, changes) | |
} | |
} | |
object DnsCluster { | |
def apply(host: String, port: Int, ttl: Duration): DnsCluster = | |
new DnsCluster(host, port, ttl, DefaultTimer.twitter) | |
// Uses default DNS ttl value | |
// http://stackoverflow.com/questions/1256556/any-way-to-make-java-honor-the-dns-caching-timeout-ttl | |
def apply(host: String, port: Int): DnsCluster = { | |
val ttl = { | |
val minTtl = 5.seconds | |
val defaultTtl = 10.seconds | |
val maxTtl = 1.hour | |
val property = Option(Security.getProperty("networkaddress.cache.ttl")) | |
property map (_.toInt.seconds max minTtl min maxTtl) getOrElse (defaultTtl) | |
} | |
apply(host, port, ttl) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment