Created
December 7, 2018 07:48
-
-
Save IronsDu/9833a09ec3fd894d31484a676a33df00 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#pragma once | |
#include <optional> | |
#include <map> | |
#include <vector> | |
#include <string> | |
#include <fstream> | |
#include "manifest.pb.h" | |
namespace dodo { namespace lsm { | |
using std::string; | |
using std::map; | |
using std::optional; | |
using std::vector; | |
struct KVPair | |
{ | |
string key; | |
string value; | |
}; | |
const static char SplitFlag = '#'; | |
// 路标距离-每个10个key保存一个路标(sstable稀疏key信息) | |
const static size_t GuidepostAistance = 10; | |
class Utils | |
{ | |
public: | |
static auto ParseUIntFromString(const char* const start, const char* const end) | |
{ | |
size_t result = 0; | |
size_t currentTimes = 1; | |
for (const char* pos = end; pos >= start; pos--) | |
{ | |
const char c = *pos; | |
if (c < '0' || c > '9') | |
{ | |
throw std::runtime_error("number must between [0,9]"); | |
} | |
result += ((c - '0') * currentTimes); | |
currentTimes *= 10; | |
} | |
return result; | |
} | |
// 计算一个正整数的显示长度(比如78901 返回 5) | |
static size_t CalcIntLength(size_t number) | |
{ | |
size_t len = 1; | |
while (number > 10) | |
{ | |
len++; | |
number /= 10; | |
} | |
return len; | |
} | |
}; | |
// 写日志 | |
class LogWriter | |
{ | |
public: | |
using Ptr = std::shared_ptr<LogWriter>; | |
explicit LogWriter(string fileName) | |
: | |
mFile(fileName, std::ios::app) | |
{ | |
} | |
virtual ~LogWriter() | |
{ | |
flush(); | |
} | |
void appendKV(const string& key, const string& value) | |
{ | |
_appendStr(key); | |
_appendStr(value); | |
} | |
void appendKvList(const vector<KVPair>& kvList) | |
{ | |
for (const auto& kv : kvList) | |
{ | |
appendKV(kv.key, kv.value); | |
} | |
} | |
void flush() | |
{ | |
mFile.flush(); | |
} | |
private: | |
LogWriter(const LogWriter&) = delete; | |
const LogWriter& operator=(const LogWriter&) = delete; | |
void _appendStr(const string& str) | |
{ | |
mFile | |
<< SplitFlag | |
<< str.size() | |
<< SplitFlag | |
<< str; | |
} | |
private: | |
std::ofstream mFile; | |
}; | |
// 读取日志到KVPair列表 | |
class LogReader | |
{ | |
public: | |
explicit LogReader(string fileName) | |
: | |
mFile(fileName, std::ios::in) | |
{ | |
} | |
virtual ~LogReader() = default; | |
vector<KVPair> read() | |
{ | |
mFile.seekg(0); | |
std::string content((std::istreambuf_iterator<char>(mFile)), std::istreambuf_iterator<char>()); | |
return ReadToKVListFromBuffer(content.data(), content.size()); | |
} | |
vector<KVPair> readRange(size_t startPos, size_t endPos) | |
{ | |
mFile.seekg(startPos); | |
string content; | |
content.resize(endPos - startPos); | |
mFile.read(content.data(), content.size()); | |
return ReadToKVListFromBuffer(content.data(), content.size()); | |
} | |
public: | |
static vector<KVPair> ReadToKVListFromBuffer(const char* const buffer, const size_t len) | |
{ | |
vector<KVPair> result; | |
const char* current = buffer; | |
const char* const end = buffer + len; | |
while (true) | |
{ | |
auto parser = [&]() -> optional<string> { | |
if (current >= end) | |
{ | |
return std::nullopt; | |
} | |
const char flag = current[0]; | |
assert(flag == SplitFlag); | |
if (flag != SplitFlag) | |
{ | |
return std::nullopt; | |
} | |
current++; | |
const char* lenFlagEnd = strchr(current, SplitFlag); | |
if (lenFlagEnd == nullptr) | |
{ | |
return std::nullopt; | |
} | |
const auto len = Utils::ParseUIntFromString(current, lenFlagEnd - 1); | |
lenFlagEnd++; | |
current = lenFlagEnd; | |
assert((end - lenFlagEnd) >= len); | |
if ((end - lenFlagEnd) < len) | |
{ | |
return std::nullopt; | |
} | |
current += len; | |
return string(lenFlagEnd, len); | |
}; | |
auto k = parser(); | |
auto v = parser(); | |
if (!k || !v) | |
{ | |
break; | |
} | |
result.push_back(KVPair{ *k, *v }); | |
}; | |
return result; | |
} | |
private: | |
std::ifstream mFile; | |
}; | |
// 缓存(memtable) | |
class KVCache | |
{ | |
public: | |
virtual ~KVCache() = default; | |
void set(string key, string value) noexcept | |
{ | |
mKVMap[key] = value; | |
} | |
optional<string> get(string key) const noexcept | |
{ | |
optional<string> result; | |
if (auto it = mKVMap.find(key); it != mKVMap.end()) | |
{ | |
result = (*it).second; | |
} | |
return result; | |
} | |
vector<KVPair> dunpToListOrderByKey() const | |
{ | |
vector<KVPair> orderList; | |
for (const auto& v : mKVMap) | |
{ | |
orderList.push_back(KVPair{ v.first, v.second }); | |
} | |
return orderList; | |
} | |
private: | |
map<string, string> mKVMap; | |
}; | |
class Helper | |
{ | |
public: | |
// 读取日志文件到缓存 | |
static void ReadLogToKVCache(KVCache& cache, string fileName) | |
{ | |
LogReader reader(fileName); | |
auto kvList = reader.read(); | |
for (const auto& pair : kvList) | |
{ | |
cache.set(pair.key, pair.value); | |
} | |
} | |
// 从某个日志文件中[startPos-endPos)区间查找key对应的value | |
static optional<string> FindValueFromFileRange(string fileName, string key, size_t startPos, size_t endPos) | |
{ | |
LogReader reader(fileName); | |
auto kvList = reader.readRange(startPos, endPos); | |
for (const auto& pair : kvList) | |
{ | |
if (pair.key == key) | |
{ | |
return pair.value; | |
} | |
} | |
return std::nullopt; | |
} | |
}; | |
class Lsm | |
{ | |
public: | |
explicit Lsm(string maniFestFileName) | |
: | |
mManifestFileName(maniFestFileName) | |
{ | |
} | |
// 根据manifest初始化 | |
bool init() | |
{ | |
do | |
{ | |
if (!readManifestFile()) | |
{ | |
break; | |
} | |
if (mManifestFile.log_file_name().empty()) | |
{ | |
mManifestFile.set_log_file_name(makeNewLogFileName()); | |
} | |
mLogWriter = std::make_shared<LogWriter>(mManifestFile.log_file_name()); | |
rebuildCache(); | |
return true; | |
} while (0); | |
return false; | |
} | |
optional<string> get(string key) | |
{ | |
// 先查找memtable | |
if (auto v = mCache.get(key); v) | |
{ | |
return v; | |
} | |
for (auto it = mManifestFile.sstables().rbegin(); it != mManifestFile.sstables().rend(); it++) | |
{ | |
const auto& sstable = (*it); | |
::dodo::log::SSTableFileMetaInfo_KeyGuidepost tmp; | |
tmp.set_key(key); | |
auto lower = std::lower_bound(sstable.guideposts().begin(), sstable.guideposts().end(), tmp, | |
[&](const ::dodo::log::SSTableFileMetaInfo_KeyGuidepost& lhs, const ::dodo::log::SSTableFileMetaInfo_KeyGuidepost& rhs) { | |
return lhs.key() < rhs.key(); | |
}); | |
auto uper = std::upper_bound(sstable.guideposts().begin(), sstable.guideposts().end(), tmp, | |
[&](const ::dodo::log::SSTableFileMetaInfo_KeyGuidepost& lhs, const ::dodo::log::SSTableFileMetaInfo_KeyGuidepost& rhs) { | |
return lhs.key() <= rhs.key(); | |
}); | |
if (lower == sstable.guideposts().end() || uper == sstable.guideposts().end()) | |
{ | |
continue; | |
} | |
auto value = Helper::FindValueFromFileRange(sstable.file_name(), key, (*lower).start_pos(), (*uper).end_pos()); | |
if (value) | |
{ | |
return value; | |
} | |
} | |
return std::nullopt; | |
} | |
void set(string key, string value) | |
{ | |
assert(mLogWriter != nullptr); | |
if (mLogWriter == nullptr) | |
{ | |
return; | |
} | |
auto oldValue = mCache.get(key); | |
mCache.set(key, value); | |
//TODO::如果append失败?还原cache为oldValue? | |
mLogWriter->appendKV(key, value); | |
//TODO::是否立即flush | |
mLogWriter->flush(); | |
} | |
// 持久化当前kv,并且清空当前log文件 | |
// TODO::其中的原子性保证 | |
// TODO::需要一个机制决定何时调用此函数 | |
void save() | |
{ | |
// 得到新sstable文件名称 | |
string fileName("sstable_"); | |
fileName += std::to_string(mManifestFile.sstables().size()); | |
fileName += ".txt"; | |
const auto orderKvList = mCache.dunpToListOrderByKey(); | |
LogWriter wirter(fileName); | |
wirter.appendKvList(orderKvList); | |
wirter.flush(); | |
// 在内存中修改manifest文件 - 添加新的sstable信息 | |
{ | |
auto sstable = mManifestFile.add_sstables(); | |
sstable->set_file_name(fileName); | |
size_t currentFileSize = 0; | |
const auto calcKVSpaceSize = [](const KVPair& kv) { | |
size_t len = 0; | |
len += 4; | |
len += (Utils::CalcIntLength(kv.key.size())); | |
len += kv.key.size(); | |
len += (Utils::CalcIntLength(kv.value.size())); | |
len += kv.value.size(); | |
return len; | |
}; | |
const auto incFileSize = [&](const KVPair& kv) { | |
//TODO::序列化数据的格式变动之后这里计算文件大小也需要变动 | |
currentFileSize += calcKVSpaceSize(kv); | |
}; | |
const auto addGuidepost = [&](const KVPair& kv) { | |
auto guidepostPtr = sstable->add_guideposts(); | |
guidepostPtr->set_key(kv.key); | |
guidepostPtr->set_start_pos(currentFileSize); | |
guidepostPtr->set_end_pos(currentFileSize + calcKVSpaceSize(kv)); | |
}; | |
// 默认为最大值,即默认会将head key记录到路标中 | |
size_t counter = GuidepostAistance; | |
for (size_t i = 0; i < orderKvList.size(); i++) | |
{ | |
const auto& guidepost = orderKvList[i]; | |
// 如果达到计数或者达到最后一个key | |
if (counter == GuidepostAistance || | |
(i + 1) == orderKvList.size()) | |
{ | |
addGuidepost(guidepost); | |
counter = 0; | |
} | |
else | |
{ | |
counter++; | |
} | |
incFileSize(guidepost); | |
} | |
} | |
// 清空cache | |
mCache = KVCache(); | |
// (增加新的sstable之后)更新当前log文件名称 | |
mManifestFile.set_log_file_name(makeNewLogFileName()); | |
// 保存manifest | |
saveManifestFile(); | |
// 重新打开新的log文件 | |
mLogWriter = std::make_shared<LogWriter>(mManifestFile.log_file_name()); | |
} | |
private: | |
string makeNewLogFileName() | |
{ | |
string logFileName("log_"); | |
logFileName += std::to_string(mManifestFile.sstables().size()); | |
logFileName += ".txt"; | |
return logFileName; | |
} | |
// 读取(反序列化)manifest文件 | |
bool readManifestFile() | |
{ | |
std::ifstream file(mManifestFileName); | |
if (!file) | |
{ | |
return false; | |
} | |
std::string content((std::istreambuf_iterator<char>(file)), std::istreambuf_iterator<char>()); | |
if (content.empty()) | |
{ | |
content = "{}"; | |
} | |
auto status = google::protobuf::util::JsonStringToMessage(content, &mManifestFile); | |
assert(status.ok()); | |
return status.ok(); | |
} | |
// 序列化manifest文件 | |
void saveManifestFile() | |
{ | |
string content; | |
google::protobuf::util::MessageToJsonString(mManifestFile, &content); | |
std::ofstream file(mManifestFileName, std::ios::trunc); | |
file << content; | |
file.flush(); | |
} | |
// 根据log文件重建(未持久化到sstable的)cache | |
void rebuildCache() | |
{ | |
Helper::ReadLogToKVCache(mCache, mManifestFile.log_file_name()); | |
} | |
private: | |
KVCache mCache; | |
string mManifestFileName; | |
dodo::log::ManifestFile mManifestFile; | |
LogWriter::Ptr mLogWriter; | |
}; | |
} } | |
/* | |
{ | |
dodo::lsm::Lsm lsm("manifest.json"); | |
lsm.init(); | |
auto value = lsm.get("abc"); | |
lsm.set("abc", "123"); | |
lsm.set("xyz", "321"); | |
lsm.set("acdss", "123"); | |
lsm.set("agsge", "123"); | |
lsm.set("oaowgseg", "321"); | |
lsm.save(); | |
} | |
*/ |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
syntax = "proto3"; | |
package dodo.log; | |
// 单个sstable文件的元信息 | |
message SSTableFileMetaInfo | |
{ | |
// 路标(用于表示当前sstable中某些key的位置) | |
message KeyGuidepost | |
{ | |
string key = 1; | |
// 在文件中的开始位置和结束位置 | |
int32 start_pos = 2; | |
int32 end_pos = 3; | |
} | |
// sstable文件名称 | |
string file_name = 1; | |
// 所有的路标节点 | |
repeated KeyGuidepost guideposts = 2; | |
} | |
message ManifestFile | |
{ | |
string log_file_name = 1; | |
repeated SSTableFileMetaInfo sstables = 2; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment