Last active
October 6, 2016 14:17
-
-
Save danikin/663ea6612f8e9ce9686c to your computer and use it in GitHub Desktop.
Load test for Tarantool
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <stdio.h> | |
#include <stdlib.h> | |
#include <time.h> | |
#include <sys/time.h> | |
#include <tarantool/tarantool.h> | |
#include <tarantool/tnt_net.h> | |
#include <tarantool/tnt_opt.h> | |
#include <pthread.h> | |
#include <errno.h> | |
#include <string.h> | |
#include <unistd.h> | |
typedef struct test_params | |
{ | |
char address[256]; | |
int is_read_test; | |
int is_response_in_thread; | |
int num_ops; | |
int batch_size; | |
// Maximum number of requests per second that a client can stand | |
// Example: | |
int client_max_rps; | |
} test_params; | |
typedef struct thread_data | |
{ | |
test_params *tp; | |
long long n_ops; | |
long long latency; | |
struct tnt_stream *tnt; | |
} thread_data; | |
int NUM_THREADS = 30; | |
void *read_thread(void *ctx) | |
{ | |
struct tnt_stream *tnt = (struct tnt_stream*)ctx; | |
int num_responses; | |
/* | |
struct tnt_reply reply; tnt_reply_init(&reply); | |
while (1) { | |
tnt->read_reply(tnt, &reply); // Read reply from server | |
if (reply.code != 0) | |
printf("Read reply failed %s\n", reply.error); | |
tnt_reply_free(&reply); // Free reply | |
}*/ | |
struct tnt_reply reply; | |
tnt_reply_init(&reply); | |
while (1)//tnt_next(&it)) | |
{ | |
// struct tnt_reply *r = TNT_IREPLY_PTR(&it); | |
// struct tnt_reply * reply = tnt_reply_init(NULL); // Initialize reply | |
tnt->read_reply(tnt, &reply); // Read reply from server | |
if (reply.code != 0) | |
printf("Read reply failed %s\n", reply.error); | |
tnt_reply_free(&reply); // Free reply | |
// if (r->code != 0) | |
// printf("Insert failed %s\n", r->error); | |
++num_responses; | |
if (!(num_responses%1000000)) | |
{ | |
printf("num_responses: %d\n", num_responses); | |
fflush(stdout); | |
} | |
} | |
/* if (num_responses != td->tp->batch_size) | |
{ | |
fprintf(stderr, "Invalid number of responses: %d, batch size is %d\n", num_responses, td->tp->batch_size); | |
fflush(stdout); | |
}*/ | |
} | |
void *do_test(void *ctx) | |
{ | |
thread_data *td = (thread_data*)ctx; | |
struct tnt_stream *tnt = tnt_net(NULL); /* See note = SETUP */ | |
td->tnt = tnt; | |
tnt_set(tnt, TNT_OPT_URI, td->tp->address); | |
if (tnt_connect(tnt) < 0) | |
{ | |
printf("Connection refused\n"); | |
exit(-1); | |
} | |
struct tnt_stream *tuple = tnt_object(NULL); /* See note = MAKE REQUEST */ | |
int k = (int)time(NULL); | |
char str[1]; | |
for (int i = 0; i < sizeof(str)-1; ++i) | |
str[i] = 'B'; | |
str[sizeof(str)-1] = 0; | |
struct timeval tv, prev_tv; | |
if (td->tp->is_response_in_thread) | |
{ | |
pthread_t read_pid; | |
int r = pthread_create(&read_pid, NULL, &read_thread, tnt); | |
if (r < 0) | |
{ | |
fprintf(stderr, "multithread_test: could not create thread, r=%d, errno='%s'\n", r, strerror(errno)); | |
fflush(stderr); | |
} | |
} | |
for (int i = 0; 1/*i < td->tp->num_ops / td->tp->batch_size*/; ++i) | |
{ | |
//tnt_object_format(tuple, "[%d%s]", k + i, "B"); | |
for (int j = 0; j < td->tp->batch_size; ++j) | |
{ | |
// Doing some pretend work on a client | |
// usleep(1000000/td->tp->client_max_rps); | |
/* gettimeofday(&tv, NULL); | |
// Calculate and save latency | |
// Latency of each query in the batch is less than latency for the whole batch | |
if (i) | |
{ | |
long long diff = (tv.tv_sec-prev_tv.tv_sec)*1000000 + (tv.tv_usec-prev_tv.tv_usec); | |
td->latency += td->tp->batch_size * diff; | |
} | |
prev_tv = tv; | |
*/ | |
// Form a request to a server | |
if (td->tp->is_read_test) | |
{ | |
tnt_object_add_array(tuple, 1); | |
tnt_object_add_int(tuple, k + i*td->tp->batch_size + j); | |
tnt_select(tnt, 512, 0, UINT32_MAX, 0, 0, tuple); | |
} | |
else | |
{ | |
tnt_object_add_array(tuple, 2); | |
tnt_object_add_int(tuple, k + i*td->tp->batch_size + j); | |
tnt_object_add_int(tuple, k + i*td->tp->batch_size + j); | |
// tnt_object_add_strz(tuple, str); | |
tnt_replace(tnt, 512, tuple); /* See note = SEND REQUEST */ | |
} | |
++td->n_ops; | |
tnt_flush(tnt); | |
tnt_object_reset(tuple); | |
} | |
//tnt_flush(tnt); | |
if (!td->tp->is_response_in_thread) | |
{ | |
int num_responses = 0; | |
struct tnt_iter it; tnt_iter_reply(&it, tnt); | |
while (tnt_next(&it)) | |
{ | |
struct tnt_reply *r = TNT_IREPLY_PTR(&it); | |
if (r->code != 0) | |
printf("Insert failed %s\n", r->error); | |
++num_responses; | |
} | |
if (num_responses != td->tp->batch_size) | |
{ | |
fprintf(stderr, "Invalid number of responses: %d, batch size is %d\n", num_responses, td->tp->batch_size); | |
fflush(stdout); | |
} | |
} | |
} | |
tnt_close(tnt); /* See below = TEARDOWN */ | |
tnt_stream_free(tuple); | |
tnt_stream_free(tnt); | |
} | |
void *timer_thread(void *ctx) | |
{ | |
thread_data *tds = (thread_data*)ctx; | |
long long prev_ops = 0, prev_latency = 0; | |
while (1) | |
{ | |
sleep(1); | |
long long ops = 0, latency = 0; | |
for (int i = 0;i < NUM_THREADS;++i) | |
{ | |
ops += tds[i].n_ops; | |
latency += tds[i].latency; | |
} | |
long long rps = ops - prev_ops; | |
long long latency_diff = latency - prev_latency; | |
printf("RPS: %d, average latency: %f\n", (int)rps, latency_diff*1e-6/rps); | |
fflush(stdout); | |
prev_ops = ops; | |
prev_latency = latency; | |
} | |
} | |
int main(int argc, char *argv[]) | |
{ | |
if (argc < 7) | |
{ | |
printf("Usage: tar_test address:port read|write num_threads num_ops batch_size client_max_rps\n"); | |
return 0; | |
} | |
test_params tp; | |
if (strlen(argv[1]) < 255) | |
strcpy(tp.address, argv[1]); | |
else | |
{ | |
fprintf(stderr, "Address is too long\n"); | |
return 1; | |
} | |
tp.is_read_test = !strcmp(argv[2], "read"); | |
NUM_THREADS = atoi(argv[3]); | |
tp.num_ops = atoi(argv[4]); | |
tp.batch_size = atoi(argv[5]); | |
tp.client_max_rps = atoi(argv[6]); | |
tp.is_response_in_thread = 1; | |
pthread_t pids[NUM_THREADS]; | |
thread_data tds[NUM_THREADS]; | |
for (int i = 0;i < NUM_THREADS;++i) | |
{ | |
tds[i].tp = &tp; | |
tds[i].n_ops = 0; | |
tds[i].latency = 0; | |
int r = pthread_create(pids + i, NULL, &do_test, tds + i); | |
if (r < 0) | |
{ | |
fprintf(stderr, "multithread_test: could not create thread, i=%d, r=%d, errno='%s'\n", i, r, strerror(errno)); | |
fflush(stderr); | |
} | |
} | |
pthread_t timer_pid; | |
int r = pthread_create(&timer_pid, NULL, &timer_thread, tds); | |
if (r < 0) | |
{ | |
fprintf(stderr, "multithread_test: could not create thread, r=%d, errno='%s'\n", r, strerror(errno)); | |
fflush(stderr); | |
} | |
for (int i = 0;i < NUM_THREADS;++i) | |
{ | |
void *v; | |
int r = pthread_join(pids[i], &v); | |
if (r < 0) | |
{ | |
fprintf(stderr, "multithread_test: could not join thread, i=%d, r=%d, errno='%s'\n", i, r, strerror(errno)); | |
fflush(stderr); | |
} | |
} | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment