Skip to content

Instantly share code, notes, and snippets.

@IronsDu
Created December 7, 2018 07:48
Show Gist options
  • Save IronsDu/9833a09ec3fd894d31484a676a33df00 to your computer and use it in GitHub Desktop.
Save IronsDu/9833a09ec3fd894d31484a676a33df00 to your computer and use it in GitHub Desktop.
#pragma once
#include <optional>
#include <map>
#include <vector>
#include <string>
#include <fstream>
#include "manifest.pb.h"
namespace dodo { namespace lsm {
using std::string;
using std::map;
using std::optional;
using std::vector;
struct KVPair
{
string key;
string value;
};
const static char SplitFlag = '#';
// 路标距离-每个10个key保存一个路标(sstable稀疏key信息)
const static size_t GuidepostAistance = 10;
class Utils
{
public:
static auto ParseUIntFromString(const char* const start, const char* const end)
{
size_t result = 0;
size_t currentTimes = 1;
for (const char* pos = end; pos >= start; pos--)
{
const char c = *pos;
if (c < '0' || c > '9')
{
throw std::runtime_error("number must between [0,9]");
}
result += ((c - '0') * currentTimes);
currentTimes *= 10;
}
return result;
}
// 计算一个正整数的显示长度(比如78901 返回 5)
static size_t CalcIntLength(size_t number)
{
size_t len = 1;
while (number > 10)
{
len++;
number /= 10;
}
return len;
}
};
// 写日志
class LogWriter
{
public:
using Ptr = std::shared_ptr<LogWriter>;
explicit LogWriter(string fileName)
:
mFile(fileName, std::ios::app)
{
}
virtual ~LogWriter()
{
flush();
}
void appendKV(const string& key, const string& value)
{
_appendStr(key);
_appendStr(value);
}
void appendKvList(const vector<KVPair>& kvList)
{
for (const auto& kv : kvList)
{
appendKV(kv.key, kv.value);
}
}
void flush()
{
mFile.flush();
}
private:
LogWriter(const LogWriter&) = delete;
const LogWriter& operator=(const LogWriter&) = delete;
void _appendStr(const string& str)
{
mFile
<< SplitFlag
<< str.size()
<< SplitFlag
<< str;
}
private:
std::ofstream mFile;
};
// 读取日志到KVPair列表
class LogReader
{
public:
explicit LogReader(string fileName)
:
mFile(fileName, std::ios::in)
{
}
virtual ~LogReader() = default;
vector<KVPair> read()
{
mFile.seekg(0);
std::string content((std::istreambuf_iterator<char>(mFile)), std::istreambuf_iterator<char>());
return ReadToKVListFromBuffer(content.data(), content.size());
}
vector<KVPair> readRange(size_t startPos, size_t endPos)
{
mFile.seekg(startPos);
string content;
content.resize(endPos - startPos);
mFile.read(content.data(), content.size());
return ReadToKVListFromBuffer(content.data(), content.size());
}
public:
static vector<KVPair> ReadToKVListFromBuffer(const char* const buffer, const size_t len)
{
vector<KVPair> result;
const char* current = buffer;
const char* const end = buffer + len;
while (true)
{
auto parser = [&]() -> optional<string> {
if (current >= end)
{
return std::nullopt;
}
const char flag = current[0];
assert(flag == SplitFlag);
if (flag != SplitFlag)
{
return std::nullopt;
}
current++;
const char* lenFlagEnd = strchr(current, SplitFlag);
if (lenFlagEnd == nullptr)
{
return std::nullopt;
}
const auto len = Utils::ParseUIntFromString(current, lenFlagEnd - 1);
lenFlagEnd++;
current = lenFlagEnd;
assert((end - lenFlagEnd) >= len);
if ((end - lenFlagEnd) < len)
{
return std::nullopt;
}
current += len;
return string(lenFlagEnd, len);
};
auto k = parser();
auto v = parser();
if (!k || !v)
{
break;
}
result.push_back(KVPair{ *k, *v });
};
return result;
}
private:
std::ifstream mFile;
};
// 缓存(memtable)
class KVCache
{
public:
virtual ~KVCache() = default;
void set(string key, string value) noexcept
{
mKVMap[key] = value;
}
optional<string> get(string key) const noexcept
{
optional<string> result;
if (auto it = mKVMap.find(key); it != mKVMap.end())
{
result = (*it).second;
}
return result;
}
vector<KVPair> dunpToListOrderByKey() const
{
vector<KVPair> orderList;
for (const auto& v : mKVMap)
{
orderList.push_back(KVPair{ v.first, v.second });
}
return orderList;
}
private:
map<string, string> mKVMap;
};
class Helper
{
public:
// 读取日志文件到缓存
static void ReadLogToKVCache(KVCache& cache, string fileName)
{
LogReader reader(fileName);
auto kvList = reader.read();
for (const auto& pair : kvList)
{
cache.set(pair.key, pair.value);
}
}
// 从某个日志文件中[startPos-endPos)区间查找key对应的value
static optional<string> FindValueFromFileRange(string fileName, string key, size_t startPos, size_t endPos)
{
LogReader reader(fileName);
auto kvList = reader.readRange(startPos, endPos);
for (const auto& pair : kvList)
{
if (pair.key == key)
{
return pair.value;
}
}
return std::nullopt;
}
};
class Lsm
{
public:
explicit Lsm(string maniFestFileName)
:
mManifestFileName(maniFestFileName)
{
}
// 根据manifest初始化
bool init()
{
do
{
if (!readManifestFile())
{
break;
}
if (mManifestFile.log_file_name().empty())
{
mManifestFile.set_log_file_name(makeNewLogFileName());
}
mLogWriter = std::make_shared<LogWriter>(mManifestFile.log_file_name());
rebuildCache();
return true;
} while (0);
return false;
}
optional<string> get(string key)
{
// 先查找memtable
if (auto v = mCache.get(key); v)
{
return v;
}
for (auto it = mManifestFile.sstables().rbegin(); it != mManifestFile.sstables().rend(); it++)
{
const auto& sstable = (*it);
::dodo::log::SSTableFileMetaInfo_KeyGuidepost tmp;
tmp.set_key(key);
auto lower = std::lower_bound(sstable.guideposts().begin(), sstable.guideposts().end(), tmp,
[&](const ::dodo::log::SSTableFileMetaInfo_KeyGuidepost& lhs, const ::dodo::log::SSTableFileMetaInfo_KeyGuidepost& rhs) {
return lhs.key() < rhs.key();
});
auto uper = std::upper_bound(sstable.guideposts().begin(), sstable.guideposts().end(), tmp,
[&](const ::dodo::log::SSTableFileMetaInfo_KeyGuidepost& lhs, const ::dodo::log::SSTableFileMetaInfo_KeyGuidepost& rhs) {
return lhs.key() <= rhs.key();
});
if (lower == sstable.guideposts().end() || uper == sstable.guideposts().end())
{
continue;
}
auto value = Helper::FindValueFromFileRange(sstable.file_name(), key, (*lower).start_pos(), (*uper).end_pos());
if (value)
{
return value;
}
}
return std::nullopt;
}
void set(string key, string value)
{
assert(mLogWriter != nullptr);
if (mLogWriter == nullptr)
{
return;
}
auto oldValue = mCache.get(key);
mCache.set(key, value);
//TODO::如果append失败?还原cache为oldValue?
mLogWriter->appendKV(key, value);
//TODO::是否立即flush
mLogWriter->flush();
}
// 持久化当前kv,并且清空当前log文件
// TODO::其中的原子性保证
// TODO::需要一个机制决定何时调用此函数
void save()
{
// 得到新sstable文件名称
string fileName("sstable_");
fileName += std::to_string(mManifestFile.sstables().size());
fileName += ".txt";
const auto orderKvList = mCache.dunpToListOrderByKey();
LogWriter wirter(fileName);
wirter.appendKvList(orderKvList);
wirter.flush();
// 在内存中修改manifest文件 - 添加新的sstable信息
{
auto sstable = mManifestFile.add_sstables();
sstable->set_file_name(fileName);
size_t currentFileSize = 0;
const auto calcKVSpaceSize = [](const KVPair& kv) {
size_t len = 0;
len += 4;
len += (Utils::CalcIntLength(kv.key.size()));
len += kv.key.size();
len += (Utils::CalcIntLength(kv.value.size()));
len += kv.value.size();
return len;
};
const auto incFileSize = [&](const KVPair& kv) {
//TODO::序列化数据的格式变动之后这里计算文件大小也需要变动
currentFileSize += calcKVSpaceSize(kv);
};
const auto addGuidepost = [&](const KVPair& kv) {
auto guidepostPtr = sstable->add_guideposts();
guidepostPtr->set_key(kv.key);
guidepostPtr->set_start_pos(currentFileSize);
guidepostPtr->set_end_pos(currentFileSize + calcKVSpaceSize(kv));
};
// 默认为最大值,即默认会将head key记录到路标中
size_t counter = GuidepostAistance;
for (size_t i = 0; i < orderKvList.size(); i++)
{
const auto& guidepost = orderKvList[i];
// 如果达到计数或者达到最后一个key
if (counter == GuidepostAistance ||
(i + 1) == orderKvList.size())
{
addGuidepost(guidepost);
counter = 0;
}
else
{
counter++;
}
incFileSize(guidepost);
}
}
// 清空cache
mCache = KVCache();
// (增加新的sstable之后)更新当前log文件名称
mManifestFile.set_log_file_name(makeNewLogFileName());
// 保存manifest
saveManifestFile();
// 重新打开新的log文件
mLogWriter = std::make_shared<LogWriter>(mManifestFile.log_file_name());
}
private:
string makeNewLogFileName()
{
string logFileName("log_");
logFileName += std::to_string(mManifestFile.sstables().size());
logFileName += ".txt";
return logFileName;
}
// 读取(反序列化)manifest文件
bool readManifestFile()
{
std::ifstream file(mManifestFileName);
if (!file)
{
return false;
}
std::string content((std::istreambuf_iterator<char>(file)), std::istreambuf_iterator<char>());
if (content.empty())
{
content = "{}";
}
auto status = google::protobuf::util::JsonStringToMessage(content, &mManifestFile);
assert(status.ok());
return status.ok();
}
// 序列化manifest文件
void saveManifestFile()
{
string content;
google::protobuf::util::MessageToJsonString(mManifestFile, &content);
std::ofstream file(mManifestFileName, std::ios::trunc);
file << content;
file.flush();
}
// 根据log文件重建(未持久化到sstable的)cache
void rebuildCache()
{
Helper::ReadLogToKVCache(mCache, mManifestFile.log_file_name());
}
private:
KVCache mCache;
string mManifestFileName;
dodo::log::ManifestFile mManifestFile;
LogWriter::Ptr mLogWriter;
};
} }
/*
{
dodo::lsm::Lsm lsm("manifest.json");
lsm.init();
auto value = lsm.get("abc");
lsm.set("abc", "123");
lsm.set("xyz", "321");
lsm.set("acdss", "123");
lsm.set("agsge", "123");
lsm.set("oaowgseg", "321");
lsm.save();
}
*/
syntax = "proto3";
package dodo.log;
// 单个sstable文件的元信息
message SSTableFileMetaInfo
{
// 路标(用于表示当前sstable中某些key的位置)
message KeyGuidepost
{
string key = 1;
// 在文件中的开始位置和结束位置
int32 start_pos = 2;
int32 end_pos = 3;
}
// sstable文件名称
string file_name = 1;
// 所有的路标节点
repeated KeyGuidepost guideposts = 2;
}
message ManifestFile
{
string log_file_name = 1;
repeated SSTableFileMetaInfo sstables = 2;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment