[1/8] ntdll: Implement TpSimpleTryPost and basic threadpool infrastructure. (v2)

Sebastian Lackner sebastian at fds-team.de
Tue Jun 30 19:55:27 CDT 2015


Changes in v2:
 * Moved code to dlls/ntdll/threadpool.c
 * Improve handling of thread creation and shutdown for default threadpool

If anyone has any questions about specific design decisions, or there is anything unclear
about how it is supposed to work, feel free to ask.

Note: Do not rely on the exitcode of the tests, instead manually check for "Test failed"
messages in the output. Wine has various race-conditions during multithreaded process
shutdown, so the exitcode can be zero although test failures are present.

---
 dlls/ntdll/ntdll.spec         |    3 
 dlls/ntdll/tests/threadpool.c |    3 
 dlls/ntdll/threadpool.c       |  454 ++++++++++++++++++++++++++++++++++++++++++
 3 files changed, 459 insertions(+), 1 deletion(-)

diff --git a/dlls/ntdll/ntdll.spec b/dlls/ntdll/ntdll.spec
index 270d7ff..cd4e853 100644
--- a/dlls/ntdll/ntdll.spec
+++ b/dlls/ntdll/ntdll.spec
@@ -970,6 +970,9 @@
 @ stdcall RtlxOemStringToUnicodeSize(ptr) RtlOemStringToUnicodeSize
 @ stdcall RtlxUnicodeStringToAnsiSize(ptr) RtlUnicodeStringToAnsiSize
 @ stdcall RtlxUnicodeStringToOemSize(ptr) RtlUnicodeStringToOemSize
+@ stdcall TpAllocPool(ptr ptr)
+@ stdcall TpReleasePool(ptr)
+@ stdcall TpSimpleTryPost(ptr ptr ptr)
 @ stdcall -ret64 VerSetConditionMask(int64 long long)
 @ stdcall WinSqmIsOptedIn()
 @ stdcall ZwAcceptConnectPort(ptr long ptr long long ptr) NtAcceptConnectPort
diff --git a/dlls/ntdll/tests/threadpool.c b/dlls/ntdll/tests/threadpool.c
index 2e31b34..6f164e9 100644
--- a/dlls/ntdll/tests/threadpool.c
+++ b/dlls/ntdll/tests/threadpool.c
@@ -48,7 +48,7 @@ static BOOL init_threadpool(void)
 
     if (!pTpAllocPool)
     {
-        skip("Threadpool functions not supported, skipping tests\n");
+        win_skip("Threadpool functions not supported, skipping tests\n");
         return FALSE;
     }
 
@@ -105,6 +105,7 @@ static void test_tp_simple(void)
     environment.Version = 9999;
     environment.Pool = pool;
     status = pTpSimpleTryPost(simple_cb, semaphore, &environment);
+    todo_wine
     ok(status == STATUS_INVALID_PARAMETER || broken(!status) /* Vista/2008 */,
        "TpSimpleTryPost unexpectedly returned status %x\n", status);
     if (!status)
diff --git a/dlls/ntdll/threadpool.c b/dlls/ntdll/threadpool.c
index 513c13d..d8dc929 100644
--- a/dlls/ntdll/threadpool.c
+++ b/dlls/ntdll/threadpool.c
@@ -2,6 +2,7 @@
  * Thread pooling
  *
  * Copyright (c) 2006 Robert Shearman
+ * Copyright (c) 2014-2015 Sebastian Lackner
  *
  * This library is free software; you can redistribute it and/or
  * modify it under the terms of the GNU Lesser General Public
@@ -37,6 +38,10 @@
 
 WINE_DEFAULT_DEBUG_CHANNEL(threadpool);
 
+/*
+ * Old thread pooling API
+ */
+
 #define OLD_WORKER_TIMEOUT 30000        /* 30 seconds */
 #define EXPIRE_NEVER       (~(ULONGLONG)0)
 #define TIMER_QUEUE_MAGIC  0x516d6954   /* TimQ */
@@ -127,6 +132,65 @@ struct timer_queue
     HANDLE thread;
 };
 
+/*
+ * Object-oriented thread pooling API
+ */
+
+#define THREADPOOL_WORKER_TIMEOUT 5000
+
+/* internal threadpool representation */
+struct threadpool
+{
+    LONG                    refcount;
+    LONG                    objcount;
+    BOOL                    shutdown;
+    CRITICAL_SECTION        cs;
+    /* pool of work items, locked via .cs */
+    struct list             pool;
+    RTL_CONDITION_VARIABLE  update_event;
+    /* information about worker threads, locked via .cs */
+    int                     max_workers;
+    int                     min_workers;
+    int                     num_workers;
+    int                     num_busy_workers;
+};
+
+enum threadpool_objtype
+{
+    TP_OBJECT_TYPE_SIMPLE
+};
+
+/* internal threadpool object representation */
+struct threadpool_object
+{
+    LONG                    refcount;
+    BOOL                    shutdown;
+    /* read-only information */
+    enum threadpool_objtype type;
+    struct threadpool       *pool;
+    PVOID                   userdata;
+    /* information about the pool, locked via .pool->cs */
+    struct list             pool_entry;
+    LONG                    num_pending_callbacks;
+    LONG                    num_running_callbacks;
+    /* arguments for callback */
+    union
+    {
+        struct
+        {
+            PTP_SIMPLE_CALLBACK callback;
+        } simple;
+    } u;
+};
+
+static inline struct threadpool *impl_from_TP_POOL( TP_POOL *pool )
+{
+    return (struct threadpool *)pool;
+}
+
+static void CALLBACK threadpool_worker_proc( void *param );
+static struct threadpool *default_threadpool = NULL;
+
 static inline LONG interlocked_inc( PLONG dest )
 {
     return interlocked_xchg_add( dest, 1 ) + 1;
@@ -1044,3 +1108,393 @@ NTSTATUS WINAPI RtlDeleteTimer(HANDLE TimerQueue, HANDLE Timer,
 
     return status;
 }
+
+/***********************************************************************
+ *           tp_threadpool_alloc    (internal)
+ *
+ * Allocates a new threadpool object.
+ */
+static NTSTATUS tp_threadpool_alloc( struct threadpool **out )
+{
+    struct threadpool *pool;
+
+    pool = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*pool) );
+    if (!pool)
+        return STATUS_NO_MEMORY;
+
+    pool->refcount              = 1;
+    pool->objcount              = 0;
+    pool->shutdown              = FALSE;
+
+    RtlInitializeCriticalSection( &pool->cs );
+    pool->cs.DebugInfo->Spare[0] = (DWORD_PTR)(__FILE__ ": threadpool.cs");
+
+    list_init( &pool->pool );
+    RtlInitializeConditionVariable( &pool->update_event );
+
+    pool->max_workers           = 500;
+    pool->min_workers           = 0;
+    pool->num_workers           = 0;
+    pool->num_busy_workers      = 0;
+
+    TRACE( "allocated threadpool %p\n", pool );
+
+    *out = pool;
+    return STATUS_SUCCESS;
+}
+
+/***********************************************************************
+ *           tp_threadpool_shutdown    (internal)
+ *
+ * Prepares the shutdown of a threadpool object and notifies all worker
+ * threads to terminate (after all remaining work items have been
+ * processed).
+ */
+static void tp_threadpool_shutdown( struct threadpool *pool )
+{
+    assert( pool != default_threadpool );
+
+    pool->shutdown = TRUE;
+    RtlWakeAllConditionVariable( &pool->update_event );
+}
+
+/***********************************************************************
+ *           tp_threadpool_release    (internal)
+ *
+ * Releases a reference to a threadpool object.
+ */
+static BOOL tp_threadpool_release( struct threadpool *pool )
+{
+    if (interlocked_dec( &pool->refcount ))
+        return FALSE;
+
+    TRACE( "destroying threadpool %p\n", pool );
+
+    assert( pool->shutdown );
+    assert( !pool->objcount );
+    assert( list_empty( &pool->pool ) );
+
+    pool->cs.DebugInfo->Spare[0] = 0;
+    RtlDeleteCriticalSection( &pool->cs );
+
+    RtlFreeHeap( GetProcessHeap(), 0, pool );
+    return TRUE;
+}
+
+/***********************************************************************
+ *           tp_threadpool_lock    (internal)
+ *
+ * Acquires a lock on a threadpool, specified with an TP_CALLBACK_ENVIRON
+ * block. When the lock is acquired successfully, it is guaranteed that
+ * there is at least one worker thread to process tasks.
+ */
+static NTSTATUS tp_threadpool_lock( struct threadpool **out, TP_CALLBACK_ENVIRON *environment )
+{
+    struct threadpool *pool = NULL;
+    NTSTATUS status = STATUS_SUCCESS;
+
+    if (environment)
+        pool = (struct threadpool *)environment->Pool;
+
+    if (!pool)
+    {
+        if (!default_threadpool)
+        {
+            status = tp_threadpool_alloc( &pool );
+            if (status != STATUS_SUCCESS)
+                return status;
+
+            if (interlocked_cmpxchg_ptr( (void *)&default_threadpool, pool, NULL ) != NULL)
+            {
+                tp_threadpool_shutdown( pool );
+                tp_threadpool_release( pool );
+            }
+        }
+
+        pool = default_threadpool;
+    }
+
+    RtlEnterCriticalSection( &pool->cs );
+
+    /* Make sure that the threadpool has at least one thread. */
+    if (!pool->num_workers)
+    {
+        HANDLE thread;
+        status = RtlCreateUserThread( GetCurrentProcess(), NULL, FALSE, NULL, 0, 0,
+                                      threadpool_worker_proc, pool, &thread, NULL );
+        if (status == STATUS_SUCCESS)
+        {
+            interlocked_inc( &pool->refcount );
+            pool->num_workers++;
+            NtClose( thread );
+        }
+    }
+
+    /* Keep a reference, and increment objcount to ensure that the
+     * last thread doesn't terminate. */
+    if (status == STATUS_SUCCESS)
+    {
+        interlocked_inc( &pool->refcount );
+        pool->objcount++;
+    }
+
+    RtlLeaveCriticalSection( &pool->cs );
+
+    if (status != STATUS_SUCCESS)
+        return status;
+
+    *out = pool;
+    return STATUS_SUCCESS;
+}
+
+/***********************************************************************
+ *           tp_threadpool_unlock    (internal)
+ *
+ * Releases a lock on a threadpool.
+ */
+static void tp_threadpool_unlock( struct threadpool *pool )
+{
+    RtlEnterCriticalSection( &pool->cs );
+    pool->objcount--;
+    RtlLeaveCriticalSection( &pool->cs );
+    tp_threadpool_release( pool );
+}
+
+/***********************************************************************
+ *           tp_object_initialize    (internal)
+ *
+ * Initializes members of a threadpool object.
+ */
+static void tp_object_initialize( struct threadpool_object *object, struct threadpool *pool,
+                                  PVOID userdata, TP_CALLBACK_ENVIRON *environment )
+{
+    object->refcount                = 1;
+    object->shutdown                = FALSE;
+
+    object->pool                    = pool;
+    object->userdata                = userdata;
+
+    memset( &object->pool_entry, 0, sizeof(object->pool_entry) );
+    object->num_pending_callbacks   = 0;
+    object->num_running_callbacks   = 0;
+
+    if (environment)
+        FIXME( "environment not implemented yet\n" );
+
+    TRACE( "allocated object %p of type %u\n", object, object->type );
+}
+
+/***********************************************************************
+ *           tp_object_submit    (internal)
+ *
+ * Submits a threadpool object to the associcated threadpool. This
+ * function has to be VOID because TpPostWork can never fail on Windows.
+ */
+static void tp_object_submit( struct threadpool_object *object )
+{
+    struct threadpool *pool = object->pool;
+    NTSTATUS status = STATUS_UNSUCCESSFUL;
+
+    assert( !object->shutdown );
+    assert( !pool->shutdown );
+
+    RtlEnterCriticalSection( &pool->cs );
+
+    /* Start new worker threads if required. */
+    if (pool->num_busy_workers >= pool->num_workers &&
+        pool->num_workers < pool->max_workers)
+    {
+        HANDLE thread;
+        status = RtlCreateUserThread( GetCurrentProcess(), NULL, FALSE, NULL, 0, 0,
+                                      threadpool_worker_proc, pool, &thread, NULL );
+        if (status == STATUS_SUCCESS)
+        {
+            interlocked_inc( &pool->refcount );
+            pool->num_workers++;
+            NtClose( thread );
+        }
+    }
+
+    /* Queue work item and increment refcount. */
+    interlocked_inc( &object->refcount );
+    if (!object->num_pending_callbacks++)
+        list_add_tail( &pool->pool, &object->pool_entry );
+
+    /* No new thread started - wake up one existing thread. */
+    if (status != STATUS_SUCCESS)
+    {
+        assert( pool->num_workers > 0 );
+        RtlWakeConditionVariable( &pool->update_event );
+    }
+
+    RtlLeaveCriticalSection( &pool->cs );
+}
+
+/***********************************************************************
+ *           tp_object_shutdown    (internal)
+ *
+ * Marks a threadpool object for shutdown (which means that no further
+ * tasks can be submitted).
+ */
+static void tp_object_shutdown( struct threadpool_object *object )
+{
+    object->shutdown = TRUE;
+}
+
+/***********************************************************************
+ *           tp_object_release    (internal)
+ *
+ * Releases a reference to a threadpool object.
+ */
+static BOOL tp_object_release( struct threadpool_object *object )
+{
+    if (interlocked_dec( &object->refcount ))
+        return FALSE;
+
+    TRACE( "destroying object %p of type %u\n", object, object->type );
+
+    assert( object->shutdown );
+    assert( !object->num_pending_callbacks );
+    assert( !object->num_running_callbacks );
+
+    tp_threadpool_unlock( object->pool );
+
+    RtlFreeHeap( GetProcessHeap(), 0, object );
+    return TRUE;
+}
+
+/***********************************************************************
+ *           threadpool_worker_proc    (internal)
+ */
+static void CALLBACK threadpool_worker_proc( void *param )
+{
+    struct threadpool *pool = param;
+    LARGE_INTEGER timeout;
+    struct list *ptr;
+
+    TRACE( "starting worker thread for pool %p\n", pool );
+
+    RtlEnterCriticalSection( &pool->cs );
+    for (;;)
+    {
+        while ((ptr = list_head( &pool->pool )))
+        {
+            struct threadpool_object *object = LIST_ENTRY( ptr, struct threadpool_object, pool_entry );
+            assert( object->num_pending_callbacks > 0 );
+
+            /* If further pending callbacks are queued, move the work item to
+             * the end of the pool list. Otherwise remove it from the pool. */
+            list_remove( &object->pool_entry );
+            if (--object->num_pending_callbacks)
+                list_add_tail( &pool->pool, &object->pool_entry );
+
+            /* Leave critical section and do the actual callback. */
+            object->num_running_callbacks++;
+            pool->num_busy_workers++;
+            RtlLeaveCriticalSection( &pool->cs );
+
+            switch (object->type)
+            {
+                case TP_OBJECT_TYPE_SIMPLE:
+                {
+                    TRACE( "executing simple callback %p(NULL, %p)\n",
+                           object->u.simple.callback, object->userdata );
+                    object->u.simple.callback( NULL, object->userdata );
+                    TRACE( "callback %p returned\n", object->u.simple.callback );
+                    break;
+                }
+
+                default:
+                    assert(0);
+                    break;
+            }
+
+            RtlEnterCriticalSection( &pool->cs );
+            pool->num_busy_workers--;
+            object->num_running_callbacks--;
+            tp_object_release( object );
+        }
+
+        /* Shutdown worker thread if requested. */
+        if (pool->shutdown)
+            break;
+
+        /* Wait for new tasks or until the timeout expires. A thread only terminates
+         * when no new tasks are available, and the number of threads can be
+         * decreased without violating the min_workers limit. An exception is when
+         * min_workers == 0, then objcount is used to detect if the last thread
+         * can be terminated. */
+        timeout.QuadPart = (ULONGLONG)THREADPOOL_WORKER_TIMEOUT * -10000;
+        if (RtlSleepConditionVariableCS( &pool->update_event, &pool->cs, &timeout ) == STATUS_TIMEOUT &&
+            !list_head( &pool->pool ) && (pool->num_workers > max( pool->min_workers, 1 ) ||
+            (!pool->min_workers && !pool->objcount)))
+        {
+            break;
+        }
+    }
+    pool->num_workers--;
+    RtlLeaveCriticalSection( &pool->cs );
+
+    TRACE( "terminating worker thread for pool %p\n", pool );
+    tp_threadpool_release( pool );
+}
+
+/***********************************************************************
+ *           TpAllocPool    (NTDLL.@)
+ */
+NTSTATUS WINAPI TpAllocPool( TP_POOL **out, PVOID reserved )
+{
+    TRACE( "%p %p\n", out, reserved );
+
+    if (reserved)
+        FIXME( "reserved argument is nonzero (%p)", reserved );
+
+    return tp_threadpool_alloc( (struct threadpool **)out );
+}
+
+/***********************************************************************
+ *           TpReleasePool    (NTDLL.@)
+ */
+VOID WINAPI TpReleasePool( TP_POOL *pool )
+{
+    struct threadpool *this = impl_from_TP_POOL( pool );
+
+    TRACE( "%p\n", pool );
+
+    tp_threadpool_shutdown( this );
+    tp_threadpool_release( this );
+}
+
+/***********************************************************************
+ *           TpSimpleTryPost    (NTDLL.@)
+ */
+NTSTATUS WINAPI TpSimpleTryPost( PTP_SIMPLE_CALLBACK callback, PVOID userdata,
+                                 TP_CALLBACK_ENVIRON *environment )
+{
+    struct threadpool_object *object;
+    struct threadpool *pool;
+    NTSTATUS status;
+
+    TRACE( "%p %p %p\n", callback, userdata, environment );
+
+    object = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*object) );
+    if (!object)
+        return STATUS_NO_MEMORY;
+
+    status = tp_threadpool_lock( &pool, environment );
+    if (status)
+    {
+        RtlFreeHeap( GetProcessHeap(), 0, object );
+        return status;
+    }
+
+    object->type = TP_OBJECT_TYPE_SIMPLE;
+    object->u.simple.callback = callback;
+    tp_object_initialize( object, pool, userdata, environment );
+
+    tp_object_submit( object );
+
+    tp_object_shutdown( object );
+    tp_object_release( object );
+    return STATUS_SUCCESS;
+}
-- 
2.4.4



More information about the wine-patches mailing list