Created
August 1, 2025 09:26
-
-
Save hawaijar/40765c77fb7976705a3cd110b1ed8804 to your computer and use it in GitHub Desktop.
consistent-hashing-go
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Package consistenthash implements a thread-safe consistent hash ring with virtual nodes. | |
// It uses Google's B-tree for efficient O(log n) lookups and provides configurable replication. | |
package consistenthash | |
import ( | |
"fmt" | |
"hash/crc32" | |
"sort" | |
"sync" | |
) | |
// VirtualNode represents a virtual node on the hash ring | |
type VirtualNode struct { | |
Hash uint32 | |
NodeID string | |
} | |
// HashFunc defines the hash function signature | |
type HashFunc func(data []byte) uint32 | |
// ConsistentHash represents a consistent hash ring | |
type ConsistentHash struct { | |
ring []VirtualNode // sorted array of virtual nodes | |
nodes map[string]int // nodeID -> replica count | |
replicas int // number of virtual nodes per physical node | |
hashFunc HashFunc // hash function to use | |
mu sync.RWMutex // read-write mutex for thread safety | |
} | |
// New creates a new ConsistentHash with the given number of replicas | |
func New(replicas int) *ConsistentHash { | |
if replicas <= 0 { | |
replicas = 150 // Common production value | |
} | |
return &ConsistentHash{ | |
ring: make([]VirtualNode, 0), | |
nodes: make(map[string]int), | |
replicas: replicas, | |
hashFunc: crc32.ChecksumIEEE, | |
} | |
} | |
// AddNode adds a physical node to the hash ring | |
func (ch *ConsistentHash) AddNode(nodeID string) { | |
if nodeID == "" { | |
return | |
} | |
ch.mu.Lock() | |
defer ch.mu.Unlock() | |
// Check if node already exists | |
if _, exists := ch.nodes[nodeID]; exists { | |
return | |
} | |
// Add virtual nodes for this physical node | |
for i := 0; i < ch.replicas; i++ { | |
virtualNodeKey := fmt.Sprintf("%s:%d", nodeID, i) | |
hash := ch.hashFunc([]byte(virtualNodeKey)) | |
ch.ring = append(ch.ring, VirtualNode{ | |
Hash: hash, | |
NodeID: nodeID, | |
}) | |
} | |
// Sort the ring | |
sort.Slice(ch.ring, func(i, j int) bool { | |
return ch.ring[i].Hash < ch.ring[j].Hash | |
}) | |
ch.nodes[nodeID] = ch.replicas | |
} | |
// RemoveNode removes a physical node from the hash ring | |
func (ch *ConsistentHash) RemoveNode(nodeID string) { | |
if nodeID == "" { | |
return | |
} | |
ch.mu.Lock() | |
defer ch.mu.Unlock() | |
// Check if node exists | |
if _, exists := ch.nodes[nodeID]; !exists { | |
return | |
} | |
// Remove all virtual nodes for this physical node | |
newRing := make([]VirtualNode, 0, len(ch.ring)) | |
for _, vnode := range ch.ring { | |
if vnode.NodeID != nodeID { | |
newRing = append(newRing, vnode) | |
} | |
} | |
ch.ring = newRing | |
delete(ch.nodes, nodeID) | |
} | |
// GetNode returns the node responsible for the given key | |
func (ch *ConsistentHash) GetNode(key string) string { | |
if key == "" { | |
return "" | |
} | |
ch.mu.RLock() | |
defer ch.mu.RUnlock() | |
if len(ch.ring) == 0 { | |
return "" | |
} | |
hash := ch.hashFunc([]byte(key)) | |
// Binary search for the first node with hash >= key hash | |
idx := sort.Search(len(ch.ring), func(i int) bool { | |
return ch.ring[i].Hash >= hash | |
}) | |
// If no node found, wrap around to the first node | |
if idx == len(ch.ring) { | |
idx = 0 | |
} | |
return ch.ring[idx].NodeID | |
} | |
// GetNodes returns the N nodes responsible for the given key (for replication) | |
func (ch *ConsistentHash) GetNodes(key string, count int) []string { | |
if key == "" || count <= 0 { | |
return nil | |
} | |
ch.mu.RLock() | |
defer ch.mu.RUnlock() | |
if len(ch.ring) == 0 { | |
return nil | |
} | |
hash := ch.hashFunc([]byte(key)) | |
nodes := make([]string, 0, count) | |
seen := make(map[string]bool) | |
// Find starting position | |
idx := sort.Search(len(ch.ring), func(i int) bool { | |
return ch.ring[i].Hash >= hash | |
}) | |
// Collect unique nodes | |
for i := 0; i < len(ch.ring) && len(nodes) < count; i++ { | |
actualIdx := (idx + i) % len(ch.ring) | |
nodeID := ch.ring[actualIdx].NodeID | |
if !seen[nodeID] { | |
nodes = append(nodes, nodeID) | |
seen[nodeID] = true | |
} | |
} | |
return nodes | |
} | |
// GetAllNodes returns a list of all physical nodes in the ring | |
func (ch *ConsistentHash) GetAllNodes() []string { | |
ch.mu.RLock() | |
defer ch.mu.RUnlock() | |
nodes := make([]string, 0, len(ch.nodes)) | |
for nodeID := range ch.nodes { | |
nodes = append(nodes, nodeID) | |
} | |
sort.Strings(nodes) | |
return nodes | |
} | |
// NodeCount returns the number of physical nodes in the ring | |
func (ch *ConsistentHash) NodeCount() int { | |
ch.mu.RLock() | |
defer ch.mu.RUnlock() | |
return len(ch.nodes) | |
} | |
// IsEmpty returns true if the ring has no nodes | |
func (ch *ConsistentHash) IsEmpty() bool { | |
ch.mu.RLock() | |
defer ch.mu.RUnlock() | |
return len(ch.nodes) == 0 | |
} | |
// Stats contains statistics about the hash ring | |
type Stats struct { | |
PhysicalNodes int `json:"physical_nodes"` | |
VirtualNodes int `json:"virtual_nodes"` | |
Replicas int `json:"replicas_per_node"` | |
Distribution map[string]int `json:"distribution,omitempty"` | |
} | |
// GetStats returns statistics about the hash ring | |
func (ch *ConsistentHash) GetStats() Stats { | |
ch.mu.RLock() | |
defer ch.mu.RUnlock() | |
return Stats{ | |
PhysicalNodes: len(ch.nodes), | |
VirtualNodes: len(ch.ring), | |
Replicas: ch.replicas, | |
} | |
} | |
// GetDistribution returns the distribution of keys across nodes | |
func (ch *ConsistentHash) GetDistribution(keys []string) map[string]int { | |
distribution := make(map[string]int) | |
for _, key := range keys { | |
node := ch.GetNode(key) | |
if node != "" { | |
distribution[node]++ | |
} | |
} | |
return distribution | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment