Robert Shearman : ntdll: Start of pooling support for
RtlQueueWorkItem.
Alexandre Julliard
julliard at wine.codeweavers.com
Tue Mar 21 13:21:19 CST 2006
Module: wine
Branch: refs/heads/master
Commit: f40f81b6d5e120529b6ef61aecf0b1068ff6ee8c
URL: http://source.winehq.org/git/?p=wine.git;a=commit;h=f40f81b6d5e120529b6ef61aecf0b1068ff6ee8c
Author: Robert Shearman <rob at codeweavers.com>
Date: Tue Mar 21 13:44:14 2006 +0000
ntdll: Start of pooling support for RtlQueueWorkItem.
Start of pooling support for RtlQueueWorkItem. The algorithm
implmented is very simple - if there are no free threads, create a new
one.
---
dlls/ntdll/threadpool.c | 137 +++++++++++++++++++++++++++++++++++++++++++----
1 files changed, 124 insertions(+), 13 deletions(-)
diff --git a/dlls/ntdll/threadpool.c b/dlls/ntdll/threadpool.c
index 10bda48..15726bb 100644
--- a/dlls/ntdll/threadpool.c
+++ b/dlls/ntdll/threadpool.c
@@ -18,7 +18,11 @@
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
+#include "config.h"
+#include "wine/port.h"
+
#include <stdarg.h>
+#include <limits.h>
#define NONAMELESSUNION
#include "ntstatus.h"
@@ -26,35 +30,123 @@
#include "winternl.h"
#include "wine/debug.h"
+#include "wine/list.h"
+
#include "ntdll_misc.h"
WINE_DEFAULT_DEBUG_CHANNEL(threadpool);
+#define WORKER_TIMEOUT 30000 /* 30 seconds */
+
+static LONG num_workers;
+static LONG num_work_items;
+static LONG num_busy_workers;
+
+static struct list work_item_list = LIST_INIT(work_item_list);
+static HANDLE work_item_event;
+
+static CRITICAL_SECTION threadpool_cs;
+static CRITICAL_SECTION_DEBUG critsect_debug =
+{
+ 0, 0, &threadpool_cs,
+ { &critsect_debug.ProcessLocksList, &critsect_debug.ProcessLocksList },
+ 0, 0, { (DWORD_PTR)(__FILE__ ": threadpool_cs") }
+};
+static CRITICAL_SECTION threadpool_cs = { &critsect_debug, -1, 0, 0, 0, 0 };
+
struct work_item
{
+ struct list entry;
PRTL_WORK_ITEM_ROUTINE function;
PVOID context;
};
+inline static LONG interlocked_inc( PLONG dest )
+{
+ return interlocked_xchg_add( (int *)dest, 1 ) + 1;
+}
+
+inline static LONG interlocked_dec( PLONG dest )
+{
+ return interlocked_xchg_add( (int *)dest, -1 ) - 1;
+}
+
static void WINAPI worker_thread_proc(void * param)
{
- struct work_item *work_item_ptr = (struct work_item *)param;
- struct work_item work_item;
+ interlocked_inc(&num_workers);
/* free the work item memory sooner to reduce memory usage */
- work_item = *work_item_ptr;
- RtlFreeHeap(GetProcessHeap(), 0, work_item_ptr);
-
- TRACE("executing %p(%p)\n", work_item.function, work_item.context);
+ while (TRUE)
+ {
+ if (num_work_items > 0)
+ {
+ struct list *item;
+ RtlEnterCriticalSection(&threadpool_cs);
+ item = list_head(&work_item_list);
+ if (item)
+ {
+ struct work_item *work_item_ptr = LIST_ENTRY(item, struct work_item, entry);
+ struct work_item work_item;
+ list_remove(&work_item_ptr->entry);
+ interlocked_dec(&num_work_items);
+
+ RtlLeaveCriticalSection(&threadpool_cs);
+
+ work_item = *work_item_ptr;
+ RtlFreeHeap(GetProcessHeap(), 0, work_item_ptr);
+
+ TRACE("executing %p(%p)\n", work_item.function, work_item.context);
+
+ interlocked_inc(&num_busy_workers);
+
+ /* do the work */
+ work_item.function(work_item.context);
+
+ interlocked_dec(&num_busy_workers);
+ }
+ else
+ RtlLeaveCriticalSection(&threadpool_cs);
+ }
+ else
+ {
+ NTSTATUS status;
+ LARGE_INTEGER timeout;
+ timeout.QuadPart = -(WORKER_TIMEOUT * (ULONGLONG)10000);
+ status = NtWaitForSingleObject(work_item_event, FALSE, &timeout);
+ if (status != STATUS_WAIT_0)
+ break;
+ }
+ }
- /* do the work */
- work_item.function(work_item.context);
+ interlocked_dec(&num_workers);
RtlExitUserThread(0);
/* never reached */
}
+static NTSTATUS add_work_item_to_queue(struct work_item *work_item)
+{
+ NTSTATUS status;
+
+ RtlEnterCriticalSection(&threadpool_cs);
+ list_add_tail(&work_item_list, &work_item->entry);
+ num_work_items++;
+ RtlLeaveCriticalSection(&threadpool_cs);
+
+ if (!work_item_event)
+ {
+ HANDLE sem;
+ status = NtCreateSemaphore(&sem, SEMAPHORE_ALL_ACCESS, NULL, 1, LONG_MAX);
+ if (interlocked_cmpxchg_ptr( (PVOID *)&work_item_event, (PVOID)sem, 0 ))
+ NtClose(sem); /* somebody beat us to it */
+ }
+ else
+ status = NtReleaseSemaphore(work_item_event, 1, NULL);
+
+ return status;
+}
+
/***********************************************************************
* RtlQueueWorkItem (NTDLL.@)
*
@@ -92,16 +184,35 @@ NTSTATUS WINAPI RtlQueueWorkItem(PRTL_WO
if (Flags != WT_EXECUTEDEFAULT)
FIXME("Flags 0x%lx not supported\n", Flags);
- /* FIXME: very crude implementation that doesn't support pooling at all */
- status = RtlCreateUserThread( GetCurrentProcess(), NULL, FALSE,
- NULL, 0, 0,
- worker_thread_proc, work_item, &thread, NULL );
+ status = add_work_item_to_queue(work_item);
+
+ if ((status == STATUS_SUCCESS) &&
+ ((num_workers == 0) || (num_workers == num_busy_workers)))
+ {
+ status = RtlCreateUserThread( GetCurrentProcess(), NULL, FALSE,
+ NULL, 0, 0,
+ worker_thread_proc, NULL, &thread, NULL );
+ if (status == STATUS_SUCCESS)
+ NtClose( thread );
+
+ /* NOTE: we don't care if we couldn't create the thread if there is at
+ * least one other available to process the request */
+ if ((num_workers > 0) && (status != STATUS_SUCCESS))
+ status = STATUS_SUCCESS;
+ }
+
if (status != STATUS_SUCCESS)
{
+ RtlEnterCriticalSection(&threadpool_cs);
+
+ interlocked_dec(&num_work_items);
+ list_remove(&work_item->entry);
RtlFreeHeap(GetProcessHeap(), 0, work_item);
+
+ RtlLeaveCriticalSection(&threadpool_cs);
+
return status;
}
- NtClose( thread );
return STATUS_SUCCESS;
}
More information about the wine-cvs
mailing list