Created
June 16, 2012 22:18
-
-
Save feliperazeek/2942666 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.klout.api.services | |
import akka.agent.Agent | |
import akka.dispatch._ | |
import org.joda.time._ | |
import com.klout.playful2.sugar._ | |
import com.klout.playful2.zookeeper._ | |
import com.klout.playful2.actors._ | |
import java.util.{ Map => javaMap } | |
import java.{ lang => java } | |
import play.Logger | |
import com.klout.api.models._ | |
import org.apache.zookeeper.KeeperException.NoNodeException | |
/** | |
* This is a service used to manage the state of a MySQL cluster. It watches nodes on ZooKeeper to define the health of each node in the cluster. | |
* | |
* @author Felipe Oliveira [@_felipera] | |
*/ | |
trait MySQLStateServiceComponent { | |
val mysqlStateService: MySQLStateService | |
trait MySQLStateService { | |
/** | |
* Return a list of currently healthy MySQL nodes | |
*/ | |
def getHealthyNodes(): List[String] | |
/** | |
* Returns the current health state of MySQL nodes | |
*/ | |
def currentState(): Map[String, Any] | |
} | |
} | |
/** | |
* This is the implemention of the service interface defined above. | |
* | |
* @author Felipe Oliveira [@_felipera] | |
*/ | |
trait AkkaAgentMySQLStateServiceComponent extends MySQLStateServiceComponent { | |
override lazy val mysqlStateService: MySQLStateService = { | |
/** | |
* Zookeeper base path where we will be watching updates to determine the health of MySQL nodes | |
*/ | |
lazy val nodeBasePath = config string "mysql.zk.base.path" ! | |
/** | |
* Zookeeper node path where we'll be watching for override updates so we can force the nodes we want the API to be talking to | |
*/ | |
lazy val nodeOverridePath = config string "mysql.zk.override.path" ! | |
/** | |
* Get the initial state of the MySQL cluster | |
*/ | |
def initialState: MySQLClusterState = { | |
val nodes: Map[String, MySQLNodeStatus] = MySQLClusterState.allNodes.map { | |
node => (node, getCurrentNodeStatus(node).getOrElse(MySQLDown)) | |
} toMap | |
val overrideNodes: Option[List[String]] = try { | |
Option(Shelves.get(nodeOverridePath).toString.split(",").toList) | |
} catch { | |
case error: NoNodeException => None | |
} | |
MySQLClusterState(nodes, overrideNodes) | |
} | |
/** | |
* Get the Zookeeper node path for a MySQL node | |
*/ | |
def getNodePath(node: String) = nodeBasePath + "/" + node | |
/** | |
* Get the current health of a node based on what's defined on Zookeeper | |
*/ | |
def getCurrentNodeStatus(node: String): Option[MySQLNodeStatus] = { | |
try { | |
val path = getNodePath(node) | |
val status = getNodeStatus(Shelves.get(path).toString) | |
please log "Node Health - Node: " + node + ", Status: " + status | |
status | |
} catch { | |
case error: NoNodeException => Option(MySQLDown) | |
} | |
} | |
/** | |
* Returns the status of a node based on the Zookeeper node value | |
*/ | |
def getNodeStatus(status: String): Option[MySQLNodeStatus] = status match { | |
case "up" => Option(MySQLUp) | |
case "down" => Option(MySQLDown) | |
case "degraded" => Option(MySQLDegraded) | |
case _ => None | |
} | |
/** | |
* Akka agent wrapper for the MySQL cluster state | |
*/ | |
val agent = Agent(initialState) | |
/** | |
* Watch for updates on Zookeeper nodes and trigger updates on the state class | |
*/ | |
MySQLClusterState.allNodes.foreach { | |
node => | |
Shelves.on(getNodePath(node)) { | |
case NodeUpdated(Some(value)) => getNodeStatus(value) match { | |
case Some(status) => { | |
please warn "MySQL Node Update - Node: " + node + ", Value: " + value + ", Status: " + status | |
agent send (_ withNodeUpdate (node, status)) | |
} | |
case _ => please warn "Unknown MySQL Status: " + value + " (Node: " + node + ")" | |
} | |
case NodeDeleted => { | |
please warn "MySQL Node Deleted - Node: " + node | |
agent send (_ withNodeUpdate (node, MySQLDown)) | |
} | |
case _ => Unit | |
} | |
} | |
/** | |
* Watch for override updates so we can force the API to talk to specific nodes | |
*/ | |
Shelves.on(nodeOverridePath) { | |
case NodeUpdated(Some(value)) => { | |
please warn "MySQL Nodes Override: " + value | |
agent send (_ withOverrideNodes (Option(value))) | |
} | |
case NodeDeleted => { | |
please warn "MySQL Nodes Override Removed!" | |
agent send (_ withOverrideNodes (None)) | |
} | |
case _ => Unit | |
} | |
new AkkaAgentMySQLStateService(agent) | |
} | |
class AkkaAgentMySQLStateService(private val state: Agent[MySQLClusterState]) extends MySQLStateService { | |
/** | |
* If there are no nodes available do we have a fallback? | |
*/ | |
lazy val fallbackEnabled = config boolean "mysql.zk.fallback.enabled" ! | |
/** | |
* If the fallback is enabled what's the node we should fallback to? | |
*/ | |
lazy val fallbackNode = config string "mysql.zk.fallback.node" ! | |
/** | |
* Return a list of currently healthy MySQL nodes | |
*/ | |
override def getHealthyNodes(): List[String] = { | |
// Get list of current healthy nodes | |
val nodes = state().getHealthyNodes | |
val overrideNodes = state().overrideNodes.getOrElse(List()) | |
please debug "MySQL Cluster State - Nodes: " + nodes + ", Override: " + overrideNodes | |
// If there are override nodes force to use that | |
val list = if (overrideNodes.size > 0) { | |
overrideNodes | |
} else { | |
// Ok there are no override nodes so check how many are available and healthy | |
if (nodes.size > 0) { | |
// Ok there are available healthy nodes | |
nodes | |
} else { | |
// Ok there are no available healthy nodes so check if there's a local fallback enabled | |
if (fallbackEnabled) { | |
// Fallback is enabled, use this default node | |
please debug "There are no healthy MySQL nodes available, using fallback node: " + fallbackNode | |
List(fallbackNode) | |
} else { | |
// No fallback is enabled, just return an empty list which should probably cause an exception to be thrown | |
List() | |
} | |
} | |
} | |
// Log Debug | |
please debug "MySQL Nodes: " + list | |
// Return nodes that should be used | |
list | |
} | |
/** | |
* Returns the current health state of MySQL nodes | |
*/ | |
override def currentState() = state().currentState | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment