diff --git a/include/my_pthread.h b/include/my_pthread.h index 30c48b42c7f..d64b5348666 100644 --- a/include/my_pthread.h +++ b/include/my_pthread.h @@ -80,11 +80,17 @@ typedef struct st_pthread_link { typedef struct { uint32 waiting; -#ifdef OS2 - HEV semaphore; -#else - HANDLE semaphore; -#endif + CRITICAL_SECTION lock_waiting; + + enum { + SIGNAL= 0, + BROADCAST= 1, + MAX_EVENTS= 2 + } EVENTS; + + HANDLE events[MAX_EVENTS]; + HANDLE broadcast_block_event; + } pthread_cond_t; typedef int pthread_mutexattr_t; diff --git a/mysys/my_wincond.c b/mysys/my_wincond.c index ed8b715cb85..353b2fced4e 100644 --- a/mysys/my_wincond.c +++ b/mysys/my_wincond.c @@ -27,27 +27,48 @@ int pthread_cond_init(pthread_cond_t *cond, const pthread_condattr_t *attr) { - cond->waiting=0; - cond->semaphore=CreateSemaphore(NULL,0,0x7FFFFFFF,NullS); - if (!cond->semaphore) + cond->waiting= 0; + InitializeCriticalSection(&cond->lock_waiting); + + cond->events[SIGNAL]= CreateEvent(NULL, /* no security */ + FALSE, /* auto-reset event */ + FALSE, /* non-signaled initially */ + NULL); /* unnamed */ + + /* Create a manual-reset event. */ + cond->events[BROADCAST]= CreateEvent(NULL, /* no security */ + TRUE, /* manual-reset */ + FALSE, /* non-signaled initially */ + NULL); /* unnamed */ + + + cond->broadcast_block_event= CreateEvent(NULL, /* no security */ + TRUE, /* manual-reset */ + TRUE, /* signaled initially */ + NULL); /* unnamed */ + + if( cond->events[SIGNAL] == NULL || + cond->events[BROADCAST] == NULL || + cond->broadcast_block_event == NULL ) return ENOMEM; return 0; } int pthread_cond_destroy(pthread_cond_t *cond) { - return CloseHandle(cond->semaphore) ? 0 : EINVAL; + DeleteCriticalSection(&cond->lock_waiting); + + if (CloseHandle(cond->events[SIGNAL]) == 0 || + CloseHandle(cond->events[BROADCAST]) == 0 || + CloseHandle(cond->broadcast_block_event) == 0) + return EINVAL; + return 0; } int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex) { - InterlockedIncrement(&cond->waiting); - LeaveCriticalSection(mutex); - WaitForSingleObject(cond->semaphore,INFINITE); - InterlockedDecrement(&cond->waiting); - EnterCriticalSection(mutex); - return 0 ; + return pthread_cond_timedwait(cond,mutex,NULL); } @@ -57,52 +78,104 @@ int pthread_cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *mutex, int result; long timeout; union ft64 now; - - GetSystemTimeAsFileTime(&now.ft); - /* - Calculate time left to abstime - - subtract start time from current time(values are in 100ns units) - - convert to millisec by dividing with 10000 + if( abstime != NULL ) + { + GetSystemTimeAsFileTime(&now.ft); + + /* + Calculate time left to abstime + - subtract start time from current time(values are in 100ns units) + - convert to millisec by dividing with 10000 + */ + timeout= (long)((abstime->tv.i64 - now.i64) / 10000); + + /* Don't allow the timeout to be negative */ + if (timeout < 0) + timeout= 0L; + + /* + Make sure the calucated timeout does not exceed original timeout + value which could cause "wait for ever" if system time changes + */ + if (timeout > abstime->max_timeout_msec) + timeout= abstime->max_timeout_msec; + + } + else + { + /* No time specified; don't expire */ + timeout= INFINITE; + } + + /* + Block access if previous broadcast hasn't finished. + This is just for safety and should normally not + affect the total time spent in this function. */ - timeout= (long)((abstime->tv.i64 - now.i64) / 10000); - - /* Don't allow the timeout to be negative */ - if (timeout < 0) - timeout= 0L; + WaitForSingleObject(cond->broadcast_block_event, INFINITE); - /* - Make sure the calucated timeout does not exceed original timeout - value which could cause "wait for ever" if system time changes - */ - if (timeout > abstime->max_timeout_msec) - timeout= abstime->max_timeout_msec; + EnterCriticalSection(&cond->lock_waiting); + cond->waiting++; + LeaveCriticalSection(&cond->lock_waiting); - InterlockedIncrement(&cond->waiting); LeaveCriticalSection(mutex); - result= WaitForSingleObject(cond->semaphore,timeout); - InterlockedDecrement(&cond->waiting); + + result= WaitForMultipleObjects(2, cond->events, FALSE, timeout); + + EnterCriticalSection(&cond->lock_waiting); + cond->waiting--; + + if (cond->waiting == 0 && result == (WAIT_OBJECT_0+BROADCAST)) + { + /* + We're the last waiter to be notified or to stop waiting, so + reset the manual event. + */ + /* Close broadcast gate */ + ResetEvent(cond->events[BROADCAST]); + /* Open block gate */ + SetEvent(cond->broadcast_block_event); + } + LeaveCriticalSection(&cond->lock_waiting); + EnterCriticalSection(mutex); return result == WAIT_TIMEOUT ? ETIMEDOUT : 0; } - int pthread_cond_signal(pthread_cond_t *cond) { - long prev_count; - if (cond->waiting) - ReleaseSemaphore(cond->semaphore,1,&prev_count); + EnterCriticalSection(&cond->lock_waiting); + + if(cond->waiting > 0) + SetEvent(cond->events[SIGNAL]); + + LeaveCriticalSection(&cond->lock_waiting); + return 0; } int pthread_cond_broadcast(pthread_cond_t *cond) { - long prev_count; - if (cond->waiting) - ReleaseSemaphore(cond->semaphore,cond->waiting,&prev_count); - return 0 ; + EnterCriticalSection(&cond->lock_waiting); + /* + The mutex protect us from broadcasting if + there isn't any thread waiting to open the + block gate after this call has closed it. + */ + if(cond->waiting > 0) + { + /* Close block gate */ + ResetEvent(cond->broadcast_block_event); + /* Open broadcast gate */ + SetEvent(cond->events[BROADCAST]); + } + + LeaveCriticalSection(&cond->lock_waiting); + + return 0; }