Last active
December 3, 2024 12:38
-
-
Save usbuild/fd11362c504c53f9599d5a4fb29adc67 to your computer and use it in GitHub Desktop.
a circular queue using shared memory, eventfd and unix domain socket
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// | |
//#pragma once | |
#ifndef _GNU_SOURCE | |
#define _GNU_SOURCE | |
#endif | |
#include <stddef.h> | |
#include <string> | |
#include <sys/time.h> | |
#include <time.h> | |
class SHMQueue; | |
struct SHMHeader { | |
size_t length; | |
int futexp; | |
size_t start; | |
size_t end; | |
}; | |
class SHMQueueServer { | |
public: | |
SHMQueueServer(const std::string &sock_path, size_t size); | |
SHMQueue* accept(struct timespec *ts=NULL); | |
int fd() {return sockfd_;} | |
private: | |
int sockfd_; | |
size_t size_; | |
}; | |
class SHMQueue { | |
public: | |
SHMQueue(int fd, int can_consume_fd, int can_produce_fd, size_t size, bool init); | |
static SHMQueue* connect(const std::string &sock_path); | |
char *dequeue(size_t *length, struct timespec *ts=NULL); | |
int enqueue(const char *memory, size_t length, struct timespec *ts=NULL); | |
size_t used(); | |
private: | |
int pop(char *memory, size_t length); | |
int push(const char *memory, size_t length); | |
size_t reserved(); | |
private: | |
SHMHeader *header_; | |
int fd_; | |
int can_consume_fd_; | |
int can_produce_fd_; | |
}; | |
///////////////// header | |
#include <fcntl.h> | |
#include <stdio.h> | |
#include <stdlib.h> | |
#include <string.h> | |
#include <sys/mman.h> | |
#include <sys/stat.h> | |
#include <sys/types.h> | |
#include <unistd.h> | |
#include <assert.h> | |
#include <fcntl.h> | |
#include <errno.h> | |
#include <linux/futex.h> | |
#include <sys/eventfd.h> | |
#include <sys/socket.h> | |
#include <sys/syscall.h> | |
#include <sys/types.h> | |
#include <sys/un.h> | |
#include <sys/wait.h> | |
//#include <gperftools/profiler.h> | |
#include <sys/select.h> | |
#define errExit(msg) \ | |
do { \ | |
perror(msg); \ | |
exit(EXIT_FAILURE); \ | |
} while (0) | |
static int futex(int *uaddr, int futex_op, int val, | |
const struct timespec *timeout, int *uaddr2, int val3) { | |
return syscall(SYS_futex, uaddr, futex_op, val, timeout, uaddr, val3); | |
} | |
static void fwait(int *futexp) { | |
int s; | |
while (1) { | |
if (__sync_bool_compare_and_swap(futexp, 1, 0)) break; | |
s = futex(futexp, FUTEX_WAIT, 0, NULL, NULL, 0); | |
if (s == -1 && errno != EAGAIN) errExit("futex-FUTEX_WAIT"); | |
} | |
} | |
static void fpost(int *futexp) { | |
int s; | |
if (__sync_bool_compare_and_swap(futexp, 0, 1)) { | |
s = futex(futexp, FUTEX_WAKE, 1, NULL, NULL, 0); | |
if (s == -1) errExit("futex-FUTEX_WAKE"); | |
} | |
} | |
bool set_blocking(int fd, bool blocking) | |
{ | |
if (fd < 0) return false; | |
int flags = fcntl(fd, F_GETFL, 0); | |
if (flags == -1) return false; | |
flags = blocking ? (flags & ~O_NONBLOCK) : (flags | O_NONBLOCK); | |
return (fcntl(fd, F_SETFL, flags) == 0) ? true : false; | |
} | |
void timespec_diff(struct timespec *start, struct timespec *stop, | |
struct timespec *result) | |
{ | |
if ((stop->tv_nsec - start->tv_nsec) < 0) { | |
result->tv_sec = stop->tv_sec - start->tv_sec - 1; | |
result->tv_nsec = stop->tv_nsec - start->tv_nsec + 1000000000; | |
} else { | |
result->tv_sec = stop->tv_sec - start->tv_sec; | |
result->tv_nsec = stop->tv_nsec - start->tv_nsec; | |
} | |
return; | |
} | |
bool fd_wait_read(int fd, struct timespec * ts) | |
{ | |
struct timespec start, end, intval, realint; | |
struct timeval tv, *ptv = &tv; | |
clock_gettime(CLOCK_MONOTONIC, &start); | |
while(true) { | |
if (ts) { | |
clock_gettime(CLOCK_MONOTONIC, &end); | |
timespec_diff(&start, &end, &intval); | |
timespec_diff(&intval, ts, &realint); | |
if (realint.tv_sec < 0 || realint.tv_nsec < 0) {return false;} | |
TIMESPEC_TO_TIMEVAL(ptv, &realint); | |
} else { | |
ptv = NULL; | |
} | |
fd_set read_fds; | |
FD_ZERO(&read_fds); | |
FD_SET(fd, &read_fds); | |
int ret = select(fd+1, &read_fds, NULL, NULL, ptv); | |
if((ret > 0) && FD_ISSET(fd, &read_fds)) | |
{ | |
return true; | |
} | |
} | |
} | |
struct CreateQueueResponse { | |
size_t size; | |
}; | |
SHMQueueServer::SHMQueueServer(const std::string &sock_path, size_t size): size_(size) { | |
struct sockaddr_un addr; | |
if ((sockfd_ = socket(AF_UNIX, SOCK_STREAM, 0)) < 0) { | |
errExit("create socket"); | |
return; | |
} | |
set_blocking(sockfd_, false); | |
memset(&addr, 0, sizeof(addr)); | |
addr.sun_family = AF_UNIX; | |
strncpy(addr.sun_path, sock_path.c_str(), sizeof(addr.sun_path)); | |
unlink(sock_path.c_str()); | |
if (bind(sockfd_, (struct sockaddr *)&addr, sizeof(addr)) < 0) { | |
errExit("socket bind"); | |
return; | |
} | |
if (listen(sockfd_, 5) < 0) { | |
errExit("socket listen"); | |
return; | |
} | |
} | |
SHMQueue* SHMQueueServer::accept(struct timespec *ts) { | |
char tmp_path[256] = {0}; | |
fd_wait_read(sockfd_, ts); | |
int client_fd = ::accept(sockfd_, NULL, NULL); | |
if (client_fd < 0) { | |
errExit("socket accept"); | |
} | |
snprintf(tmp_path, sizeof(tmp_path), "cb.%d.%d", time(NULL), rand()); | |
int can_consume_fd = eventfd(0, EFD_CLOEXEC); | |
int can_produce_fd = eventfd(0, EFD_CLOEXEC); | |
int fd = shm_open(tmp_path, O_CREAT | O_RDWR, S_IRUSR | S_IWUSR); | |
ftruncate(fd, sizeof(SHMHeader) + size_); | |
shm_unlink(tmp_path); | |
SHMQueue *queue = new SHMQueue(fd, can_consume_fd, can_produce_fd, size_, true); | |
CreateQueueResponse resp; | |
resp.size = size_; | |
iovec iov[1]; | |
iov[0].iov_base = &resp; | |
iov[0].iov_len = sizeof(resp); | |
int cmsgsize = CMSG_LEN(sizeof(int) * 3); | |
cmsghdr *cmptr = (cmsghdr *)(malloc(cmsgsize)); | |
cmptr->cmsg_level = SOL_SOCKET; | |
cmptr->cmsg_type = SCM_RIGHTS; | |
cmptr->cmsg_len = cmsgsize; | |
msghdr msg; | |
msg.msg_iov = iov; | |
msg.msg_iovlen = 1; | |
msg.msg_name = NULL; | |
msg.msg_namelen = 0; | |
msg.msg_control = cmptr; | |
msg.msg_controllen = cmsgsize; | |
((int *)CMSG_DATA(cmptr))[0] = fd; | |
((int *)CMSG_DATA(cmptr))[1] = can_consume_fd; | |
((int *)CMSG_DATA(cmptr))[2] = can_produce_fd; | |
if (sendmsg(client_fd, &msg, 0) < 0) { | |
perror("sendmsg"); | |
delete queue; | |
queue = NULL; | |
} | |
free(cmptr); | |
close(client_fd); | |
return queue; | |
} | |
#define ADDR(offset) ((char *)this->header_ + sizeof(SHMHeader) + (offset)) | |
#define HEAD ADDR(0) | |
#define START ADDR(this->header_->start) | |
#define END ADDR(this->header_->end) | |
struct lk { | |
lk(int *futexp) : f_(futexp) { fwait(f_); } | |
~lk() { fpost(f_); } | |
private: | |
int *f_; | |
}; | |
SHMQueue::SHMQueue(int fd, int can_consume_fd, int can_produce_fd, size_t size, bool init) : fd_(fd), can_consume_fd_(can_consume_fd), can_produce_fd_(can_produce_fd) { | |
size_t map_size = size + sizeof(SHMHeader); | |
void *ptr = mmap(NULL, map_size, PROT_READ | PROT_WRITE, MAP_SHARED, fd_, 0); | |
if (ptr == MAP_FAILED) { | |
errExit("map failed"); | |
} | |
header_ = static_cast<SHMHeader *>(ptr); | |
if (init) { | |
header_->length = size; | |
header_->futexp = 1; | |
header_->start = 0; | |
header_->end = 0; | |
} | |
set_blocking(can_consume_fd, false); | |
set_blocking(can_produce_fd, false); | |
} | |
SHMQueue* SHMQueue::connect(const std::string &sock_path) { | |
//////////////////////////// | |
struct sockaddr_un addr; | |
int sockfd = socket(AF_UNIX, SOCK_STREAM, 0); | |
if (sockfd < 0) { | |
return NULL; | |
} | |
memset(&addr, 0, sizeof(addr)); | |
addr.sun_family = AF_UNIX; | |
strncpy(addr.sun_path, sock_path.c_str(), sizeof(addr.sun_path)); | |
if(::connect(sockfd, (struct sockaddr *)&addr, sizeof(addr)) < 0) { | |
errExit("socket connect"); | |
} | |
int cmsgsize = CMSG_LEN(sizeof(int) * 3); | |
cmsghdr *cmptr = (cmsghdr *)malloc(cmsgsize); | |
CreateQueueResponse resp; | |
iovec iov[1]; | |
iov[0].iov_base = &resp; | |
iov[0].iov_len = sizeof(resp); | |
msghdr msg; | |
msg.msg_iov = iov; | |
msg.msg_iovlen = 1; | |
msg.msg_name = NULL; | |
msg.msg_namelen = 0; | |
msg.msg_control = cmptr; | |
msg.msg_controllen = cmsgsize; | |
if(recvmsg(sockfd, &msg, 0) < 0) { | |
errExit("recvmsg"); | |
} | |
int fd = ((int *)CMSG_DATA(cmptr))[0]; | |
int can_consume_fd = ((int *)CMSG_DATA(cmptr))[1]; | |
int can_produce_fd = ((int *)CMSG_DATA(cmptr))[2]; | |
free(cmptr); | |
return new SHMQueue(fd, can_consume_fd, can_produce_fd, resp.size, false); | |
} | |
size_t SHMQueue::reserved() { | |
if (header_->end >= header_->start) { | |
return header_->length - header_->end + header_->start; | |
} else { | |
return header_->start - header_->end; | |
} | |
} | |
int SHMQueue::push(const char *memory, size_t length) { | |
if (header_->end >= header_->start) { | |
size_t tail_len = header_->length - header_->end; | |
if (tail_len > length) { | |
memcpy(END, memory, length); | |
header_->end += length; | |
} else { | |
size_t start_len = length - tail_len; | |
memcpy(END, memory, tail_len); | |
memcpy(HEAD, memory + tail_len, start_len); | |
header_->end = start_len; | |
} | |
} else { | |
memcpy(END, memory, length); | |
header_->end += length; | |
} | |
return 0; | |
} | |
int SHMQueue::enqueue(const char *memory, size_t length, struct timespec *ts) { | |
lk _(&header_->futexp); | |
while (length + sizeof(size_t) >= reserved()) { | |
int64_t cnt = 0; | |
fpost(&header_->futexp); | |
bool ret = fd_wait_read(can_produce_fd_, ts); | |
fwait(&header_->futexp); | |
if (!ret) return -1; | |
read(can_produce_fd_, &cnt, sizeof(cnt)); | |
} | |
push((char *)&length, sizeof(size_t)); | |
push(memory, length); | |
int64_t cnt = 1; | |
write(can_consume_fd_, &cnt, sizeof(cnt)); | |
return 0; | |
} | |
int SHMQueue::pop(char *memory, size_t length) { | |
if (header_->end < header_->start) { | |
size_t head_len = header_->length - header_->start; | |
if (head_len >= length) { | |
memcpy(memory, START, length); | |
header_->start += length; | |
} else { | |
size_t start_len = length - head_len; | |
memcpy(memory, START, head_len); | |
memcpy(memory + head_len, HEAD, start_len); | |
header_->start = start_len; | |
} | |
} else { | |
memcpy(memory, START, length); | |
header_->start += length; | |
} | |
return 0; | |
} | |
char *SHMQueue::dequeue(size_t *length, struct timespec *ts) { | |
lk _(&header_->futexp); | |
while ((header_->start - header_->end + header_->length) % header_->length == 0) { | |
int64_t cnt = 0; | |
fpost(&header_->futexp); | |
bool ret = fd_wait_read(can_consume_fd_, ts); | |
fwait(&header_->futexp); | |
if (!ret) return NULL; | |
int rc = read(can_consume_fd_, &cnt, sizeof(cnt)); | |
} | |
pop((char *)(length), sizeof(size_t)); | |
char *ptr = (char *)malloc(*length); | |
pop(ptr, *length); | |
int64_t cnt = 1; | |
write(can_produce_fd_, &cnt, sizeof(cnt)); | |
return ptr; | |
} | |
size_t SHMQueue::used() { | |
lk _(&header_->futexp); | |
return header_->length - reserved(); | |
} | |
int main() { | |
int count = 200000; | |
int idx = 0; | |
uint64_t digest = 5381L; | |
if (fork()) { // parent | |
//ProfilerStart("perf_parent.out"); | |
SHMQueueServer server("/tmp/qichao_test.sock", 2000); | |
SHMQueue *queue = server.accept(); | |
const int max_size = 1024; | |
char buf[max_size]; | |
FILE *f = fopen("/dev/urandom", "r"); | |
while (true) { | |
idx++; | |
size_t size = random() % max_size; | |
fread(buf, size, 1, f); | |
if (queue->enqueue(buf, size) == 0) { | |
for (int i = 0; i < size; ++i) { digest = ((digest << 5) + digest) + buf[i]; } | |
if (!--count) break; | |
} | |
} | |
fclose(f); | |
printf("parent: %lx idx: %d\n", digest, idx); | |
//ProfilerStop(); | |
int status; | |
wait(&status); | |
} else { | |
usleep(100000); | |
//ProfilerStart("perf_child.out"); | |
SHMQueue *queue = SHMQueue::connect("/tmp/qichao_test.sock"); | |
size_t length; | |
while (true) { | |
idx++; | |
char *ptr = queue->dequeue(&length); | |
if (ptr) { | |
for (int i = 0; i < length; ++i) { digest = ((digest << 5) + digest) + ptr[i]; } | |
free(ptr); | |
if (!--count) break; | |
} | |
} | |
assert(!queue->used()); | |
printf("child : %lx idx: %d\n", digest, idx); | |
//ProfilerStop(); | |
} | |
return 0; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment