Last active
January 11, 2024 09:36
-
-
Save sotex/97e171e153c79d09553b7717e3637fd5 to your computer and use it in GitHub Desktop.
singleflight的C++实现版本
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// solym | |
// [email protected] | |
// 2020年3月16日 20点28分 | |
#include <map> | |
#include <memory> | |
#include <mutex> | |
#include <condition_variable> | |
template<typename _Kty, typename _Ty> | |
class SingleFlight | |
{ | |
// 保存实际执行结果 | |
struct _Result | |
{ | |
bool _done; // 条件量 | |
// 这里也可以使用读写锁来实现,会简单一点 | |
// 第一个操作的线程加写锁,后续的线程加读锁,写完成之后,读锁不再阻塞,即可获取结果 | |
std::mutex _mtx; // 条件变量互斥锁 | |
std::condition_variable _cv; // 条件变量,用于通知 | |
// _result 用于保存唯一那个真的执行处理的结果 | |
// 这里需要考虑 Do 参数 func 函数的实际输入输出参数 | |
// 不一定是返回值 | |
_Ty _result; | |
}; | |
// 实际执行的结果保存 | |
std::mutex _domtx; | |
std::map<_Kty, std::shared_ptr<_Result>> _do; | |
public: | |
// 执行操作 | |
// key 用于区分请求 | |
// func 实际执行操作的函数 | |
// args 实际执行操作函数的参数 | |
template<class F, typename... Args> | |
_Ty Do(_Kty key, F&& func, Args&&... args) | |
{ | |
// 先加写锁,后面可能要修改 | |
std::unique_lock<std::mutex> lock(_domtx); | |
// 判断是否已经存在执行结果 | |
auto iter = _do.find(key); | |
// 存在就等待完成 | |
if (iter != _do.end()) | |
{ | |
// 获取实际执行结果结构 | |
std::shared_ptr<_Result> pRes = iter->second; | |
lock.unlock(); | |
// 等待条件成立(也就是实际执行的那个线程执行完成) | |
std::unique_lock<std::mutex> lck(pRes->_mtx); | |
pRes->_cv.wait(lck, [pRes]() -> bool { return pRes->_done; }); | |
// 获取执行结果进行返回 | |
return pRes->_result; | |
} | |
// 不存在就创建一个操作结果 | |
std::shared_ptr<_Result> pRes = std::make_shared<_Result>(); | |
pRes->_done = false; // 设置初始条件为 false | |
_do[key] = pRes; | |
lock.unlock(); // 解锁,别的线程能够继续 | |
// 执行真正的操作,获取返回结果 | |
pRes->_result = func(args...); | |
{ | |
std::lock_guard<std::mutex> lck(pRes->_mtx); | |
pRes->_done = true; | |
} | |
// 通知所有线程 | |
pRes->_cv.notify_all(); | |
// 移除(也可以放在一个延迟移除队列,进行异步移除,以便后续的 | |
// 相同key操作也可以直接使用。但在这外面缓存结果会更好) | |
lock.lock(); | |
_do.erase(key); | |
return pRes->_result; | |
} | |
}; |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// solym | |
// [email protected] | |
// 2020年3月16日 20点28分 | |
#include "singleflight.hpp" | |
#include <string> | |
#include <thread> | |
#include <vector> | |
#include <chrono> | |
#include <ctime> | |
#include <iomanip> | |
#include <iostream> | |
#include <sstream> | |
// 获取时间戳 | |
std::string timestamp() | |
{ | |
std::time_t now = std::time(nullptr); | |
std::ostringstream oss; | |
oss << std::put_time(std::localtime(&now), "%Y-%m-%d %H.%M.%S "); | |
return oss.str(); | |
} | |
// 获取当前线程ID | |
std::string threadid() | |
{ | |
std::thread::id this_id = std::this_thread::get_id(); | |
std::ostringstream oss; | |
oss << this_id; | |
return oss.str(); | |
} | |
// 输出简单日志 | |
void outlog(const char* msg) | |
{ | |
printf("%s [%8s] %s\n",timestamp().c_str(),threadid().c_str(), msg); | |
} | |
// 一个简单的测试函数(假设是比较耗时的绘图操作) | |
void draw(int w, int h, std::vector<uint8_t>& out) | |
{ | |
outlog(" draw begin "); | |
// 休眠 | |
std::this_thread::sleep_for(std::chrono::seconds(3)); | |
for (size_t j = 0; j < 10; ++j) | |
{ | |
out[j] = 'a' + j; | |
} | |
outlog(" draw ended"); | |
} | |
int main() | |
{ | |
// 创建一个 singlefilght 对象 | |
// 这里用于接收返回值的是一个 shared_ptr 避免多次拷贝返回结果 | |
SingleFlight<std::string, std::shared_ptr<std::vector<uint8_t> > > sl; | |
// 创建十个线程 | |
for (size_t i = 0; i < 10; ++i) | |
{ | |
std::thread thr([&sl]() { | |
outlog(" Begin"); | |
// 对 draw 操作进行包装 | |
// 便于输出需要的结果形式 | |
std::shared_ptr<std::vector<uint8_t> > out = sl.Do( | |
"100", [](int h, int w) -> std::shared_ptr<std::vector<uint8_t> > { | |
std::vector<uint8_t> out(100,0); | |
draw(h, w, out); | |
return std::make_shared<std::vector<uint8_t> >(out); | |
}, | |
100, | |
100); | |
char buffer[256]; | |
sprintf(buffer," %p Out: %s",out->data(),(char*)out->data()); | |
outlog(buffer); | |
std::cout << std::endl; | |
outlog(" Ended: "); | |
}); | |
thr.detach(); // 线程分离执行 | |
} | |
outlog(" Main Begin "); | |
// 休眠等待所有线程正常返回 | |
std::this_thread::sleep_for(std::chrono::seconds(5)); | |
outlog(" Main Ended "); | |
return 0; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment