Last active
September 4, 2022 10:03
-
-
Save s8sg/6c774cd901527df2bd036c55167ce936 to your computer and use it in GitHub Desktop.
A blazing fast single producer multiple consumer lockless queue
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* NOTE: The design decision of the req pool is tuned to get the | |
* best possible performance. Below point describes | |
* the design decisions: | |
* >> reqpool is a buffer queue where the producer adds at the start | |
* and consumers consume from the end | |
* >> it is strictly one consumer and multiple producers queue | |
* >> It is unbounded queue and avoids any resource allocation on heap | |
* >> It is lockless and use atomic operation to avoid race condition | |
* for consumers | |
*/ | |
#include <stdbool.h> | |
#define EMPTY(pool) pool->end == NULL; | |
/* As there are multiplle producer we need atomic operation | |
* * while dequeueing */ | |
#define SET_END(pool, old_node, node) __sync_bool_compare_and_swap(&(pool->end), old_node, node) | |
typedef struct pool_node { | |
struct pool_node *next; | |
struct pool_node *prev; | |
} pool_node_t; | |
struct reqpool { | |
pool_node_t *start; | |
pool_node_t *end; | |
}; | |
typedef struct reqpool reqpool_t; | |
/* cast a data from the node | |
* Example: | |
* | |
* typedef struct mydata { | |
* .... | |
* pool_node_t node; | |
* } mydata_t; | |
* | |
* enqueue: | |
* | |
* enqueue(pool, &(mydata->node)); | |
* | |
* | |
* dequeue: | |
* | |
* mydata_t *mydata; | |
* pool_node_t *node = dequeue(pool); | |
* mydata = CAST_DATA(node, mydata_t); | |
*/ | |
#define CAST_DATA(node, datatype) (datatype *)((char*)node - (sizeof(datatype) - sizeof(pool_node_t))) | |
static inline void init_pool(reqpool_t *pool) { | |
pool->start = NULL; | |
pool->end = NULL; | |
} | |
// Add node to the pool, strictly single producer | |
static inline void enqueue(reqpool_t *pool, pool_node_t *node) { | |
node->next = NULL; | |
node->prev = NULL; | |
/* we never reset pool->start to NULL even if the queue | |
* is empty so if the same address is used twice it will | |
* create the vicious loop of a->prev = a. Which would | |
* cause dequeue to return same adress multiple time | |
* */ | |
if(pool->start != NULL && pool->start != node) | |
pool->start->prev = node; | |
node->next=pool->start; | |
pool->start=node; | |
/* only successful if end is NULL */ | |
SET_END(pool, NULL, node); | |
} | |
// Dequeue node from pool | |
static inline pool_node_t * dequeue(reqpool_t *pool) { | |
pool_node_t *node = NULL; | |
node = pool->end; | |
if(node != NULL) { | |
if(!SET_END(pool, node, node->prev)) { | |
node = NULL; | |
} | |
} | |
return node; | |
} | |
/******* MORE detailed Example ******************************************************************************************* | |
#include<stdbool.h> | |
#include<stdio.h> | |
#include<stdlib.h> | |
#include<pthread.h> | |
#include<unistd.h> | |
#include <stdbool.h> | |
#include "gslb_reqpool.h" | |
typedef struct mydata { | |
int data; | |
pool_node_t node; // node links request_data in list) | |
} mydata_t; | |
static void *consumer(void *param) { | |
mydata_t *mydata = NULL; | |
reqpool_t *pool = (reqpool_t*)param; | |
pool_node_t *node = NULL; | |
while (true) { | |
node = dequeue(pool); | |
if (node != NULL) { | |
mydata = CAST_DATA(node, mydata_t); | |
printf("dequeue: %d\n", mydata->data); | |
free(mydata); | |
} | |
} | |
} | |
static void *producer(void *param) { | |
int counter = 1; | |
reqpool_t *pool = (reqpool_t*)param; | |
while(true) { | |
mydata_t *mydata = (mydata_t *)malloc(sizeof(mydata_t)); | |
mydata->data = counter; | |
enqueue(pool, &(mydata->node)); | |
counter++; | |
} | |
} | |
int main(int argc, char **argv) { | |
reqpool_t pool; | |
pthread_t p_thread; | |
pthread_t c_thread[12]; | |
char *b; | |
int i = 0; | |
init_pool(&pool); | |
pthread_create(&p_thread, NULL, producer, &pool); | |
for(i=0; i<12; i++) { | |
pthread_create(&c_thread[i], NULL, consumer, &pool); | |
} | |
pthread_join(p_thread, (void**)&b); | |
for(i=0; i<12; i++) { | |
pthread_join(c_thread[i], (void**)&b); | |
} | |
} | |
************************************************************************************************************************/ |
Line#65~#66: Potential memory leak
This implementation does not reset pool->start
to NULL
when the queue is emptied. As such, calling enqueue()
after emptying the queue causes pool->start->prev = node;
to execute, while the node pool->start
is no longer valid and potentially already free()
ed.
Had a very difficult afternoon spotting this exact bug...
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Inconsistent description in Line#6 of given code as compared to topic.