Skip to content

Instantly share code, notes, and snippets.

@keiichishima
Created May 19, 2015 10:56
Show Gist options
  • Save keiichishima/9091f1341d00ae42365d to your computer and use it in GitHub Desktop.
Save keiichishima/9091f1341d00ae42365d to your computer and use it in GitHub Desktop.
#include <errno.h>
#include <fcntl.h>
#include <signal.h>
#include <stdio.h>
#include <stdlib.h>
#include <stropts.h>
#include <unistd.h>
#include <string.h>
#include <sys/signalfd.h>
#define __USE_GNU
#include <poll.h>
#include <hiredis/hiredis.h>
#include "abuse.h"
#define SECTOR_SIZE 512
int fd;
redisContext *red;
redisReply *res;
void teardown()
{
ioctl(fd, ABUSE_RESET);
return;
}
const int max_queue = 1 << 16;
char *
get_data(const char *disk_name, __u64 sector, __u32 data_len)
{
char *data;
int nsectors;
int i;
char key_name[512];
data = malloc(data_len);
nsectors = data_len / SECTOR_SIZE;
if ((data_len % SECTOR_SIZE) != 0) {
nsectors++;
}
for (i = 0; i < nsectors; i++) {
sprintf(key_name, "%s:%llu", disk_name, sector + i);
res = redisCommand(red, "GET %s", key_name);
if (res == NULL || red->err) {
fprintf(stderr, "GET error\n");
exit(-1);
}
if (res->type == REDIS_REPLY_ERROR) {
fprintf(stderr, "get_data: redis error: %s\n", res->str);
exit(-1);
}
if (res->type == REDIS_REPLY_NIL) {
memset(data + (i * SECTOR_SIZE), 0, SECTOR_SIZE);
} else {
memcpy(data + (i * SECTOR_SIZE), res->str, res->len);
}
freeReplyObject(res);
}
return data;
}
int
put_data(const char *disk_name, __u64 sector, const char *data,
__u64 data_len)
{
int nsectors;
int i;
char key_name[512];
nsectors = data_len / SECTOR_SIZE;
if ((data_len % SECTOR_SIZE) != 0) {
nsectors++;
}
for (i = 0; i < nsectors; i++) {
sprintf(key_name, "%s:%llu", disk_name, sector + i);
res = redisCommand(red, "SET %s %b", key_name, data + (i * SECTOR_SIZE),
SECTOR_SIZE);
if (res == NULL) {
fprintf(stderr, "SET error\n");
exit(-1);
}
if (res->type == REDIS_REPLY_ERROR) {
fprintf(stderr, "put_data: redis error: %s\n", res->str);
exit(-1);
}
freeReplyObject(res);
}
return 0;
}
int
main(int argc, char *argv[])
{
char *abctlpath = argv[1];
char *redis_server = argv[2];
char *disk_name = argv[3];
fd = open(abctlpath, O_RDWR);
if (fd < 0) {
fprintf(stderr, "Failed to open %s: ", abctlpath);
perror(NULL);
return -1;
}
// check if disk is already there
struct abuse_info info;
if (ioctl(fd, ABUSE_GET_STATUS, &info) == -1) {
perror("GET_STATUS failed");
return -1;
}
if (info.ab_size > 0)
ioctl(fd, ABUSE_RESET);
// Generate disk
info.ab_size = 1310720 * SECTOR_SIZE; /* XXX 640MB */
//info.ab_size = 125000 * SECTOR_SIZE; /* XXX 64MB */
info.ab_blocksize = 4096;
info.ab_max_queue = max_queue;
if (ioctl(fd, ABUSE_SET_STATUS, &info) == -1) {
perror("SET_STATUS failed");
return -1;
}
const int FDCNT = 2;
struct pollfd fds[FDCNT];
sigset_t sigmask;
sigemptyset(&sigmask);
sigaddset(&sigmask, SIGINT);
sigaddset(&sigmask, SIGTERM);
sigaddset(&sigmask, SIGQUIT);
fds[0].fd = fd;
fds[0].events = POLLMSG;
fds[1].fd = signalfd(-1, &sigmask, 0);
fds[1].events = POLLIN;
if (sigprocmask(SIG_BLOCK, &sigmask, NULL) == -1) {
perror("sigprocmask failed");
teardown();
return -1;
}
// main loop waiting for IO
red = redisConnect(redis_server, 6379);
if (red == NULL || red->err) {
if (red) {
fprintf(stderr, "Connection error: %s\n", red->errstr);
redisFree(red);
} else {
fprintf(stderr, "Connection error: can't allocate redis context\n");
}
return -1;
}
while (ppoll(fds, FDCNT, NULL, NULL) > 0) {
if (fds[0].revents) {
struct abuse_xfr_hdr xfr;
struct abuse_vec vecs[max_queue];
for (;;) {
int i;
memset(&xfr, 0, sizeof(xfr));
xfr.ab_transfer_address = (__u64) vecs;
// read header
if (ioctl(fd, ABUSE_GET_BIO, &xfr) == -1) {
if (errno != ENOMSG) {
perror("GET_BIO failed");
goto out;
}
}
if (xfr.ab_vec_count == 0)
break;
#ifdef DEBUG
printf("id =\t\t%p\n"
"command =\t%s\n"
"sector = \t0x%llx\n"
"result =\t0x%x\n"
"vec_count =\t%d\n",
(void*)xfr.ab_id, xfr.ab_command ? "WRITE": "READ",
xfr.ab_sector,
xfr.ab_result, xfr.ab_vec_count);
#endif
int write = xfr.ab_command;
// allocate receive/send buffer
if (write) {
for (i = 0; i < xfr.ab_vec_count; ++i) {
vecs[i].ab_address = (__u64)malloc(vecs[i].ab_len);
}
ioctl(fd, ABUSE_PUT_BIO, &xfr);
for (i = 0; i < xfr.ab_vec_count; ++i) {
char *data;
#ifdef DEBUG
printf("\toffset: %d, len: %d\n\n", vecs[i].ab_offset, vecs[i].ab_len);
#endif
data = get_data(disk_name, xfr.ab_sector,
vecs[i].ab_offset + vecs[i].ab_len);
memcpy(data + vecs[i].ab_offset,
(char *)vecs[i].ab_address + vecs[i].ab_offset,
vecs[i].ab_len);
put_data(disk_name, xfr.ab_sector, data,
vecs[i].ab_offset + vecs[i].ab_len);
free(data);
}
for (i=0; i<xfr.ab_vec_count; ++i)
free((struct abuse_vec*)vecs[i].ab_address);
} else {
char **data_array = malloc(xfr.ab_vec_count * sizeof(char*));
for (i = 0; i < xfr.ab_vec_count; ++i) {
#ifdef DEBUG
printf("\toffset: %d, len: %d\n\n", vecs[i].ab_offset, vecs[i].ab_len);
#endif
data_array[i] = get_data(disk_name, xfr.ab_sector,
vecs[i].ab_offset + vecs[i].ab_len);
vecs[i].ab_address = (__u64)(data_array[i]);
}
ioctl(fd, ABUSE_PUT_BIO, &xfr);
for (i=0; i<xfr.ab_vec_count; ++i) {
free(data_array[i]);
}
free(data_array);
}
fds[0].revents = 0;
fflush(stdout);
}
}
// got signal! closing
if (fds[1].revents)
break;
}
out:
fprintf(stderr, "Exiting\n");
teardown();
redisFree(red);
close(fd);
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment