Created
December 18, 2023 13:39
-
-
Save royguo/20150057e0c00e495af508f35e567047 to your computer and use it in GitHub Desktop.
policy_aware_cache
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include "innovic/cache/policy_aware_cache.h" | |
#include <iostream> | |
#include <memory> | |
namespace innovic { | |
namespace cache { | |
void DRAMStore::Put(const std::string &key, std::shared_ptr<CachedValue> value) { | |
std::unique_lock<std::shared_mutex> write_lock(map_mutex_); | |
// Item exist | |
auto it = map_.find(key); | |
if (it != map_.end()) { | |
used_bytes_ -= it->second->Size(); | |
map_.erase(key); | |
} | |
map_[key] = value; | |
used_bytes_ += value->Size(); | |
} | |
std::shared_ptr<CachedValue> DRAMStore::Get(const std::string &key) { | |
std::shared_lock<std::shared_mutex> read_lock(map_mutex_); | |
auto it = map_.find(key); | |
if (it != map_.end()) { | |
return map_[key]; | |
} | |
return nullptr; | |
} | |
uint64_t DRAMStore::UsedBytes() { | |
return used_bytes_; | |
} | |
void DRAMStore::Remove(const std::string &key) { | |
std::unique_lock<std::shared_mutex> write_lock(map_mutex_); | |
map_.erase(key); | |
} | |
void LRUPolicy::Put(const std::string &key, | |
const std::function<void(const std::string &key)> &put_callback, | |
const std::function<void(const std::string &key)> &evict_callback) { | |
bool new_item = false; | |
// If the key doesn't exist, then we can safely add new element. | |
if (!Exist(key)) { | |
if (count_ >= soft_capacity_) { | |
Evict(evict_callback); | |
} | |
std::unique_lock<std::shared_mutex> write_lock(mutex_); | |
// double check with the write lock in case any other thread inserted the same key. | |
if (keys_.find(key) == keys_.end()) { | |
priority_list_.push_front(key); | |
keys_[key] = priority_list_.begin(); | |
count_++; | |
new_item = true; | |
} | |
} | |
if (new_item) { put_callback(key); } | |
// If the key exist, we should promote it to the head | |
TouchExist(key, [](const std::string &) {}); | |
} | |
bool LRUPolicy::Exist(const std::string &key) { | |
std::shared_lock<std::shared_mutex> read_lock(mutex_); | |
if (keys_.find(key) != keys_.end()) { | |
return true; | |
} | |
return false; | |
} | |
// Only lock the section of updating policy list. | |
bool LRUPolicy::TouchExist(const std::string &key, | |
const std::function<void(const std::string &key)> &touch_callback) { | |
bool ret = false; | |
if (Exist(key)) { | |
std::unique_lock<std::shared_mutex> write_lock(mutex_); | |
// double check with write lock. | |
auto it = keys_.find(key); | |
if (it != keys_.end()) { | |
priority_list_.erase(it->second); | |
priority_list_.push_front(it->first); | |
keys_[key] = priority_list_.begin(); | |
ret = true; | |
} | |
} | |
if (ret) { touch_callback(key); } | |
return ret; | |
} | |
void LRUPolicy::Evict(const std::function<void(const std::string &key)> &evict_callback) { | |
// No need to evict any element | |
if (count_ == 0) { | |
return; | |
} | |
std::string evicted_key; | |
{ | |
std::unique_lock<std::shared_mutex> write_lock(mutex_); | |
evicted_key = priority_list_.back(); | |
keys_.erase(evicted_key); | |
priority_list_.pop_back(); | |
count_--; | |
} | |
evict_callback(evicted_key); | |
} | |
void LRUCache::Put(const std::string &key, std::shared_ptr<CachedValue> value) { | |
policy_->Put(key, [&](const std::string &key) { | |
store_->Put(key, value); | |
}, [&](const std::string &key) { | |
store_->Remove(key); | |
}); | |
} | |
std::shared_ptr<CachedValue> LRUCache::Get(const std::string &key) { | |
std::shared_ptr<CachedValue> value; | |
// It is possible that the touched item was deleted after we get the value, we don't care about it. | |
policy_->TouchExist(key, [&](const std::string &key) { | |
value = store_->Get(key); | |
}); | |
return value; | |
} | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#pragma once | |
#include <memory> | |
#include <string> | |
#include <map> | |
#include <list> | |
#include <mutex> | |
#include <shared_mutex> | |
#include <functional> | |
namespace innovic { | |
namespace cache { | |
// The cached item's lifecycle should be managed by the CacheValue. Some other libraries like folly | |
// has a `folly::IOBuf` that has the similar usage. | |
// All cached item should be able to be represented as binary data. | |
class CachedValue { | |
public: | |
CachedValue(char *data, uint64_t size) : data_(data), size_(size) {} | |
explicit CachedValue(const std::string &data) { | |
size_ = data.size(); | |
data_ = (char *) malloc(data.size()); | |
memcpy(data_, data.c_str(), size_); | |
} | |
~CachedValue() { | |
if (data_ != nullptr) { free(data_); } | |
} | |
const char *Data() { return data_; } | |
uint64_t Size() const { return size_; } | |
std::string String() { | |
return {data_, size_}; | |
} | |
private: | |
char *data_ = nullptr; | |
uint64_t size_ = 0; | |
}; | |
// The abstract Cache Policy, we can implement LRU, sLRU or even some adaptive cache policy. | |
// We disaggregated the policy and storage is because sometimes we may use SSD or even remote | |
// storage as cache, where local memory pointer is not always available. | |
class Policy { | |
public: | |
virtual void Put(const std::string &key) = 0; | |
virtual void Put(const std::string &key, | |
const std::function<void(const std::string &key)> &put_callback, | |
const std::function<void(const std::string &key)> &evict_callback) = 0; | |
// Check existence without change its priority | |
virtual bool Exist(const std::string &key) = 0; | |
virtual bool TouchExist(const std::string &key) = 0; | |
// return true if the key exist and promote it. | |
// If the key exists, call the touch_callback | |
virtual bool TouchExist(const std::string &key, | |
const std::function<void(const std::string &key)> &touch_callback) = 0; | |
virtual void Evict(const std::function<void(const std::string &key)> &evict_callback) = 0; | |
}; | |
// The LRU Policy implementation allow the cached items exceeds the soft capacity. | |
class LRUPolicy : public Policy { | |
public: | |
explicit LRUPolicy(uint64_t soft_capacity) : soft_capacity_(soft_capacity) {} | |
public: | |
void Put(const std::string &key) override { | |
Put(key, [](const std::string &) {}, [](const std::string &) {}); | |
} | |
void Put(const std::string &key, | |
const std::function<void(const std::string &key)> &put_callback, | |
const std::function<void(const std::string &key)> &evict_callback) override; | |
bool Exist(const std::string &key) override; | |
bool TouchExist(const std::string &key) override { | |
return TouchExist(key, [](const std::string &key) {}); | |
} | |
bool TouchExist(const std::string &key, | |
const std::function<void(const std::string &key)> &touch_callback) override; | |
void Evict(const std::function<void(const std::string &key)> &evict_callback) override; | |
private: | |
std::list<std::string> priority_list_; | |
// Check existence of target keys. | |
std::map<std::string, std::list<std::string>::iterator> keys_; | |
std::shared_mutex mutex_; | |
// total elements in this policy. | |
std::atomic<uint64_t> count_ = {0}; | |
uint64_t soft_capacity_ = 0; | |
}; | |
// The underlying cached value storage. | |
class Store { | |
public: | |
// If is not necessary to have a return value for the Put method. If anything goes wrong, just | |
// log the error inside the implementation without break the system. | |
virtual void Put(const std::string &key, std::shared_ptr<CachedValue> value) = 0; | |
// If the returned unique_ptr is empty, the we got nothing available in the cache. | |
virtual std::shared_ptr<CachedValue> Get(const std::string &key) = 0; | |
virtual void Remove(const std::string &key) = 0; | |
// The total used storage size is protected by the policy, so the store system itself shouldn't | |
// care about the limitations. | |
virtual uint64_t UsedBytes() = 0; | |
}; | |
// For all most all cases, we will only use DRAM-based storage. | |
class DRAMStore : public Store { | |
public: | |
// The cached item could possibly be used in different threads. | |
void Put(const std::string &key, std::shared_ptr<CachedValue> value) override; | |
std::shared_ptr<CachedValue> Get(const std::string &key) override; | |
void Remove(const std::string &key) override; | |
uint64_t UsedBytes() override; | |
private: | |
std::map<std::string, std::shared_ptr<CachedValue>> map_; | |
std::atomic_uint64_t used_bytes_ = {0}; | |
std::shared_mutex map_mutex_; | |
}; | |
// Unimplemented | |
//class SSDStore: public Store {}; | |
class PolicyAwareCache { | |
public: | |
explicit PolicyAwareCache(uint64_t soft_capacity) : soft_capacity_(soft_capacity) {} | |
virtual void Put(const std::string &key, std::shared_ptr<CachedValue> value) = 0; | |
virtual std::shared_ptr<CachedValue> Get(const std::string &key) = 0; | |
private: | |
std::atomic<uint64_t> soft_capacity_ = {0}; | |
std::atomic<uint64_t> used_ = {0}; | |
}; | |
// Implementations of the LRUCache. | |
class LRUCache : public PolicyAwareCache { | |
public: | |
explicit LRUCache(uint64_t soft_capacity) : PolicyAwareCache(soft_capacity) { | |
policy_ = std::make_unique<LRUPolicy>(soft_capacity); | |
store_ = std::make_unique<DRAMStore>(); | |
} | |
void Put(const std::string &key, std::shared_ptr<CachedValue> value) override; | |
std::shared_ptr<CachedValue> Get(const std::string &key) override; | |
private: | |
std::unique_ptr<LRUPolicy> policy_; | |
std::unique_ptr<DRAMStore> store_; | |
}; | |
} // cache | |
} // innovic |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <gtest/gtest.h> | |
#include <memory> | |
#include "innovic/cache/policy_aware_cache.h" | |
#include "innovic/utils.h" | |
namespace innovic { | |
namespace test { | |
using namespace innovic::cache; | |
class PolicyAwareCacheTest : public testing::Test { | |
protected: | |
std::string random_string_; | |
void SetUp() override { | |
// generate a 2MB length random string for further tests. | |
random_string_ = utils::StringUtils::RandomString(2UL << 20); | |
} | |
}; | |
// Demonstrate some basic assertions. | |
TEST_F(PolicyAwareCacheTest, DRAMStoreTest) { | |
DRAMStore dram_store; | |
// Simple Put & Get Test | |
for (int i = 0; i < 1000; ++i) { | |
std::string key = std::to_string(i); | |
auto value = std::make_shared<CachedValue>(std::to_string(i)); | |
dram_store.Put(key, value); | |
} | |
for (int i = 0; i < 1000; ++i) { | |
std::string key = std::to_string(i); | |
std::shared_ptr<CachedValue> value = dram_store.Get(key); | |
EXPECT_TRUE(value.get() != nullptr); | |
EXPECT_EQ(value->String(), std::to_string(i)); | |
} | |
// Multi-Thread Put & Get Test | |
std::vector<std::string> keys; | |
keys.reserve(1000); | |
for (int i = 0; i < 1000; ++i) { | |
keys.emplace_back(utils::StringUtils::RandomString(100)); | |
} | |
std::vector<std::thread> workers; | |
workers.reserve(10); | |
for (int i = 0; i < 10; ++i) { | |
workers.emplace_back([&, i]() { | |
// each thread puts & gets 1000 items | |
for (int j = 0; j < 1000; j++) { | |
std::string key = keys[random() % 1000]; | |
auto value = std::make_shared<CachedValue>(key); | |
dram_store.Put(key, value); | |
std::string key2 = keys[random() % 1000]; | |
value = dram_store.Get(key2); | |
if (value != nullptr) { | |
EXPECT_EQ(key2, value->String()); | |
} | |
} | |
}); | |
} | |
for (auto &worker : workers) { | |
worker.join(); | |
} | |
LOG(INFO, "DRAMStore Test Finished"); | |
} | |
TEST_F(PolicyAwareCacheTest, LRUCachePolicyTest) { | |
LRUPolicy lru(500); | |
// Simple Put & Get Test | |
for (int i = 0; i < 1000; ++i) { | |
std::string key = std::to_string(i); | |
lru.Put(key); | |
} | |
for (int i = 500; i < 1000; ++i) { | |
std::string key = std::to_string(i); | |
EXPECT_TRUE(lru.TouchExist(key)); | |
} | |
for (int i = 0; i < 500; ++i) { | |
std::string key = std::to_string(i); | |
EXPECT_FALSE(lru.TouchExist(key)); | |
} | |
// Test duplicated values | |
for (int i = 0; i < 1000; ++i) { | |
lru.Put("key123"); | |
} | |
// Multi-Thread Put & Get Test | |
std::vector<std::string> keys; | |
keys.reserve(1000); | |
for (int i = 0; i < 1000; ++i) { | |
keys.emplace_back(utils::StringUtils::RandomString(100)); | |
} | |
std::vector<std::thread> workers; | |
workers.reserve(10); | |
for (int i = 0; i < 10; ++i) { | |
workers.emplace_back([&, i]() { | |
// each thread puts & gets 1000 items | |
for (int j = 0; j < 1000; j++) { | |
std::string key = keys[random() % 1000]; | |
lru.Put(key); | |
std::string key2 = keys[random() % 1000]; | |
lru.TouchExist(key2); | |
} | |
}); | |
} | |
for (auto &worker : workers) { | |
worker.join(); | |
} | |
// Check existence | |
uint32_t cnt = 0; | |
for (auto &key : keys) { | |
if (lru.TouchExist(key)) { | |
cnt++; | |
} | |
} | |
// We don't make the policy a highly restricted container. | |
EXPECT_GT(cnt, 500); | |
LOG(INFO, "LRUPolicy Test Finished"); | |
} | |
TEST_F(PolicyAwareCacheTest, LRUCacheTest) { | |
LRUCache cache(500); | |
// Simple Put & Get Test | |
for (int i = 0; i < 1000; ++i) { | |
std::string key = std::to_string(i); | |
auto value = std::make_shared<CachedValue>(std::to_string(i + 1)); | |
cache.Put(key, value); | |
} | |
for (int i = 500; i < 1000; ++i) { | |
std::string key = std::to_string(i); | |
auto value = cache.Get(key); | |
EXPECT_EQ(value->String(), std::to_string(i + 1)); | |
} | |
for (int i = 0; i < 500; ++i) { | |
std::string key = std::to_string(i); | |
auto value = cache.Get(key); | |
EXPECT_TRUE(value == nullptr); | |
} | |
// Test duplicated values | |
for (int i = 0; i < 1000; ++i) { | |
auto value = std::make_shared<CachedValue>(std::to_string(i + 1)); | |
cache.Put("key123", value); | |
} | |
// Multi-Thread Put & Get Test | |
std::vector<std::string> keys; | |
keys.reserve(1000); | |
for (int i = 0; i < 1000; ++i) { | |
keys.emplace_back(utils::StringUtils::RandomString(100)); | |
} | |
std::vector<std::thread> workers; | |
workers.reserve(10); | |
for (int i = 0; i < 10; ++i) { | |
workers.emplace_back([&, i]() { | |
// each thread puts & gets 1000 items | |
for (int j = 0; j < 1000; j++) { | |
std::string key = keys[random() % 1000]; | |
auto value = std::make_shared<CachedValue>(std::to_string(i + 1)); | |
cache.Put(key, value); | |
std::string key2 = keys[random() % 1000]; | |
cache.Get(key2); | |
} | |
}); | |
} | |
for (auto &worker : workers) { | |
worker.join(); | |
} | |
// Check existence | |
uint32_t cnt = 0; | |
for (auto &key : keys) { | |
if (cache.Get(key) != nullptr) { | |
cnt++; | |
} | |
} | |
// We don't make the policy a highly restricted container. | |
EXPECT_GT(cnt, 500); | |
LOG(INFO, "LRUPolicy Test Finished"); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment