Last active
March 4, 2016 09:21
-
-
Save unixunion/fbf075e22b2e1d32a47d to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.deblox.clustering; | |
import com.deblox.Boot; | |
import io.vertx.core.AbstractVerticle; | |
import io.vertx.core.Future; | |
import io.vertx.core.Handler; | |
import io.vertx.core.eventbus.DeliveryOptions; | |
import io.vertx.core.eventbus.EventBus; | |
import io.vertx.core.eventbus.Message; | |
import io.vertx.core.eventbus.MessageConsumer; | |
import io.vertx.core.json.JsonObject; | |
import io.vertx.core.logging.Logger; | |
import io.vertx.core.logging.LoggerFactory; | |
import io.vertx.core.shareddata.SharedData; | |
import java.net.InetAddress; | |
import java.net.UnknownHostException; | |
import java.util.UUID; | |
import static com.deblox.Util.sendError; | |
import static com.deblox.Util.sendOk; | |
/** | |
* Created by keghol on 09/11/15. | |
* | |
* Listens on topic: deblox.TransponderService | |
* Listens on private topic: deblox.TransponderService.{HOSTNAME} and deblox.TransponderService.{UUID} | |
* | |
* maintains a clusterwide shared data object called nodes, which contains a key: nodes with a JsonObject representing | |
* all nodes in the cluster as a JsonArray. | |
* | |
* Periodically announce self to the cluster | |
* Periodically iterate over nodes in the cluster and "ping", remove unackknowledged nodes | |
* Keep track of clusterSize | |
* | |
*/ | |
public class TransponderService extends AbstractVerticle implements Handler<Message<JsonObject>> { | |
private static final Logger logger = LoggerFactory.getLogger(TransponderService.class); | |
private static String nodeSharedData = "nodes"; // clusterwide map and key to store list of nodes in. | |
private static String nodeListKey = "nodes"; // the key we will store ALL the nodes in | |
private String transponderTopicPrefix = "greatsnipe.TransponderService"; // default prefix for the eventbus topics | |
private static int clusterSize = 1; // set the clusterSize to 1, this variable is updated by "register" handler | |
// holders | |
DeliveryOptions eventbusOptions; | |
private static String hostname; | |
private String transponderAddress; | |
private static String localHashAddress; | |
private long announceTimer; // will hold the instance of the Timer | |
private long houseKeepingTimer; // will hold the instance of the Timer | |
// timers / timeouts | |
private int announceInterval = 1000; // the interval between announcing | |
private int housekeepingInterval = 10000; // interval at which housekeeping function fires | |
private int eventbusTimeout = 3000; // timeout for events on the eventbus | |
/** | |
* Put any graceful shutdown stuff here. | |
* | |
* @param stopFuture | |
* @throws Exception | |
*/ | |
@Override | |
public void stop(Future<Void> stopFuture) throws Exception{ | |
logger.info("Shutting Down"); | |
vertx.eventBus().publish(transponderAddress, new JsonObject() | |
.put("action", "unregister") | |
.put("hostname", hostname), eventbusOptions); | |
try { | |
vertx.setTimer(1000, event -> { | |
stopFuture.complete(); | |
}); | |
} catch (Exception e) { | |
e.printStackTrace(); | |
logger.error("Unable to shutdown"); | |
} | |
} | |
/** | |
* Startup the service, load config, register handlers, start timers... | |
* | |
* @param startFuture | |
* @throws Exception | |
*/ | |
public void start(Future<Void> startFuture) throws Exception { | |
logger.info("deploying with config: " + config().toString()); | |
// Read hostname from config, or get from OS | |
try { | |
hostname = config().getString("hostname", InetAddress.getLocalHost().getHostName()); | |
logger.info("hostname: " + hostname); | |
} catch (UnknownHostException e) { | |
e.printStackTrace(); | |
logger.error("unable to determine hostname, please add a hostname config for this verticle"); | |
startFuture.fail("unable to determine hostname, please add a hostname config for this verticle"); | |
} | |
// set the timers / intervals / timeouts | |
announceInterval = config().getInteger("announceInterval", announceInterval); | |
housekeepingInterval = config().getInteger("housekeepingInterval", housekeepingInterval); | |
eventbusTimeout = config().getInteger("eventbusTimeout", eventbusTimeout); | |
// queues and topics addresses | |
transponderAddress = config().getString("transponderAddress", transponderTopicPrefix); | |
localHashAddress = transponderAddress + "." + UUID.randomUUID().toString(); | |
// messaging options for when sending message from this verticle. | |
eventbusOptions = new DeliveryOptions() | |
.addHeader("hostname", hostname) | |
.addHeader("localHashAddress", localHashAddress) | |
.setSendTimeout(eventbusTimeout); | |
logger.info("Starting consumers"); | |
MessageConsumer<JsonObject> transponderAddressConsumer = vertx.eventBus().consumer(transponderAddress, this); | |
transponderAddressConsumer.completionHandler(res -> { | |
logger.info("transponderAddress: " + transponderAddress); | |
}); | |
MessageConsumer<JsonObject> localHashAddressConsumer = vertx.eventBus().consumer(localHashAddress, this); | |
localHashAddressConsumer.completionHandler(res -> { | |
logger.info("localHashAddress: " + localHashAddress); | |
}); | |
// Publish a notification to the cluster to register myself periodically | |
announceTimer = vertx.setPeriodic(announceInterval, timerID -> { | |
JsonObject request = new JsonObject() | |
.put("action", "register") | |
.put("config", Boot.config); | |
logger.debug("Sending request: " + request + " to " + transponderAddress); | |
vertx.eventBus().send(transponderAddress, request, eventbusOptions, event -> { | |
if (event.succeeded()) { | |
JsonObject response = new JsonObject(event.result().body().toString()); | |
if (response.getString("status").equals("ok")) { | |
logger.debug("Successfully announced to cluster"); | |
} else { | |
logger.warn("Response does not contain a valid status"); | |
logger.warn(response.toString()); | |
} | |
} | |
}); | |
}); | |
logger.info("announceTimer id: " + announceTimer); | |
/** | |
* go through nodes in the cluster and check if they are alive, clean up cluster map if they are dead. | |
*/ | |
houseKeepingTimer = vertx.setPeriodic(housekeepingInterval, timerID ->{ | |
logger.debug("Checking nodes in the cluster"); | |
try { | |
SharedData sd = vertx.sharedData(); | |
// get the map called "nodes" | |
sd.<String, JsonObject>getClusterWideMap(nodeSharedData, result -> { | |
if (result.succeeded()) { | |
try { | |
// get the "nodes" key from the map | |
result.result().get(nodeListKey, r -> { | |
try { | |
logger.debug(nodeListKey + " key value: " + r.result().toString()); | |
JsonObject document = r.result(); | |
logger.debug("Cluster size: " + document.size()); | |
// update this node's clusterSize variable to match the document | |
if (clusterSize != document.size()) { | |
clusterSize = document.size(); | |
logger.info("Cluster resized to " + clusterSize + " nodes"); | |
} | |
document.forEach(node -> { | |
pingNode(node.getKey()); | |
}); | |
} catch (NullPointerException e) { | |
logger.warn("No nodes in cluster yet"); | |
} | |
}); | |
} catch (NullPointerException e) { | |
logger.warn("Cluster does not contain any data yet"); | |
} | |
} else { | |
// unable to get the clusterwidemap | |
logger.warn("Cluster lookup request failed"); | |
} | |
}); | |
} catch (NullPointerException e) { | |
logger.warn("Cluster does not seem to be available"); | |
} | |
}); | |
logger.info("houseKeepingTimer id: " + houseKeepingTimer); | |
} | |
/** | |
* register a node with the cluster, accepts JSON with a "hostname" field | |
* | |
* @param event | |
*/ | |
private void doRegisterNode(Message<JsonObject> event) { | |
logger.debug("Node: " + event.replyAddress() + " requesting registration of: " + event.body()); | |
// the host's unique address is used as a identifier | |
String hostAddress = event.headers().get("localHashAddress"); | |
// we will store the host's config in the map | |
JsonObject hostConfig = event.body().getJsonObject("config", new JsonObject()); | |
logger.debug("Getting nodeShared Data map: " + nodeSharedData); | |
SharedData sd = vertx.sharedData(); | |
sd.<String, JsonObject>getClusterWideMap(nodeSharedData, req -> { | |
req.result().putIfAbsent(nodeListKey, new JsonObject(), r2 -> { | |
if (r2.succeeded()) { | |
req.result().get(nodeListKey, nodesList -> { | |
JsonObject nodes = nodesList.result(); | |
nodes.put(hostAddress, hostConfig); | |
req.result().put(nodeListKey, nodes, updateNodes -> { | |
if (updateNodes.succeeded()) { | |
logger.debug("Updated nodes document"); | |
sendOk(event); | |
} else { | |
logger.error("Unable to update nodes document"); | |
sendError(event); | |
} | |
}); | |
}); | |
} else { | |
logger.warn("Unable to create / get the node shared data object"); | |
sendError(event); | |
} | |
}); | |
}); | |
} | |
/** | |
* unregister a node from the cluster, accepts JSON with a "localHashAddress" header matching the | |
* "key" in nodes document on the clusterwide shared storage. | |
* @param event | |
*/ | |
private void doUnregisterNode(Message<JsonObject> event, String hashAddress) { | |
logger.debug("Unregister node: " + event.body()); | |
String host = hashAddress; | |
logger.info("Removing node: " + host); | |
SharedData sd = vertx.sharedData(); | |
sd.<String, JsonObject>getClusterWideMap(nodeSharedData, req -> { | |
if (req.succeeded()) { | |
req.result().get(nodeListKey, nodesList -> { | |
JsonObject nodes = nodesList.result(); | |
nodes.remove(host); | |
req.result().put(nodeListKey, nodes, updateNodes -> { | |
if (updateNodes.succeeded()) { | |
logger.debug("Updated nodes document"); | |
sendOk(event); | |
} else { | |
logger.error("Unable to update nodes document"); | |
sendError(event); | |
} | |
}); | |
}); | |
} else { | |
logger.warn("Cluster unavailable"); | |
sendError(event); | |
} | |
}); | |
} | |
/** | |
* Main event handlers for this verticle | |
* @param event | |
*/ | |
@Override | |
public void handle(Message<JsonObject> event) { | |
final String action = event.body().getString("action"); | |
switch (action) { | |
case "register": | |
doRegisterNode(event); | |
break; | |
case "unregister": | |
doUnregisterNode(event, event.body().getString("hostname")); | |
break; | |
case "ping": | |
doPing(event); | |
break; | |
default: | |
logger.warn("Undefined action: " + action); | |
sendError(event, "Undefined action: " + action); | |
break; | |
} | |
} | |
/** | |
* ping a node on its private address, if its not answering, remove it from the nodes map | |
*/ | |
public void pingNode(String nodeAddress) { | |
logger.debug("Checking host at address: " + nodeAddress); | |
vertx.eventBus().send(nodeAddress, new JsonObject().put("action", "ping"), eventbusOptions, ping -> { | |
if (ping.succeeded()) { | |
JsonObject response = new JsonObject(ping.result().body().toString()); | |
if (response.getString("status").equals("ok")) { | |
logger.debug("host " + nodeAddress + " is alive"); | |
} else { | |
logger.warn("host " + nodeAddress + " has responded invalidly: " + ping.result().toString()); | |
} | |
} else { | |
logger.warn("Node " + nodeAddress + " unreachable, removing, no such address registered?"); | |
JsonObject request = new JsonObject() | |
.put("action", "unregister") | |
.put("hostname", nodeAddress); | |
logger.info("Sending unregister request for node: " + nodeAddress); | |
vertx.eventBus().send(transponderAddress, request, eventbusOptions, removeNode -> { | |
if (removeNode.succeeded()) { | |
logger.debug("Node removal requested"); | |
} else { | |
logger.warn("Failed to remove node, will retry later."); | |
} | |
}); | |
} | |
}); | |
} | |
// basically sends back a OK message | |
public void doPing(Message<JsonObject> event) { | |
sendOk(event, eventbusOptions); | |
} | |
// accessor for other verticles to determine the cluster size | |
public static int getClusterSize() { | |
return clusterSize; | |
} | |
// accessor for other verticles to deterine the configured hostname | |
public static String getHostname() { | |
return hostname; | |
} | |
// accessor for other verticles to deterine the configured localAddress | |
public static String getLocalHashAddress() { | |
return localHashAddress; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment