Skip to content

Instantly share code, notes, and snippets.

@Alexis-D
Created February 11, 2012 15:51
Show Gist options
  • Save Alexis-D/1801206 to your computer and use it in GitHub Desktop.
Save Alexis-D/1801206 to your computer and use it in GitHub Desktop.
Producer / Consumer / Conditions variables / pthreads
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#define BUF_SIZE 5
// the buffer works like a stack for
// the sake of simplicity, if needed
// we may implement a queue
typedef struct {
int buf[BUF_SIZE]; // the buffer
size_t len; // number of items in the buffer
pthread_mutex_t mutex; // needed to add/remove data from the buffer
pthread_cond_t can_produce; // signaled when items are removed
pthread_cond_t can_consume; // signaled when items are added
} buffer_t;
// produce random numbers
void* producer(void *arg) {
buffer_t *buffer = (buffer_t*)arg;
while(1) {
#ifdef UNDERFLOW
// used to show that if the producer is somewhat "slow"
// the consumer will not fail (i.e. it'll just wait
// for new items to consume)
sleep(rand() % 3);
#endif
pthread_mutex_lock(&buffer->mutex);
if(buffer->len == BUF_SIZE) { // full
// wait until some elements are consumed
pthread_cond_wait(&buffer->can_produce, &buffer->mutex);
}
// in real life it may be some data fetched from
// sensors, the web, or just some I/O
int t = rand();
printf("Produced: %d\n", t);
// append data to the buffer
buffer->buf[buffer->len] = t;
++buffer->len;
// signal the fact that new items may be consumed
pthread_cond_signal(&buffer->can_consume);
pthread_mutex_unlock(&buffer->mutex);
}
// never reached
return NULL;
}
// consume random numbers
void* consumer(void *arg) {
buffer_t *buffer = (buffer_t*)arg;
while(1) {
#ifdef OVERFLOW
// show that the buffer won't overflow if the consumer
// is slow (i.e. the producer will wait)
sleep(rand() % 3);
#endif
pthread_mutex_lock(&buffer->mutex);
if(buffer->len == 0) { // empty
// wait for new items to be appended to the buffer
pthread_cond_wait(&buffer->can_consume, &buffer->mutex);
}
// grab data
--buffer->len;
printf("Consumed: %d\n", buffer->buf[buffer->len]);
// signal the fact that new items may be produced
pthread_cond_signal(&buffer->can_produce);
pthread_mutex_unlock(&buffer->mutex);
}
// never reached
return NULL;
}
int main(int argc, char *argv[]) {
buffer_t buffer = {
.len = 0,
.mutex = PTHREAD_MUTEX_INITIALIZER,
.can_produce = PTHREAD_COND_INITIALIZER,
.can_consume = PTHREAD_COND_INITIALIZER
};
pthread_t prod, cons;
pthread_create(&prod, NULL, producer, (void*)&buffer);
pthread_create(&cons, NULL, consumer, (void*)&buffer);
pthread_join(prod, NULL); // will wait forever
pthread_join(cons, NULL);
return 0;
}
@CodeLife2012
Copy link

CodeLife2012 commented Nov 8, 2017

Why can't use only one condition?
@ktsaou @Alexis-D

@rksys
Copy link

rksys commented Nov 11, 2018

Hi, to allow multiple consumers, this if statement:

        if(buffer->len == 0) { // empty
            // wait for new items to be appended to the buffer
            pthread_cond_wait(&buffer->can_consume, &buffer->mutex);
        }

needs to be a while statement:

        while(buffer->len == 0) { // empty
            // wait for new items to be appended to the buffer
            pthread_cond_wait(&buffer->can_consume, &buffer->mutex);
        }

+1

@evandrix
Copy link

Hi, to allow multiple consumers, this if statement:

        if(buffer->len == 0) { // empty
            // wait for new items to be appended to the buffer
            pthread_cond_wait(&buffer->can_consume, &buffer->mutex);
        }

needs to be a while statement:

        while(buffer->len == 0) { // empty
            // wait for new items to be appended to the buffer
            pthread_cond_wait(&buffer->can_consume, &buffer->mutex);
        }

👍

@UmbraSang
Copy link

Hi, to allow multiple consumers, this if statement:

        if(buffer->len == 0) { // empty
            // wait for new items to be appended to the buffer
            pthread_cond_wait(&buffer->can_consume, &buffer->mutex);
        }

needs to be a while statement:

        while(buffer->len == 0) { // empty
            // wait for new items to be appended to the buffer
            pthread_cond_wait(&buffer->can_consume, &buffer->mutex);
        }

but why?

@Matt-KC
Copy link

Matt-KC commented Apr 26, 2019

I get this error when compiling:

/tmp/cc1yUZ5e.o: In function main': pthread.c:(.text+0x1aa): undefined reference to pthread_create'
pthread.c:(.text+0x1cd): undefined reference to pthread_create' pthread.c:(.text+0x1e1): undefined reference to pthread_join'
pthread.c:(.text+0x1f5): undefined reference to `pthread_join'
collect2: error: ld returned 1 exit status

@bheemann
Copy link

bheemann commented Feb 2, 2020

I get this error when compiling:

/tmp/cc1yUZ5e.o: In function main': pthread.c:(.text+0x1aa): undefined reference to pthread_create'
pthread.c:(.text+0x1cd): undefined reference to pthread_create' pthread.c:(.text+0x1e1): undefined reference to pthread_join'
pthread.c:(.text+0x1f5): undefined reference to `pthread_join'
collect2: error: ld returned 1 exit status

compile with -lpthread

@billgewrgoulas
Copy link

billgewrgoulas commented Mar 18, 2020

if we want to allow 1 consumer and 1 producer to be able to get/append data at the same time would we need two mutexes?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment