Created
August 12, 2025 16:19
-
-
Save amaembo/6e6c392e8d7155fee825a93084ab631c to your computer and use it in GitHub Desktop.
ConcurrentHashTable implementation
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.example | |
import java.util.concurrent.atomic.* | |
class ConcurrentHashTable<K : Any, V : Any>(initialCapacity: Int) : HashTable<K, V> { | |
private val table = AtomicReference(Table<K, V>(initialCapacity)) | |
override fun put(key: K, value: V): V? { | |
val curTable = table.get() | |
val (result, newTable) = curTable.put(key, value) | |
if (newTable !== curTable) table.compareAndSet(curTable, newTable) | |
return result | |
} | |
override fun get(key: K): V? { | |
return table.get().get(key) | |
} | |
override fun remove(key: K): V? { | |
return table.get().remove(key) | |
} | |
private class Table<K : Any, V : Any>(val capacity: Int) { | |
val keys = AtomicReferenceArray<Any?>(capacity) | |
val values = AtomicReferenceArray<Any?>(capacity) | |
// The size is the number of allocated keys. | |
// We keep the removed key, updating the value with 'removed' sentinel, | |
// so the size of the table is never decreased and does not correspond | |
// to the actual number of elements in the table. | |
// On transfer, however, removed values are not transferred, so we may | |
// get decreased size | |
val size = AtomicInteger(0) | |
val nextTable = AtomicReference<Table<K, V>?>(null) | |
// Transfers all the entries to the new table. | |
// Empty or removed entry becomes 'forwarded'. | |
// The present entry first becomes 'frozen', | |
// then transferred to the new table, then becomes 'forwarded'. | |
// Several transfers could be active concurrently, | |
// as well as concurrent updates like 'put' or 'remove'. | |
// In this case, they help each other to have global progress. | |
fun transfer(): Table<K, V> { | |
val nextTable = createNextTable() | |
repeat(capacity) { index -> | |
while (true) { | |
var key: Any? | |
var value: Any? | |
// Atomic read of the key/value pair | |
do { | |
key = keys[index] | |
value = values[index] | |
} while (key == null && key !== keys[index]) | |
if (value === FORWARDED) break // somebody else moved it | |
if (value == null || value === REMOVED) { | |
// No need to keep the 'removed' flag after forwarding | |
if (!values.compareAndSet(index, value, FORWARDED)) continue | |
break | |
} | |
if (value is Frozen) { | |
// Another transfer is active: help it | |
nextTable.putForTransfer(key as K, value.value as V) | |
values.compareAndSet(index, value, FORWARDED) | |
break | |
} else { | |
val frozen = Frozen(value) | |
if (values.compareAndSet(index, value, frozen)) { | |
nextTable.putForTransfer(key as K, value as V) | |
values.compareAndSet(index, frozen, FORWARDED) | |
break | |
} | |
} | |
} | |
} | |
return nextTable | |
} | |
// Creates next table having 2x capacity. | |
// Maybe called concurrently, in this case, one of the values wins | |
// and returned by all the concurrent calls. | |
fun createNextTable(): Table<K, V> { | |
while (true) { | |
var tbl = nextTable.get() | |
if (tbl != null) return tbl | |
tbl = Table(capacity * 2) | |
if (nextTable.compareAndSet(null, tbl)) return tbl | |
} | |
} | |
// The 'put' operation executed during the transfer. | |
// Somewhat like 'putIfAbsent', but 'removed' value is considered as present. | |
// It doesn't update the target value if it's non-null. | |
// Returns 'true' if the put was successful. | |
// Even though nobody checks the return value, it may help to understand the algorithm. | |
fun putForTransfer(key: K, value: V): Boolean { | |
val origIndex = index(key) | |
var index = origIndex | |
while (true) { | |
while (true) { | |
val curKey = keys[index] | |
if (curKey == key) { | |
return values.compareAndSet(index, null, value) | |
} | |
if (curKey != null) break | |
val oldValue = values[index] | |
if (oldValue === FORWARDED) { | |
return nextTable.get()!!.putForTransfer(key, value) | |
} | |
if (oldValue != null) return false | |
// Insert the key/value pair into this cell. | |
if (keys.compareAndSet(index, null, key)) { | |
size.incrementAndGet() | |
return values.compareAndSet(index, null, value) | |
} | |
} | |
// Process the next cell, use linear probing. | |
index = (index + 1) % capacity | |
if (index == origIndex) { | |
throw IllegalStateException("Table is full") | |
} | |
} | |
} | |
// Puts the new pair into the table, triggering rehashing if necessary. | |
// Returns the pair of old value and the new table to be used instead of this one | |
// if rehashing happened. | |
// Using an old table is still possible but adds a performance overhead, | |
// as all the entries will be forwarded, so it's better to switch to the new table | |
// as soon as the transfer is complete. | |
fun put(key: K, value: V): Pair<V?, Table<K, V>> { | |
val origIndex = index(key) | |
var index = origIndex | |
while (true) { | |
while (true) { | |
val curKey = keys[index] | |
when (curKey) { | |
// The cell contains the specified key. | |
key -> { | |
// Update the value and return the previous one. | |
while (true) { | |
val oldValue = values[index] | |
if (oldValue === FORWARDED) { | |
// Need to return the current table, as forwarding is not yet complete. | |
return nextTable.get()!!.put(key, value).first to this | |
} | |
if (oldValue is Frozen) { | |
// Help transfer | |
val nextTable = nextTable.get()!! | |
nextTable.putForTransfer(key, oldValue.value as V) | |
values.compareAndSet(index, oldValue, FORWARDED) | |
// Need to return the current table, as forwarding is not yet complete. | |
return nextTable.put(key, value).first to this | |
} | |
if (values.compareAndSet(index, oldValue, value)) { | |
if (oldValue === REMOVED) return null to this | |
return oldValue as V? to this | |
} | |
} | |
} | |
// The cell does not store a key. | |
null -> { | |
val oldValue = values[index] | |
if (oldValue === FORWARDED) { | |
// Need to return the current table, as forwarding is not yet complete. | |
return nextTable.get()!!.put(key, value).first to this | |
} | |
// Increase the size before the key is inserted, so we can rehash in advance. | |
// We cannot make it after the key is set, | |
// as several concurrent put operations may overflow the table | |
// without triggering the rehashing. | |
if (size.incrementAndGet() >= capacity * 3 / 4) { | |
return this.transfer().put(key, value) | |
} | |
if (!keys.compareAndSet(index, null, key)) { | |
// key was concurrently taken | |
size.decrementAndGet() | |
} | |
continue | |
} | |
} | |
break | |
} | |
// Process the next cell, use linear probing. | |
index = (index + 1) % capacity | |
if (index == origIndex) { | |
throw IllegalStateException("Table is full") | |
} | |
} | |
} | |
fun get(key: K): V? { | |
val origIndex = index(key) | |
var index = origIndex | |
while (true) { | |
// Read the key. | |
val curKey = keys[index] | |
when (curKey) { | |
// The cell contains the required key. | |
key -> { | |
// Read the value associated with the key. | |
val v = values[index] | |
return when { | |
v === FORWARDED -> nextTable.get()!!.get(key) | |
v === REMOVED -> null | |
v is Frozen -> v.value as V? | |
else -> v as V? | |
} | |
} | |
null -> { | |
// No key is found: the value could be forwarded | |
return if (values[index] === FORWARDED) nextTable.get()!!.get(key) | |
else null | |
} | |
} | |
// Process the next cell, use linear probing. | |
index = (index + 1) % capacity | |
if (index == origIndex) { | |
throw IllegalStateException("Table is full") | |
} | |
} | |
} | |
fun remove(key: K): V? { | |
val origIndex = index(key) | |
var index = origIndex | |
while (true) { | |
// Read the key. | |
val curKey = keys[index] | |
when (curKey) { | |
// The cell contains the required key. | |
key -> { | |
while (true) { | |
val oldValue = values[index] | |
if (oldValue is Frozen) { | |
// Help transfer | |
val nextTable = nextTable.get()!! | |
nextTable.putForTransfer(key, oldValue.value as V) | |
values.compareAndSet(index, oldValue, FORWARDED) | |
return nextTable.remove(key) | |
} | |
if (oldValue === FORWARDED) { | |
return nextTable.get()!!.remove(key) | |
} | |
if (oldValue === REMOVED) return null | |
if (values.compareAndSet(index, oldValue, REMOVED)) { | |
return oldValue as V? | |
} | |
} | |
} | |
// Empty cell. | |
null -> { | |
val oldValue = values[index] | |
if (oldValue === FORWARDED) { | |
return nextTable.get()!!.remove(key) | |
} | |
// The key has not been found. | |
return null | |
} | |
} | |
// Process the next cell, use linear probing. | |
index = (index + 1) % capacity | |
if (index == origIndex) { | |
throw IllegalStateException("Table is full") | |
} | |
} | |
} | |
private fun index(key: Any) = Math.floorMod(key.hashCode() * MAGIC, capacity) | |
} | |
} | |
private const val MAGIC = -0x61c88647 // golden ratio | |
// The 'forwarded' sentinel value indicates that the entry | |
// from an old table was already moved to the new one | |
private val FORWARDED = object : Any() { | |
override fun toString() = "-->" | |
} | |
// Removed sentinel is necessary to distinguish an already removed object | |
// from not yet completely created, which is crucial when removing while transfer is active | |
private val REMOVED = object : Any() { | |
override fun toString() = "xxx" | |
} | |
// Frozen is sn entry being under transfer to the next table. | |
// The transfer must be finished before any subsequent update of this entry. | |
// When the transfer finishes, the 'frozen' value will be updated to 'forwarded'. | |
private data class Frozen(val value: Any) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment