Skip to content

Instantly share code, notes, and snippets.

@pnarayanan
Last active November 12, 2016 01:12
Show Gist options
  • Save pnarayanan/cf52e673e4e7685578124863bfa03635 to your computer and use it in GitHub Desktop.
Save pnarayanan/cf52e673e4e7685578124863bfa03635 to your computer and use it in GitHub Desktop.
// Helix agents will run on datanodes and frontends (routers)
// DataNode will instantiate a datanode agent at startup:
class AmbryDataNodeHelixAgent {
private AmbryClusterParticipant ambryClusterParticipant;
private AmbryClusterSpectator ambryClusterSpectator;
AmbryDataNodeHelixAgent(...)
{
ambryClusterParticipant = new AmbryClusterParticipant(...);
ambryClusterSpectator = new AmbryClusterSpectator(...);
}
}
// Routers will instantiate a Router agent at startup:
class AmbryRouterHelixAgent {
private AmbryClusterSpectator ambryClusterSpectator;
AmbryFrontendHelixAgent(...)
{
ambryClusterSpectator = new AmbryClusterSpectator(...);
}
}
// The publicly exposed part will be the ClusterMap interface implementation, which will internally use the
// information from the agents (mostly from the Spectator)
public class HelixClusterManager implements ClusterMap {
...
@Override
public List<PartitionId> getWritablePartitionIds() {
// WRITABLE vs. SEALED at the resource level via configs.
// ONLINE vs. OFFLINE at the replica level.
partitions = routingTableProvider.getResources("WRITABLE"); // to be implemented by extending RoutingTableProvider.
return partitions;
}
...
// can implement something like this in the cluster map:
@Override
public List<DataNodeId> getAvailableDataNodesForPartition(PartitionId partition) {
String partitionName = partition.getName();
List<InstanceConfig> instanceConfigs = routingTableProvider.getInstances(partitionName, partitionName, "ONLINE");
List<DataNodeId> availableNodes = new List();
for (InstanceConfig instanceConfig: instanceConfigs) {
availableNodes.add(getDataNode(instanceConfig));
}
return availableNodes;
}
}
// ---------------
// Spectator: This will be part of both the datanode and the router agents.
class AmbryClusterSpectator {
Map<String, RoutingTableProvider> dcToRoutingTableProvider = new Map();
CompositeClusterView compositeClusterView;
// 1. At startup (constructor)
AmbryClusterSpectator(String clusterName, String instanceName, Map<String, String> zkConnectStrs)
{
// Register a spectator for every dataCenter
for (Entry entry : zkConnectStrs) {
String dcName = entry.getKey();
String zkConnectStr = entry.getValue();
HelixManager manager = HelixManagerFactory.getZKHelixManager(clusterName, instanceName,
InstanceType.SPECTATOR, zkConnectStr);
manager.connect();
RoutingTableProvider routingTableProvider = new RoutingTableProvider();
manager.addExternalViewChangeListener(routingTableProvider);
dcToRoutingTableProvider.put(dcName, routingTableProvider);
// composite view, if required for simplicity.
compositeClusterView = new CompositeClusterView(dcToRoutingTableProvider);
}
}
}
// Participant will be part of the datanode agent:
class AmbryClusterParticipant {
HelixAdmin admin;
HelixManager manager;
String instanceName;
String clusterName;
// 1. At startup (constructor)
AmbryClusterParticipant(String clusterName, String instanceName, String localZkConnectStr)
{
admin = new ZKHelixAdmin(localZkConnectStr);
manager = HelixManagerFactory.getZKHelixManager(clusterName, instanceName,
InstanceType.PARTICIPANT, localZkConnectStr);
this.instanceName = instanceName;
this.clusterName = clusterName;
StateMachineEngine stateMachine = manager.getStateMachineEngine();
stateModelFactory = new AmbryStateModelFactory();
stateMachine.registerStateModelFactory("OnlineOfflineStateModel", stateModelFactory);
// this is how Helix controller knows that this participant is "up"
manager.connect();
}
// 2. When the node becomes disconnected or the ambry server process is shutdown, the connection goes away
// and Helix comes to know that the instance/node/participant is offline. There isn't anything in particular
// that we need to do in response to that at this time.
// 3. When the instance figures out that a disk has gone bad, it goes on to disable all partitions on that disk.
void onDiskFailure(String diskId)
{
InstanceConfig instanceConfig = admin.getInstanceConfig(clusterName, instanceName);
Map<String, String> diskInfo = instanceConfig.getRecord().getMapField(diskId);
String mountPath = diskInfo.get("mountPath");
List<String> partitions = instanceConfig.getRecord().getListField(mountPath);
for (Partition partition: partitions) {
String resourceName = partition.getName();
String partitionName = partition.getName();
List<String> resourcePartitions = new List();
resourcePartitions.add(partitionName);
admin.enablePartition(false, clusterName, instanceName, resourceName, resourcePartitions);
}
}
void onReplicaFull(String resourceName)
{
// @todo: mark this at the resource level. disableResource is an option, but it is not clear
// if that is going to mark all the replicas as offline - which is not what we want. This is
// to be found out.
}
}
// The OnlineOfflineStateModel Helix comes with would be sufficient in Ambry.
// Including the below to give clarity on how the state model works
// and in case we want to bring in the "SEALED" aspect into the state model.
// The reason to define that at the resource level is that a replica does not
// ever individually become SEALED - the resource (ambry partition) as a whole is
// either SEALED or WRITABLE.
public class AmbryStateModelFactory extends StateModelFactory<AmbryStateModel> {
@Override
public AmbryStateModel createNewStateModel(String resource, String partitionName) {
return new AmbryStateModel();
}
}
@StateModelInfo(initialState = "ONLINE", states = {
"ONLINE", "OFFLINE"
})
class AmbryStateModel extends StateModel {
@Transition(to = "OFFLINE", from = "ONLINE")
public void onOfflineFromOnline(Message message, NotificationContext context) {
}
@Transition(to = "ONLINE", from = "OFFLINE")
public void onOnlineFromOffline(Message message, NotificationContext context) {
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment