Sebastian Lackner : ntdll: Use condition variable for RtlQueueWorkItem implementation.
Alexandre Julliard
julliard at winehq.org
Wed Mar 12 15:23:43 CDT 2014
Module: wine
Branch: master
Commit: 7ba4d119c1674562a3f6f615986ba77d55a8ec9b
URL: http://source.winehq.org/git/wine.git/?a=commit;h=7ba4d119c1674562a3f6f615986ba77d55a8ec9b
Author: Sebastian Lackner <sebastian at fds-team.de>
Date: Sun Mar 9 04:04:53 2014 +0100
ntdll: Use condition variable for RtlQueueWorkItem implementation.
---
dlls/ntdll/threadpool.c | 147 ++++++++++++++++++-----------------------------
1 file changed, 57 insertions(+), 90 deletions(-)
diff --git a/dlls/ntdll/threadpool.c b/dlls/ntdll/threadpool.c
index e2fc6a5..f66f6ab 100644
--- a/dlls/ntdll/threadpool.c
+++ b/dlls/ntdll/threadpool.c
@@ -39,12 +39,13 @@ WINE_DEFAULT_DEBUG_CHANNEL(threadpool);
#define WORKER_TIMEOUT 30000 /* 30 seconds */
+/* threadpool_cs must be held while modifying the following elements */
+static struct list work_item_list = LIST_INIT(work_item_list);
static LONG num_workers;
-static LONG num_work_items;
static LONG num_busy_workers;
+static LONG num_items_processed;
-static struct list work_item_list = LIST_INIT(work_item_list);
-static HANDLE work_item_event;
+static RTL_CONDITION_VARIABLE threadpool_cond = RTL_CONDITION_VARIABLE_INIT;
static RTL_CRITICAL_SECTION threadpool_cs;
static RTL_CRITICAL_SECTION_DEBUG critsect_debug =
@@ -84,80 +85,44 @@ static inline LONG interlocked_dec( PLONG dest )
static void WINAPI worker_thread_proc(void * param)
{
- interlocked_inc(&num_workers);
-
- /* free the work item memory sooner to reduce memory usage */
- while (TRUE)
- {
- if (num_work_items > 0)
- {
- struct list *item;
- RtlEnterCriticalSection(&threadpool_cs);
- item = list_head(&work_item_list);
- if (item)
- {
- struct work_item *work_item_ptr = LIST_ENTRY(item, struct work_item, entry);
- struct work_item work_item;
- list_remove(&work_item_ptr->entry);
- interlocked_dec(&num_work_items);
-
- RtlLeaveCriticalSection(&threadpool_cs);
-
- work_item = *work_item_ptr;
- RtlFreeHeap(GetProcessHeap(), 0, work_item_ptr);
-
- TRACE("executing %p(%p)\n", work_item.function, work_item.context);
-
- interlocked_inc(&num_busy_workers);
+ struct list *item;
+ struct work_item *work_item_ptr, work_item;
+ LARGE_INTEGER timeout;
+ timeout.QuadPart = -(WORKER_TIMEOUT * (ULONGLONG)10000);
- /* do the work */
- work_item.function(work_item.context);
+ RtlEnterCriticalSection( &threadpool_cs );
+ num_workers++;
- interlocked_dec(&num_busy_workers);
- }
- else
- RtlLeaveCriticalSection(&threadpool_cs);
- }
- else
+ for (;;)
+ {
+ if ((item = list_head( &work_item_list )))
{
- NTSTATUS status;
- LARGE_INTEGER timeout;
- timeout.QuadPart = -(WORKER_TIMEOUT * (ULONGLONG)10000);
- status = NtWaitForSingleObject(work_item_event, FALSE, &timeout);
- if (status != STATUS_WAIT_0)
- break;
+ work_item_ptr = LIST_ENTRY( item, struct work_item, entry );
+ list_remove( &work_item_ptr->entry );
+ num_busy_workers++;
+ num_items_processed++;
+ RtlLeaveCriticalSection( &threadpool_cs );
+
+ /* copy item to stack and do the work */
+ work_item = *work_item_ptr;
+ RtlFreeHeap( GetProcessHeap(), 0, work_item_ptr );
+ TRACE("executing %p(%p)\n", work_item.function, work_item.context);
+ work_item.function( work_item.context );
+
+ RtlEnterCriticalSection( &threadpool_cs );
+ num_busy_workers--;
}
+ else if (RtlSleepConditionVariableCS( &threadpool_cond, &threadpool_cs, &timeout ) != STATUS_SUCCESS)
+ break;
}
- interlocked_dec(&num_workers);
-
- RtlExitUserThread(0);
+ num_workers--;
+ RtlLeaveCriticalSection( &threadpool_cs );
+ RtlExitUserThread( 0 );
/* never reached */
}
-static NTSTATUS add_work_item_to_queue(struct work_item *work_item)
-{
- NTSTATUS status;
-
- RtlEnterCriticalSection(&threadpool_cs);
- list_add_tail(&work_item_list, &work_item->entry);
- num_work_items++;
- RtlLeaveCriticalSection(&threadpool_cs);
-
- if (!work_item_event)
- {
- HANDLE sem;
- status = NtCreateSemaphore(&sem, SEMAPHORE_ALL_ACCESS, NULL, 1, INT_MAX);
- if (interlocked_cmpxchg_ptr( &work_item_event, sem, 0 ))
- NtClose(sem); /* somebody beat us to it */
- }
- else
- status = NtReleaseSemaphore(work_item_event, 1, NULL);
-
- return status;
-}
-
/***********************************************************************
* RtlQueueWorkItem (NTDLL.@)
*
@@ -184,6 +149,7 @@ NTSTATUS WINAPI RtlQueueWorkItem(PRTL_WORK_ITEM_ROUTINE Function, PVOID Context,
{
HANDLE thread;
NTSTATUS status;
+ LONG items_processed;
struct work_item *work_item = RtlAllocateHeap(GetProcessHeap(), 0, sizeof(struct work_item));
if (!work_item)
@@ -195,39 +161,40 @@ NTSTATUS WINAPI RtlQueueWorkItem(PRTL_WORK_ITEM_ROUTINE Function, PVOID Context,
if (Flags & ~WT_EXECUTELONGFUNCTION)
FIXME("Flags 0x%x not supported\n", Flags);
- status = add_work_item_to_queue(work_item);
+ RtlEnterCriticalSection( &threadpool_cs );
+ list_add_tail( &work_item_list, &work_item->entry );
+ status = (num_workers > num_busy_workers) ? STATUS_SUCCESS : STATUS_UNSUCCESSFUL;
+ items_processed = num_items_processed;
+ RtlLeaveCriticalSection( &threadpool_cs );
/* FIXME: tune this algorithm to not be as aggressive with creating threads
* if WT_EXECUTELONGFUNCTION isn't specified */
- if ((status == STATUS_SUCCESS) &&
- ((num_workers == 0) || (num_workers == num_busy_workers)))
+ if (status == STATUS_SUCCESS)
+ RtlWakeConditionVariable( &threadpool_cond );
+ else
{
- status = RtlCreateUserThread( GetCurrentProcess(), NULL, FALSE,
- NULL, 0, 0,
- worker_thread_proc, NULL, &thread, NULL );
- if (status == STATUS_SUCCESS)
- NtClose( thread );
+ status = RtlCreateUserThread( GetCurrentProcess(), NULL, FALSE, NULL, 0, 0,
+ worker_thread_proc, NULL, &thread, NULL );
/* NOTE: we don't care if we couldn't create the thread if there is at
* least one other available to process the request */
- if ((num_workers > 0) && (status != STATUS_SUCCESS))
- status = STATUS_SUCCESS;
- }
-
- if (status != STATUS_SUCCESS)
- {
- RtlEnterCriticalSection(&threadpool_cs);
-
- interlocked_dec(&num_work_items);
- list_remove(&work_item->entry);
- RtlFreeHeap(GetProcessHeap(), 0, work_item);
-
- RtlLeaveCriticalSection(&threadpool_cs);
+ if (status == STATUS_SUCCESS)
+ NtClose( thread );
+ else
+ {
+ RtlEnterCriticalSection( &threadpool_cs );
+ if (num_workers > 0 || num_items_processed != items_processed)
+ status = STATUS_SUCCESS;
+ else
+ list_remove( &work_item->entry );
+ RtlLeaveCriticalSection( &threadpool_cs );
- return status;
+ if (status != STATUS_SUCCESS)
+ RtlFreeHeap( GetProcessHeap(), 0, work_item );
+ }
}
- return STATUS_SUCCESS;
+ return status;
}
/***********************************************************************
More information about the wine-cvs
mailing list