Nikolay Sivov : ntdll/threadpool: Add support for callback priority.

Alexandre Julliard julliard at winehq.org
Wed Mar 13 18:10:50 CDT 2019


Module: wine
Branch: master
Commit: 733b43858376d9b6e4dba331ca2f234c5b25b034
URL:    https://source.winehq.org/git/wine.git/?a=commit;h=733b43858376d9b6e4dba331ca2f234c5b25b034

Author: Nikolay Sivov <nsivov at codeweavers.com>
Date:   Wed Mar 13 09:28:45 2019 +0300

ntdll/threadpool: Add support for callback priority.

Signed-off-by: Nikolay Sivov <nsivov at codeweavers.com>
Signed-off-by: Alexandre Julliard <julliard at winehq.org>

---

 dlls/ntdll/tests/threadpool.c | 17 ++++++++---
 dlls/ntdll/threadpool.c       | 67 +++++++++++++++++++++++++++++++++++++------
 2 files changed, 72 insertions(+), 12 deletions(-)

diff --git a/dlls/ntdll/tests/threadpool.c b/dlls/ntdll/tests/threadpool.c
index 3bd2d99..bf5493c 100644
--- a/dlls/ntdll/tests/threadpool.c
+++ b/dlls/ntdll/tests/threadpool.c
@@ -517,12 +517,21 @@ static void test_tp_simple(void)
     memset(&environment3, 0, sizeof(environment3));
     environment3.Version = 3;
     environment3.Pool = pool;
-    environment3.CallbackPriority = TP_CALLBACK_PRIORITY_NORMAL;
     environment3.Size = sizeof(environment3);
+
+    for (i = 0; i < 3; ++i)
+    {
+        environment3.CallbackPriority = TP_CALLBACK_PRIORITY_HIGH + i;
+        status = pTpSimpleTryPost(simple_cb, semaphore, (TP_CALLBACK_ENVIRON *)&environment3);
+        ok(!status, "TpSimpleTryPost failed with status %x\n", status);
+        result = WaitForSingleObject(semaphore, 1000);
+        ok(result == WAIT_OBJECT_0, "WaitForSingleObject returned %u\n", result);
+    }
+
+    environment3.CallbackPriority = 10;
     status = pTpSimpleTryPost(simple_cb, semaphore, (TP_CALLBACK_ENVIRON *)&environment3);
-    ok(!status, "TpSimpleTryPost failed with status %x\n", status);
-    result = WaitForSingleObject(semaphore, 1000);
-    ok(result == WAIT_OBJECT_0, "WaitForSingleObject returned %u\n", result);
+    ok(status == STATUS_INVALID_PARAMETER || broken(!status) /* Vista does not support priorities */,
+            "TpSimpleTryPost failed with status %x\n", status);
 
     /* test with invalid version number */
     memset(&environment, 0, sizeof(environment));
diff --git a/dlls/ntdll/threadpool.c b/dlls/ntdll/threadpool.c
index bf7449c..a7ad321 100644
--- a/dlls/ntdll/threadpool.c
+++ b/dlls/ntdll/threadpool.c
@@ -123,8 +123,8 @@ struct threadpool
     LONG                    objcount;
     BOOL                    shutdown;
     CRITICAL_SECTION        cs;
-    /* pool of work items, locked via .cs */
-    struct list             pool;
+    /* Pools of work items, locked via .cs, order matches TP_CALLBACK_PRIORITY - high, normal, low. */
+    struct list             pools[3];
     RTL_CONDITION_VARIABLE  update_event;
     /* information about worker threads, locked via .cs */
     int                     max_workers;
@@ -155,6 +155,7 @@ struct threadpool_object
     PTP_SIMPLE_CALLBACK     finalization_callback;
     BOOL                    may_run_long;
     HMODULE                 race_dll;
+    TP_CALLBACK_PRIORITY    priority;
     /* information about the group, locked via .group->cs */
     struct list             group_entry;
     BOOL                    is_group_member;
@@ -1648,6 +1649,7 @@ static void tp_waitqueue_unlock( struct threadpool_object *wait )
 static NTSTATUS tp_threadpool_alloc( struct threadpool **out )
 {
     struct threadpool *pool;
+    unsigned int i;
 
     pool = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*pool) );
     if (!pool)
@@ -1660,7 +1662,8 @@ static NTSTATUS tp_threadpool_alloc( struct threadpool **out )
     RtlInitializeCriticalSection( &pool->cs );
     pool->cs.DebugInfo->Spare[0] = (DWORD_PTR)(__FILE__ ": threadpool.cs");
 
-    list_init( &pool->pool );
+    for (i = 0; i < ARRAY_SIZE(pool->pools); ++i)
+        list_init( &pool->pools[i] );
     RtlInitializeConditionVariable( &pool->update_event );
 
     pool->max_workers           = 500;
@@ -1696,6 +1699,8 @@ static void tp_threadpool_shutdown( struct threadpool *pool )
  */
 static BOOL tp_threadpool_release( struct threadpool *pool )
 {
+    unsigned int i;
+
     if (interlocked_dec( &pool->refcount ))
         return FALSE;
 
@@ -1703,7 +1708,8 @@ static BOOL tp_threadpool_release( struct threadpool *pool )
 
     assert( pool->shutdown );
     assert( !pool->objcount );
-    assert( list_empty( &pool->pool ) );
+    for (i = 0; i < ARRAY_SIZE(pool->pools); ++i)
+        assert( list_empty( &pool->pools[i] ) );
 
     pool->cs.DebugInfo->Spare[0] = 0;
     RtlDeleteCriticalSection( &pool->cs );
@@ -1725,7 +1731,25 @@ static NTSTATUS tp_threadpool_lock( struct threadpool **out, TP_CALLBACK_ENVIRON
     NTSTATUS status = STATUS_SUCCESS;
 
     if (environment)
+    {
+        /* Validate environment parameters. */
+        if (environment->Version == 3)
+        {
+            TP_CALLBACK_ENVIRON_V3 *environment3 = (TP_CALLBACK_ENVIRON_V3 *)environment;
+
+            switch (environment3->CallbackPriority)
+            {
+                case TP_CALLBACK_PRIORITY_HIGH:
+                case TP_CALLBACK_PRIORITY_NORMAL:
+                case TP_CALLBACK_PRIORITY_LOW:
+                    break;
+                default:
+                    return STATUS_INVALID_PARAMETER;
+            }
+        }
+
         pool = (struct threadpool *)environment->Pool;
+    }
 
     if (!pool)
     {
@@ -1860,6 +1884,7 @@ static void tp_object_initialize( struct threadpool_object *object, struct threa
     object->finalization_callback   = NULL;
     object->may_run_long            = 0;
     object->race_dll                = NULL;
+    object->priority                = TP_CALLBACK_PRIORITY_NORMAL;
 
     memset( &object->group_entry, 0, sizeof(object->group_entry) );
     object->is_group_member         = FALSE;
@@ -1881,6 +1906,13 @@ static void tp_object_initialize( struct threadpool_object *object, struct threa
         object->finalization_callback   = environment->FinalizationCallback;
         object->may_run_long            = environment->u.s.LongFunction != 0;
         object->race_dll                = environment->RaceDll;
+        if (environment->Version == 3)
+        {
+            TP_CALLBACK_ENVIRON_V3 *environment_v3 = (TP_CALLBACK_ENVIRON_V3 *)environment;
+
+            object->priority = environment_v3->CallbackPriority;
+            assert( object->priority < ARRAY_SIZE(pool->pools) );
+        }
 
         if (environment->ActivationContext)
             FIXME( "activation context not supported yet\n" );
@@ -1916,6 +1948,11 @@ static void tp_object_initialize( struct threadpool_object *object, struct threa
         tp_object_release( object );
 }
 
+static void tp_object_prio_queue( struct threadpool_object *object )
+{
+    list_add_tail( &object->pool->pools[object->priority], &object->pool_entry );
+}
+
 /***********************************************************************
  *           tp_object_submit    (internal)
  *
@@ -1940,7 +1977,7 @@ static void tp_object_submit( struct threadpool_object *object, BOOL signaled )
     /* Queue work item and increment refcount. */
     interlocked_inc( &object->refcount );
     if (!object->num_pending_callbacks++)
-        list_add_tail( &pool->pool, &object->pool_entry );
+        tp_object_prio_queue( object );
 
     /* Count how often the object was signaled. */
     if (object->type == TP_OBJECT_TYPE_WAIT && signaled)
@@ -2061,6 +2098,20 @@ static BOOL tp_object_release( struct threadpool_object *object )
     return TRUE;
 }
 
+static struct list *threadpool_get_next_item( const struct threadpool *pool )
+{
+    struct list *ptr;
+    unsigned int i;
+
+    for (i = 0; i < ARRAY_SIZE(pool->pools); ++i)
+    {
+        if ((ptr = list_head( &pool->pools[i] )))
+            break;
+    }
+
+    return ptr;
+}
+
 /***********************************************************************
  *           threadpool_worker_proc    (internal)
  */
@@ -2080,7 +2131,7 @@ static void CALLBACK threadpool_worker_proc( void *param )
     pool->num_busy_workers--;
     for (;;)
     {
-        while ((ptr = list_head( &pool->pool )))
+        while ((ptr = threadpool_get_next_item( pool )))
         {
             struct threadpool_object *object = LIST_ENTRY( ptr, struct threadpool_object, pool_entry );
             assert( object->num_pending_callbacks > 0 );
@@ -2089,7 +2140,7 @@ static void CALLBACK threadpool_worker_proc( void *param )
              * the end of the pool list. Otherwise remove it from the pool. */
             list_remove( &object->pool_entry );
             if (--object->num_pending_callbacks)
-                list_add_tail( &pool->pool, &object->pool_entry );
+                tp_object_prio_queue( object );
 
             /* For wait objects check if they were signaled or have timed out. */
             if (object->type == TP_OBJECT_TYPE_WAIT)
@@ -2230,7 +2281,7 @@ static void CALLBACK threadpool_worker_proc( void *param )
          * can be terminated. */
         timeout.QuadPart = (ULONGLONG)THREADPOOL_WORKER_TIMEOUT * -10000;
         if (RtlSleepConditionVariableCS( &pool->update_event, &pool->cs, &timeout ) == STATUS_TIMEOUT &&
-            !list_head( &pool->pool ) && (pool->num_workers > max( pool->min_workers, 1 ) ||
+            !threadpool_get_next_item( pool ) && (pool->num_workers > max( pool->min_workers, 1 ) ||
             (!pool->min_workers && !pool->objcount)))
         {
             break;




More information about the wine-cvs mailing list