Last active
September 17, 2023 12:04
-
-
Save rakeshopensource/e5f78019387710a0aa30d1a583ebadb9 to your computer and use it in GitHub Desktop.
Explore the functioning of Consistent Hashing with a straightforward Java in-memory simulation. This simulation not only incorporates the core concept of Consistent Hashing but also encompasses key features such as data redistribution when nodes are added or removed, and the ability to serve data from replicas in the event of node crashes.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
org.rakeshopensource.systemdesign | |
import java.util.*; | |
import java.util.function.Function; | |
import java.util.stream.Collectors; | |
public class ConsistentHashing<K, V> { | |
public static class Details<K, V> { | |
K node, key; | |
V value; | |
public Details(K node, K key, V value) { | |
this.node = node; | |
this.key = key; | |
this.value = value; | |
} | |
@Override | |
public String toString() { | |
return "Details{" + | |
"node='" + node + '\'' + | |
", Data key='" + key + '\'' + | |
", Data value='" + value + '\'' + | |
'}'; | |
} | |
} | |
public static class Node<K, V> { | |
private final K identifier; | |
private final Map<K, V> dataStore = new HashMap<>(); | |
public Node(K identifier) { | |
this.identifier = identifier; | |
} | |
public K getIdentifier() { | |
return identifier; | |
} | |
public Map<K, V> getDataStore() { | |
return dataStore; | |
} | |
} | |
private final NavigableMap<Integer, Node<K, V>> hashRing = new TreeMap<>(); | |
private final int numberOfReplicas; | |
private final Function<K, Integer> hashFunction; | |
public ConsistentHashing(int numberOfReplicas, Function<K, Integer> hashFunction) { | |
this.numberOfReplicas = numberOfReplicas; | |
this.hashFunction = hashFunction; | |
} | |
public int hashFunction(K key) { | |
return this.hashFunction.apply(key); | |
} | |
public void addNode(Node<K, V> node) { | |
int hash = hashFunction(node.getIdentifier()); | |
hashRing.put(hash, node); | |
redistributeKeys(node); | |
} | |
public void crashNode(Node<K, V> node) { | |
int hash = hashFunction(node.getIdentifier()); | |
SortedMap<Integer, Node<K, V>> tailMap = hashRing.tailMap(hash); | |
tailMap.remove(hash); | |
// Optional: You may want to redistribute all keys here | |
// redistributeKeysAll(); | |
} | |
public void removeNode(Node<K, V> node) { | |
int hash = hashFunction(node.getIdentifier()); | |
SortedMap<Integer, Node<K, V>> tailMap = hashRing.tailMap(hash); | |
tailMap.remove(hash); | |
Node<K, V> nextNode = tailMap.isEmpty() ? hashRing.get(hashRing.firstKey()) : tailMap.get(tailMap.firstKey()); | |
moveData(node, nextNode, true); | |
hashRing.remove(hash); | |
} | |
public void put(K key, V value) { | |
int hash = hashFunction(key); | |
SortedMap<Integer, Node<K, V>> tailMap = hashRing.tailMap(hash); | |
Iterator<Node<K, V>> iterator = tailMap.values().iterator(); | |
for (int i = 0; i < numberOfReplicas; i++) { | |
if (!iterator.hasNext()) { | |
iterator = hashRing.values().iterator(); | |
} | |
Node<K, V> node = iterator.next(); | |
node.getDataStore().put(key, value); | |
} | |
} | |
public Details<K, V> get(K key) { | |
int hash = hashFunction(key); | |
SortedMap<Integer, Node<K, V>> tailMap = hashRing.tailMap(hash); | |
Iterator<Node<K, V>> iterator = tailMap.values().iterator(); | |
for (int i = 0; i < numberOfReplicas; i++) { | |
if (!iterator.hasNext()) { | |
iterator = hashRing.values().iterator(); | |
} | |
Node<K, V> node = iterator.next(); | |
V value = node.getDataStore().get(key); | |
if (value != null) { | |
return new Details<>(node.getIdentifier(), key, value); | |
} | |
} | |
return null; | |
} | |
private void redistributeKeysAll() { | |
for (Node<K, V> node : hashRing.values()) { | |
Iterator<K> iterator = node.getDataStore().keySet().iterator(); | |
while (iterator.hasNext()) { | |
K key = iterator.next(); | |
int hash = hashFunction(key); | |
SortedMap<Integer, Node<K, V>> tailMap = hashRing.tailMap(hash); | |
Node<K, V> primaryNode = tailMap.get(tailMap.firstKey()); | |
if (!primaryNode.getIdentifier().equals(node.getIdentifier())) { | |
primaryNode.getDataStore().put(key, node.getDataStore().get(key)); | |
iterator.remove(); | |
} | |
} | |
} | |
} | |
public void redistributeKeys(Node<K, V> newNode) { | |
Integer newNodeHash = hashFunction(newNode.getIdentifier()); | |
SortedMap<Integer, Node<K, V>> headMap = hashRing.headMap(newNodeHash); | |
Node<K, V> previousNode = headMap.isEmpty() ? hashRing.lastEntry().getValue() : headMap.get(headMap.lastKey()); | |
moveData(previousNode, newNode, false); | |
} | |
private void moveData(Node<K, V> sourceNode, Node<K, V> destinationNode, boolean moveAll) { | |
Map<K, V> dataToMove = sourceNode.getDataStore().entrySet().stream() | |
.filter(entry -> shouldMoveToNewNode(entry.getKey(), destinationNode, sourceNode, moveAll)) | |
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); | |
for (Map.Entry<K, V> entry : dataToMove.entrySet()) { | |
sourceNode.getDataStore().remove(entry.getKey()); | |
destinationNode.getDataStore().put(entry.getKey(), entry.getValue()); | |
} | |
} | |
private boolean shouldMoveToNewNode(K key, Node<K, V> newNode, Node<K, V> previousNode, Boolean moveAll) { | |
if (moveAll) return true; | |
int keyHash = hashFunction(key); | |
int newNodeHash = hashFunction(newNode.getIdentifier()); | |
int previousNodeHash = hashFunction(previousNode.getIdentifier()); | |
return (keyHash > previousNodeHash && keyHash <= newNodeHash) || | |
(newNodeHash < previousNodeHash && (keyHash > previousNodeHash || keyHash <= newNodeHash)); | |
} | |
public static void main(String[] args) { | |
ConsistentHashing<String, String> consistentHashing = new ConsistentHashing<>(3, String::hashCode); | |
Node<String, String> node1 = new Node<>("Node1"); | |
Node<String, String> node2 = new Node<>("Node2"); | |
Node<String, String> node3 = new Node<>("Node3"); | |
consistentHashing.addNode(node1); | |
consistentHashing.addNode(node2); | |
consistentHashing.addNode(node3); | |
// Put some data on nodes. Data and nodes are hashed using same hash function | |
consistentHashing.put("Node1", "value1"); | |
consistentHashing.put("Node2", "value2"); | |
consistentHashing.put("Node3", "value3"); | |
consistentHashing.put("Node4", "value4"); | |
// Retrieve and print the values of the keys | |
System.out.println(">>>>>> Initial state of the cluster"); | |
System.out.println(consistentHashing.get("Node1")); | |
System.out.println(consistentHashing.get("Node2")); | |
System.out.println(consistentHashing.get("Node3")); | |
System.out.println(consistentHashing.get("Node4")); | |
System.out.println(consistentHashing.get("Node5")); | |
Node<String, String> node4 = new Node<>("Node4"); | |
consistentHashing.addNode(node4); | |
System.out.println(">>>>>> After Node4 addition in cluster and data redistribution "); | |
System.out.println(consistentHashing.get("Node1")); | |
System.out.println(consistentHashing.get("Node2")); | |
System.out.println(consistentHashing.get("Node3")); | |
System.out.println(consistentHashing.get("Node4")); | |
System.out.println(consistentHashing.get("Node5")); | |
consistentHashing.removeNode(node1); | |
System.out.println(">>>>>> After Node1 removal in cluster and data redistribution "); | |
System.out.println(consistentHashing.get("Node1")); | |
System.out.println(consistentHashing.get("Node2")); | |
System.out.println(consistentHashing.get("Node3")); | |
System.out.println(consistentHashing.get("Node4")); | |
System.out.println(consistentHashing.get("Node5")); | |
consistentHashing.crashNode(node2); | |
System.out.println(">>>>>> After Node2 crash in cluster and data is servered from replica"); | |
System.out.println(consistentHashing.get("Node1")); | |
System.out.println(consistentHashing.get("Node2")); | |
System.out.println(consistentHashing.get("Node3")); | |
System.out.println(consistentHashing.get("Node4")); | |
System.out.println(consistentHashing.get("Node5")); | |
consistentHashing.redistributeKeysAll(); | |
System.out.println(">>>>>> Cluster after full redistribution"); | |
System.out.println(consistentHashing.get("Node1")); | |
System.out.println(consistentHashing.get("Node2")); | |
System.out.println(consistentHashing.get("Node3")); | |
System.out.println(consistentHashing.get("Node4")); | |
System.out.println(consistentHashing.get("Node5")); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment