Skip to content

Instantly share code, notes, and snippets.

@Kronuz
Last active November 14, 2016 18:16
Show Gist options
  • Save Kronuz/fcdaff8eef80dd05025632fdea532b88 to your computer and use it in GitHub Desktop.
Save Kronuz/fcdaff8eef80dd05025632fdea532b88 to your computer and use it in GitHub Desktop.
#include <iostream>
// c++ -std=c++14 -fsanitize=address -Wall -Wextra -g -o tst test_stash.cpp && ./tst
#include <array> // for array
#include <atomic> // for atomic
class StashException { };
class StashContinue : public StashException { };
class StashEmptyBin : public StashContinue { };
class StashEmptyChunk : public StashContinue { };
class StashOutOfRange : public StashException { };
class StashEmptyStash : public StashOutOfRange { };
template <typename _Tp, size_t _Size>
class Stash {
protected:
struct Nil { };
struct Bin {
_Tp val;
Bin() = default;
Bin(Nil&&) { }
Bin(uint64_t key)
: val(_Tp(key)) { }
Bin(const _Tp& val_)
: val(val_) { }
Bin(_Tp&& val_) noexcept
: val(std::move(val_)) { }
};
class Data {
private:
using Chunks = std::array<std::atomic<Bin*>, _Size>;
std::atomic<Chunks*> chunk;
std::atomic<Data*> next;
public:
Data()
: chunk(nullptr),
next(nullptr) { }
Data(Data&& o) noexcept
: chunk(o.chunk.load()),
next(o.next.load()) { }
~Data() {
clear();
}
void clear() {
auto s = chunk.load();
while (!chunk.compare_exchange_weak(s, nullptr));
if (s) {
for (auto & a : *s) {
auto p = a.load();
while (!a.compare_exchange_weak(p, nullptr));
if (p) {
delete p;
}
}
delete s;
}
Data* n;
do {
n = next.load();
if (n) {
while (!next.compare_exchange_weak(n, n->next));
if (n) {
n->next = nullptr;
delete n;
}
}
} while (n);
}
auto& bin(size_t slot, bool spawn) {
if (!spawn && !next && ! chunk) {
throw StashEmptyStash();
}
size_t chunk = 0;
if (slot >= _Size) {
chunk = slot / _Size;
slot = slot % _Size;
}
auto data = this;
for (size_t c = 0; c < chunk; ++c) {
auto ptr = data->next.load();
if (!spawn && !ptr) {
throw StashOutOfRange();
}
if (!ptr) {
auto tmp = new Data;
if (data->next.compare_exchange_strong(ptr, tmp)) {
ptr = tmp;
} else {
delete tmp;
}
}
data = ptr;
}
auto ptr = data->chunk.load();
if (!spawn && !ptr) {
throw StashEmptyChunk();
}
if (!ptr) {
auto tmp = new Chunks{ {} };
if (data->chunk.compare_exchange_strong(ptr, tmp)) {
ptr = tmp;
} else {
delete tmp;
}
}
auto& bin = (*ptr)[slot];
if (!spawn && !bin.load()) {
throw StashEmptyBin();
}
return bin;
}
};
Data data;
std::atomic_size_t pos;
public:
Stash(Stash&& o) noexcept
: data(std::move(o.data)),
pos(std::move(o.pos)) { }
Stash(size_t pos_)
: pos(pos_) { }
~Stash() {
clear();
}
void clear() {
data.clear();
}
auto& get_bin(size_t slot) {
/* This could fail with:
* StashEmptyStash
* StashEmptyBin
* StashEmptyChunk
* StashOutOfRange
*/
return Stash::data.bin(slot, false);
}
auto& spawn_bin(size_t slot) {
/* This shouldn't fail. */
return Stash::data.bin(slot, true);
}
template <typename T>
auto& _put(std::atomic<Bin*>& bin, T&& value) {
auto ptr = bin.load();
if (!ptr) {
auto tmp = new Bin(std::forward<T>(value));
if (bin.compare_exchange_strong(ptr, tmp)) {
ptr = tmp;
} else {
delete tmp;
}
}
return ptr->val;
}
};
template <typename _Tp, size_t _Size, uint64_t(*_CurrentKey)(), uint64_t _Sum, uint64_t _Div, uint64_t _Mod, bool _Ring>
class StashSlots : public Stash<_Tp, _Size> {
using Stash = Stash<_Tp, _Size>;
using Bin = typename Stash::Bin;
using Nil = typename Stash::Nil;
size_t get_slot(uint64_t key) {
auto slot = ((key + _Sum) / _Div) % _Mod;
// std::cout << "size:" << _Size << ", div:" << _Div << ", mod:" << _Mod << ", key:" << key << ", slot:" << slot << std::endl;
return slot;
}
auto& put(uint64_t key) {
auto slot = get_slot(key);
auto& bin = Stash::spawn_bin(slot);
if (!_Ring) {
// This is not a ring, any new (older) item added could move the position
auto pos = Stash::pos.load();
while (slot < pos && !Stash::pos.compare_exchange_weak(pos, slot));
}
return Stash::_put(bin, Nil());
}
bool check_pos(size_t pos, size_t initial_pos, size_t final_pos, size_t last_pos) {
/* There are basically two cases where pos is valid (v) based in the
* values of final_pos (fp) and initial_pos (ip).
*
* 1. When final_pos >= initial_pos:
* | (ip)vvvvvvvvvvvvvvvv(fp) |
* ^------- valid --------^
*
* 2. When final_pos < initial_pos:
* |vvvvvvv(fp) (ip)vvvvvvv|
* ^--valid--^ ^--valid--^
*
*/
if (!_Ring && pos == 0) {
return false;
}
if (final_pos >= initial_pos) {
if (pos < initial_pos || pos > final_pos) {
// We're beyond final_pos, stop
return false;
}
} else {
if (pos < initial_pos) {
if (pos > final_pos) {
// We're beyond final_pos, stop
return false;
} else if (pos == final_pos && final_pos == last_pos) {
return false;
}
}
}
return true;
}
size_t increment_pos(size_t pos, bool keep_going, size_t initial_pos, size_t final_pos, size_t last_pos) {
auto new_pos = (pos + 1) % _Mod;
if (!check_pos(new_pos, initial_pos, final_pos, last_pos)) {
if (keep_going) {
final_pos = get_slot(_CurrentKey());
if (!check_pos(new_pos, initial_pos, final_pos, last_pos)) {
throw StashContinue();
}
} else {
throw StashContinue();
}
}
Stash::pos.compare_exchange_strong(pos, new_pos);
return final_pos;
}
public:
StashSlots(StashSlots&& o) noexcept
: Stash::Stash(std::move(o)) { }
StashSlots()
: Stash::Stash(0) { }
StashSlots(uint64_t key)
: Stash::Stash(get_slot(key)) { }
auto& next(bool final, uint64_t final_key, bool keep_going) {
// std::cout << "\tTimeStash<_Mod=" << _Mod << ">::next(" << (final ? "true" : "false") <<", " << final_key << ", " << (keep_going ? "true" : "false") << ")" << std::endl;
auto pos = Stash::pos.load();
keep_going &= final;
if (keep_going) {
final_key = _CurrentKey();
}
auto initial_pos = pos;
auto last_pos = static_cast<size_t>((initial_pos + _Mod - 1) % _Mod);
auto final_pos = final ? get_slot(final_key) : last_pos;
do {
Bin* ptr = nullptr;
try {
// std::cout << "\t\tTimeStash<_Mod=" << _Mod << "> pos:" << pos << ", initial_pos:" << initial_pos << ", final:" << (final ? "true" : "false") << ", final_pos:" << final_pos << ", last_pos:" << last_pos << std::endl;
ptr = Stash::get_bin(pos).load();
return ptr->val.next(final && pos == final_pos, final_key, keep_going);
} catch (const StashOutOfRange&) {
throw StashContinue();
} catch (const StashContinue&) { }
final_pos = increment_pos(pos, keep_going, initial_pos, final_pos, last_pos);
if (!final && ptr) {
// Dispose if it's not in the final slice
ptr->val.clear();
}
pos = Stash::pos.load();
} while (true);
}
template <typename T>
void add(T&& value, uint64_t key) {
if (!key) key = _CurrentKey();
put(key).add(std::forward<T>(value), key);
}
};
template <typename _Tp, size_t _Size>
class StashValues : public Stash<_Tp, _Size> {
using Stash = Stash<_Tp, _Size>;
using Bin = typename Stash::Bin;
using Nil = typename Stash::Nil;
std::atomic_size_t idx;
public:
StashValues(StashValues&& o) noexcept
: Stash::Stash(std::move(o)),
idx(o.idx.load()) { }
StashValues()
: Stash::Stash(0),
idx(0) { }
StashValues(uint64_t)
: Stash::Stash(0),
idx(0) { }
void increment_pos(size_t pos) {
auto new_pos = pos + 1;
Stash::pos.compare_exchange_strong(pos, new_pos);
}
auto& next(bool, uint64_t, bool) {
// std::cout << "\t\tLogStash::next()" << std::endl;
auto pos = Stash::pos.load();
// std::cout << "\t\t\tLogStash pos:" << pos << std::endl;
try {
auto ptr = Stash::get_bin(pos).load();
increment_pos(pos);
return ptr->val;
} catch (const StashException&) { }
throw StashContinue();
}
template <typename T>
void add(T&& value, uint64_t) {
auto& bin = Stash::spawn_bin(idx++);
Stash::_put(bin, std::forward<T>(value));
}
};
////////////////////////////////////////////////////////////////////////////////
#include <chrono> // for chrono::system_clock
#include <memory> // for shared_ptr
using LogType = std::shared_ptr<std::string>;
template <typename T>
inline uint64_t time_point_to_key(std::chrono::time_point<T> n) {
return std::chrono::duration_cast<std::chrono::nanoseconds>(n.time_since_epoch()).count();
}
static inline uint64_t now() {
return time_point_to_key(std::chrono::system_clock::now());
}
#define MUL 1000000ULL
class LogQueue {
using _logs = StashValues<LogType, 10ULL>;
using _50_1ms = StashSlots<_logs, 10ULL, &now, 1ULL * MUL / 2ULL, 1ULL * MUL, 50ULL, false>;
using _10_50ms = StashSlots<_50_1ms, 10ULL, &now, 0ULL, 50ULL * MUL, 10ULL, false>;
using _3600_500ms = StashSlots<_10_50ms, 600ULL, &now, 0ULL, 500ULL * MUL, 3600ULL, false>;
using _48_1800s = StashSlots<_3600_500ms, 48ULL, &now, 0ULL, 1800000ULL * MUL, 48ULL, true>;
_48_1800s queue;
public:
LogQueue();
LogType& next(bool final=true, uint64_t final_key=0, bool keep_going=true);
void add(const LogType& l_ptr, uint64_t key=0);
};
LogQueue::LogQueue()
: queue(now())
{ }
LogType&
LogQueue::next(bool final, uint64_t final_key, bool keep_going)
{
keep_going = keep_going && !final_key;
return queue.next(final, final_key, keep_going);
}
void
LogQueue::add(const LogType& l_ptr, uint64_t key)
{
queue.add(l_ptr, key);
}
////////////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////////
#include <iostream>
#include <unistd.h>
#include <algorithm>
#include <cassert>
#include <random>
#include <thread>
#include <string>
#include <mutex>
std::mutex mtx;
#define RD rd()
// #define RD 0 // FIXME: HERE
std::atomic_bool running(true);
std::atomic_int produced(0);
static const int num_threads = 1000;
LogQueue queue;
void test1() {
std::random_device rd;
std::mt19937_64 rng(RD);
std::uniform_int_distribution<uint64_t> rnd3600s(0, 3600ULL * MUL);
{
std::lock_guard<std::mutex> lk(mtx);
std::cout << "Now: " << now() << std::endl;
}
auto n = now();
{/* Add items */
for (int i = 1; i <= 1000; ++i) {
auto r = n + rnd3600s(rng);
std::cout << "Added: " << i << ". " << r << std::endl;
queue.add(std::make_shared<std::string>("Valor " + std::to_string(r)), r);
}
}
{/* Walk items */
int i = 1;
for (auto sec : { 500ULL, 1000ULL, 2000ULL, 4000ULL }) {
auto nx = n + sec * MUL;
std::cout << "<" << nx << ":" << std::endl;
do {
std::string str;
try {
auto& val = queue.next(true, nx, false);
if (val) {
str = *val;
val.reset();
}
} catch(const StashContinue&) {
break;
}
std::cout << " " << i++ << ". " << str << std::endl;
} while (true);
}
}
}
void test2_producer() {
std::random_device rd;
std::mt19937_64 rng(RD);
std::uniform_int_distribution<uint64_t> rnd8s(0, 8ULL * MUL);
std::uniform_int_distribution<uint64_t> rnd1ms(0, MUL / 1000ULL);
std::uniform_int_distribution<uint64_t> dis100000(0, 100000ULL);
std::uniform_int_distribution<uint64_t> dis10(0, 10ULL);
do {
auto r = now() - rnd1ms(rng) + MUL / 2000ULL;
auto d10 = dis10(rng);
for (uint64_t u = 0; u < d10; ++u) {
auto str = std::to_string(++produced) + " - " + std::to_string(r) + " (" + std::to_string(u) + ")";
queue.add(std::make_shared<std::string>(str), r);
}
r = now() + rnd8s(rng);
auto str = std::to_string(++produced) + " - " + std::to_string(r);
queue.add(std::make_shared<std::string>(str), r);
usleep(10000ULL + dis100000(rng)); // 10ms - 110ms
} while (running);
}
void test2_consumer() {
int consumed = 0;
do {
std::string str;
try {
auto& val = queue.next();
if (val) {
str = *val;
val.reset();
}
} catch(const StashContinue&) {
usleep(10000ULL); // 10ms
continue;
}
{
std::lock_guard<std::mutex> lk(mtx);
std::cout << " " << ++consumed << "/" << produced.load() << " - " << str << std::endl;
}
} while (running);
{
std::lock_guard<std::mutex> lk(mtx);
std::cout << "---" << std::endl;
}
do {
std::string str;
try {
auto& val = queue.next(false);
if (val) {
str = *val;
val.reset();
}
} catch(const StashContinue&) {
break;
}
{
std::lock_guard<std::mutex> lk(mtx);
std::cout << " " << ++consumed << "/" << produced.load() << " - " << str << std::endl;
}
} while (true);
}
void test2() {
{
std::lock_guard<std::mutex> lk(mtx);
std::cout << "Now: " << now() << std::endl;
}
std::thread p[num_threads];
std::thread c;
for (int i = 0; i < num_threads; ++i) {
p[i] = std::thread(test2_producer);
}
c = std::thread(test2_consumer);
usleep(4000000ULL); // 4s
{
std::lock_guard<std::mutex> lk(mtx);
std::cout << "Now: " << now() << std::endl;
}
running = false;
for (int i = 0; i < num_threads; ++i) {
p[i].join();
}
c.join();
}
int main() {
// test1();
test2();
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment