Robert Shearman : ntdll: Start of pooling support for RtlQueueWorkItem.

Alexandre Julliard julliard at wine.codeweavers.com
Tue Mar 21 13:21:19 CST 2006


Module: wine
Branch: refs/heads/master
Commit: f40f81b6d5e120529b6ef61aecf0b1068ff6ee8c
URL:    http://source.winehq.org/git/?p=wine.git;a=commit;h=f40f81b6d5e120529b6ef61aecf0b1068ff6ee8c

Author: Robert Shearman <rob at codeweavers.com>
Date:   Tue Mar 21 13:44:14 2006 +0000

ntdll: Start of pooling support for RtlQueueWorkItem.

Start of pooling support for RtlQueueWorkItem. The algorithm
implmented is very simple - if there are no free threads, create a new
one.

---

 dlls/ntdll/threadpool.c |  137 +++++++++++++++++++++++++++++++++++++++++++----
 1 files changed, 124 insertions(+), 13 deletions(-)

diff --git a/dlls/ntdll/threadpool.c b/dlls/ntdll/threadpool.c
index 10bda48..15726bb 100644
--- a/dlls/ntdll/threadpool.c
+++ b/dlls/ntdll/threadpool.c
@@ -18,7 +18,11 @@
  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
  */
 
+#include "config.h"
+#include "wine/port.h"
+
 #include <stdarg.h>
+#include <limits.h>
 
 #define NONAMELESSUNION
 #include "ntstatus.h"
@@ -26,35 +30,123 @@
 #include "winternl.h"
 
 #include "wine/debug.h"
+#include "wine/list.h"
+
 #include "ntdll_misc.h"
 
 WINE_DEFAULT_DEBUG_CHANNEL(threadpool);
 
+#define WORKER_TIMEOUT 30000 /* 30 seconds */
+
+static LONG num_workers;
+static LONG num_work_items;
+static LONG num_busy_workers;
+
+static struct list work_item_list = LIST_INIT(work_item_list);
+static HANDLE work_item_event;
+
+static CRITICAL_SECTION threadpool_cs;
+static CRITICAL_SECTION_DEBUG critsect_debug =
+{
+    0, 0, &threadpool_cs,
+    { &critsect_debug.ProcessLocksList, &critsect_debug.ProcessLocksList },
+    0, 0, { (DWORD_PTR)(__FILE__ ": threadpool_cs") }
+};
+static CRITICAL_SECTION threadpool_cs = { &critsect_debug, -1, 0, 0, 0, 0 };
+
 struct work_item
 {
+    struct list entry;
     PRTL_WORK_ITEM_ROUTINE function;
     PVOID context;
 };
 
+inline static LONG interlocked_inc( PLONG dest )
+{
+    return interlocked_xchg_add( (int *)dest, 1 ) + 1;
+}
+
+inline static LONG interlocked_dec( PLONG dest )
+{
+    return interlocked_xchg_add( (int *)dest, -1 ) - 1;
+}
+
 static void WINAPI worker_thread_proc(void * param)
 {
-    struct work_item *work_item_ptr = (struct work_item *)param;
-    struct work_item work_item;
+    interlocked_inc(&num_workers);
 
     /* free the work item memory sooner to reduce memory usage */
-    work_item = *work_item_ptr;
-    RtlFreeHeap(GetProcessHeap(), 0, work_item_ptr);
-
-    TRACE("executing %p(%p)\n", work_item.function, work_item.context);
+    while (TRUE)
+    {
+        if (num_work_items > 0)
+        {
+            struct list *item;
+            RtlEnterCriticalSection(&threadpool_cs);
+            item = list_head(&work_item_list);
+            if (item)
+            {
+                struct work_item *work_item_ptr = LIST_ENTRY(item, struct work_item, entry);
+                struct work_item work_item;
+                list_remove(&work_item_ptr->entry);
+                interlocked_dec(&num_work_items);
+
+                RtlLeaveCriticalSection(&threadpool_cs);
+
+                work_item = *work_item_ptr;
+                RtlFreeHeap(GetProcessHeap(), 0, work_item_ptr);
+
+                TRACE("executing %p(%p)\n", work_item.function, work_item.context);
+
+                interlocked_inc(&num_busy_workers);
+
+                /* do the work */
+                work_item.function(work_item.context);
+
+                interlocked_dec(&num_busy_workers);
+            }
+            else
+                RtlLeaveCriticalSection(&threadpool_cs);
+        }
+        else
+        {
+            NTSTATUS status;
+            LARGE_INTEGER timeout;
+            timeout.QuadPart = -(WORKER_TIMEOUT * (ULONGLONG)10000);
+            status = NtWaitForSingleObject(work_item_event, FALSE, &timeout);
+            if (status != STATUS_WAIT_0)
+                break;
+        }
+    }
 
-    /* do the work */
-    work_item.function(work_item.context);
+    interlocked_dec(&num_workers);
 
     RtlExitUserThread(0);
 
     /* never reached */
 }
 
+static NTSTATUS add_work_item_to_queue(struct work_item *work_item)
+{
+    NTSTATUS status;
+
+    RtlEnterCriticalSection(&threadpool_cs);
+    list_add_tail(&work_item_list, &work_item->entry);
+    num_work_items++;
+    RtlLeaveCriticalSection(&threadpool_cs);
+
+    if (!work_item_event)
+    {
+        HANDLE sem;
+        status = NtCreateSemaphore(&sem, SEMAPHORE_ALL_ACCESS, NULL, 1, LONG_MAX);
+        if (interlocked_cmpxchg_ptr( (PVOID *)&work_item_event, (PVOID)sem, 0 ))
+            NtClose(sem);  /* somebody beat us to it */
+    }
+    else
+        status = NtReleaseSemaphore(work_item_event, 1, NULL);
+
+    return status;
+}
+
 /***********************************************************************
  *              RtlQueueWorkItem   (NTDLL.@)
  *
@@ -92,16 +184,35 @@ NTSTATUS WINAPI RtlQueueWorkItem(PRTL_WO
     if (Flags != WT_EXECUTEDEFAULT)
         FIXME("Flags 0x%lx not supported\n", Flags);
 
-    /* FIXME: very crude implementation that doesn't support pooling at all */
-    status = RtlCreateUserThread( GetCurrentProcess(), NULL, FALSE,
-                                  NULL, 0, 0,
-                                  worker_thread_proc, work_item, &thread, NULL );
+    status = add_work_item_to_queue(work_item);
+
+    if ((status == STATUS_SUCCESS) &&
+        ((num_workers == 0) || (num_workers == num_busy_workers)))
+    {
+        status = RtlCreateUserThread( GetCurrentProcess(), NULL, FALSE,
+                                    NULL, 0, 0,
+                                    worker_thread_proc, NULL, &thread, NULL );
+        if (status == STATUS_SUCCESS)
+            NtClose( thread );
+
+        /* NOTE: we don't care if we couldn't create the thread if there is at
+         * least one other available to process the request */
+        if ((num_workers > 0) && (status != STATUS_SUCCESS))
+            status = STATUS_SUCCESS;
+    }
+
     if (status != STATUS_SUCCESS)
     {
+        RtlEnterCriticalSection(&threadpool_cs);
+
+        interlocked_dec(&num_work_items);
+        list_remove(&work_item->entry);
         RtlFreeHeap(GetProcessHeap(), 0, work_item);
+
+        RtlLeaveCriticalSection(&threadpool_cs);
+
         return status;
     }
-    NtClose( thread );
 
     return STATUS_SUCCESS;
 }




More information about the wine-cvs mailing list