[4/8] ntdll: Implement threadpool cleanup group functions.

Sebastian Lackner sebastian at fds-team.de
Tue Jun 30 19:57:15 CDT 2015


---
 dlls/ntdll/ntdll.spec   |    3 
 dlls/ntdll/threadpool.c |  268 +++++++++++++++++++++++++++++++++++++++++++++++-
 2 files changed, 266 insertions(+), 5 deletions(-)

diff --git a/dlls/ntdll/ntdll.spec b/dlls/ntdll/ntdll.spec
index 94194d6..f889b04 100644
--- a/dlls/ntdll/ntdll.spec
+++ b/dlls/ntdll/ntdll.spec
@@ -970,7 +970,10 @@
 @ stdcall RtlxOemStringToUnicodeSize(ptr) RtlOemStringToUnicodeSize
 @ stdcall RtlxUnicodeStringToAnsiSize(ptr) RtlUnicodeStringToAnsiSize
 @ stdcall RtlxUnicodeStringToOemSize(ptr) RtlUnicodeStringToOemSize
+@ stdcall TpAllocCleanupGroup(ptr)
 @ stdcall TpAllocPool(ptr ptr)
+@ stdcall TpReleaseCleanupGroup(ptr)
+@ stdcall TpReleaseCleanupGroupMembers(ptr long ptr)
 @ stdcall TpReleasePool(ptr)
 @ stdcall TpSetPoolMaxThreads(ptr long)
 @ stdcall TpSetPoolMinThreads(ptr long)
diff --git a/dlls/ntdll/threadpool.c b/dlls/ntdll/threadpool.c
index 2bb1cdb..daf1577 100644
--- a/dlls/ntdll/threadpool.c
+++ b/dlls/ntdll/threadpool.c
@@ -168,9 +168,14 @@ struct threadpool_object
     /* read-only information */
     enum threadpool_objtype type;
     struct threadpool       *pool;
+    struct threadpool_group *group;
     PVOID                   userdata;
+    /* information about the group, locked via .group->cs */
+    struct list             group_entry;
+    BOOL                    is_group_member;
     /* information about the pool, locked via .pool->cs */
     struct list             pool_entry;
+    RTL_CONDITION_VARIABLE  finished_event;
     LONG                    num_pending_callbacks;
     LONG                    num_running_callbacks;
     /* arguments for callback */
@@ -183,12 +188,30 @@ struct threadpool_object
     } u;
 };
 
+/* internal threadpool group representation */
+struct threadpool_group
+{
+    LONG                    refcount;
+    BOOL                    shutdown;
+    CRITICAL_SECTION        cs;
+    /* list of group members, locked via .cs */
+    struct list             members;
+};
+
 static inline struct threadpool *impl_from_TP_POOL( TP_POOL *pool )
 {
     return (struct threadpool *)pool;
 }
 
+static inline struct threadpool_group *impl_from_TP_CLEANUP_GROUP( TP_CLEANUP_GROUP *group )
+{
+    return (struct threadpool_group *)group;
+}
+
 static void CALLBACK threadpool_worker_proc( void *param );
+static void tp_object_submit( struct threadpool_object *object );
+static void tp_object_shutdown( struct threadpool_object *object );
+static BOOL tp_object_release( struct threadpool_object *object );
 static struct threadpool *default_threadpool = NULL;
 
 static inline LONG interlocked_inc( PLONG dest )
@@ -1261,6 +1284,65 @@ static void tp_threadpool_unlock( struct threadpool *pool )
 }
 
 /***********************************************************************
+ *           tp_group_alloc    (internal)
+ *
+ * Allocates a new threadpool group object.
+ */
+static NTSTATUS tp_group_alloc( struct threadpool_group **out )
+{
+    struct threadpool_group *group;
+
+    group = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*group) );
+    if (!group)
+        return STATUS_NO_MEMORY;
+
+    group->refcount     = 1;
+    group->shutdown     = FALSE;
+
+    RtlInitializeCriticalSection( &group->cs );
+    group->cs.DebugInfo->Spare[0] = (DWORD_PTR)(__FILE__ ": threadpool_group.cs");
+
+    list_init( &group->members );
+
+    TRACE( "allocated group %p\n", group );
+
+    *out = group;
+    return STATUS_SUCCESS;
+}
+
+/***********************************************************************
+ *           tp_group_shutdown    (internal)
+ *
+ * Marks the group object for shutdown.
+ */
+static void tp_group_shutdown( struct threadpool_group *group )
+{
+    group->shutdown = TRUE;
+}
+
+/***********************************************************************
+ *           tp_group_release    (internal)
+ *
+ * Releases a reference to a group object.
+ */
+static BOOL tp_group_release( struct threadpool_group *group )
+{
+    if (interlocked_dec( &group->refcount ))
+        return FALSE;
+
+    TRACE( "destroying group %p\n", group );
+
+    assert( group->shutdown );
+    assert( list_empty( &group->members ) );
+
+    group->cs.DebugInfo->Spare[0] = 0;
+    RtlDeleteCriticalSection( &group->cs );
+
+    RtlFreeHeap( GetProcessHeap(), 0, group );
+    return TRUE;
+}
+
+/***********************************************************************
  *           tp_object_initialize    (internal)
  *
  * Initializes members of a threadpool object.
@@ -1268,20 +1350,58 @@ static void tp_threadpool_unlock( struct threadpool *pool )
 static void tp_object_initialize( struct threadpool_object *object, struct threadpool *pool,
                                   PVOID userdata, TP_CALLBACK_ENVIRON *environment )
 {
+    BOOL is_simple_callback = (object->type == TP_OBJECT_TYPE_SIMPLE);
+
     object->refcount                = 1;
     object->shutdown                = FALSE;
 
     object->pool                    = pool;
+    object->group                   = NULL;
     object->userdata                = userdata;
 
+    memset( &object->group_entry, 0, sizeof(object->group_entry) );
+    object->is_group_member         = FALSE;
+
     memset( &object->pool_entry, 0, sizeof(object->pool_entry) );
+    RtlInitializeConditionVariable( &object->finished_event );
     object->num_pending_callbacks   = 0;
     object->num_running_callbacks   = 0;
 
     if (environment)
-        FIXME( "environment not implemented yet\n" );
+    {
+        if (environment->Version != 1)
+            FIXME( "unsupported environment version %u\n", environment->Version );
+
+        object->group = impl_from_TP_CLEANUP_GROUP( environment->CleanupGroup );
+
+        WARN( "environment not fully implemented yet\n" );
+    }
 
     TRACE( "allocated object %p of type %u\n", object, object->type );
+
+    /* For simple callbacks we have to run tp_object_submit before adding this object
+     * to the cleanup group. As soon as the cleanup group members are released ->shutdown
+     * will be set, and tp_object_submit would fail with an assertion. */
+
+    if (is_simple_callback)
+        tp_object_submit( object );
+
+    if (object->group)
+    {
+        struct threadpool_group *group = object->group;
+        interlocked_inc( &group->refcount );
+
+        RtlEnterCriticalSection( &group->cs );
+        list_add_tail( &group->members, &object->group_entry );
+        object->is_group_member = TRUE;
+        RtlLeaveCriticalSection( &group->cs );
+    }
+
+    if (is_simple_callback)
+    {
+        tp_object_shutdown( object );
+        tp_object_release( object );
+    }
 }
 
 /***********************************************************************
@@ -1331,6 +1451,45 @@ static void tp_object_submit( struct threadpool_object *object )
 }
 
 /***********************************************************************
+ *           tp_object_cancel    (internal)
+ *
+ * Cancels all currently pending callbacks for a specific object.
+ */
+static void tp_object_cancel( struct threadpool_object *object )
+{
+    struct threadpool *pool = object->pool;
+    LONG pending_callbacks = 0;
+
+    RtlEnterCriticalSection( &pool->cs );
+    if (object->num_pending_callbacks)
+    {
+        pending_callbacks = object->num_pending_callbacks;
+        object->num_pending_callbacks = 0;
+        list_remove( &object->pool_entry );
+    }
+    RtlLeaveCriticalSection( &pool->cs );
+
+    while (pending_callbacks--)
+        tp_object_release( object );
+}
+
+/***********************************************************************
+ *           tp_object_wait    (internal)
+ *
+ * Waits until all pending and running callbacks of a specific object
+ * have been processed.
+ */
+static void tp_object_wait( struct threadpool_object *object )
+{
+    struct threadpool *pool = object->pool;
+
+    RtlEnterCriticalSection( &pool->cs );
+    while (object->num_pending_callbacks || object->num_running_callbacks)
+        RtlSleepConditionVariableCS( &object->finished_event, &pool->cs, NULL );
+    RtlLeaveCriticalSection( &pool->cs );
+}
+
+/***********************************************************************
  *           tp_object_shutdown    (internal)
  *
  * Marks a threadpool object for shutdown (which means that no further
@@ -1357,6 +1516,22 @@ static BOOL tp_object_release( struct threadpool_object *object )
     assert( !object->num_pending_callbacks );
     assert( !object->num_running_callbacks );
 
+    /* release reference to the group */
+    if (object->group)
+    {
+        struct threadpool_group *group = object->group;
+
+        RtlEnterCriticalSection( &group->cs );
+        if (object->is_group_member)
+        {
+            list_remove( &object->group_entry );
+            object->is_group_member = FALSE;
+        }
+        RtlLeaveCriticalSection( &group->cs );
+
+        tp_group_release( group );
+    }
+
     tp_threadpool_unlock( object->pool );
 
     RtlFreeHeap( GetProcessHeap(), 0, object );
@@ -1412,6 +1587,8 @@ static void CALLBACK threadpool_worker_proc( void *param )
             RtlEnterCriticalSection( &pool->cs );
             pool->num_busy_workers--;
             object->num_running_callbacks--;
+            if (!object->num_pending_callbacks && !object->num_running_callbacks)
+                RtlWakeAllConditionVariable( &object->finished_event );
             tp_object_release( object );
         }
 
@@ -1439,6 +1616,17 @@ static void CALLBACK threadpool_worker_proc( void *param )
     tp_threadpool_release( pool );
 }
 
+
+/***********************************************************************
+ *           TpAllocCleanupGroup    (NTDLL.@)
+ */
+NTSTATUS WINAPI TpAllocCleanupGroup( TP_CLEANUP_GROUP **out )
+{
+    TRACE( "%p\n", out );
+
+    return tp_group_alloc( (struct threadpool_group **)out );
+}
+
 /***********************************************************************
  *           TpAllocPool    (NTDLL.@)
  */
@@ -1453,6 +1641,80 @@ NTSTATUS WINAPI TpAllocPool( TP_POOL **out, PVOID reserved )
 }
 
 /***********************************************************************
+ *           TpReleaseCleanupGroup    (NTDLL.@)
+ */
+VOID WINAPI TpReleaseCleanupGroup( TP_CLEANUP_GROUP *group )
+{
+    struct threadpool_group *this = impl_from_TP_CLEANUP_GROUP( group );
+
+    TRACE( "%p\n", group );
+
+    tp_group_shutdown( this );
+    tp_group_release( this );
+}
+
+/***********************************************************************
+ *           TpReleaseCleanupGroupMembers    (NTDLL.@)
+ */
+VOID WINAPI TpReleaseCleanupGroupMembers( TP_CLEANUP_GROUP *group, BOOL cancel_pending, PVOID userdata )
+{
+    struct threadpool_group *this = impl_from_TP_CLEANUP_GROUP( group );
+    struct threadpool_object *object, *next;
+    struct list members;
+
+    TRACE( "%p %u %p\n", group, cancel_pending, userdata );
+
+    RtlEnterCriticalSection( &this->cs );
+
+    /* Unset group, increase references, and mark objects for shutdown */
+    LIST_FOR_EACH_ENTRY_SAFE( object, next, &this->members, struct threadpool_object, group_entry )
+    {
+        assert( object->group == this );
+        assert( object->is_group_member );
+
+        /* Simple callbacks are very special. The user doesn't hold any reference, so
+         * they would be released too early. Add one additional temporary reference. */
+        if (object->type == TP_OBJECT_TYPE_SIMPLE)
+        {
+            if (interlocked_inc( &object->refcount ) == 1)
+            {
+                /* Object is basically already destroyed, but group reference
+                 * was not deleted yet. We can safely ignore this object. */
+                interlocked_dec( &object->refcount );
+                list_remove( &object->group_entry );
+                object->is_group_member = FALSE;
+                continue;
+            }
+        }
+
+        object->is_group_member = FALSE;
+        tp_object_shutdown( object );
+    }
+
+    /* Move members to a new temporary list */
+    list_init( &members );
+    list_move_tail( &members, &this->members );
+
+    RtlLeaveCriticalSection( &this->cs );
+
+    /* Cancel pending callbacks if requested */
+    if (cancel_pending)
+    {
+        LIST_FOR_EACH_ENTRY( object, &members, struct threadpool_object, group_entry )
+        {
+            tp_object_cancel( object );
+        }
+    }
+
+    /* Wait for remaining callbacks to finish */
+    LIST_FOR_EACH_ENTRY_SAFE( object, next, &members, struct threadpool_object, group_entry )
+    {
+        tp_object_wait( object );
+        tp_object_release( object );
+    }
+}
+
+/***********************************************************************
  *           TpReleasePool    (NTDLL.@)
  */
 VOID WINAPI TpReleasePool( TP_POOL *pool )
@@ -1542,9 +1804,5 @@ NTSTATUS WINAPI TpSimpleTryPost( PTP_SIMPLE_CALLBACK callback, PVOID userdata,
     object->u.simple.callback = callback;
     tp_object_initialize( object, pool, userdata, environment );
 
-    tp_object_submit( object );
-
-    tp_object_shutdown( object );
-    tp_object_release( object );
     return STATUS_SUCCESS;
 }
-- 
2.4.4



More information about the wine-patches mailing list