Last active
February 24, 2023 22:52
-
-
Save tqinli/db1b892a97cfa0bc41fb8b0b0b156b7e to your computer and use it in GitHub Desktop.
A repro for pthread_cond_wait/signal's issue of losing signal
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// To compile | |
// gcc -g -o pthread_cond_repro ./pthread_cond_repro.c -lpthread | |
#include <pthread.h> | |
#include <stdio.h> | |
#include <stdlib.h> | |
#include <stdint.h> | |
#include <unistd.h> | |
#include <assert.h> | |
// !!!The following code mimics a .net core critical section implementation!!! | |
// See also: https://github.com/dotnet/coreclr/blob/release/3.0/src/pal/src/sync/cs.cpp | |
// Helper to mimic https://docs.microsoft.com/en-us/windows/win32/api/winnt/nf-winnt-interlockedcompareexchange | |
// NOTE InterlockedCompareExchange as Exchange before Comperand | |
typedef int32_t LONG; | |
LONG InterlockedCompareExchange( | |
LONG volatile *Destination, | |
LONG Exchange, | |
LONG Comperand) | |
{ | |
LONG result = | |
__sync_val_compare_and_swap( | |
Destination, /* The pointer to a variable whose value is to be compared with. */ | |
Comperand, /* The value to be compared */ | |
Exchange /* The value to be stored */); | |
return result; | |
} | |
LONG InterlockedIncrement(LONG volatile *Destination) | |
{ | |
LONG result = __sync_fetch_and_add(Destination, 1); | |
return result; | |
} | |
#define TRUE 1 | |
#define FALSE 0 | |
#define LOCK_BIT 0x1L | |
#define LOCK_AWAKENED_WAITER 0x2L | |
#define LOCK_WAITER_INC 0x4L | |
typedef struct CriticalSection | |
{ | |
// lock_data: | |
// 1) last bit is lock bit (0x1 or LOCK_BIT), | |
// set by a succes EnterCS, and unset by LeaveCS. | |
// 2) 2nd bit is awaken waiter bit (0x2 or LOCK_AWAKENED_WAITER) | |
// set by LeaveCS when it detects there are waiter, and sent | |
// the signal to wake up one waiter. unset by the waiter | |
// actually woke by signaler. | |
// 3) waiter count is maintained by the rest, e.g. | |
// lock_data >> 2 == waiter count. | |
volatile LONG lock_data; | |
pthread_mutex_t mutex; | |
pthread_cond_t condition; | |
int iPredicate; | |
} CriticalSection; | |
typedef enum WaiterReturnState { | |
ReturnWaiterAwakened, | |
WaiterDidntWait | |
} WaiterReturnState; | |
void DoActualWait(CriticalSection *cs) { | |
int iRet; | |
iRet = pthread_mutex_lock(&cs->mutex); | |
assert(iRet == 0); | |
while (0 == cs->iPredicate) { | |
iRet = pthread_cond_wait(&cs->condition, &cs->mutex); | |
assert(iRet == 0); | |
} | |
cs->iPredicate = 0; | |
iRet = pthread_mutex_unlock(&cs->mutex); | |
assert(iRet == 0); | |
} | |
WaiterReturnState WaitOnCS(CriticalSection *cs, LONG lWaitInc) { | |
LONG lVal, lNewVal, lOldVal; | |
LONG localOrder; | |
// spin until waiter count is added to g_cslock_data | |
do { | |
lVal = cs->lock_data; | |
// since we have to load lock_data anyways, we can do another check | |
// if not locked, let's bail out indicating wait didn't happen | |
if (0 == (lVal & LOCK_BIT)) return WaiterDidntWait; | |
// we use +, instead of |, since this is incrementing waiter count | |
lNewVal = lVal + lWaitInc; | |
lOldVal = InterlockedCompareExchange(&cs->lock_data, lNewVal, lVal); | |
} while (lOldVal != lVal); | |
DoActualWait(cs); | |
return ReturnWaiterAwakened; | |
} | |
void WakeupOneWaiter(CriticalSection *cs) { | |
int iRet; | |
iRet = pthread_mutex_lock(&cs->mutex); | |
assert(iRet == 0); | |
cs->iPredicate = 1; | |
pthread_cond_signal(&cs->condition); | |
assert(iRet == 0); | |
pthread_mutex_unlock(&cs->mutex); | |
assert(iRet == 0); | |
} | |
void InitCS(CriticalSection *cs) { | |
cs->lock_data = 0; | |
cs->iPredicate = 0; | |
int iRet; | |
iRet = pthread_mutex_init(&cs->mutex, NULL); | |
assert(iRet == 0); | |
iRet = pthread_cond_init(&cs->condition, NULL); | |
assert(iRet == 0); | |
} | |
void EnterCS(CriticalSection *cs) { | |
LONG lVal, lNewVal, lOldVal; | |
LONG lBitsToChange = LOCK_BIT; // 0x1 | |
LONG lWaitInc = LOCK_WAITER_INC; // 0x4 | |
WaiterReturnState rs; | |
LONG localOrder; | |
while (TRUE) | |
{ | |
lVal = cs->lock_data; | |
while (0 == (lVal & LOCK_BIT)) { | |
// CS is not locked: try to lock it | |
lNewVal = lVal ^ lBitsToChange; | |
lOldVal = InterlockedCompareExchange(&cs->lock_data, lNewVal, lVal); | |
if (lOldVal == lVal) goto CS_entered; | |
// Some thread raced with us, update value for next loop | |
lVal = lOldVal; | |
} | |
rs = WaitOnCS(cs, lWaitInc); | |
if (rs == ReturnWaiterAwakened) { | |
// Set the lock bit, and at the same time reset the awakened waiter bit | |
lBitsToChange = LOCK_BIT | LOCK_AWAKENED_WAITER; // 0x3 | |
// For thread returned from waiting, 0x2 is decremented by | |
// the pthread_cond_signal thread, so this thread only need to do 0x2 | |
lWaitInc = LOCK_AWAKENED_WAITER; // 0x2 | |
} | |
} | |
CS_entered: | |
return; | |
} | |
void LeaveCS(CriticalSection *cs) { | |
LONG lVal, lNewVal, lOldVal; | |
lVal = cs->lock_data; | |
LONG localOrder; | |
while (TRUE) { | |
if (lVal == LOCK_BIT || lVal & LOCK_AWAKENED_WAITER) { | |
// if waiter count == 0, or a waiter has already been awakened | |
// simply reset the lock bit and return | |
lNewVal = lVal & ~LOCK_BIT; | |
// try unlock | |
lOldVal = InterlockedCompareExchange(&cs->lock_data, lNewVal, lVal); | |
if (lOldVal == lVal) goto CS_left; | |
} | |
else { | |
// decrement waiter count, set awakened waiter bit, unset lock bit | |
lNewVal = lVal - LOCK_WAITER_INC + LOCK_AWAKENED_WAITER - LOCK_BIT; | |
// try unlock | |
lOldVal = InterlockedCompareExchange(&cs->lock_data, lNewVal, lVal); | |
if (lOldVal == lVal) { | |
WakeupOneWaiter(cs); | |
goto CS_left; | |
} | |
} | |
lVal = lOldVal; | |
} | |
CS_left: | |
return; | |
} | |
// !!!The following code try to repro the pthread_cond_wait/signal bug!!! | |
int total_threads = 4; | |
int g_counter = 0; | |
LONG loop_round = 0; | |
LONG threads_finished = 0; | |
CriticalSection g_cs; | |
int __thread ThreadId; | |
void *LoopCriticalSectionThread(void *val) | |
{ | |
ThreadId = (int)(long)val; | |
printf("LoopCriticalSectionThread - %d started\n", ThreadId); | |
LONG private_round = 0; | |
while (TRUE) | |
{ | |
// Enter CS | |
EnterCS(&g_cs); | |
for (int i = 0; i < 1000; ++i) g_counter += ThreadId; | |
LeaveCS(&g_cs); | |
// Mark this thread finished | |
InterlockedIncrement(&threads_finished); | |
// Before start the next iteration, wait until all other | |
// threads to finish this round | |
while (private_round == loop_round) | |
{ | |
sleep(0); | |
} | |
assert(private_round + 1 == loop_round); | |
++private_round; | |
} | |
} | |
void *RefereeThread(void *val) | |
{ | |
printf("RefereeThread - %s started\n", (char *)val); | |
while (TRUE) | |
{ | |
if (total_threads == InterlockedCompareExchange(&threads_finished, 0, total_threads)) | |
{ | |
sleep(0); | |
// if all threads finished, reset the counter | |
// and increment the loop_round, to kick of another round | |
InterlockedIncrement(&loop_round); | |
} | |
sleep(0); | |
} | |
} | |
void MonitorCounter() | |
{ | |
while (TRUE) | |
{ | |
sleep(2); | |
printf("Monitor - g_counter %d, loop_round %d, threads_finished %d\n", g_counter, loop_round, threads_finished); | |
} | |
} | |
int main(int argc, char **argv) | |
{ | |
if (argc >= 2) | |
total_threads = atoi(argv[1]); | |
else | |
total_threads = 1.5 * sysconf(_SC_NPROCESSORS_ONLN); | |
printf("Total Threads Count; %d\n", total_threads); | |
// Initialize CS and try once | |
InitCS(&g_cs); | |
EnterCS(&g_cs); | |
LeaveCS(&g_cs); | |
int tid = 0; | |
int iRet; | |
pthread_t threads[total_threads + 1]; | |
// Create threads | |
iRet = pthread_create(&threads[tid++], NULL, RefereeThread, NULL); | |
for (int i = 0; i < total_threads; ++i) | |
{ | |
iRet = pthread_create(&threads[tid++], NULL, LoopCriticalSectionThread, (void *)(long)(i + 1)); | |
assert(iRet == 0); | |
} | |
// Monitor the counter | |
MonitorCounter(); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
It can be reproduced on my machine, but is it really a bug of pthread? Consider the following scenario:
For simplicity, we assume that there are only two threads. Thread A arrived L92, and switched to thread B, which blocked at L158. Then thread B got the
cs->lock_data
and went to L172(becausecs->lock_data
has been added 4 just a moment ago by thread A). After that thread B continued to run for a while, and it calledpthread_cond_signal
before thread A has calledpthread_cond_wait
. Finally, thread A would always wait a signal.