Skip to content

Instantly share code, notes, and snippets.

@rakeshopensource
Last active September 17, 2023 12:04
Show Gist options
  • Save rakeshopensource/e5f78019387710a0aa30d1a583ebadb9 to your computer and use it in GitHub Desktop.
Save rakeshopensource/e5f78019387710a0aa30d1a583ebadb9 to your computer and use it in GitHub Desktop.
Explore the functioning of Consistent Hashing with a straightforward Java in-memory simulation. This simulation not only incorporates the core concept of Consistent Hashing but also encompasses key features such as data redistribution when nodes are added or removed, and the ability to serve data from replicas in the event of node crashes.
org.rakeshopensource.systemdesign
import java.util.*;
import java.util.function.Function;
import java.util.stream.Collectors;
public class ConsistentHashing<K, V> {
public static class Details<K, V> {
K node, key;
V value;
public Details(K node, K key, V value) {
this.node = node;
this.key = key;
this.value = value;
}
@Override
public String toString() {
return "Details{" +
"node='" + node + '\'' +
", Data key='" + key + '\'' +
", Data value='" + value + '\'' +
'}';
}
}
public static class Node<K, V> {
private final K identifier;
private final Map<K, V> dataStore = new HashMap<>();
public Node(K identifier) {
this.identifier = identifier;
}
public K getIdentifier() {
return identifier;
}
public Map<K, V> getDataStore() {
return dataStore;
}
}
private final NavigableMap<Integer, Node<K, V>> hashRing = new TreeMap<>();
private final int numberOfReplicas;
private final Function<K, Integer> hashFunction;
public ConsistentHashing(int numberOfReplicas, Function<K, Integer> hashFunction) {
this.numberOfReplicas = numberOfReplicas;
this.hashFunction = hashFunction;
}
public int hashFunction(K key) {
return this.hashFunction.apply(key);
}
public void addNode(Node<K, V> node) {
int hash = hashFunction(node.getIdentifier());
hashRing.put(hash, node);
redistributeKeys(node);
}
public void crashNode(Node<K, V> node) {
int hash = hashFunction(node.getIdentifier());
SortedMap<Integer, Node<K, V>> tailMap = hashRing.tailMap(hash);
tailMap.remove(hash);
// Optional: You may want to redistribute all keys here
// redistributeKeysAll();
}
public void removeNode(Node<K, V> node) {
int hash = hashFunction(node.getIdentifier());
SortedMap<Integer, Node<K, V>> tailMap = hashRing.tailMap(hash);
tailMap.remove(hash);
Node<K, V> nextNode = tailMap.isEmpty() ? hashRing.get(hashRing.firstKey()) : tailMap.get(tailMap.firstKey());
moveData(node, nextNode, true);
hashRing.remove(hash);
}
public void put(K key, V value) {
int hash = hashFunction(key);
SortedMap<Integer, Node<K, V>> tailMap = hashRing.tailMap(hash);
Iterator<Node<K, V>> iterator = tailMap.values().iterator();
for (int i = 0; i < numberOfReplicas; i++) {
if (!iterator.hasNext()) {
iterator = hashRing.values().iterator();
}
Node<K, V> node = iterator.next();
node.getDataStore().put(key, value);
}
}
public Details<K, V> get(K key) {
int hash = hashFunction(key);
SortedMap<Integer, Node<K, V>> tailMap = hashRing.tailMap(hash);
Iterator<Node<K, V>> iterator = tailMap.values().iterator();
for (int i = 0; i < numberOfReplicas; i++) {
if (!iterator.hasNext()) {
iterator = hashRing.values().iterator();
}
Node<K, V> node = iterator.next();
V value = node.getDataStore().get(key);
if (value != null) {
return new Details<>(node.getIdentifier(), key, value);
}
}
return null;
}
private void redistributeKeysAll() {
for (Node<K, V> node : hashRing.values()) {
Iterator<K> iterator = node.getDataStore().keySet().iterator();
while (iterator.hasNext()) {
K key = iterator.next();
int hash = hashFunction(key);
SortedMap<Integer, Node<K, V>> tailMap = hashRing.tailMap(hash);
Node<K, V> primaryNode = tailMap.get(tailMap.firstKey());
if (!primaryNode.getIdentifier().equals(node.getIdentifier())) {
primaryNode.getDataStore().put(key, node.getDataStore().get(key));
iterator.remove();
}
}
}
}
public void redistributeKeys(Node<K, V> newNode) {
Integer newNodeHash = hashFunction(newNode.getIdentifier());
SortedMap<Integer, Node<K, V>> headMap = hashRing.headMap(newNodeHash);
Node<K, V> previousNode = headMap.isEmpty() ? hashRing.lastEntry().getValue() : headMap.get(headMap.lastKey());
moveData(previousNode, newNode, false);
}
private void moveData(Node<K, V> sourceNode, Node<K, V> destinationNode, boolean moveAll) {
Map<K, V> dataToMove = sourceNode.getDataStore().entrySet().stream()
.filter(entry -> shouldMoveToNewNode(entry.getKey(), destinationNode, sourceNode, moveAll))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
for (Map.Entry<K, V> entry : dataToMove.entrySet()) {
sourceNode.getDataStore().remove(entry.getKey());
destinationNode.getDataStore().put(entry.getKey(), entry.getValue());
}
}
private boolean shouldMoveToNewNode(K key, Node<K, V> newNode, Node<K, V> previousNode, Boolean moveAll) {
if (moveAll) return true;
int keyHash = hashFunction(key);
int newNodeHash = hashFunction(newNode.getIdentifier());
int previousNodeHash = hashFunction(previousNode.getIdentifier());
return (keyHash > previousNodeHash && keyHash <= newNodeHash) ||
(newNodeHash < previousNodeHash && (keyHash > previousNodeHash || keyHash <= newNodeHash));
}
public static void main(String[] args) {
ConsistentHashing<String, String> consistentHashing = new ConsistentHashing<>(3, String::hashCode);
Node<String, String> node1 = new Node<>("Node1");
Node<String, String> node2 = new Node<>("Node2");
Node<String, String> node3 = new Node<>("Node3");
consistentHashing.addNode(node1);
consistentHashing.addNode(node2);
consistentHashing.addNode(node3);
// Put some data on nodes. Data and nodes are hashed using same hash function
consistentHashing.put("Node1", "value1");
consistentHashing.put("Node2", "value2");
consistentHashing.put("Node3", "value3");
consistentHashing.put("Node4", "value4");
// Retrieve and print the values of the keys
System.out.println(">>>>>> Initial state of the cluster");
System.out.println(consistentHashing.get("Node1"));
System.out.println(consistentHashing.get("Node2"));
System.out.println(consistentHashing.get("Node3"));
System.out.println(consistentHashing.get("Node4"));
System.out.println(consistentHashing.get("Node5"));
Node<String, String> node4 = new Node<>("Node4");
consistentHashing.addNode(node4);
System.out.println(">>>>>> After Node4 addition in cluster and data redistribution ");
System.out.println(consistentHashing.get("Node1"));
System.out.println(consistentHashing.get("Node2"));
System.out.println(consistentHashing.get("Node3"));
System.out.println(consistentHashing.get("Node4"));
System.out.println(consistentHashing.get("Node5"));
consistentHashing.removeNode(node1);
System.out.println(">>>>>> After Node1 removal in cluster and data redistribution ");
System.out.println(consistentHashing.get("Node1"));
System.out.println(consistentHashing.get("Node2"));
System.out.println(consistentHashing.get("Node3"));
System.out.println(consistentHashing.get("Node4"));
System.out.println(consistentHashing.get("Node5"));
consistentHashing.crashNode(node2);
System.out.println(">>>>>> After Node2 crash in cluster and data is servered from replica");
System.out.println(consistentHashing.get("Node1"));
System.out.println(consistentHashing.get("Node2"));
System.out.println(consistentHashing.get("Node3"));
System.out.println(consistentHashing.get("Node4"));
System.out.println(consistentHashing.get("Node5"));
consistentHashing.redistributeKeysAll();
System.out.println(">>>>>> Cluster after full redistribution");
System.out.println(consistentHashing.get("Node1"));
System.out.println(consistentHashing.get("Node2"));
System.out.println(consistentHashing.get("Node3"));
System.out.println(consistentHashing.get("Node4"));
System.out.println(consistentHashing.get("Node5"));
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment