Last active
November 22, 2022 10:59
-
-
Save victoroliveirab/2ecdba1525dec887329ba4bfd73dd253 to your computer and use it in GitHub Desktop.
Threadpool with pthreads, semaphore and mutex lock
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* Example client program that uses thread pool. | |
*/ | |
#include <stdio.h> | |
#include <unistd.h> | |
#include "threadpool.h" | |
struct data | |
{ | |
int a; | |
int b; | |
}; | |
void add(void *param) | |
{ | |
struct data *temp; | |
temp = (struct data*)param; | |
printf("I add two values %d and %d result = %d\n",temp->a, temp->b, temp->a + temp->b); | |
} | |
int main(void) | |
{ | |
// create some work to do | |
struct data work0; | |
work0.a = 5; | |
work0.b = 10; | |
struct data work1; | |
work1.a = 15; | |
work1.b = 110; | |
struct data work2; | |
work2.a = 55; | |
work2.b = 150; | |
struct data work3; | |
work3.a = 155; | |
work3.b = 1105; | |
struct data work4; | |
work4.a = 25; | |
work4.b = 210; | |
struct data work5; | |
work5.a = 158; | |
work5.b = 1108; | |
struct data work6; | |
work6.a = 9; | |
work6.b = 99; | |
struct data work7; | |
work7.a = 1; | |
work7.b = 11; | |
// initialize the thread pool | |
pool_init(); | |
// submit the work to the queue | |
pool_submit(&add,&work0); | |
pool_submit(&add,&work1); | |
pool_submit(&add,&work2); | |
pool_submit(&add,&work3); | |
pool_submit(&add,&work4); | |
pool_submit(&add,&work5); | |
pool_submit(&add,&work6); | |
pool_submit(&add,&work7); | |
// may be helpful | |
// sleep(1); | |
pool_shutdown(); | |
return 0; | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# makefile for thread pool | |
# | |
CC=gcc | |
CFLAGS=-Wall | |
PTHREADS=-lpthread | |
all: client.o threadpool.o | |
$(CC) $(CFLAGS) -o test client.o threadpool.o $(PTHREADS) | |
client.o: client.c | |
$(CC) $(CFLAGS) -c client.c $(PTHREADS) | |
threadpool.o: threadpool.c threadpool.h | |
$(CC) $(CFLAGS) -c threadpool.c $(PTHREADS) | |
clean: | |
rm -rf *.o | |
rm -rf test |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* Implementation of thread pool. | |
*/ | |
#include <pthread.h> | |
#include <stdlib.h> | |
#include <stdio.h> | |
#include <semaphore.h> | |
#include "threadpool.h" | |
#define QUEUE_SIZE 10 // not used in this code | |
#define NUMBER_OF_THREADS 3 | |
#define TRUE 1 | |
#define DEBUG 1 // 0 - no debug message, 1 - main points, 2 - all | |
#define LIFE 2 // amount of seconds threads will live | |
struct task worktodo; | |
struct threadpool *pool; | |
time_t begin; | |
pthread_t bee; // the worker bee | |
int i = 0; // task_id debugging | |
int j = 0; // thread_id debugging | |
// insert a task into the queue | |
int enqueue(struct task *t) | |
{ | |
if (pool->head == NULL) { // pool is currently empty | |
if (DEBUG > 1) printf("Pool is currently empty...\n"); | |
pool->head = t; | |
pool->tail = t; | |
} else { | |
if (DEBUG > 1) printf("Pool isn't currently empty...\n"); | |
pool->tail->next = t; | |
pool->tail = t; | |
} | |
if (DEBUG > 1) printf("Task with id = %d added...\n", t->task_id); | |
sem_post(&pool->semaphore); | |
return 0; | |
} | |
// remove a task from the queue | |
struct task* dequeue() | |
{ | |
struct task *worktodo; | |
worktodo = pool->head; | |
if (worktodo == NULL) { // dequeue was required in an empty thread | |
return NULL; | |
} | |
if (worktodo->next == NULL) { // dequeue made pool empty | |
pool->head = NULL; | |
pool->tail = NULL; | |
} else { | |
pool->head = worktodo->next; | |
} | |
return worktodo; | |
} | |
// the worker thread in the thread pool | |
void *worker(void *param) | |
{ | |
int thread_id = (int) param; | |
if (DEBUG) printf("Start work of thread %u...\n", thread_id); | |
struct task *task; | |
int num_of_works = 0; | |
time_t now; | |
while (TRUE) { | |
now = time(NULL); | |
if (now - begin > pool->timespan) break; // stop condition | |
sem_wait(&pool->semaphore); | |
pthread_mutex_lock(&pool->mutex); | |
if (DEBUG && pool->head) printf(".locking in thread %u...\n", thread_id); | |
if (pool->head == NULL) { | |
pthread_mutex_unlock(&pool->mutex); | |
} else { | |
task = dequeue(); | |
++num_of_works; | |
if (DEBUG > 1) printf("Thread %u got job of adding %u + %u...\n", thread_id, *((int*) task->data), *((int*) task->data + 1)); | |
pthread_mutex_unlock(&pool->mutex); | |
if (DEBUG) printf(".unlocking in thread %u...\n", thread_id); | |
execute(task->function, task->data); | |
} | |
sem_post(&pool->semaphore); | |
} | |
if (DEBUG) printf("Closing thread %u, that performed %u works...\n",thread_id, num_of_works); | |
pthread_exit(0); | |
} | |
/** | |
* Executes the task provided to the thread pool | |
*/ | |
void execute(void (*somefunction)(void *p), void *p) | |
{ | |
(*somefunction)(p); | |
} | |
/** | |
* Submits work to the pool. | |
*/ | |
int pool_submit(void (*somefunction)(void *p), void *p) | |
{ | |
pthread_mutex_lock(&pool->mutex); | |
if (DEBUG > 1) printf("New task in the pool... Locking\n"); | |
struct task *worktodo = (struct task*) malloc(sizeof(struct task)); | |
worktodo->function = somefunction; | |
worktodo->data = p; | |
worktodo->task_id = i; | |
worktodo->next = NULL; | |
++i; | |
if (DEBUG > 1) { | |
printf("work.a = %u ... ", *((int*) worktodo->data)); | |
printf("work.b = %u\n", *((int*) worktodo->data + 1)); | |
//printf(">>> *((int*) worktodo.data) == worktodo.b = %u\n", *((int*) (worktodo.data + sizeof(int)))); | |
} | |
enqueue(worktodo); | |
if (DEBUG > 1) printf("Enqueued. Unlocking execution...\n"); | |
pthread_mutex_unlock(&pool->mutex); | |
return 0; | |
} | |
// initialize the thread pool | |
void pool_init(void) | |
{ | |
pool = (struct threadpool*) malloc(sizeof(struct threadpool)); | |
pool->head = NULL; | |
pool->tail = NULL; | |
pool->timespan = LIFE; //seconds | |
pthread_mutex_init(&pool->mutex, NULL); | |
sem_init(&pool->semaphore, 0, NUMBER_OF_THREADS); | |
begin = time(NULL); | |
for (int i = 0; i < NUMBER_OF_THREADS; ++i, ++j) { | |
pthread_create(&bee, NULL, worker, (void *) j); | |
} | |
} | |
// shutdown the thread pool | |
void pool_shutdown(void) | |
{ | |
pthread_join(bee,NULL); | |
if (DEBUG) printf("End of execution :)\n"); | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <pthread.h> | |
#include <semaphore.h> | |
// function prototypes | |
void execute(void (*somefunction)(void *p), void *p); | |
int pool_submit(void (*somefunction)(void *p), void *p); | |
void *worker(void *param); | |
void pool_init(void); | |
void pool_shutdown(void); | |
struct task | |
{ | |
void (*function)(void *p); | |
void *data; | |
struct task *next; | |
int task_id; //debug purpose | |
}; | |
struct threadpool | |
{ | |
struct task *head; | |
struct task *tail; | |
pthread_mutex_t mutex; | |
sem_t semaphore; | |
int timespan; | |
}; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment