Created
February 5, 2024 04:49
-
-
Save cloudwu/e8cc734a31dd01b439d8d131acc361c3 to your computer and use it in GitHub Desktop.
concurrence log
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include "clog.h" | |
#include <stdatomic.h> | |
#include <stdlib.h> | |
#include <string.h> | |
#define LOGMETA_BUFFER 4096 | |
#define LOGDATA_BUFFER (64 * 1024) | |
// [..........................] | |
// ^ | |
// | | |
// ptr % LOGDATA_BUFFER | |
// N producer: | |
// add ptr, size | |
// copy : input -> buffer + (ptr - size) % LOGDATA_BUFFER | |
// consumer: | |
// copy : buffer + offset % LOGDATA_BUFFER -> output | |
// check valid ( offset + LOGDATA_BUFFER >= ptr ) | |
struct log_data_ringbuffer { | |
atomic_size_t ptr; | |
char buffer[LOGDATA_BUFFER]; | |
}; | |
// N producer: | |
// push data into log_data_ringbuffer | |
// index <- atomic_fetch_add tail, 1 | |
// offset[index] = data offset | |
// size[index] = size (last) | |
// 1 consumer: | |
// if head == tail then return empty | |
// if size[head] < 0 then return empty | |
// index = head ++ | |
// offset[index] -> output | |
// size[index] = -1 (invalid size) | |
struct log_meta_ringbuffer { | |
atomic_size_t tail; | |
size_t head; | |
size_t offset[LOGMETA_BUFFER]; | |
atomic_int size[LOGMETA_BUFFER]; | |
}; | |
struct log_buffer { | |
struct log_meta_ringbuffer meta; | |
struct log_data_ringbuffer data; | |
}; | |
struct log_buffer * | |
log_new() { | |
struct log_buffer * obj = (struct log_buffer *)malloc(sizeof(*obj)); | |
obj->meta.head = 0; | |
atomic_init(&obj->meta.tail, 0); | |
atomic_init(&obj->data.ptr, 0); | |
return obj; | |
} | |
void | |
log_delete(struct log_buffer *obj) { | |
free(obj); | |
} | |
static size_t | |
push_data(struct log_data_ringbuffer *buffer, int n, const char *data) { | |
size_t ptr = atomic_fetch_add(&buffer->ptr, n); | |
int offset = ptr % LOGDATA_BUFFER; | |
if (offset + n <= LOGDATA_BUFFER) { | |
memcpy(buffer->buffer + offset, data, n); | |
} else { | |
int second_part = offset + n - LOGDATA_BUFFER; | |
int first_part = n - second_part; | |
memcpy(buffer->buffer + offset, data, first_part); | |
memcpy(buffer->buffer, data+first_part, second_part); | |
} | |
return ptr; | |
} | |
void | |
log_push(struct log_buffer *self, int n, const char *data) { | |
size_t offset = push_data(&self->data, n, data); | |
struct log_meta_ringbuffer *meta_buffer = &self->meta; | |
size_t index = atomic_fetch_add(&meta_buffer->tail, 1); | |
index %= LOGMETA_BUFFER; | |
meta_buffer->offset[index] = offset; | |
atomic_thread_fence(memory_order_release); | |
atomic_store_explicit(&meta_buffer->size[index] , n, memory_order_relaxed); | |
} | |
// >0 : size , 0 empty ; -1 : drop | |
static int | |
get_index(struct log_buffer *self, size_t *offset) { | |
struct log_meta_ringbuffer *meta = &self->meta; | |
size_t t = atomic_load_explicit(&meta->tail, memory_order_relaxed); | |
if (t == meta->head) | |
return 0; | |
if (meta->head + LOGMETA_BUFFER < t) { | |
++meta->head; | |
return -1; | |
} | |
int index = meta->head % LOGMETA_BUFFER; | |
int size = atomic_load_explicit(&meta->size[index], memory_order_relaxed); | |
if (size < 0) | |
return 0; | |
atomic_thread_fence(memory_order_acquire); | |
meta->head ++; | |
*offset = meta->offset[index]; | |
atomic_thread_fence(memory_order_release); | |
atomic_store_explicit(&meta->size[index], -1, memory_order_relaxed); | |
return size; | |
} | |
// >0 : length ; 0 empty ; -1 : drop | |
int | |
log_pop(struct log_buffer *self, int n, char *output) { | |
size_t queue_offset; | |
int size = get_index(self, &queue_offset); | |
if (size <= 0) | |
return size; | |
struct log_data_ringbuffer *buffer = &self->data; | |
if (size > n) | |
size = n; | |
int offset = queue_offset % LOGDATA_BUFFER; | |
if (offset + size <= LOGDATA_BUFFER) { | |
memcpy(output, buffer->buffer + offset, size); | |
} else { | |
int second_part = offset + size - LOGDATA_BUFFER; | |
int first_part = size - second_part; | |
memcpy(output, buffer->buffer + offset, first_part); | |
memcpy(output + first_part, buffer->buffer, second_part); | |
} | |
size_t ptr = atomic_load(&buffer->ptr); | |
if (queue_offset + LOGDATA_BUFFER >= ptr) | |
return size; | |
return -1; | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#ifndef concurrence_log_h | |
#define concurrence_log_h | |
#include <stddef.h> | |
struct log_buffer; | |
struct log_buffer * log_new(); | |
void log_delete(struct log_buffer *); | |
void log_push(struct log_buffer *, int n, const char *data); | |
// >0 : length ; 0 empty ; -1 : drop | |
int log_pop(struct log_buffer *self, int n, char *output); | |
#endif |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include "clog.h" | |
#include <stdio.h> | |
int | |
main() { | |
struct log_buffer *log = log_new(); | |
char tmp[100]; | |
int i; | |
for (i=0;i< 5000; i++) { | |
int n = sprintf(tmp, "Hello World %d", i); | |
log_push(log, n, tmp); | |
} | |
printf("push\n"); | |
for (i=0;i< 5000; i++) { | |
int n = log_pop(log, 100, tmp); | |
if (n <= 0) { | |
printf("Drop %d\n", i); | |
} else { | |
printf("[%d] %.*s\n", i, n, tmp); | |
} | |
} | |
log_delete(log); | |
return 0; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment