Last active
November 14, 2016 18:16
-
-
Save Kronuz/fcdaff8eef80dd05025632fdea532b88 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <iostream> | |
// c++ -std=c++14 -fsanitize=address -Wall -Wextra -g -o tst test_stash.cpp && ./tst | |
#include <array> // for array | |
#include <atomic> // for atomic | |
class StashException { }; | |
class StashContinue : public StashException { }; | |
class StashEmptyBin : public StashContinue { }; | |
class StashEmptyChunk : public StashContinue { }; | |
class StashOutOfRange : public StashException { }; | |
class StashEmptyStash : public StashOutOfRange { }; | |
template <typename _Tp, size_t _Size> | |
class Stash { | |
protected: | |
struct Nil { }; | |
struct Bin { | |
_Tp val; | |
Bin() = default; | |
Bin(Nil&&) { } | |
Bin(uint64_t key) | |
: val(_Tp(key)) { } | |
Bin(const _Tp& val_) | |
: val(val_) { } | |
Bin(_Tp&& val_) noexcept | |
: val(std::move(val_)) { } | |
}; | |
class Data { | |
private: | |
using Chunks = std::array<std::atomic<Bin*>, _Size>; | |
std::atomic<Chunks*> chunk; | |
std::atomic<Data*> next; | |
public: | |
Data() | |
: chunk(nullptr), | |
next(nullptr) { } | |
Data(Data&& o) noexcept | |
: chunk(o.chunk.load()), | |
next(o.next.load()) { } | |
~Data() { | |
clear(); | |
} | |
void clear() { | |
auto s = chunk.load(); | |
while (!chunk.compare_exchange_weak(s, nullptr)); | |
if (s) { | |
for (auto & a : *s) { | |
auto p = a.load(); | |
while (!a.compare_exchange_weak(p, nullptr)); | |
if (p) { | |
delete p; | |
} | |
} | |
delete s; | |
} | |
Data* n; | |
do { | |
n = next.load(); | |
if (n) { | |
while (!next.compare_exchange_weak(n, n->next)); | |
if (n) { | |
n->next = nullptr; | |
delete n; | |
} | |
} | |
} while (n); | |
} | |
auto& bin(size_t slot, bool spawn) { | |
if (!spawn && !next && ! chunk) { | |
throw StashEmptyStash(); | |
} | |
size_t chunk = 0; | |
if (slot >= _Size) { | |
chunk = slot / _Size; | |
slot = slot % _Size; | |
} | |
auto data = this; | |
for (size_t c = 0; c < chunk; ++c) { | |
auto ptr = data->next.load(); | |
if (!spawn && !ptr) { | |
throw StashOutOfRange(); | |
} | |
if (!ptr) { | |
auto tmp = new Data; | |
if (data->next.compare_exchange_strong(ptr, tmp)) { | |
ptr = tmp; | |
} else { | |
delete tmp; | |
} | |
} | |
data = ptr; | |
} | |
auto ptr = data->chunk.load(); | |
if (!spawn && !ptr) { | |
throw StashEmptyChunk(); | |
} | |
if (!ptr) { | |
auto tmp = new Chunks{ {} }; | |
if (data->chunk.compare_exchange_strong(ptr, tmp)) { | |
ptr = tmp; | |
} else { | |
delete tmp; | |
} | |
} | |
auto& bin = (*ptr)[slot]; | |
if (!spawn && !bin.load()) { | |
throw StashEmptyBin(); | |
} | |
return bin; | |
} | |
}; | |
Data data; | |
std::atomic_size_t pos; | |
public: | |
Stash(Stash&& o) noexcept | |
: data(std::move(o.data)), | |
pos(std::move(o.pos)) { } | |
Stash(size_t pos_) | |
: pos(pos_) { } | |
~Stash() { | |
clear(); | |
} | |
void clear() { | |
data.clear(); | |
} | |
auto& get_bin(size_t slot) { | |
/* This could fail with: | |
* StashEmptyStash | |
* StashEmptyBin | |
* StashEmptyChunk | |
* StashOutOfRange | |
*/ | |
return Stash::data.bin(slot, false); | |
} | |
auto& spawn_bin(size_t slot) { | |
/* This shouldn't fail. */ | |
return Stash::data.bin(slot, true); | |
} | |
template <typename T> | |
auto& _put(std::atomic<Bin*>& bin, T&& value) { | |
auto ptr = bin.load(); | |
if (!ptr) { | |
auto tmp = new Bin(std::forward<T>(value)); | |
if (bin.compare_exchange_strong(ptr, tmp)) { | |
ptr = tmp; | |
} else { | |
delete tmp; | |
} | |
} | |
return ptr->val; | |
} | |
}; | |
template <typename _Tp, size_t _Size, uint64_t(*_CurrentKey)(), uint64_t _Sum, uint64_t _Div, uint64_t _Mod, bool _Ring> | |
class StashSlots : public Stash<_Tp, _Size> { | |
using Stash = Stash<_Tp, _Size>; | |
using Bin = typename Stash::Bin; | |
using Nil = typename Stash::Nil; | |
size_t get_slot(uint64_t key) { | |
auto slot = ((key + _Sum) / _Div) % _Mod; | |
// std::cout << "size:" << _Size << ", div:" << _Div << ", mod:" << _Mod << ", key:" << key << ", slot:" << slot << std::endl; | |
return slot; | |
} | |
auto& put(uint64_t key) { | |
auto slot = get_slot(key); | |
auto& bin = Stash::spawn_bin(slot); | |
if (!_Ring) { | |
// This is not a ring, any new (older) item added could move the position | |
auto pos = Stash::pos.load(); | |
while (slot < pos && !Stash::pos.compare_exchange_weak(pos, slot)); | |
} | |
return Stash::_put(bin, Nil()); | |
} | |
bool check_pos(size_t pos, size_t initial_pos, size_t final_pos, size_t last_pos) { | |
/* There are basically two cases where pos is valid (v) based in the | |
* values of final_pos (fp) and initial_pos (ip). | |
* | |
* 1. When final_pos >= initial_pos: | |
* | (ip)vvvvvvvvvvvvvvvv(fp) | | |
* ^------- valid --------^ | |
* | |
* 2. When final_pos < initial_pos: | |
* |vvvvvvv(fp) (ip)vvvvvvv| | |
* ^--valid--^ ^--valid--^ | |
* | |
*/ | |
if (!_Ring && pos == 0) { | |
return false; | |
} | |
if (final_pos >= initial_pos) { | |
if (pos < initial_pos || pos > final_pos) { | |
// We're beyond final_pos, stop | |
return false; | |
} | |
} else { | |
if (pos < initial_pos) { | |
if (pos > final_pos) { | |
// We're beyond final_pos, stop | |
return false; | |
} else if (pos == final_pos && final_pos == last_pos) { | |
return false; | |
} | |
} | |
} | |
return true; | |
} | |
size_t increment_pos(size_t pos, bool keep_going, size_t initial_pos, size_t final_pos, size_t last_pos) { | |
auto new_pos = (pos + 1) % _Mod; | |
if (!check_pos(new_pos, initial_pos, final_pos, last_pos)) { | |
if (keep_going) { | |
final_pos = get_slot(_CurrentKey()); | |
if (!check_pos(new_pos, initial_pos, final_pos, last_pos)) { | |
throw StashContinue(); | |
} | |
} else { | |
throw StashContinue(); | |
} | |
} | |
Stash::pos.compare_exchange_strong(pos, new_pos); | |
return final_pos; | |
} | |
public: | |
StashSlots(StashSlots&& o) noexcept | |
: Stash::Stash(std::move(o)) { } | |
StashSlots() | |
: Stash::Stash(0) { } | |
StashSlots(uint64_t key) | |
: Stash::Stash(get_slot(key)) { } | |
auto& next(bool final, uint64_t final_key, bool keep_going) { | |
// std::cout << "\tTimeStash<_Mod=" << _Mod << ">::next(" << (final ? "true" : "false") <<", " << final_key << ", " << (keep_going ? "true" : "false") << ")" << std::endl; | |
auto pos = Stash::pos.load(); | |
keep_going &= final; | |
if (keep_going) { | |
final_key = _CurrentKey(); | |
} | |
auto initial_pos = pos; | |
auto last_pos = static_cast<size_t>((initial_pos + _Mod - 1) % _Mod); | |
auto final_pos = final ? get_slot(final_key) : last_pos; | |
do { | |
Bin* ptr = nullptr; | |
try { | |
// std::cout << "\t\tTimeStash<_Mod=" << _Mod << "> pos:" << pos << ", initial_pos:" << initial_pos << ", final:" << (final ? "true" : "false") << ", final_pos:" << final_pos << ", last_pos:" << last_pos << std::endl; | |
ptr = Stash::get_bin(pos).load(); | |
return ptr->val.next(final && pos == final_pos, final_key, keep_going); | |
} catch (const StashOutOfRange&) { | |
throw StashContinue(); | |
} catch (const StashContinue&) { } | |
final_pos = increment_pos(pos, keep_going, initial_pos, final_pos, last_pos); | |
if (!final && ptr) { | |
// Dispose if it's not in the final slice | |
ptr->val.clear(); | |
} | |
pos = Stash::pos.load(); | |
} while (true); | |
} | |
template <typename T> | |
void add(T&& value, uint64_t key) { | |
if (!key) key = _CurrentKey(); | |
put(key).add(std::forward<T>(value), key); | |
} | |
}; | |
template <typename _Tp, size_t _Size> | |
class StashValues : public Stash<_Tp, _Size> { | |
using Stash = Stash<_Tp, _Size>; | |
using Bin = typename Stash::Bin; | |
using Nil = typename Stash::Nil; | |
std::atomic_size_t idx; | |
public: | |
StashValues(StashValues&& o) noexcept | |
: Stash::Stash(std::move(o)), | |
idx(o.idx.load()) { } | |
StashValues() | |
: Stash::Stash(0), | |
idx(0) { } | |
StashValues(uint64_t) | |
: Stash::Stash(0), | |
idx(0) { } | |
void increment_pos(size_t pos) { | |
auto new_pos = pos + 1; | |
Stash::pos.compare_exchange_strong(pos, new_pos); | |
} | |
auto& next(bool, uint64_t, bool) { | |
// std::cout << "\t\tLogStash::next()" << std::endl; | |
auto pos = Stash::pos.load(); | |
// std::cout << "\t\t\tLogStash pos:" << pos << std::endl; | |
try { | |
auto ptr = Stash::get_bin(pos).load(); | |
increment_pos(pos); | |
return ptr->val; | |
} catch (const StashException&) { } | |
throw StashContinue(); | |
} | |
template <typename T> | |
void add(T&& value, uint64_t) { | |
auto& bin = Stash::spawn_bin(idx++); | |
Stash::_put(bin, std::forward<T>(value)); | |
} | |
}; | |
//////////////////////////////////////////////////////////////////////////////// | |
#include <chrono> // for chrono::system_clock | |
#include <memory> // for shared_ptr | |
using LogType = std::shared_ptr<std::string>; | |
template <typename T> | |
inline uint64_t time_point_to_key(std::chrono::time_point<T> n) { | |
return std::chrono::duration_cast<std::chrono::nanoseconds>(n.time_since_epoch()).count(); | |
} | |
static inline uint64_t now() { | |
return time_point_to_key(std::chrono::system_clock::now()); | |
} | |
#define MUL 1000000ULL | |
class LogQueue { | |
using _logs = StashValues<LogType, 10ULL>; | |
using _50_1ms = StashSlots<_logs, 10ULL, &now, 1ULL * MUL / 2ULL, 1ULL * MUL, 50ULL, false>; | |
using _10_50ms = StashSlots<_50_1ms, 10ULL, &now, 0ULL, 50ULL * MUL, 10ULL, false>; | |
using _3600_500ms = StashSlots<_10_50ms, 600ULL, &now, 0ULL, 500ULL * MUL, 3600ULL, false>; | |
using _48_1800s = StashSlots<_3600_500ms, 48ULL, &now, 0ULL, 1800000ULL * MUL, 48ULL, true>; | |
_48_1800s queue; | |
public: | |
LogQueue(); | |
LogType& next(bool final=true, uint64_t final_key=0, bool keep_going=true); | |
void add(const LogType& l_ptr, uint64_t key=0); | |
}; | |
LogQueue::LogQueue() | |
: queue(now()) | |
{ } | |
LogType& | |
LogQueue::next(bool final, uint64_t final_key, bool keep_going) | |
{ | |
keep_going = keep_going && !final_key; | |
return queue.next(final, final_key, keep_going); | |
} | |
void | |
LogQueue::add(const LogType& l_ptr, uint64_t key) | |
{ | |
queue.add(l_ptr, key); | |
} | |
//////////////////////////////////////////////////////////////////////////////// | |
//////////////////////////////////////////////////////////////////////////////// | |
//////////////////////////////////////////////////////////////////////////////// | |
#include <iostream> | |
#include <unistd.h> | |
#include <algorithm> | |
#include <cassert> | |
#include <random> | |
#include <thread> | |
#include <string> | |
#include <mutex> | |
std::mutex mtx; | |
#define RD rd() | |
// #define RD 0 // FIXME: HERE | |
std::atomic_bool running(true); | |
std::atomic_int produced(0); | |
static const int num_threads = 1000; | |
LogQueue queue; | |
void test1() { | |
std::random_device rd; | |
std::mt19937_64 rng(RD); | |
std::uniform_int_distribution<uint64_t> rnd3600s(0, 3600ULL * MUL); | |
{ | |
std::lock_guard<std::mutex> lk(mtx); | |
std::cout << "Now: " << now() << std::endl; | |
} | |
auto n = now(); | |
{/* Add items */ | |
for (int i = 1; i <= 1000; ++i) { | |
auto r = n + rnd3600s(rng); | |
std::cout << "Added: " << i << ". " << r << std::endl; | |
queue.add(std::make_shared<std::string>("Valor " + std::to_string(r)), r); | |
} | |
} | |
{/* Walk items */ | |
int i = 1; | |
for (auto sec : { 500ULL, 1000ULL, 2000ULL, 4000ULL }) { | |
auto nx = n + sec * MUL; | |
std::cout << "<" << nx << ":" << std::endl; | |
do { | |
std::string str; | |
try { | |
auto& val = queue.next(true, nx, false); | |
if (val) { | |
str = *val; | |
val.reset(); | |
} | |
} catch(const StashContinue&) { | |
break; | |
} | |
std::cout << " " << i++ << ". " << str << std::endl; | |
} while (true); | |
} | |
} | |
} | |
void test2_producer() { | |
std::random_device rd; | |
std::mt19937_64 rng(RD); | |
std::uniform_int_distribution<uint64_t> rnd8s(0, 8ULL * MUL); | |
std::uniform_int_distribution<uint64_t> rnd1ms(0, MUL / 1000ULL); | |
std::uniform_int_distribution<uint64_t> dis100000(0, 100000ULL); | |
std::uniform_int_distribution<uint64_t> dis10(0, 10ULL); | |
do { | |
auto r = now() - rnd1ms(rng) + MUL / 2000ULL; | |
auto d10 = dis10(rng); | |
for (uint64_t u = 0; u < d10; ++u) { | |
auto str = std::to_string(++produced) + " - " + std::to_string(r) + " (" + std::to_string(u) + ")"; | |
queue.add(std::make_shared<std::string>(str), r); | |
} | |
r = now() + rnd8s(rng); | |
auto str = std::to_string(++produced) + " - " + std::to_string(r); | |
queue.add(std::make_shared<std::string>(str), r); | |
usleep(10000ULL + dis100000(rng)); // 10ms - 110ms | |
} while (running); | |
} | |
void test2_consumer() { | |
int consumed = 0; | |
do { | |
std::string str; | |
try { | |
auto& val = queue.next(); | |
if (val) { | |
str = *val; | |
val.reset(); | |
} | |
} catch(const StashContinue&) { | |
usleep(10000ULL); // 10ms | |
continue; | |
} | |
{ | |
std::lock_guard<std::mutex> lk(mtx); | |
std::cout << " " << ++consumed << "/" << produced.load() << " - " << str << std::endl; | |
} | |
} while (running); | |
{ | |
std::lock_guard<std::mutex> lk(mtx); | |
std::cout << "---" << std::endl; | |
} | |
do { | |
std::string str; | |
try { | |
auto& val = queue.next(false); | |
if (val) { | |
str = *val; | |
val.reset(); | |
} | |
} catch(const StashContinue&) { | |
break; | |
} | |
{ | |
std::lock_guard<std::mutex> lk(mtx); | |
std::cout << " " << ++consumed << "/" << produced.load() << " - " << str << std::endl; | |
} | |
} while (true); | |
} | |
void test2() { | |
{ | |
std::lock_guard<std::mutex> lk(mtx); | |
std::cout << "Now: " << now() << std::endl; | |
} | |
std::thread p[num_threads]; | |
std::thread c; | |
for (int i = 0; i < num_threads; ++i) { | |
p[i] = std::thread(test2_producer); | |
} | |
c = std::thread(test2_consumer); | |
usleep(4000000ULL); // 4s | |
{ | |
std::lock_guard<std::mutex> lk(mtx); | |
std::cout << "Now: " << now() << std::endl; | |
} | |
running = false; | |
for (int i = 0; i < num_threads; ++i) { | |
p[i].join(); | |
} | |
c.join(); | |
} | |
int main() { | |
// test1(); | |
test2(); | |
return 0; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment