Skip to content

Instantly share code, notes, and snippets.

@royguo
Created December 18, 2023 13:39
Show Gist options
  • Save royguo/20150057e0c00e495af508f35e567047 to your computer and use it in GitHub Desktop.
Save royguo/20150057e0c00e495af508f35e567047 to your computer and use it in GitHub Desktop.
policy_aware_cache
#include "innovic/cache/policy_aware_cache.h"
#include <iostream>
#include <memory>
namespace innovic {
namespace cache {
void DRAMStore::Put(const std::string &key, std::shared_ptr<CachedValue> value) {
std::unique_lock<std::shared_mutex> write_lock(map_mutex_);
// Item exist
auto it = map_.find(key);
if (it != map_.end()) {
used_bytes_ -= it->second->Size();
map_.erase(key);
}
map_[key] = value;
used_bytes_ += value->Size();
}
std::shared_ptr<CachedValue> DRAMStore::Get(const std::string &key) {
std::shared_lock<std::shared_mutex> read_lock(map_mutex_);
auto it = map_.find(key);
if (it != map_.end()) {
return map_[key];
}
return nullptr;
}
uint64_t DRAMStore::UsedBytes() {
return used_bytes_;
}
void DRAMStore::Remove(const std::string &key) {
std::unique_lock<std::shared_mutex> write_lock(map_mutex_);
map_.erase(key);
}
void LRUPolicy::Put(const std::string &key,
const std::function<void(const std::string &key)> &put_callback,
const std::function<void(const std::string &key)> &evict_callback) {
bool new_item = false;
// If the key doesn't exist, then we can safely add new element.
if (!Exist(key)) {
if (count_ >= soft_capacity_) {
Evict(evict_callback);
}
std::unique_lock<std::shared_mutex> write_lock(mutex_);
// double check with the write lock in case any other thread inserted the same key.
if (keys_.find(key) == keys_.end()) {
priority_list_.push_front(key);
keys_[key] = priority_list_.begin();
count_++;
new_item = true;
}
}
if (new_item) { put_callback(key); }
// If the key exist, we should promote it to the head
TouchExist(key, [](const std::string &) {});
}
bool LRUPolicy::Exist(const std::string &key) {
std::shared_lock<std::shared_mutex> read_lock(mutex_);
if (keys_.find(key) != keys_.end()) {
return true;
}
return false;
}
// Only lock the section of updating policy list.
bool LRUPolicy::TouchExist(const std::string &key,
const std::function<void(const std::string &key)> &touch_callback) {
bool ret = false;
if (Exist(key)) {
std::unique_lock<std::shared_mutex> write_lock(mutex_);
// double check with write lock.
auto it = keys_.find(key);
if (it != keys_.end()) {
priority_list_.erase(it->second);
priority_list_.push_front(it->first);
keys_[key] = priority_list_.begin();
ret = true;
}
}
if (ret) { touch_callback(key); }
return ret;
}
void LRUPolicy::Evict(const std::function<void(const std::string &key)> &evict_callback) {
// No need to evict any element
if (count_ == 0) {
return;
}
std::string evicted_key;
{
std::unique_lock<std::shared_mutex> write_lock(mutex_);
evicted_key = priority_list_.back();
keys_.erase(evicted_key);
priority_list_.pop_back();
count_--;
}
evict_callback(evicted_key);
}
void LRUCache::Put(const std::string &key, std::shared_ptr<CachedValue> value) {
policy_->Put(key, [&](const std::string &key) {
store_->Put(key, value);
}, [&](const std::string &key) {
store_->Remove(key);
});
}
std::shared_ptr<CachedValue> LRUCache::Get(const std::string &key) {
std::shared_ptr<CachedValue> value;
// It is possible that the touched item was deleted after we get the value, we don't care about it.
policy_->TouchExist(key, [&](const std::string &key) {
value = store_->Get(key);
});
return value;
}
}
}
#pragma once
#include <memory>
#include <string>
#include <map>
#include <list>
#include <mutex>
#include <shared_mutex>
#include <functional>
namespace innovic {
namespace cache {
// The cached item's lifecycle should be managed by the CacheValue. Some other libraries like folly
// has a `folly::IOBuf` that has the similar usage.
// All cached item should be able to be represented as binary data.
class CachedValue {
public:
CachedValue(char *data, uint64_t size) : data_(data), size_(size) {}
explicit CachedValue(const std::string &data) {
size_ = data.size();
data_ = (char *) malloc(data.size());
memcpy(data_, data.c_str(), size_);
}
~CachedValue() {
if (data_ != nullptr) { free(data_); }
}
const char *Data() { return data_; }
uint64_t Size() const { return size_; }
std::string String() {
return {data_, size_};
}
private:
char *data_ = nullptr;
uint64_t size_ = 0;
};
// The abstract Cache Policy, we can implement LRU, sLRU or even some adaptive cache policy.
// We disaggregated the policy and storage is because sometimes we may use SSD or even remote
// storage as cache, where local memory pointer is not always available.
class Policy {
public:
virtual void Put(const std::string &key) = 0;
virtual void Put(const std::string &key,
const std::function<void(const std::string &key)> &put_callback,
const std::function<void(const std::string &key)> &evict_callback) = 0;
// Check existence without change its priority
virtual bool Exist(const std::string &key) = 0;
virtual bool TouchExist(const std::string &key) = 0;
// return true if the key exist and promote it.
// If the key exists, call the touch_callback
virtual bool TouchExist(const std::string &key,
const std::function<void(const std::string &key)> &touch_callback) = 0;
virtual void Evict(const std::function<void(const std::string &key)> &evict_callback) = 0;
};
// The LRU Policy implementation allow the cached items exceeds the soft capacity.
class LRUPolicy : public Policy {
public:
explicit LRUPolicy(uint64_t soft_capacity) : soft_capacity_(soft_capacity) {}
public:
void Put(const std::string &key) override {
Put(key, [](const std::string &) {}, [](const std::string &) {});
}
void Put(const std::string &key,
const std::function<void(const std::string &key)> &put_callback,
const std::function<void(const std::string &key)> &evict_callback) override;
bool Exist(const std::string &key) override;
bool TouchExist(const std::string &key) override {
return TouchExist(key, [](const std::string &key) {});
}
bool TouchExist(const std::string &key,
const std::function<void(const std::string &key)> &touch_callback) override;
void Evict(const std::function<void(const std::string &key)> &evict_callback) override;
private:
std::list<std::string> priority_list_;
// Check existence of target keys.
std::map<std::string, std::list<std::string>::iterator> keys_;
std::shared_mutex mutex_;
// total elements in this policy.
std::atomic<uint64_t> count_ = {0};
uint64_t soft_capacity_ = 0;
};
// The underlying cached value storage.
class Store {
public:
// If is not necessary to have a return value for the Put method. If anything goes wrong, just
// log the error inside the implementation without break the system.
virtual void Put(const std::string &key, std::shared_ptr<CachedValue> value) = 0;
// If the returned unique_ptr is empty, the we got nothing available in the cache.
virtual std::shared_ptr<CachedValue> Get(const std::string &key) = 0;
virtual void Remove(const std::string &key) = 0;
// The total used storage size is protected by the policy, so the store system itself shouldn't
// care about the limitations.
virtual uint64_t UsedBytes() = 0;
};
// For all most all cases, we will only use DRAM-based storage.
class DRAMStore : public Store {
public:
// The cached item could possibly be used in different threads.
void Put(const std::string &key, std::shared_ptr<CachedValue> value) override;
std::shared_ptr<CachedValue> Get(const std::string &key) override;
void Remove(const std::string &key) override;
uint64_t UsedBytes() override;
private:
std::map<std::string, std::shared_ptr<CachedValue>> map_;
std::atomic_uint64_t used_bytes_ = {0};
std::shared_mutex map_mutex_;
};
// Unimplemented
//class SSDStore: public Store {};
class PolicyAwareCache {
public:
explicit PolicyAwareCache(uint64_t soft_capacity) : soft_capacity_(soft_capacity) {}
virtual void Put(const std::string &key, std::shared_ptr<CachedValue> value) = 0;
virtual std::shared_ptr<CachedValue> Get(const std::string &key) = 0;
private:
std::atomic<uint64_t> soft_capacity_ = {0};
std::atomic<uint64_t> used_ = {0};
};
// Implementations of the LRUCache.
class LRUCache : public PolicyAwareCache {
public:
explicit LRUCache(uint64_t soft_capacity) : PolicyAwareCache(soft_capacity) {
policy_ = std::make_unique<LRUPolicy>(soft_capacity);
store_ = std::make_unique<DRAMStore>();
}
void Put(const std::string &key, std::shared_ptr<CachedValue> value) override;
std::shared_ptr<CachedValue> Get(const std::string &key) override;
private:
std::unique_ptr<LRUPolicy> policy_;
std::unique_ptr<DRAMStore> store_;
};
} // cache
} // innovic
#include <gtest/gtest.h>
#include <memory>
#include "innovic/cache/policy_aware_cache.h"
#include "innovic/utils.h"
namespace innovic {
namespace test {
using namespace innovic::cache;
class PolicyAwareCacheTest : public testing::Test {
protected:
std::string random_string_;
void SetUp() override {
// generate a 2MB length random string for further tests.
random_string_ = utils::StringUtils::RandomString(2UL << 20);
}
};
// Demonstrate some basic assertions.
TEST_F(PolicyAwareCacheTest, DRAMStoreTest) {
DRAMStore dram_store;
// Simple Put & Get Test
for (int i = 0; i < 1000; ++i) {
std::string key = std::to_string(i);
auto value = std::make_shared<CachedValue>(std::to_string(i));
dram_store.Put(key, value);
}
for (int i = 0; i < 1000; ++i) {
std::string key = std::to_string(i);
std::shared_ptr<CachedValue> value = dram_store.Get(key);
EXPECT_TRUE(value.get() != nullptr);
EXPECT_EQ(value->String(), std::to_string(i));
}
// Multi-Thread Put & Get Test
std::vector<std::string> keys;
keys.reserve(1000);
for (int i = 0; i < 1000; ++i) {
keys.emplace_back(utils::StringUtils::RandomString(100));
}
std::vector<std::thread> workers;
workers.reserve(10);
for (int i = 0; i < 10; ++i) {
workers.emplace_back([&, i]() {
// each thread puts & gets 1000 items
for (int j = 0; j < 1000; j++) {
std::string key = keys[random() % 1000];
auto value = std::make_shared<CachedValue>(key);
dram_store.Put(key, value);
std::string key2 = keys[random() % 1000];
value = dram_store.Get(key2);
if (value != nullptr) {
EXPECT_EQ(key2, value->String());
}
}
});
}
for (auto &worker : workers) {
worker.join();
}
LOG(INFO, "DRAMStore Test Finished");
}
TEST_F(PolicyAwareCacheTest, LRUCachePolicyTest) {
LRUPolicy lru(500);
// Simple Put & Get Test
for (int i = 0; i < 1000; ++i) {
std::string key = std::to_string(i);
lru.Put(key);
}
for (int i = 500; i < 1000; ++i) {
std::string key = std::to_string(i);
EXPECT_TRUE(lru.TouchExist(key));
}
for (int i = 0; i < 500; ++i) {
std::string key = std::to_string(i);
EXPECT_FALSE(lru.TouchExist(key));
}
// Test duplicated values
for (int i = 0; i < 1000; ++i) {
lru.Put("key123");
}
// Multi-Thread Put & Get Test
std::vector<std::string> keys;
keys.reserve(1000);
for (int i = 0; i < 1000; ++i) {
keys.emplace_back(utils::StringUtils::RandomString(100));
}
std::vector<std::thread> workers;
workers.reserve(10);
for (int i = 0; i < 10; ++i) {
workers.emplace_back([&, i]() {
// each thread puts & gets 1000 items
for (int j = 0; j < 1000; j++) {
std::string key = keys[random() % 1000];
lru.Put(key);
std::string key2 = keys[random() % 1000];
lru.TouchExist(key2);
}
});
}
for (auto &worker : workers) {
worker.join();
}
// Check existence
uint32_t cnt = 0;
for (auto &key : keys) {
if (lru.TouchExist(key)) {
cnt++;
}
}
// We don't make the policy a highly restricted container.
EXPECT_GT(cnt, 500);
LOG(INFO, "LRUPolicy Test Finished");
}
TEST_F(PolicyAwareCacheTest, LRUCacheTest) {
LRUCache cache(500);
// Simple Put & Get Test
for (int i = 0; i < 1000; ++i) {
std::string key = std::to_string(i);
auto value = std::make_shared<CachedValue>(std::to_string(i + 1));
cache.Put(key, value);
}
for (int i = 500; i < 1000; ++i) {
std::string key = std::to_string(i);
auto value = cache.Get(key);
EXPECT_EQ(value->String(), std::to_string(i + 1));
}
for (int i = 0; i < 500; ++i) {
std::string key = std::to_string(i);
auto value = cache.Get(key);
EXPECT_TRUE(value == nullptr);
}
// Test duplicated values
for (int i = 0; i < 1000; ++i) {
auto value = std::make_shared<CachedValue>(std::to_string(i + 1));
cache.Put("key123", value);
}
// Multi-Thread Put & Get Test
std::vector<std::string> keys;
keys.reserve(1000);
for (int i = 0; i < 1000; ++i) {
keys.emplace_back(utils::StringUtils::RandomString(100));
}
std::vector<std::thread> workers;
workers.reserve(10);
for (int i = 0; i < 10; ++i) {
workers.emplace_back([&, i]() {
// each thread puts & gets 1000 items
for (int j = 0; j < 1000; j++) {
std::string key = keys[random() % 1000];
auto value = std::make_shared<CachedValue>(std::to_string(i + 1));
cache.Put(key, value);
std::string key2 = keys[random() % 1000];
cache.Get(key2);
}
});
}
for (auto &worker : workers) {
worker.join();
}
// Check existence
uint32_t cnt = 0;
for (auto &key : keys) {
if (cache.Get(key) != nullptr) {
cnt++;
}
}
// We don't make the policy a highly restricted container.
EXPECT_GT(cnt, 500);
LOG(INFO, "LRUPolicy Test Finished");
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment