Sebastian Lackner : ntdll: Use condition variable for RtlQueueWorkItem implementation.

Alexandre Julliard julliard at winehq.org
Wed Mar 12 15:23:43 CDT 2014


Module: wine
Branch: master
Commit: 7ba4d119c1674562a3f6f615986ba77d55a8ec9b
URL:    http://source.winehq.org/git/wine.git/?a=commit;h=7ba4d119c1674562a3f6f615986ba77d55a8ec9b

Author: Sebastian Lackner <sebastian at fds-team.de>
Date:   Sun Mar  9 04:04:53 2014 +0100

ntdll: Use condition variable for RtlQueueWorkItem implementation.

---

 dlls/ntdll/threadpool.c |  147 ++++++++++++++++++-----------------------------
 1 file changed, 57 insertions(+), 90 deletions(-)

diff --git a/dlls/ntdll/threadpool.c b/dlls/ntdll/threadpool.c
index e2fc6a5..f66f6ab 100644
--- a/dlls/ntdll/threadpool.c
+++ b/dlls/ntdll/threadpool.c
@@ -39,12 +39,13 @@ WINE_DEFAULT_DEBUG_CHANNEL(threadpool);
 
 #define WORKER_TIMEOUT 30000 /* 30 seconds */
 
+/* threadpool_cs must be held while modifying the following elements */
+static struct list work_item_list = LIST_INIT(work_item_list);
 static LONG num_workers;
-static LONG num_work_items;
 static LONG num_busy_workers;
+static LONG num_items_processed;
 
-static struct list work_item_list = LIST_INIT(work_item_list);
-static HANDLE work_item_event;
+static RTL_CONDITION_VARIABLE threadpool_cond = RTL_CONDITION_VARIABLE_INIT;
 
 static RTL_CRITICAL_SECTION threadpool_cs;
 static RTL_CRITICAL_SECTION_DEBUG critsect_debug =
@@ -84,80 +85,44 @@ static inline LONG interlocked_dec( PLONG dest )
 
 static void WINAPI worker_thread_proc(void * param)
 {
-    interlocked_inc(&num_workers);
-
-    /* free the work item memory sooner to reduce memory usage */
-    while (TRUE)
-    {
-        if (num_work_items > 0)
-        {
-            struct list *item;
-            RtlEnterCriticalSection(&threadpool_cs);
-            item = list_head(&work_item_list);
-            if (item)
-            {
-                struct work_item *work_item_ptr = LIST_ENTRY(item, struct work_item, entry);
-                struct work_item work_item;
-                list_remove(&work_item_ptr->entry);
-                interlocked_dec(&num_work_items);
-
-                RtlLeaveCriticalSection(&threadpool_cs);
-
-                work_item = *work_item_ptr;
-                RtlFreeHeap(GetProcessHeap(), 0, work_item_ptr);
-
-                TRACE("executing %p(%p)\n", work_item.function, work_item.context);
-
-                interlocked_inc(&num_busy_workers);
+    struct list *item;
+    struct work_item *work_item_ptr, work_item;
+    LARGE_INTEGER timeout;
+    timeout.QuadPart = -(WORKER_TIMEOUT * (ULONGLONG)10000);
 
-                /* do the work */
-                work_item.function(work_item.context);
+    RtlEnterCriticalSection( &threadpool_cs );
+    num_workers++;
 
-                interlocked_dec(&num_busy_workers);
-            }
-            else
-                RtlLeaveCriticalSection(&threadpool_cs);
-        }
-        else
+    for (;;)
+    {
+        if ((item = list_head( &work_item_list )))
         {
-            NTSTATUS status;
-            LARGE_INTEGER timeout;
-            timeout.QuadPart = -(WORKER_TIMEOUT * (ULONGLONG)10000);
-            status = NtWaitForSingleObject(work_item_event, FALSE, &timeout);
-            if (status != STATUS_WAIT_0)
-                break;
+            work_item_ptr = LIST_ENTRY( item, struct work_item, entry );
+            list_remove( &work_item_ptr->entry );
+            num_busy_workers++;
+            num_items_processed++;
+            RtlLeaveCriticalSection( &threadpool_cs );
+
+            /* copy item to stack and do the work */
+            work_item = *work_item_ptr;
+            RtlFreeHeap( GetProcessHeap(), 0, work_item_ptr );
+            TRACE("executing %p(%p)\n", work_item.function, work_item.context);
+            work_item.function( work_item.context );
+
+            RtlEnterCriticalSection( &threadpool_cs );
+            num_busy_workers--;
         }
+        else if (RtlSleepConditionVariableCS( &threadpool_cond, &threadpool_cs, &timeout ) != STATUS_SUCCESS)
+            break;
     }
 
-    interlocked_dec(&num_workers);
-
-    RtlExitUserThread(0);
+    num_workers--;
+    RtlLeaveCriticalSection( &threadpool_cs );
+    RtlExitUserThread( 0 );
 
     /* never reached */
 }
 
-static NTSTATUS add_work_item_to_queue(struct work_item *work_item)
-{
-    NTSTATUS status;
-
-    RtlEnterCriticalSection(&threadpool_cs);
-    list_add_tail(&work_item_list, &work_item->entry);
-    num_work_items++;
-    RtlLeaveCriticalSection(&threadpool_cs);
-
-    if (!work_item_event)
-    {
-        HANDLE sem;
-        status = NtCreateSemaphore(&sem, SEMAPHORE_ALL_ACCESS, NULL, 1, INT_MAX);
-        if (interlocked_cmpxchg_ptr( &work_item_event, sem, 0 ))
-            NtClose(sem);  /* somebody beat us to it */
-    }
-    else
-        status = NtReleaseSemaphore(work_item_event, 1, NULL);
-
-    return status;
-}
-
 /***********************************************************************
  *              RtlQueueWorkItem   (NTDLL.@)
  *
@@ -184,6 +149,7 @@ NTSTATUS WINAPI RtlQueueWorkItem(PRTL_WORK_ITEM_ROUTINE Function, PVOID Context,
 {
     HANDLE thread;
     NTSTATUS status;
+    LONG items_processed;
     struct work_item *work_item = RtlAllocateHeap(GetProcessHeap(), 0, sizeof(struct work_item));
 
     if (!work_item)
@@ -195,39 +161,40 @@ NTSTATUS WINAPI RtlQueueWorkItem(PRTL_WORK_ITEM_ROUTINE Function, PVOID Context,
     if (Flags & ~WT_EXECUTELONGFUNCTION)
         FIXME("Flags 0x%x not supported\n", Flags);
 
-    status = add_work_item_to_queue(work_item);
+    RtlEnterCriticalSection( &threadpool_cs );
+    list_add_tail( &work_item_list, &work_item->entry );
+    status = (num_workers > num_busy_workers) ? STATUS_SUCCESS : STATUS_UNSUCCESSFUL;
+    items_processed = num_items_processed;
+    RtlLeaveCriticalSection( &threadpool_cs );
 
     /* FIXME: tune this algorithm to not be as aggressive with creating threads
      * if WT_EXECUTELONGFUNCTION isn't specified */
-    if ((status == STATUS_SUCCESS) &&
-        ((num_workers == 0) || (num_workers == num_busy_workers)))
+    if (status == STATUS_SUCCESS)
+        RtlWakeConditionVariable( &threadpool_cond );
+    else
     {
-        status = RtlCreateUserThread( GetCurrentProcess(), NULL, FALSE,
-                                    NULL, 0, 0,
-                                    worker_thread_proc, NULL, &thread, NULL );
-        if (status == STATUS_SUCCESS)
-            NtClose( thread );
+        status = RtlCreateUserThread( GetCurrentProcess(), NULL, FALSE, NULL, 0, 0,
+                                      worker_thread_proc, NULL, &thread, NULL );
 
         /* NOTE: we don't care if we couldn't create the thread if there is at
          * least one other available to process the request */
-        if ((num_workers > 0) && (status != STATUS_SUCCESS))
-            status = STATUS_SUCCESS;
-    }
-
-    if (status != STATUS_SUCCESS)
-    {
-        RtlEnterCriticalSection(&threadpool_cs);
-
-        interlocked_dec(&num_work_items);
-        list_remove(&work_item->entry);
-        RtlFreeHeap(GetProcessHeap(), 0, work_item);
-
-        RtlLeaveCriticalSection(&threadpool_cs);
+        if (status == STATUS_SUCCESS)
+            NtClose( thread );
+        else
+        {
+            RtlEnterCriticalSection( &threadpool_cs );
+            if (num_workers > 0 || num_items_processed != items_processed)
+                status = STATUS_SUCCESS;
+            else
+                list_remove( &work_item->entry );
+            RtlLeaveCriticalSection( &threadpool_cs );
 
-        return status;
+            if (status != STATUS_SUCCESS)
+                RtlFreeHeap( GetProcessHeap(), 0, work_item );
+        }
     }
 
-    return STATUS_SUCCESS;
+    return status;
 }
 
 /***********************************************************************




More information about the wine-cvs mailing list