[2/7] ntdll: Implement threadpool wait queues.

Sebastian Lackner sebastian at fds-team.de
Sat Jul 4 19:22:52 CDT 2015


To implement waiting for an arbitrary number of handles, we group them in
buckets up to (MAXIMUM_WAIT_OBJECTS - 1) objects, and then assign a dedicated
wait queue thread. The last handle is used to notify about changes.

---
 dlls/ntdll/threadpool.c |  290 ++++++++++++++++++++++++++++++++++++++++++++++--
 1 file changed, 282 insertions(+), 8 deletions(-)

diff --git a/dlls/ntdll/threadpool.c b/dlls/ntdll/threadpool.c
index b9aece4..e7f07d3 100644
--- a/dlls/ntdll/threadpool.c
+++ b/dlls/ntdll/threadpool.c
@@ -137,6 +137,7 @@ struct timer_queue
  */
 
 #define THREADPOOL_WORKER_TIMEOUT 5000
+#define MAXIMUM_WAITQUEUE_OBJECTS (MAXIMUM_WAIT_OBJECTS - 1)
 
 /* internal threadpool representation */
 struct threadpool
@@ -213,6 +214,13 @@ struct threadpool_object
         struct
         {
             PTP_WAIT_CALLBACK callback;
+            LONG            signaled;
+            /* information about the wait object, locked via waitqueue.cs */
+            struct waitqueue_bucket *bucket;
+            BOOL            wait_pending;
+            struct list     wait_entry;
+            ULONGLONG       timeout;
+            HANDLE          handle;
         } wait;
     } u;
 };
@@ -272,6 +280,38 @@ static RTL_CRITICAL_SECTION_DEBUG timerqueue_debug =
       0, 0, { (DWORD_PTR)(__FILE__ ": timerqueue.cs") }
 };
 
+/* global waitqueue object */
+static RTL_CRITICAL_SECTION_DEBUG waitqueue_debug;
+
+static struct
+{
+    CRITICAL_SECTION        cs;
+    LONG                    num_buckets;
+    struct list             buckets;
+}
+waitqueue =
+{
+    { &waitqueue_debug, -1, 0, 0, 0, 0 },       /* cs */
+    0,                                          /* num_buckets */
+    LIST_INIT( waitqueue.buckets )              /* buckets */
+};
+
+static RTL_CRITICAL_SECTION_DEBUG waitqueue_debug =
+{
+    0, 0, &waitqueue.cs,
+    { &waitqueue_debug.ProcessLocksList, &waitqueue_debug.ProcessLocksList },
+      0, 0, { (DWORD_PTR)(__FILE__ ": waitqueue.cs") }
+};
+
+struct waitqueue_bucket
+{
+    struct list             bucket_entry;
+    LONG                    objcount;
+    struct list             reserved;
+    struct list             waiting;
+    HANDLE                  update_event;
+};
+
 static inline struct threadpool *impl_from_TP_POOL( TP_POOL *pool )
 {
     return (struct threadpool *)pool;
@@ -309,7 +349,7 @@ static inline struct threadpool_instance *impl_from_TP_CALLBACK_INSTANCE( TP_CAL
 }
 
 static void CALLBACK threadpool_worker_proc( void *param );
-static void tp_object_submit( struct threadpool_object *object );
+static void tp_object_submit( struct threadpool_object *object, BOOL signaled );
 static void tp_object_shutdown( struct threadpool_object *object );
 static BOOL tp_object_release( struct threadpool_object *object );
 static struct threadpool *default_threadpool = NULL;
@@ -1261,7 +1301,7 @@ static void CALLBACK timerqueue_thread_proc( void *param )
             /* Queue a new callback in one of the worker threads. */
             list_remove( &timer->u.timer.timer_entry );
             timer->u.timer.timer_pending = FALSE;
-            tp_object_submit( timer );
+            tp_object_submit( timer, FALSE );
 
             /* Insert the timer back into the queue, except its marked for shutdown. */
             if (timer->u.timer.period && !timer->shutdown)
@@ -1398,6 +1438,214 @@ static void tp_timerqueue_unlock( struct threadpool_object *timer )
 }
 
 /***********************************************************************
+ *           waitqueue_thread_proc    (internal)
+ */
+static void CALLBACK waitqueue_thread_proc( void *param )
+{
+    struct threadpool_object *objects[MAXIMUM_WAITQUEUE_OBJECTS];
+    HANDLE handles[MAXIMUM_WAITQUEUE_OBJECTS + 1];
+    struct waitqueue_bucket *bucket = param;
+    struct threadpool_object *wait, *next;
+    LARGE_INTEGER now, timeout;
+    DWORD num_handles;
+    NTSTATUS status;
+
+    TRACE( "starting wait queue thread\n" );
+
+    RtlEnterCriticalSection( &waitqueue.cs );
+
+    for (;;)
+    {
+        NtQuerySystemTime( &now );
+        timeout.QuadPart = TIMEOUT_INFINITE;
+        num_handles = 0;
+
+        LIST_FOR_EACH_ENTRY_SAFE( wait, next, &bucket->waiting, struct threadpool_object,
+                                  u.wait.wait_entry )
+        {
+            assert( wait->type == TP_OBJECT_TYPE_WAIT );
+            if (wait->u.wait.timeout <= now.QuadPart)
+            {
+                /* Wait object timed out. */
+                list_remove( &wait->u.wait.wait_entry );
+                list_add_tail( &bucket->reserved, &wait->u.wait.wait_entry );
+                tp_object_submit( wait, FALSE );
+            }
+            else
+            {
+                if (wait->u.wait.timeout < timeout.QuadPart)
+                    timeout.QuadPart = wait->u.wait.timeout;
+
+                assert( num_handles < MAXIMUM_WAITQUEUE_OBJECTS );
+                interlocked_inc( &wait->refcount );
+                objects[num_handles] = wait;
+                handles[num_handles] = wait->u.wait.handle;
+                num_handles++;
+            }
+        }
+
+        if (!bucket->objcount)
+        {
+            /* All wait objects have been destroyed, if no new wait objects are created
+             * within some amount of time, then we can shutdown this thread. */
+            assert( num_handles == 0 );
+            RtlLeaveCriticalSection( &waitqueue.cs );
+            timeout.QuadPart = (ULONGLONG)THREADPOOL_WORKER_TIMEOUT * -10000;
+            status = NtWaitForMultipleObjects( 1, &bucket->update_event, TRUE, FALSE, &timeout );
+            RtlEnterCriticalSection( &waitqueue.cs );
+
+            if (status == STATUS_TIMEOUT && !bucket->objcount)
+                break;
+        }
+        else
+        {
+            handles[num_handles] = bucket->update_event;
+            RtlLeaveCriticalSection( &waitqueue.cs );
+            status = NtWaitForMultipleObjects( num_handles + 1, handles, TRUE, FALSE, &timeout );
+            RtlEnterCriticalSection( &waitqueue.cs );
+
+            if (status >= STATUS_WAIT_0 && status < STATUS_WAIT_0 + num_handles)
+            {
+                wait = objects[status - STATUS_WAIT_0];
+                assert( wait->type == TP_OBJECT_TYPE_WAIT );
+                if (wait->u.wait.bucket)
+                {
+                    /* Wait object signaled. */
+                    assert( wait->u.wait.bucket == bucket );
+                    list_remove( &wait->u.wait.wait_entry );
+                    list_add_tail( &bucket->reserved, &wait->u.wait.wait_entry );
+                    tp_object_submit( wait, TRUE );
+                }
+                else
+                    ERR("wait object %p triggered while object was destroyed\n", wait);
+            }
+
+            /* Release temporary references to wait objects. */
+            while (num_handles)
+            {
+                wait = objects[--num_handles];
+                assert( wait->type == TP_OBJECT_TYPE_WAIT );
+                tp_object_release( wait );
+            }
+        }
+    }
+
+    /* Remove this bucket from the list. */
+    list_remove( &bucket->bucket_entry );
+    if (!--waitqueue.num_buckets)
+        assert( list_empty( &waitqueue.buckets ) );
+
+    RtlLeaveCriticalSection( &waitqueue.cs );
+
+    TRACE( "terminating wait queue thread\n" );
+
+    assert( bucket->objcount == 0 );
+    assert( list_empty( &bucket->reserved ) );
+    assert( list_empty( &bucket->waiting ) );
+    NtClose( bucket->update_event );
+
+    RtlFreeHeap( GetProcessHeap(), 0, bucket );
+}
+
+/***********************************************************************
+ *           tp_waitqueue_lock    (internal)
+ */
+static NTSTATUS tp_waitqueue_lock( struct threadpool_object *wait )
+{
+    struct waitqueue_bucket *bucket;
+    NTSTATUS status;
+    HANDLE thread;
+    assert( wait->type = TP_OBJECT_TYPE_WAIT );
+
+    wait->u.wait.signaled       = 0;
+    wait->u.wait.bucket         = NULL;
+    wait->u.wait.wait_pending   = FALSE;
+    wait->u.wait.timeout        = 0;
+    wait->u.wait.handle         = INVALID_HANDLE_VALUE;
+
+    RtlEnterCriticalSection( &waitqueue.cs );
+
+    /* Try to assign to existing bucket if possible. */
+    LIST_FOR_EACH_ENTRY( bucket, &waitqueue.buckets, struct waitqueue_bucket, bucket_entry )
+    {
+        if (bucket->objcount < MAXIMUM_WAITQUEUE_OBJECTS)
+        {
+            list_add_tail( &bucket->reserved, &wait->u.wait.wait_entry );
+            wait->u.wait.bucket = bucket;
+            bucket->objcount++;
+
+            status = STATUS_SUCCESS;
+            goto out;
+        }
+    }
+
+    /* Create a new bucket and corresponding worker thread. */
+    bucket = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*bucket) );
+    if (!bucket)
+    {
+        status = STATUS_NO_MEMORY;
+        goto out;
+    }
+
+    bucket->objcount = 0;
+    list_init( &bucket->reserved );
+    list_init( &bucket->waiting );
+
+    status = NtCreateEvent( &bucket->update_event, EVENT_ALL_ACCESS,
+                            NULL, SynchronizationEvent, FALSE );
+    if (status)
+    {
+        RtlFreeHeap( GetProcessHeap(), 0, bucket );
+        goto out;
+    }
+
+    status = RtlCreateUserThread( GetCurrentProcess(), NULL, FALSE, NULL, 0, 0,
+                                  waitqueue_thread_proc, bucket, &thread, NULL );
+    if (status == STATUS_SUCCESS)
+    {
+        list_add_tail( &waitqueue.buckets, &bucket->bucket_entry );
+        waitqueue.num_buckets++;
+
+        list_add_tail( &bucket->reserved, &wait->u.wait.wait_entry );
+        wait->u.wait.bucket = bucket;
+        bucket->objcount++;
+
+        NtClose( thread );
+    }
+    else
+    {
+        NtClose( bucket->update_event );
+        RtlFreeHeap( GetProcessHeap(), 0, bucket );
+    }
+
+out:
+    RtlLeaveCriticalSection( &waitqueue.cs );
+    return status;
+}
+
+/***********************************************************************
+ *           tp_waitqueue_unlock    (internal)
+ */
+static void tp_waitqueue_unlock( struct threadpool_object *wait )
+{
+    assert( wait->type == TP_OBJECT_TYPE_WAIT );
+
+    RtlEnterCriticalSection( &waitqueue.cs );
+    if (wait->u.wait.bucket)
+    {
+        struct waitqueue_bucket *bucket = wait->u.wait.bucket;
+        assert( bucket->objcount > 0 );
+
+        list_remove( &wait->u.wait.wait_entry );
+        wait->u.wait.bucket = NULL;
+        bucket->objcount--;
+
+        NtSetEvent( bucket->update_event, NULL );
+    }
+    RtlLeaveCriticalSection( &waitqueue.cs );
+}
+
+/***********************************************************************
  *           tp_threadpool_alloc    (internal)
  *
  * Allocates a new threadpool object.
@@ -1666,7 +1914,7 @@ static void tp_object_initialize( struct threadpool_object *object, struct threa
      * will be set, and tp_object_submit would fail with an assertion. */
 
     if (is_simple_callback)
-        tp_object_submit( object );
+        tp_object_submit( object, FALSE );
 
     if (object->group)
     {
@@ -1692,7 +1940,7 @@ static void tp_object_initialize( struct threadpool_object *object, struct threa
  * Submits a threadpool object to the associcated threadpool. This
  * function has to be VOID because TpPostWork can never fail on Windows.
  */
-static void tp_object_submit( struct threadpool_object *object )
+static void tp_object_submit( struct threadpool_object *object, BOOL signaled )
 {
     struct threadpool *pool = object->pool;
     NTSTATUS status = STATUS_UNSUCCESSFUL;
@@ -1722,6 +1970,10 @@ static void tp_object_submit( struct threadpool_object *object )
     if (!object->num_pending_callbacks++)
         list_add_tail( &pool->pool, &object->pool_entry );
 
+    /* Count how often the object was signaled. */
+    if (object->type == TP_OBJECT_TYPE_WAIT && signaled)
+        object->u.wait.signaled++;
+
     /* No new thread started - wake up one existing thread. */
     if (status != STATUS_SUCCESS)
     {
@@ -1748,6 +2000,9 @@ static void tp_object_cancel( struct threadpool_object *object, BOOL group_cance
         pending_callbacks = object->num_pending_callbacks;
         object->num_pending_callbacks = 0;
         list_remove( &object->pool_entry );
+
+        if (object->type == TP_OBJECT_TYPE_WAIT)
+            object->u.wait.signaled = 0;
     }
     RtlLeaveCriticalSection( &pool->cs );
 
@@ -1797,6 +2052,8 @@ static void tp_object_shutdown( struct threadpool_object *object )
 {
     if (object->type == TP_OBJECT_TYPE_TIMER)
         tp_timerqueue_unlock( object );
+    else if (object->type == TP_OBJECT_TYPE_WAIT)
+        tp_waitqueue_unlock( object );
 
     object->shutdown = TRUE;
 }
@@ -1851,6 +2108,7 @@ static void CALLBACK threadpool_worker_proc( void *param )
     TP_CALLBACK_INSTANCE *callback_instance;
     struct threadpool_instance instance;
     struct threadpool *pool = param;
+    TP_WAIT_RESULT wait_result;
     LARGE_INTEGER timeout;
     struct list *ptr;
     NTSTATUS status;
@@ -1871,6 +2129,13 @@ static void CALLBACK threadpool_worker_proc( void *param )
             if (--object->num_pending_callbacks)
                 list_add_tail( &pool->pool, &object->pool_entry );
 
+            /* For wait objects check if they were signaled or have timed out. */
+            if (object->type == TP_OBJECT_TYPE_WAIT)
+            {
+                wait_result = object->u.wait.signaled ? WAIT_OBJECT_0 : WAIT_TIMEOUT;
+                if (wait_result == WAIT_OBJECT_0) object->u.wait.signaled--;
+            }
+
             /* Leave critical section and do the actual callback. */
             object->num_associated_callbacks++;
             object->num_running_callbacks++;
@@ -1922,8 +2187,8 @@ static void CALLBACK threadpool_worker_proc( void *param )
                 case TP_OBJECT_TYPE_WAIT:
                 {
                     TRACE( "executing wait callback %p(%p, %p, %p, %u)\n",
-                           object->u.wait.callback, callback_instance, object->userdata, object, WAIT_OBJECT_0 );
-                    object->u.wait.callback( callback_instance, object->userdata, (TP_WAIT *)object, WAIT_OBJECT_0 );
+                           object->u.wait.callback, callback_instance, object->userdata, object, wait_result );
+                    object->u.wait.callback( callback_instance, object->userdata, (TP_WAIT *)object, wait_result );
                     TRACE( "callback %p returned\n", object->u.wait.callback );
                     break;
                 }
@@ -2097,6 +2362,15 @@ NTSTATUS WINAPI TpAllocWait( TP_WAIT **out, PTP_WAIT_CALLBACK callback, PVOID us
 
     object->type = TP_OBJECT_TYPE_WAIT;
     object->u.wait.callback = callback;
+
+    status = tp_waitqueue_lock( object );
+    if (status)
+    {
+        tp_threadpool_unlock( pool );
+        RtlFreeHeap( GetProcessHeap(), 0, object );
+        return status;
+    }
+
     tp_object_initialize( object, pool, userdata, environment );
 
     *out = (TP_WAIT *)object;
@@ -2304,7 +2578,7 @@ VOID WINAPI TpPostWork( TP_WORK *work )
 
     TRACE( "%p\n", work );
 
-    tp_object_submit( this );
+    tp_object_submit( this, FALSE );
 }
 
 /***********************************************************************
@@ -2558,7 +2832,7 @@ VOID WINAPI TpSetTimer( TP_TIMER *timer, LARGE_INTEGER *timeout, LONG period, LO
     RtlLeaveCriticalSection( &timerqueue.cs );
 
     if (submit_timer)
-       tp_object_submit( this );
+       tp_object_submit( this, FALSE );
 }
 
 /***********************************************************************
-- 
2.4.4



More information about the wine-patches mailing list