Created
August 20, 2019 14:35
-
-
Save niujiabenben/3136338fb40ba2ecc1f7ef20c5bbbad5 to your computer and use it in GitHub Desktop.
libcurl multi inferface example
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include "httpclient.h" | |
static inline size_t WriteCallback(void* contents, size_t size, size_t nmemb, | |
void* userp) { | |
((std::string*) userp)->append((char*) contents, size * nmemb); | |
return size * nmemb; | |
} | |
//////////////////////////// class HttpClient ////////////////////////////////// | |
HttpClient::HttpClient() { | |
curl_global_init(CURL_GLOBAL_ALL); | |
handle_ = curl_multi_init(); | |
CHECK(handle_ != NULL) << "Failed to create curl_multi handle."; | |
thread_ = std::thread([this] { Run(); }); | |
} | |
HttpClient::~HttpClient() { | |
stop_ = true; | |
condition_.notify_all(); | |
if (thread_.joinable()) { thread_.join(); } | |
//// 清理各种handle的顺序, 请参考: | |
//// https://curl.haxx.se/libcurl/c/curl_multi_cleanup.html | |
for (auto& request : running_requests_) { | |
if (request->curl != NULL) { | |
curl_multi_remove_handle(handle_, request->curl); | |
curl_easy_cleanup(request->curl); | |
request->curl = NULL; | |
} | |
} | |
for (auto& kv : curl_pool_) { | |
while (!kv.second.empty()) { | |
curl_easy_cleanup(kv.second.top()); | |
kv.second.pop(); | |
} | |
} | |
curl_multi_cleanup(handle_); | |
} | |
void HttpClient::AddRequest(HttpRequestPtr request) { | |
if (true) { | |
std::lock_guard<std::mutex> lock(mutex_new_); | |
new_requests_.push(std::move(request)); | |
} | |
condition_.notify_all(); | |
} | |
HttpRequestPtr HttpClient::GetRequest(const std::string& name) { | |
std::lock_guard<std::mutex> lock(mutex_ready_); | |
auto it = ready_requests_.begin(); | |
for (; it != ready_requests_.end(); ++it) { | |
if ((*it)->name == name) { | |
auto copy = *it; | |
ready_requests_.erase(it); | |
return copy; | |
} | |
} | |
return HttpRequestPtr(); | |
} | |
void HttpClient::Run() { | |
const int waittime = 1000; // in millisecond | |
int still_running = 0; | |
int msgs_left = 0; | |
while (!stop_) { | |
//// wait util we have new requests | |
if (running_requests_.empty()) { | |
std::unique_lock<std::mutex> lock(mutex_new_); | |
condition_.wait(lock, [this] { | |
return stop_ || (!new_requests_.empty()); | |
}); | |
if (stop_) { break; } | |
} | |
//// add all of new requests into running_requests | |
while (true) { | |
HttpRequestPtr request; | |
if (true) { | |
std::lock_guard<std::mutex> lock(mutex_new_); | |
if (new_requests_.empty()) { break; } | |
request = std::move(new_requests_.front()); | |
new_requests_.pop(); | |
} | |
AddRequestToHandle(std::move(request)); | |
} | |
//// perform & wait & read | |
curl_multi_perform(handle_, &still_running); | |
curl_multi_wait(handle_, NULL, 0, waittime, NULL); | |
CURLMsg* msg = NULL; | |
do { | |
msg = curl_multi_info_read(handle_, &msgs_left); | |
if ((msg != NULL) && (msg->msg == CURLMSG_DONE)) { | |
RemoveRequestFromHandle(msg->easy_handle); | |
} | |
} while (msg != NULL); | |
} | |
} | |
void HttpClient::AddRequestToHandle(HttpRequestPtr request) { | |
CHECK_LT(request->retries, request->max_retries); | |
request->http_status = -1; | |
request->response.clear(); | |
if (request->curl != NULL) { | |
curl_easy_cleanup(request->curl); | |
request->curl = NULL; | |
} | |
//// 优先从pool中获取 | |
auto iter = curl_pool_.find(request->url); | |
if (iter != curl_pool_.end()) { | |
if (!iter->second.empty()) { | |
request->curl = iter->second.top(); | |
iter->second.pop(); | |
} | |
} | |
//// 若pool中没有, 则自己创建一个 | |
if (request->curl == NULL) { | |
request->curl = curl_easy_init(); | |
CHECK(request->curl != NULL) << "Failed to initialize curl"; | |
} | |
curl_slist* slist = NULL; | |
if (request->type == HttpRequest::Type::PROTOBUF) { | |
slist = curl_slist_append(slist, "application/x-protobuf"); | |
slist = curl_slist_append(slist, "Content-Type:application/x-protobuf"); | |
slist = curl_slist_append(slist, "Accept:application/x-protobuf"); | |
} else if (request->type == HttpRequest::Type::JSON) { | |
slist = curl_slist_append(slist, "application/json"); | |
slist = curl_slist_append(slist, "Content-Type:application/json"); | |
slist = curl_slist_append(slist, "Accept:application/json"); | |
} | |
curl_easy_setopt(request->curl, CURLOPT_HTTPHEADER, slist); | |
curl_easy_setopt(request->curl, CURLOPT_POST, 1L); | |
curl_easy_setopt(request->curl, CURLOPT_HEADER, 0L); | |
curl_easy_setopt(request->curl, CURLOPT_URL, request->url.c_str()); | |
curl_easy_setopt(request->curl, CURLOPT_POSTFIELDS, request->data.c_str()); | |
curl_easy_setopt(request->curl, CURLOPT_POSTFIELDSIZE, request->data.size()); | |
curl_easy_setopt(request->curl, CURLOPT_TIMEOUT, request->timeout); | |
curl_easy_setopt(request->curl, CURLOPT_WRITEFUNCTION, WriteCallback); | |
curl_easy_setopt(request->curl, CURLOPT_WRITEDATA, &request->response); | |
curl_multi_add_handle(handle_, request->curl); | |
running_requests_.push_back(std::move(request)); | |
request->retries += 1; | |
} | |
void HttpClient::RemoveRequestFromHandle(CURL* curl) { | |
int http_status = 0; | |
curl_multi_remove_handle(handle_, curl); | |
curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, http_status); | |
HttpRequestPtr request; | |
auto it = running_requests_.begin(); | |
for (; it != running_requests_.end(); ++it) { | |
if ((*it)->curl == curl) { | |
request = std::move(*it); | |
running_requests_.erase(it); | |
break; | |
} | |
} | |
request->http_status = http_status; | |
if (http_status != 200) { | |
request->response.clear(); | |
curl_easy_cleanup(curl); | |
request->curl = NULL; | |
if (request->retries < request->max_retries) { | |
AddRequestToHandle(std::move(request)); | |
} else { | |
std::lock_guard<std::mutex> lock(mutex_ready_); | |
ready_requests_.push_back(std::move(request)); | |
} | |
} else { | |
//// 这里参考: https://stackoverflow.com/questions/14911156/how-to-properly-reuse-a-curl-handle | |
curl_easy_reset(curl); | |
request->curl = NULL; | |
if (curl_pool_.find(request->url) == curl_pool_.end()) { | |
curl_pool_[request->url] = std::stack<CURL*>(); | |
} | |
curl_pool_[request->url].push(curl); | |
std::lock_guard<std::mutex> lock(mutex_ready_); | |
ready_requests_.push_back(std::move(request)); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#ifndef UNITTEST_HTTPCLIENT_H_ | |
#define UNITTEST_HTTPCLIENT_H_ | |
#include <curl/curl.h> | |
#include "common.h" | |
struct HttpRequest; | |
class HttpClient; | |
using HttpRequestPtr = std::shared_ptr<HttpRequest>; | |
using HttpClientPtr = std::shared_ptr<HttpClient>; | |
struct HttpRequest { | |
enum class Type { PROTOBUF, JSON }; | |
std::string name; | |
std::string url; | |
std::string data; | |
Type type; | |
int timeout; | |
int max_retries; | |
int retries{0}; | |
int http_status{-1}; | |
std::string response; | |
CURL* curl{NULL}; | |
bool good() const { return http_status == 200; } | |
}; | |
class HttpClient { | |
public: | |
HttpClient(); | |
DISABLE_COPY_ASIGN(HttpClient); | |
DISABLE_MOVE_ASIGN(HttpClient); | |
~HttpClient(); | |
//// 每一个request需要设置如下变量: | |
//// name, url, data, type, timeout, max_retries | |
//// 且调用者需要保证name唯一. | |
void AddRequest(HttpRequestPtr request); | |
//// 由名字查找已经ready的request. 如果没有ready, 则返回空指针. | |
//// 如果request->good()为true, 则request->response为请求结果, | |
//// 否则request->response为空. | |
//// 返回的request中, 用户设置的参数不变, request->curl恒为空. | |
HttpRequestPtr GetRequest(const std::string& name); | |
private: | |
void Run(); | |
void AddRequestToHandle(HttpRequestPtr request); | |
void RemoveRequestFromHandle(CURL* curl); | |
std::queue<HttpRequestPtr> new_requests_; | |
std::list<HttpRequestPtr> running_requests_; | |
std::list<HttpRequestPtr> ready_requests_; | |
CURLM* handle_{NULL}; | |
std::map<std::string, std::stack<CURL*>> curl_pool_; | |
std::thread thread_; | |
mutable std::mutex mutex_new_; | |
mutable std::mutex mutex_ready_; | |
mutable std::condition_variable condition_; | |
std::atomic<bool> stop_{false}; | |
}; | |
#endif // UNITTEST_HTTPCLIENT_H_ |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment