Last active
November 12, 2016 01:12
-
-
Save pnarayanan/cf52e673e4e7685578124863bfa03635 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Helix agents will run on datanodes and frontends (routers) | |
// DataNode will instantiate a datanode agent at startup: | |
class AmbryDataNodeHelixAgent { | |
private AmbryClusterParticipant ambryClusterParticipant; | |
private AmbryClusterSpectator ambryClusterSpectator; | |
AmbryDataNodeHelixAgent(...) | |
{ | |
ambryClusterParticipant = new AmbryClusterParticipant(...); | |
ambryClusterSpectator = new AmbryClusterSpectator(...); | |
} | |
} | |
// Routers will instantiate a Router agent at startup: | |
class AmbryRouterHelixAgent { | |
private AmbryClusterSpectator ambryClusterSpectator; | |
AmbryFrontendHelixAgent(...) | |
{ | |
ambryClusterSpectator = new AmbryClusterSpectator(...); | |
} | |
} | |
// The publicly exposed part will be the ClusterMap interface implementation, which will internally use the | |
// information from the agents (mostly from the Spectator) | |
public class HelixClusterManager implements ClusterMap { | |
... | |
@Override | |
public List<PartitionId> getWritablePartitionIds() { | |
// WRITABLE vs. SEALED at the resource level via configs. | |
// ONLINE vs. OFFLINE at the replica level. | |
partitions = routingTableProvider.getResources("WRITABLE"); // to be implemented by extending RoutingTableProvider. | |
return partitions; | |
} | |
... | |
// can implement something like this in the cluster map: | |
@Override | |
public List<DataNodeId> getAvailableDataNodesForPartition(PartitionId partition) { | |
String partitionName = partition.getName(); | |
List<InstanceConfig> instanceConfigs = routingTableProvider.getInstances(partitionName, partitionName, "ONLINE"); | |
List<DataNodeId> availableNodes = new List(); | |
for (InstanceConfig instanceConfig: instanceConfigs) { | |
availableNodes.add(getDataNode(instanceConfig)); | |
} | |
return availableNodes; | |
} | |
} | |
// --------------- | |
// Spectator: This will be part of both the datanode and the router agents. | |
class AmbryClusterSpectator { | |
Map<String, RoutingTableProvider> dcToRoutingTableProvider = new Map(); | |
CompositeClusterView compositeClusterView; | |
// 1. At startup (constructor) | |
AmbryClusterSpectator(String clusterName, String instanceName, Map<String, String> zkConnectStrs) | |
{ | |
// Register a spectator for every dataCenter | |
for (Entry entry : zkConnectStrs) { | |
String dcName = entry.getKey(); | |
String zkConnectStr = entry.getValue(); | |
HelixManager manager = HelixManagerFactory.getZKHelixManager(clusterName, instanceName, | |
InstanceType.SPECTATOR, zkConnectStr); | |
manager.connect(); | |
RoutingTableProvider routingTableProvider = new RoutingTableProvider(); | |
manager.addExternalViewChangeListener(routingTableProvider); | |
dcToRoutingTableProvider.put(dcName, routingTableProvider); | |
// composite view, if required for simplicity. | |
compositeClusterView = new CompositeClusterView(dcToRoutingTableProvider); | |
} | |
} | |
} | |
// Participant will be part of the datanode agent: | |
class AmbryClusterParticipant { | |
HelixAdmin admin; | |
HelixManager manager; | |
String instanceName; | |
String clusterName; | |
// 1. At startup (constructor) | |
AmbryClusterParticipant(String clusterName, String instanceName, String localZkConnectStr) | |
{ | |
admin = new ZKHelixAdmin(localZkConnectStr); | |
manager = HelixManagerFactory.getZKHelixManager(clusterName, instanceName, | |
InstanceType.PARTICIPANT, localZkConnectStr); | |
this.instanceName = instanceName; | |
this.clusterName = clusterName; | |
StateMachineEngine stateMachine = manager.getStateMachineEngine(); | |
stateModelFactory = new AmbryStateModelFactory(); | |
stateMachine.registerStateModelFactory("OnlineOfflineStateModel", stateModelFactory); | |
// this is how Helix controller knows that this participant is "up" | |
manager.connect(); | |
} | |
// 2. When the node becomes disconnected or the ambry server process is shutdown, the connection goes away | |
// and Helix comes to know that the instance/node/participant is offline. There isn't anything in particular | |
// that we need to do in response to that at this time. | |
// 3. When the instance figures out that a disk has gone bad, it goes on to disable all partitions on that disk. | |
void onDiskFailure(String diskId) | |
{ | |
InstanceConfig instanceConfig = admin.getInstanceConfig(clusterName, instanceName); | |
Map<String, String> diskInfo = instanceConfig.getRecord().getMapField(diskId); | |
String mountPath = diskInfo.get("mountPath"); | |
List<String> partitions = instanceConfig.getRecord().getListField(mountPath); | |
for (Partition partition: partitions) { | |
String resourceName = partition.getName(); | |
String partitionName = partition.getName(); | |
List<String> resourcePartitions = new List(); | |
resourcePartitions.add(partitionName); | |
admin.enablePartition(false, clusterName, instanceName, resourceName, resourcePartitions); | |
} | |
} | |
void onReplicaFull(String resourceName) | |
{ | |
// @todo: mark this at the resource level. disableResource is an option, but it is not clear | |
// if that is going to mark all the replicas as offline - which is not what we want. This is | |
// to be found out. | |
} | |
} | |
// The OnlineOfflineStateModel Helix comes with would be sufficient in Ambry. | |
// Including the below to give clarity on how the state model works | |
// and in case we want to bring in the "SEALED" aspect into the state model. | |
// The reason to define that at the resource level is that a replica does not | |
// ever individually become SEALED - the resource (ambry partition) as a whole is | |
// either SEALED or WRITABLE. | |
public class AmbryStateModelFactory extends StateModelFactory<AmbryStateModel> { | |
@Override | |
public AmbryStateModel createNewStateModel(String resource, String partitionName) { | |
return new AmbryStateModel(); | |
} | |
} | |
@StateModelInfo(initialState = "ONLINE", states = { | |
"ONLINE", "OFFLINE" | |
}) | |
class AmbryStateModel extends StateModel { | |
@Transition(to = "OFFLINE", from = "ONLINE") | |
public void onOfflineFromOnline(Message message, NotificationContext context) { | |
} | |
@Transition(to = "ONLINE", from = "OFFLINE") | |
public void onOnlineFromOffline(Message message, NotificationContext context) { | |
} | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment