-
-
Save jobs-git/6e1d5f3de07e8e045e6c1923f913cea5 to your computer and use it in GitHub Desktop.
Asynchronous client for Tarantool with synchronous interface
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <stdio.h> | |
#include <stdlib.h> | |
#include <time.h> | |
#include <sys/time.h> | |
#include <tarantool/tarantool.h> | |
#include <tarantool/tnt_net.h> | |
#include <tarantool/tnt_opt.h> | |
#include <pthread.h> | |
#include <errno.h> | |
#include <string.h> | |
#include <unistd.h> | |
#include <deque> | |
#include <thread> | |
#include <mutex> | |
#include <condition_variable> | |
#include <cassert> | |
// An object of this class can be used to connect to tarantool | |
// and then to issue requests on one connection from different threads. | |
// | |
// The key thing about this class is that one object is good to serve all the workload | |
// from a single machiune because Tarantool allows using one socket in parallel for different | |
// requests | |
// Plus this workload will be served extremely efficient because under the hoop inside the | |
// standard Tarantool c-library all the parallel requests will be packed into a single packet and | |
// all the parallel responses will come in in a single packet which allows to recude the number | |
// of system calls at order or magitude or even more | |
class TarantoolConnection | |
{ | |
public: | |
// conn_string - "IP:port" | |
TarantoolConnection(const char *conn_string); | |
bool IsConnected() { return connected_; } | |
// Requests the server for the spercified query then receives a response | |
int DoQuery(struct tnt_stream *tuple, struct tnt_reply **result); | |
private: | |
void SendingThread() | |
{ | |
int j = (int)time_t(NULL); | |
struct tnt_stream *tuple = tnt_object(NULL); | |
// Constantly sending everything from the out queue | |
while (true) | |
{ | |
std::deque<tnt_stream*> temp; | |
{ | |
std::unique_lock<std::mutex> l(out_mutex_); | |
// Waiting while out_queue is empty | |
// out_cond should be fired once a queue is not empty | |
while (out_queue_.empty()) | |
out_cond_.wait(l); | |
// Copy all the queue to the temp one | |
// We're doing that under the mutex | |
// In fact this is extremely fast as it is the swap operation | |
temp.swap(out_queue_); | |
} | |
assert(!temp.empty()); | |
// printf("sending %d requests\n", temp.size()); | |
// Now we don't need a mutex. Just send everything over the network | |
for (auto i = temp.begin(); i != temp.end(); ++i) | |
{ | |
tnt_object_add_array(tuple, 2); | |
tnt_object_add_int(tuple, j); | |
tnt_object_add_int(tuple, j); | |
tnt_replace(tnt_, 512, tuple); | |
++j; | |
// tnt_flush(tnt_); | |
tnt_object_reset(tuple); | |
} | |
tnt_flush(tnt_); | |
// tnt_object_reset(tuple); | |
// Notify the receiveing thread that it can start receive data | |
// It we did not than it would in an active waiting as read_reply would return 1 | |
send_notify_cond_.notify_all(); | |
} // while | |
} | |
void ReceivingThread() | |
{ | |
struct tnt_reply reply; | |
tnt_reply_init(&reply); | |
// Constantly receiving everything and putting it to the in queue | |
while (true) | |
{ | |
// Receive everything that we can receive | |
std::deque<struct tnt_reply*> temp; | |
/*while (true) | |
{ | |
struct tnt_reply *reply = tnt_reply_init(NULL); | |
int r = tnt_->read_reply(tnt_, reply); | |
if (!r) | |
break; | |
temp.push_back(reply); | |
}*/ | |
// Receive as much as we can but no more than 10000 | |
int r = 0; | |
int counter; | |
for (counter = 0; counter < 10000; ++counter) | |
{ | |
r = tnt_->read_reply(tnt_, &reply); | |
if (r == 0) | |
temp.push_back(NULL); | |
else | |
break; | |
} | |
// printf("counter=%d\n", counter); | |
// We got the result | |
if (!temp.empty()) | |
{ | |
// Put it to the in queue under the mutex | |
std::lock_guard<std::mutex> l(in_mutex_); | |
in_queue_.insert(in_queue_.end(), temp.begin(), temp.end()); | |
// We're notifying all the threads, but it would be better to notify only those of them | |
// that waits for the data in just received "temp" | |
in_cond_.notify_all(); | |
} | |
// We ain't got any result | |
else | |
{ | |
if (r == -1) | |
fprintf(stderr, "Error receiveing response: r=%d, '%s'\n", r, reply.error); | |
else | |
// No result - just wait util it comes in | |
if (r == 1) | |
{ | |
// Waiting for a send thread to send data to network | |
// We have to wait because right now we don't have any data to read from network | |
std::unique_lock<std::mutex> l(send_notify_mutex_); | |
send_notify_cond_.wait(l); | |
} | |
} | |
} | |
} | |
void TimerThread() | |
{ | |
int64_t prev_answers = 0, rps; | |
while (true) | |
{ | |
sleep(1); | |
{ | |
std::lock_guard<std::mutex> l(in_mutex_); | |
rps = num_answers_ - prev_answers; | |
prev_answers = num_answers_; | |
} | |
printf("RPS=%d\n", rps); | |
fflush(stdout); | |
} | |
} | |
std::deque<struct tnt_reply*> in_queue_; | |
std::deque<tnt_stream*> out_queue_; | |
std::mutex in_mutex_, out_mutex_, send_notify_mutex_; | |
std::condition_variable in_cond_, out_cond_, send_notify_cond_; | |
std::thread send_thread_, receive_thread_, timer_thread_; | |
struct tnt_stream *tnt_; | |
bool connected_; | |
int64_t num_answers_; | |
}; | |
TarantoolConnection::TarantoolConnection(const char *conn_string) | |
{ | |
num_answers_ = 0; | |
connected_ = false; | |
tnt_ = tnt_net(NULL); | |
tnt_set(tnt_, TNT_OPT_URI, conn_string); | |
if (tnt_connect(tnt_) < 0) | |
{ | |
fprintf(stderr, "Connection refused on '%s'\n", conn_string); | |
fflush(stderr); | |
return; | |
} | |
connected_ = true; | |
// Starting threads | |
send_thread_ = std::thread( [=] { SendingThread(); }); | |
receive_thread_ = std::thread( [=] { ReceivingThread(); }); | |
timer_thread_ = std::thread( [=] { TimerThread(); }); | |
} | |
int TarantoolConnection::DoQuery(struct tnt_stream *tuple, struct tnt_reply **result) | |
{ | |
// Put the query into outgoing queue | |
// Protect this queue from multithreading access | |
{ | |
std::lock_guard<std::mutex> l(out_mutex_); | |
out_queue_.push_back(tuple); | |
// Notify the sending thread that it can send data | |
out_cond_.notify_all(); | |
} | |
// Now this query should be processed in a background sending thread | |
// There is something in queue - get this answer to the client | |
std::unique_lock<std::mutex> l(in_mutex_); | |
{ | |
// Wait until we have data in the in_queue | |
while (in_queue_.empty()) | |
in_cond_.wait(l); | |
assert(!in_queue_.empty()); | |
*result = in_queue_.front(); | |
in_queue_.pop_front(); | |
++num_answers_; | |
} | |
// if (!(num_answers_%100000)) | |
// printf("num_answers_=%d\n", num_answers_); | |
return 1; | |
} | |
void DoTest(TarantoolConnection *conn) | |
{ | |
struct tnt_reply *result; | |
for (int i = 0; i < 100000000; ++i) | |
conn->DoQuery(NULL, &result); | |
} | |
int main() | |
{ | |
TarantoolConnection conn("172.31.26.200:3301"); | |
if (!conn.IsConnected()) | |
return 0; | |
std::thread t[1000]; | |
for (int i = 0;i < 1000;++i) | |
t[i] = std::thread(&DoTest, &conn); | |
sleep(1000000); | |
return 0; | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment