Last active
January 27, 2025 21:13
-
-
Save RealNeGate/4057a54b5859b64842a9c1892270f95e to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
//////////////////////////////// | |
// NBHM - Non-blocking hashmap | |
//////////////////////////////// | |
// You wanna intern lots of things on lots of cores? this is for you. It's | |
// inspired by Cliff's non-blocking hashmap. | |
// | |
// To use it, you'll need to define NBHM_FN and then include the header: | |
// | |
// #define NBHM_FN(n) XXX_hm_ ## n | |
// #include <nbhm.h> | |
// | |
// This will compile implementations of the hashset using | |
// | |
// bool NBHM_FN(cmp)(const void* a, const void* b); | |
// uint32_t NBHM_FN(hash)(const void* a); | |
// | |
// The exported functions are: | |
// | |
// void* NBHM_FN(get)(NBHM* hs, void* key); | |
// void* NBHM_FN(put)(NBHM* hs, void* key, void* val); | |
// void* NBHM_FN(put_if_null)(NBHM* hs, void* key, void* val); | |
// void NBHM_FN(resize_barrier)(NBHM* hs); | |
// | |
#ifndef NBHM_H | |
#define NBHM_H | |
#include <threads.h> | |
#include <stdint.h> | |
#include <stddef.h> | |
#include <stdatomic.h> | |
// Virtual memory allocation (since the tables are generally nicely page-size friendly) | |
#ifdef _WIN32 | |
#define WIN32_LEAN_AND_MEAN | |
#include <windows.h> | |
#define NBHM_VIRTUAL_ALLOC(size) VirtualAlloc(NULL, size, MEM_RESERVE | MEM_COMMIT, PAGE_READWRITE) | |
#define NBHM_VIRTUAL_FREE(ptr, size) VirtualFree(ptr, size, MEM_RELEASE) | |
#else | |
#include <sys/mman.h> | |
#define NBHM_VIRTUAL_ALLOC(size) mmap(NULL, size, PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS, -1, 0) | |
#define NBHM_VIRTUAL_FREE(ptr, size) munmap(ptr, size) | |
#endif | |
// traditional heap ops | |
#ifndef NBHM_REALLOC | |
#define NBHM_REALLOC(ptr, size) realloc(ptr, size) | |
#endif // NBHM_REALLOC | |
// personal debooging stuff | |
#define NBHM__DEBOOGING 0 | |
#if NBHM__DEBOOGING | |
#define NBHM__BEGIN(name) spall_auto_buffer_begin(name, sizeof(name) - 1, NULL, 0) | |
#define NBHM__END() spall_auto_buffer_end() | |
#else | |
#define NBHM__BEGIN(name) | |
#define NBHM__END() | |
#endif | |
// for the time in the ebr entry | |
#define NBHM_PINNED_BIT (1ull << 63ull) | |
#define NBHM_PRIME_BIT (1ull << 63ull) | |
enum { | |
NBHM_LOAD_FACTOR = 75, | |
NBHM_MOVE_AMOUNT = 256, | |
}; | |
typedef struct NBHM_EBREntry { | |
_Atomic(struct NBHM_EBREntry*) next; | |
_Atomic(uint64_t) time; | |
// keep on a separate cacheline to avoid false sharing | |
_Alignas(64) int id; | |
} NBHM_EBREntry; | |
typedef struct { | |
_Atomic(void*) key; | |
_Atomic(void*) val; | |
} NBHM_Entry; | |
typedef struct NBHM_Table NBHM_Table; | |
struct NBHM_Table { | |
_Atomic(NBHM_Table*) prev; | |
uint32_t cap; | |
// reciprocals to compute modulo | |
uint64_t a, sh; | |
// tracks how many entries have | |
// been moved once we're resizing | |
_Atomic uint32_t moved; | |
_Atomic uint32_t move_done; | |
_Atomic uint32_t count; | |
NBHM_Entry data[]; | |
}; | |
typedef struct { | |
_Atomic(NBHM_Table*) latest; | |
} NBHM; | |
typedef struct NBHM_FreeNode NBHM_FreeNode; | |
struct NBHM_FreeNode { | |
_Atomic(NBHM_FreeNode*) next; | |
NBHM_Table* table; | |
}; | |
static size_t nbhm_compute_cap(size_t y) { | |
// minimum capacity | |
if (y < 256) { | |
y = 256; | |
} else { | |
y = ((y + 1) / 3) * 4; | |
} | |
size_t cap = 1ull << (64 - __builtin_clzll(y - 1)); | |
return cap - (sizeof(NBHM_Table) / sizeof(NBHM_Entry)); | |
} | |
static void nbhm_compute_size(NBHM_Table* table, size_t cap) { | |
// reciprocals to compute modulo | |
#if defined(__GNUC__) || defined(__clang__) | |
table->sh = 64 - __builtin_clzll(cap); | |
#else | |
uint64_t sh = 0; | |
while (cap > (1ull << sh)){ sh++; } | |
table->sh = sh; | |
#endif | |
table->sh += 63 - 64; | |
#if (defined(__GNUC__) || defined(__clang__)) && defined(__x86_64__) | |
uint64_t d,e; | |
__asm__("divq %[v]" : "=a"(d), "=d"(e) : [v] "r"(cap), "a"(cap - 1), "d"(1ull << table->sh)); | |
table->a = d; | |
#elif defined(_MSC_VER) | |
uint64_t rem; | |
table->a = _udiv128(1ull << table->sh, cap - 1, cap, &rem); | |
#else | |
#error "Unsupported target" | |
#endif | |
table->cap = cap; | |
} | |
static NBHM nbhm_alloc(size_t initial_cap) { | |
size_t cap = nbhm_compute_cap(initial_cap); | |
NBHM_Table* table = NBHM_VIRTUAL_ALLOC(sizeof(NBHM_Table) + cap*sizeof(NBHM_Entry)); | |
nbhm_compute_size(table, cap); | |
return (NBHM){ .latest = table }; | |
} | |
static void nbhm_free(NBHM* hs) { | |
NBHM_Table* curr = hs->latest; | |
while (curr) { | |
NBHM_Table* next = curr->prev; | |
NBHM_VIRTUAL_FREE(curr, sizeof(NBHM_Table) + curr->cap*sizeof(NBHM_Entry)); | |
curr = next; | |
} | |
} | |
// for spooky stuff | |
static NBHM_Entry* nbhm_array(NBHM* hs) { return hs->latest->data; } | |
static size_t nbhm_count(NBHM* hs) { return hs->latest->count; } | |
static size_t nbhm_capacity(NBHM* hs) { return hs->latest->cap; } | |
#define nbhm_for(it, hs) for (NBHM_Entry *it = nbhm_array(hs), **_end_ = &it[nbhm_capacity(hs)]; it != _end_; it++) if (*it != NULL) | |
#endif // NBHM_H | |
#ifdef NBHM_IMPL | |
#if defined(_WIN32) | |
#pragma comment(lib, "synchronization.lib") | |
#endif | |
int NBHM_TOMBSTONE; | |
int NBHM_NO_MATCH_OLD; | |
_Thread_local bool nbhm_ebr_init; | |
_Thread_local NBHM_EBREntry nbhm_ebr; | |
atomic_int nbhm_ebr_count; | |
_Atomic(NBHM_EBREntry*) nbhm_ebr_list; | |
atomic_flag nbhm_free_thread_init; | |
NBHM_FreeNode NBHM_DUMMY; | |
_Atomic(NBHM_FreeNode*) nbhm_free_head = &NBHM_DUMMY; | |
_Atomic(NBHM_FreeNode*) nbhm_free_tail = &NBHM_DUMMY; | |
_Atomic uint32_t nbhm_free_done; | |
_Atomic uint32_t nbhm_free_count; | |
int nbhm_thread_fn(void* arg) { | |
#if NBHM__DEBOOGING | |
spall_auto_thread_init(111111, SPALL_DEFAULT_BUFFER_SIZE); | |
#endif | |
for (;;) retry: { | |
// Use a futex to avoid polling too hard | |
uint32_t old; | |
while (old = nbhm_free_count, old == nbhm_free_done) { | |
#if defined(_WIN32) | |
int WaitOnAddress(volatile void* addr, void* compare_addr, size_t addr_size, unsigned long millis); | |
WaitOnAddress(&nbhm_free_count, &old, 4, -1); | |
#elif defined(__linux__) | |
int futex_rc = futex(SYS_futex, &nbhm_free_count, FUTEX_WAIT, old, NULL, NULL, 0); | |
assert(futex_rc >= 0); | |
#elif defined(__APPLE__) | |
int __ulock_wait(uint32_t operation, void *addr, uint64_t value, uint32_t timeout); | |
int res = __ulock_wait(0x01000001, &nbhm_free_count, old, 0); | |
assert(res >= 0); | |
#else | |
// No futex support, just sleep once we run out of tasks | |
thrd_sleep(&(struct timespec){.tv_nsec=100000000}, NULL); | |
#endif | |
} | |
NBHM_FreeNode* p = atomic_load_explicit(&nbhm_free_head, memory_order_relaxed); | |
do { | |
if (p->next == NULL) { | |
// it's empty | |
goto retry; | |
} | |
} while (!atomic_compare_exchange_strong(&nbhm_free_head, &p, p->next)); | |
NBHM_Table* table = p->next->table; | |
// Handling deferred freeing without blocking up the normie threads | |
int state_count = nbhm_ebr_count; | |
uint64_t* states = NBHM_REALLOC(NULL, state_count * sizeof(uint64_t)); | |
NBHM__BEGIN("scan"); | |
NBHM_EBREntry* us = &nbhm_ebr; | |
// "snapshot" the current statuses, once the other threads either advance or aren't in the | |
// hashset functions we know we can free. | |
for (NBHM_EBREntry* list = atomic_load(&nbhm_ebr_list); list; list = list->next) { | |
// mark sure no ptrs refer to prev | |
if (list != us && list->id < state_count) { | |
states[list->id] = list->time; | |
} | |
} | |
// important bit is that pointers can't be held across the critical sections, they'd need | |
// to reload from `NBHM.latest`. | |
// | |
// Here's the states of our "epoch" critical section thingy: | |
// | |
// UNPINNED(id) -> PINNED(id) -> UNPINNED(id + 1) -> UNPINNED(id + 1) -> ... | |
// | |
// survey on if we can free the pointer if the status changed from X -> Y: | |
// | |
// # YES: if we started unlocked then we weren't holding pointers in the first place. | |
// UNPINNED(A) -> PINNED(A) | |
// UNPINNED(A) -> UNPINNED(A) | |
// UNPINNED(A) -> UNPINNED(B) | |
// | |
// # YES: if we're locked we need to wait until we've stopped holding pointers. | |
// PINNED(A) -> PINNED(B) we're a different call so we've let it go by now. | |
// PINNED(A) -> UNPINNED(B) we've stopped caring about the state of the pointer at this point. | |
// | |
// # NO: we're still doing shit, wait a sec. | |
// PINNED(A) -> PINNED(A) | |
// | |
// these aren't quite blocking the other threads, we're simply checking what their progress is concurrently. | |
for (NBHM_EBREntry* list = atomic_load(&nbhm_ebr_list); list; list = list->next) { | |
if (list != us && list->id < state_count && (states[list->id] & NBHM_PINNED_BIT)) { | |
uint64_t before_t = states[list->id], now_t; | |
do { | |
// idk, maybe this should be a better spinlock | |
now_t = atomic_load(&list->time); | |
} while (before_t == now_t); | |
} | |
} | |
NBHM__END(); | |
// no more refs, we can immediately free | |
NBHM_VIRTUAL_FREE(table, sizeof(NBHM_Table) + table->cap*sizeof(NBHM_Entry)); | |
NBHM_REALLOC(states, 0); | |
nbhm_free_done++; | |
#if NBHM__DEBOOGING | |
spall_auto_buffer_flush(); | |
#endif | |
} | |
} | |
#endif // NBHM_IMPL | |
// Templated implementation | |
#ifdef NBHM_FN | |
extern int NBHM_TOMBSTONE; | |
extern int NBHM_NO_MATCH_OLD; | |
extern _Thread_local bool nbhm_ebr_init; | |
extern _Thread_local NBHM_EBREntry nbhm_ebr; | |
extern _Atomic(int) nbhm_ebr_count; | |
extern _Atomic(NBHM_EBREntry*) nbhm_ebr_list; | |
extern atomic_flag nbhm_free_thread_init; | |
extern _Atomic(NBHM_FreeNode*) nbhm_free_head; | |
extern _Atomic(NBHM_FreeNode*) nbhm_free_tail; | |
extern _Atomic uint32_t nbhm_free_done; | |
extern _Atomic uint32_t nbhm_free_count; | |
extern int nbhm_thread_fn(void*); | |
static void* NBHM_FN(put_if_match)(NBHM* hs, NBHM_Table* latest, NBHM_Table* prev, void* key, void* val, void* exp); | |
static size_t NBHM_FN(hash2index)(NBHM_Table* table, uint64_t h) { | |
// MulHi(h, table->a) | |
#if defined(__GNUC__) || defined(__clang__) | |
uint64_t hi = (uint64_t) (((unsigned __int128)h * table->a) >> 64); | |
#elif defined(_MSC_VER) | |
uint64_t hi; | |
_umul128(a, b, &hi); | |
#else | |
#error "Unsupported target" | |
#endif | |
uint64_t q = hi >> table->sh; | |
uint64_t q2 = h - (q * table->cap); | |
assert(q2 == h % table->cap); | |
return q2; | |
} | |
// flips the top bit on | |
static void NBHM_FN(enter_pinned)(void) { | |
uint64_t t = atomic_load_explicit(&nbhm_ebr.time, memory_order_relaxed); | |
atomic_store_explicit(&nbhm_ebr.time, t + NBHM_PINNED_BIT, memory_order_release); | |
} | |
// flips the top bit off AND increments time by one | |
static void NBHM_FN(exit_pinned)(void) { | |
uint64_t t = atomic_load_explicit(&nbhm_ebr.time, memory_order_relaxed); | |
atomic_store_explicit(&nbhm_ebr.time, t + NBHM_PINNED_BIT + 1, memory_order_release); | |
} | |
NBHM_Table* NBHM_FN(move_items)(NBHM* hm, NBHM_Table* latest, NBHM_Table* prev, int items_to_move) { | |
assert(prev); | |
size_t cap = prev->cap; | |
// snatch up some number of items | |
uint32_t old, new; | |
do { | |
old = atomic_load(&prev->moved); | |
if (old == cap) { return prev; } | |
// cap the number of items to copy... by the cap | |
new = old + items_to_move; | |
if (new > cap) { new = cap; } | |
} while (!atomic_compare_exchange_strong(&prev->moved, &(uint32_t){ old }, new)); | |
if (old == new) { | |
return prev; | |
} | |
NBHM__BEGIN("copying old"); | |
for (size_t i = old; i < new; i++) { | |
void* old_v = atomic_load(&prev->data[i].val); | |
void* k = atomic_load(&prev->data[i].key); | |
// freeze the values by adding a prime bit. | |
while (((uintptr_t) old_v & NBHM_PRIME_BIT) == 0) { | |
uintptr_t primed_v = (old_v == &NBHM_TOMBSTONE ? 0 : (uintptr_t) old_v) | NBHM_PRIME_BIT; | |
if (atomic_compare_exchange_strong(&prev->data[i].val, &old_v, (void*) primed_v)) { | |
if (old_v != NULL && old_v != &NBHM_TOMBSTONE) { | |
// once we've frozen, we can move it to the new table. | |
// we pass NULL for prev since we already know the entries exist in prev. | |
NBHM_FN(put_if_match)(hm, latest, NULL, k, old_v, &NBHM_NO_MATCH_OLD); | |
} | |
break; | |
} | |
// btw, CAS updated old_v | |
} | |
} | |
NBHM__END(); | |
uint32_t done = atomic_fetch_add(&prev->move_done, new - old); | |
done += new - old; | |
assert(done <= cap); | |
if (done == cap) { | |
// dettach now | |
NBHM__BEGIN("detach"); | |
latest->prev = NULL; | |
if (!atomic_flag_test_and_set(&nbhm_free_thread_init)) { | |
thrd_t freeing_thread; // don't care to track it | |
thrd_create(&freeing_thread, nbhm_thread_fn, NULL); | |
} | |
NBHM_FreeNode* new_node = NBHM_REALLOC(NULL, sizeof(NBHM_FreeNode)); | |
new_node->table = prev; | |
new_node->next = NULL; | |
// enqueue, it's a low-size low-contention list i just don't wanna block my lovely normie threads | |
NBHM_FreeNode* p = atomic_load_explicit(&nbhm_free_tail, memory_order_relaxed); | |
NBHM_FreeNode* old_p = p; | |
do { | |
while (p->next != NULL) { | |
p = p->next; | |
} | |
} while (!atomic_compare_exchange_strong(&p->next, &(NBHM_FreeNode*){ NULL }, new_node)); | |
atomic_compare_exchange_strong(&nbhm_free_tail, &old_p, new_node); | |
nbhm_free_count++; | |
// signal futex | |
#if defined(_WIN32) | |
extern void WakeByAddressSingle(void* addr); | |
WakeByAddressSingle(&nbhm_free_count); | |
#elif defined(__linux__) | |
int futex_rc = futex(SYS_futex, &nbhm_free_count, FUTEX_WAKE, 1, NULL, NULL, 0); | |
assert(futex_rc >= 0); | |
#elif defined(__APPLE__) | |
extern int __ulock_wake(uint32_t operation, void* addr, uint64_t wake_value); | |
int res = __ulock_wake(0x01000001, &nbhm_free_count); | |
assert(res >= 0); | |
#endif | |
prev = NULL; | |
NBHM__END(); | |
} | |
return prev; | |
} | |
static void NBHM_FN(ebr_try_init)(void) { | |
if (!nbhm_ebr_init) { | |
NBHM__BEGIN("init"); | |
nbhm_ebr_init = true; | |
nbhm_ebr.id = nbhm_ebr_count++; | |
// add to ebr list, we never free this because i don't care | |
// TODO(NeGate): i do care, this is a nightmare when threads die figure it out | |
NBHM_EBREntry* old; | |
do { | |
old = atomic_load_explicit(&nbhm_ebr_list, memory_order_relaxed); | |
nbhm_ebr.next = old; | |
} while (!atomic_compare_exchange_strong(&nbhm_ebr_list, &old, &nbhm_ebr)); | |
NBHM__END(); | |
} | |
} | |
static void* NBHM_FN(raw_lookup)(NBHM* hs, NBHM_Table* table, uint32_t h, void* key) { | |
size_t cap = table->cap; | |
size_t first = NBHM_FN(hash2index)(table, h), i = first; | |
do { | |
void* v = atomic_load(&table->data[i].val); | |
void* k = atomic_load(&table->data[i].key); | |
if (k == NULL) { | |
return NULL; | |
} else if (NBHM_FN(cmp)(k, key)) { | |
return v != &NBHM_TOMBSTONE ? v : NULL; | |
} | |
// inc & wrap around | |
i = (i == cap-1) ? 0 : i + 1; | |
} while (i != first); | |
return NULL; | |
} | |
static void* NBHM_FN(put_if_match)(NBHM* hs, NBHM_Table* latest, NBHM_Table* prev, void* key, void* val, void* exp) { | |
assert(key); | |
void *k, *v; | |
for (;;) { | |
uint32_t cap = latest->cap; | |
size_t limit = (cap * NBHM_LOAD_FACTOR) / 100; | |
if (prev == NULL && latest->count >= limit) { | |
// make resized table, we'll amortize the moves upward | |
size_t new_cap = nbhm_compute_cap(limit*2); | |
NBHM_Table* new_top = NBHM_VIRTUAL_ALLOC(sizeof(NBHM_Table) + new_cap*sizeof(NBHM_Entry)); | |
nbhm_compute_size(new_top, new_cap); | |
// CAS latest -> new_table, if another thread wins the race we'll use its table | |
new_top->prev = latest; | |
if (!atomic_compare_exchange_strong(&hs->latest, &latest, new_top)) { | |
NBHM_VIRTUAL_FREE(new_top, sizeof(NBHM_Table) + new_cap*sizeof(NBHM_Entry)); | |
prev = atomic_load(&latest->prev); | |
} else { | |
prev = latest; | |
latest = new_top; | |
// float s = sizeof(NBHM_Table) + new_cap*sizeof(NBHM_Entry); | |
// printf("Resize: %.2f KiB (cap=%zu)\n", s / 1024.0f, new_cap); | |
} | |
continue; | |
} | |
// key claiming phase: | |
// once completed we'll have a key inserted into the latest | |
// table (the value might be NULL which means that the entry | |
// is still empty but we've at least agreed where the value | |
// goes). | |
bool found = false; | |
uint32_t h = NBHM_FN(hash)(key); | |
size_t first = NBHM_FN(hash2index)(latest, h), i = first; | |
do { | |
v = atomic_load_explicit(&latest->data[i].val, memory_order_acquire); | |
k = atomic_load_explicit(&latest->data[i].key, memory_order_acquire); | |
if (k == NULL) { | |
// key was never in the table | |
if (val == &NBHM_TOMBSTONE) { return NULL; } | |
// fight for empty slot | |
if (atomic_compare_exchange_strong(&latest->data[i].key, &k, key)) { | |
atomic_fetch_add_explicit(&latest->count, 1, memory_order_relaxed); | |
found = true; | |
break; | |
} | |
} | |
if (NBHM_FN(cmp)(k, key)) { | |
found = true; | |
break; | |
} | |
// inc & wrap around | |
i = (i == cap-1) ? 0 : i + 1; | |
} while (i != first); | |
// we didn't claim a key, that means the table is entirely full, retry | |
// to use or make a bigger table. | |
if (!found) { | |
latest = atomic_load(&hs->latest); | |
prev = atomic_load(&latest->prev); | |
continue; | |
} | |
// migration barrier, we only insert our item once we've | |
// "logically" moved it | |
if (v == NULL && prev != NULL) { | |
assert(prev->prev == NULL); | |
void* old = NBHM_FN(raw_lookup)(hs, prev, h, val); | |
if (old != NULL) { | |
// the old value might've been primed, we don't want to propagate the prime bit tho | |
old = (void*) (((uintptr_t) old) & ~NBHM_PRIME_BIT); | |
// if we lost, then we just get replaced by a separate fella (which is fine ig) | |
if (atomic_compare_exchange_strong(&latest->data[i].val, &v, old)) { | |
v = old; | |
} | |
} | |
} | |
// if the old value is a prime, we've been had (we're resizing) | |
if (((uintptr_t) v) & NBHM_PRIME_BIT) { | |
continue; | |
} | |
// if the existing value is: | |
// * incompatible with the expected value, we don't write. | |
// * equal, we don't write (speed opt, one CAS is slower than no CAS). | |
if (v != val && | |
// exp is tombstone, we'll only insert if it's empty (and not a migration) | |
(exp != &NBHM_TOMBSTONE || (v == NULL || v == &NBHM_TOMBSTONE)) | |
) { | |
// value writing attempt, if we lose the CAS it means someone could've written a | |
// prime (thus the entry was migrated to a later table). It could also mean we lost | |
// the insertion fight to another writer and in that case we'll take their value. | |
if (atomic_compare_exchange_strong(&latest->data[i].val, &v, val)) { | |
v = val; | |
} else { | |
// if we see a prime, the entry has been migrated | |
// and we should write to that later table. if not, | |
// we simply lost the race to update the value. | |
uintptr_t v_raw = (uintptr_t) v; | |
if (v_raw & NBHM_PRIME_BIT) { | |
continue; | |
} | |
} | |
} | |
return v; | |
} | |
} | |
void* NBHM_FN(put)(NBHM* hm, void* key, void* val) { | |
NBHM__BEGIN("put"); | |
assert(val); | |
NBHM_FN(ebr_try_init)(); | |
NBHM_FN(enter_pinned)(); | |
NBHM_Table* latest = atomic_load(&hm->latest); | |
// if there's earlier versions of the table we can move up entries as we go along. | |
NBHM_Table* prev = atomic_load(&latest->prev); | |
if (prev != NULL) { | |
prev = NBHM_FN(move_items)(hm, latest, prev, NBHM_MOVE_AMOUNT); | |
if (prev == NULL) { | |
latest = atomic_load(&hm->latest); | |
} | |
} | |
void* v = NBHM_FN(put_if_match)(hm, latest, prev, key, val, &NBHM_NO_MATCH_OLD); | |
NBHM_FN(exit_pinned)(); | |
NBHM__END(); | |
return v; | |
} | |
void* NBHM_FN(remove)(NBHM* hm, void* key) { | |
NBHM__BEGIN("remove"); | |
assert(key); | |
NBHM_FN(ebr_try_init)(); | |
NBHM_FN(enter_pinned)(); | |
NBHM_Table* latest = atomic_load(&hm->latest); | |
// if there's earlier versions of the table we can move up entries as we go along. | |
NBHM_Table* prev = atomic_load(&latest->prev); | |
if (prev != NULL) { | |
prev = NBHM_FN(move_items)(hm, latest, prev, NBHM_MOVE_AMOUNT); | |
if (prev == NULL) { | |
latest = atomic_load(&hm->latest); | |
} | |
} | |
void* v = NBHM_FN(put_if_match)(hm, latest, prev, key, &NBHM_TOMBSTONE, &NBHM_NO_MATCH_OLD); | |
NBHM_FN(exit_pinned)(); | |
NBHM__END(); | |
return v; | |
} | |
void* NBHM_FN(get)(NBHM* hm, void* key) { | |
NBHM__BEGIN("get"); | |
assert(key); | |
NBHM_FN(ebr_try_init)(); | |
NBHM_FN(enter_pinned)(); | |
NBHM_Table* latest = atomic_load(&hm->latest); | |
// if there's earlier versions of the table we can move up entries as we go along. | |
NBHM_Table* prev = atomic_load(&latest->prev); | |
if (prev != NULL) { | |
prev = NBHM_FN(move_items)(hm, latest, prev, NBHM_MOVE_AMOUNT); | |
if (prev == NULL) { | |
latest = atomic_load(&hm->latest); | |
} | |
} | |
uint32_t cap = latest->cap; | |
uint32_t h = NBHM_FN(hash)(key); | |
size_t first = NBHM_FN(hash2index)(latest, h), i = first; | |
void *k, *v; | |
do { | |
v = atomic_load(&latest->data[i].val); | |
k = atomic_load(&latest->data[i].key); | |
if (k == NULL) { | |
// first time seen, maybe the entry hasn't been moved yet | |
if (prev != NULL) { | |
v = NBHM_FN(raw_lookup)(hm, prev, h, key); | |
} | |
break; | |
} else if (NBHM_FN(cmp)(k, key)) { | |
return v; | |
} | |
// inc & wrap around | |
i = (i == cap-1) ? 0 : i + 1; | |
} while (i != first); | |
NBHM_FN(exit_pinned)(); | |
NBHM__END(); | |
return v; | |
} | |
void* NBHM_FN(put_if_null)(NBHM* hm, void* val) { | |
NBHM__BEGIN("put"); | |
assert(val); | |
NBHM_FN(ebr_try_init)(); | |
NBHM_FN(enter_pinned)(); | |
NBHM_Table* latest = atomic_load(&hm->latest); | |
// if there's earlier versions of the table we can move up entries as we go along. | |
NBHM_Table* prev = atomic_load(&latest->prev); | |
if (prev != NULL) { | |
prev = NBHM_FN(move_items)(hm, latest, prev, NBHM_MOVE_AMOUNT); | |
if (prev == NULL) { | |
latest = atomic_load(&hm->latest); | |
} | |
} | |
void* v = NBHM_FN(put_if_match)(hm, latest, prev, val, val, &NBHM_TOMBSTONE); | |
NBHM_FN(exit_pinned)(); | |
NBHM__END(); | |
return v; | |
} | |
// waits for all items to be moved up before continuing | |
void NBHM_FN(resize_barrier)(NBHM* hm) { | |
NBHM__BEGIN("resize_barrier"); | |
NBHM_FN(ebr_try_init)(); | |
NBHM_FN(enter_pinned)(); | |
NBHM_Table *prev, *latest = atomic_load(&hm->latest); | |
while (prev = atomic_load(&latest->prev), prev != NULL) { | |
NBHM_FN(move_items)(hm, latest, prev, prev->cap); | |
} | |
NBHM_FN(exit_pinned)(); | |
NBHM__END(); | |
} | |
#undef NBHM_FN | |
#endif // NBHM_FN |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#define _CRT_SECURE_NO_WARNINGS | |
#include <stdio.h> | |
#include <stdint.h> | |
#include <string.h> | |
#include <stddef.h> | |
#include <stdlib.h> | |
#include <assert.h> | |
#include <stdbool.h> | |
#include <stdatomic.h> | |
#define USE_SPALL 0 | |
#if USE_SPALL | |
#include "spall_native_auto.h" | |
#else | |
#define spall_auto_buffer_begin(...) | |
#define spall_auto_buffer_end(...) | |
#endif | |
static int num_threads; | |
static uint32_t my_hash(const void* a) { | |
const uint8_t* data = a; | |
uint32_t h = 0x811C9DC5; | |
for (size_t i = 0; i < 4; i++) { | |
h = (data[i] ^ h) * 0x01000193; | |
} | |
return h; | |
} | |
static bool my_cmp(const void* a, const void* b) { | |
return *(const uint32_t*)a == *(const uint32_t*)b; | |
} | |
#define NBHM_IMPL | |
#define NBHM_FN(n) my_ ## n | |
#include "nbhm.h" | |
typedef struct { | |
#ifdef _WIN32 | |
CRITICAL_SECTION lock; | |
#else | |
pthread_mutex_t lock; | |
#endif | |
size_t exp; | |
void* data[]; | |
} LockedHS; | |
void* lhs_intern(LockedHS* hs, void* val) { | |
NBHM__BEGIN("intern"); | |
if (num_threads > 1) { | |
#ifdef _WIN32 | |
EnterCriticalSection(&hs->lock); | |
#else | |
pthread_mutex_lock(&hs->lock); | |
#endif | |
} | |
// actually lookup & insert | |
uint32_t exp = hs->exp; | |
size_t mask = (1 << exp) - 1; | |
void* result = NULL; | |
uint32_t h = my_hash(val); | |
size_t first = h & mask, i = first; | |
do { | |
if (hs->data[i] == NULL) { | |
hs->data[i] = result = val; | |
break; | |
} else if (my_cmp(hs->data[i], val)) { | |
result = hs->data[i]; | |
break; | |
} | |
i = (i + 1) & mask; | |
} while (i != first); | |
assert(result != NULL); | |
if (num_threads > 1) { | |
#ifdef _WIN32 | |
LeaveCriticalSection(&hs->lock); | |
#else | |
pthread_mutex_unlock(&hs->lock); | |
#endif | |
} | |
NBHM__END(); | |
return result; | |
} | |
// https://github.com/demetri/scribbles/blob/master/randomness/prngs.c | |
uint32_t pcg32_pie(uint64_t *state) { | |
uint64_t old = *state ^ 0xc90fdaa2adf85459ULL; | |
*state = *state * 6364136223846793005ULL + 0xc90fdaa2adf85459ULL; | |
uint32_t xorshifted = ((old >> 18u) ^ old) >> 27u; | |
uint32_t rot = old >> 59u; | |
return (xorshifted >> rot) | (xorshifted << ((-rot) & 31)); | |
} | |
static LockedHS* test_lhs; | |
static NBHM test_set; | |
static int attempts; // per thread | |
static bool testing_lhs; | |
static int* thread_stats; | |
static _Atomic uint64_t total_time; | |
static uint64_t get_nanos(void) { | |
struct timespec ts; | |
timespec_get(&ts, TIME_UTC); | |
return (uint64_t)ts.tv_sec * 1000000000ULL + ts.tv_nsec; | |
} | |
static int test_thread_fn(void* arg) { | |
uintptr_t starting_id = (uintptr_t) arg; | |
uint64_t seed = starting_id * 11400714819323198485ULL; | |
int* stats = &thread_stats[starting_id*16]; | |
uint32_t* arr = malloc(attempts * sizeof(uint32_t)); | |
#if USE_SPALL | |
spall_auto_thread_init(starting_id, SPALL_DEFAULT_BUFFER_SIZE); | |
spall_auto_buffer_begin("work", 4, NULL, 0); | |
#endif | |
uint64_t start = get_nanos(); | |
if (testing_lhs) { | |
for (int i = 0; i < attempts; i++) { | |
arr[i] = pcg32_pie(&seed) & 0xFFFF; | |
if (lhs_intern(test_lhs, &arr[i]) == &arr[i]) { | |
stats[0] += 1; // insertions | |
} else { | |
stats[1] += 1; // duplicate | |
} | |
} | |
} else { | |
for (int i = 0; i < attempts; i++) { | |
arr[i] = pcg32_pie(&seed) & 0xFFFF; | |
if (my_put_if_null(&test_set, &arr[i]) == &arr[i]) { | |
stats[0] += 1; // insertions | |
} else { | |
stats[1] += 1; // duplicate | |
} | |
} | |
} | |
total_time += get_nanos() - start; | |
#if USE_SPALL | |
spall_auto_buffer_end(); | |
spall_auto_thread_quit(); | |
#endif | |
return 0; | |
} | |
int main(int argc, char** argv) { | |
#if USE_SPALL | |
spall_auto_init((char *)"profile.spall"); | |
spall_auto_thread_init(0, SPALL_DEFAULT_BUFFER_SIZE); | |
#endif | |
num_threads = atoi(argv[1]); | |
// printf("Testing with %d threads\n", num_threads); | |
if (argc >= 3 && strcmp(argv[2], "lhs") == 0) { | |
testing_lhs = true; | |
printf(" With Locked hashset...\n"); | |
} | |
// attempts = 1000000000 / threads; | |
attempts = 10000000 / num_threads; | |
thread_stats = calloc(num_threads, 64 * sizeof(int)); | |
if (testing_lhs) { | |
test_lhs = calloc(sizeof(LockedHS) + 262144*sizeof(void*), 1); | |
test_lhs->exp = 18; | |
#ifdef _WIN32 | |
InitializeCriticalSection(&test_lhs->lock); | |
#endif | |
} else { | |
test_set = nbhm_alloc(32); | |
} | |
thrd_t* arr = malloc(num_threads * sizeof(thrd_t)); | |
for (int i = 0; i < num_threads; i++) { | |
thrd_create(&arr[i], test_thread_fn, (void*) (uintptr_t) i); | |
} | |
for (int i = 0; i < num_threads; i++) { | |
thrd_join(arr[i], NULL); | |
} | |
printf("%.4f ns/op (total=%.4f ms)\n", total_time / 10000000.0, total_time / 1000000.0); | |
/* int inserted = 0, duplicates = 0; | |
for (int i = 0; i < num_threads; i++) { | |
inserted += thread_stats[i*16 + 0]; | |
duplicates += thread_stats[i*16 + 1]; | |
} | |
printf("%d + %d = %d (needed %d)\n", inserted, duplicates, inserted + duplicates, attempts*num_threads); | |
if (inserted + duplicates != attempts*num_threads) { | |
printf("FAIL!\n"); | |
abort(); | |
}*/ | |
#if USE_SPALL | |
spall_auto_thread_quit(); | |
spall_auto_quit(); | |
#endif | |
return 0; | |
} | |
#if USE_SPALL | |
#define SPALL_AUTO_IMPLEMENTATION | |
#include "spall_native_auto.h" | |
#endif |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment