[4/8] ntdll: Implement threadpool cleanup group functions.
Sebastian Lackner
sebastian at fds-team.de
Tue Jun 30 19:57:15 CDT 2015
---
dlls/ntdll/ntdll.spec | 3
dlls/ntdll/threadpool.c | 268 +++++++++++++++++++++++++++++++++++++++++++++++-
2 files changed, 266 insertions(+), 5 deletions(-)
diff --git a/dlls/ntdll/ntdll.spec b/dlls/ntdll/ntdll.spec
index 94194d6..f889b04 100644
--- a/dlls/ntdll/ntdll.spec
+++ b/dlls/ntdll/ntdll.spec
@@ -970,7 +970,10 @@
@ stdcall RtlxOemStringToUnicodeSize(ptr) RtlOemStringToUnicodeSize
@ stdcall RtlxUnicodeStringToAnsiSize(ptr) RtlUnicodeStringToAnsiSize
@ stdcall RtlxUnicodeStringToOemSize(ptr) RtlUnicodeStringToOemSize
+@ stdcall TpAllocCleanupGroup(ptr)
@ stdcall TpAllocPool(ptr ptr)
+@ stdcall TpReleaseCleanupGroup(ptr)
+@ stdcall TpReleaseCleanupGroupMembers(ptr long ptr)
@ stdcall TpReleasePool(ptr)
@ stdcall TpSetPoolMaxThreads(ptr long)
@ stdcall TpSetPoolMinThreads(ptr long)
diff --git a/dlls/ntdll/threadpool.c b/dlls/ntdll/threadpool.c
index 2bb1cdb..daf1577 100644
--- a/dlls/ntdll/threadpool.c
+++ b/dlls/ntdll/threadpool.c
@@ -168,9 +168,14 @@ struct threadpool_object
/* read-only information */
enum threadpool_objtype type;
struct threadpool *pool;
+ struct threadpool_group *group;
PVOID userdata;
+ /* information about the group, locked via .group->cs */
+ struct list group_entry;
+ BOOL is_group_member;
/* information about the pool, locked via .pool->cs */
struct list pool_entry;
+ RTL_CONDITION_VARIABLE finished_event;
LONG num_pending_callbacks;
LONG num_running_callbacks;
/* arguments for callback */
@@ -183,12 +188,30 @@ struct threadpool_object
} u;
};
+/* internal threadpool group representation */
+struct threadpool_group
+{
+ LONG refcount;
+ BOOL shutdown;
+ CRITICAL_SECTION cs;
+ /* list of group members, locked via .cs */
+ struct list members;
+};
+
static inline struct threadpool *impl_from_TP_POOL( TP_POOL *pool )
{
return (struct threadpool *)pool;
}
+static inline struct threadpool_group *impl_from_TP_CLEANUP_GROUP( TP_CLEANUP_GROUP *group )
+{
+ return (struct threadpool_group *)group;
+}
+
static void CALLBACK threadpool_worker_proc( void *param );
+static void tp_object_submit( struct threadpool_object *object );
+static void tp_object_shutdown( struct threadpool_object *object );
+static BOOL tp_object_release( struct threadpool_object *object );
static struct threadpool *default_threadpool = NULL;
static inline LONG interlocked_inc( PLONG dest )
@@ -1261,6 +1284,65 @@ static void tp_threadpool_unlock( struct threadpool *pool )
}
/***********************************************************************
+ * tp_group_alloc (internal)
+ *
+ * Allocates a new threadpool group object.
+ */
+static NTSTATUS tp_group_alloc( struct threadpool_group **out )
+{
+ struct threadpool_group *group;
+
+ group = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*group) );
+ if (!group)
+ return STATUS_NO_MEMORY;
+
+ group->refcount = 1;
+ group->shutdown = FALSE;
+
+ RtlInitializeCriticalSection( &group->cs );
+ group->cs.DebugInfo->Spare[0] = (DWORD_PTR)(__FILE__ ": threadpool_group.cs");
+
+ list_init( &group->members );
+
+ TRACE( "allocated group %p\n", group );
+
+ *out = group;
+ return STATUS_SUCCESS;
+}
+
+/***********************************************************************
+ * tp_group_shutdown (internal)
+ *
+ * Marks the group object for shutdown.
+ */
+static void tp_group_shutdown( struct threadpool_group *group )
+{
+ group->shutdown = TRUE;
+}
+
+/***********************************************************************
+ * tp_group_release (internal)
+ *
+ * Releases a reference to a group object.
+ */
+static BOOL tp_group_release( struct threadpool_group *group )
+{
+ if (interlocked_dec( &group->refcount ))
+ return FALSE;
+
+ TRACE( "destroying group %p\n", group );
+
+ assert( group->shutdown );
+ assert( list_empty( &group->members ) );
+
+ group->cs.DebugInfo->Spare[0] = 0;
+ RtlDeleteCriticalSection( &group->cs );
+
+ RtlFreeHeap( GetProcessHeap(), 0, group );
+ return TRUE;
+}
+
+/***********************************************************************
* tp_object_initialize (internal)
*
* Initializes members of a threadpool object.
@@ -1268,20 +1350,58 @@ static void tp_threadpool_unlock( struct threadpool *pool )
static void tp_object_initialize( struct threadpool_object *object, struct threadpool *pool,
PVOID userdata, TP_CALLBACK_ENVIRON *environment )
{
+ BOOL is_simple_callback = (object->type == TP_OBJECT_TYPE_SIMPLE);
+
object->refcount = 1;
object->shutdown = FALSE;
object->pool = pool;
+ object->group = NULL;
object->userdata = userdata;
+ memset( &object->group_entry, 0, sizeof(object->group_entry) );
+ object->is_group_member = FALSE;
+
memset( &object->pool_entry, 0, sizeof(object->pool_entry) );
+ RtlInitializeConditionVariable( &object->finished_event );
object->num_pending_callbacks = 0;
object->num_running_callbacks = 0;
if (environment)
- FIXME( "environment not implemented yet\n" );
+ {
+ if (environment->Version != 1)
+ FIXME( "unsupported environment version %u\n", environment->Version );
+
+ object->group = impl_from_TP_CLEANUP_GROUP( environment->CleanupGroup );
+
+ WARN( "environment not fully implemented yet\n" );
+ }
TRACE( "allocated object %p of type %u\n", object, object->type );
+
+ /* For simple callbacks we have to run tp_object_submit before adding this object
+ * to the cleanup group. As soon as the cleanup group members are released ->shutdown
+ * will be set, and tp_object_submit would fail with an assertion. */
+
+ if (is_simple_callback)
+ tp_object_submit( object );
+
+ if (object->group)
+ {
+ struct threadpool_group *group = object->group;
+ interlocked_inc( &group->refcount );
+
+ RtlEnterCriticalSection( &group->cs );
+ list_add_tail( &group->members, &object->group_entry );
+ object->is_group_member = TRUE;
+ RtlLeaveCriticalSection( &group->cs );
+ }
+
+ if (is_simple_callback)
+ {
+ tp_object_shutdown( object );
+ tp_object_release( object );
+ }
}
/***********************************************************************
@@ -1331,6 +1451,45 @@ static void tp_object_submit( struct threadpool_object *object )
}
/***********************************************************************
+ * tp_object_cancel (internal)
+ *
+ * Cancels all currently pending callbacks for a specific object.
+ */
+static void tp_object_cancel( struct threadpool_object *object )
+{
+ struct threadpool *pool = object->pool;
+ LONG pending_callbacks = 0;
+
+ RtlEnterCriticalSection( &pool->cs );
+ if (object->num_pending_callbacks)
+ {
+ pending_callbacks = object->num_pending_callbacks;
+ object->num_pending_callbacks = 0;
+ list_remove( &object->pool_entry );
+ }
+ RtlLeaveCriticalSection( &pool->cs );
+
+ while (pending_callbacks--)
+ tp_object_release( object );
+}
+
+/***********************************************************************
+ * tp_object_wait (internal)
+ *
+ * Waits until all pending and running callbacks of a specific object
+ * have been processed.
+ */
+static void tp_object_wait( struct threadpool_object *object )
+{
+ struct threadpool *pool = object->pool;
+
+ RtlEnterCriticalSection( &pool->cs );
+ while (object->num_pending_callbacks || object->num_running_callbacks)
+ RtlSleepConditionVariableCS( &object->finished_event, &pool->cs, NULL );
+ RtlLeaveCriticalSection( &pool->cs );
+}
+
+/***********************************************************************
* tp_object_shutdown (internal)
*
* Marks a threadpool object for shutdown (which means that no further
@@ -1357,6 +1516,22 @@ static BOOL tp_object_release( struct threadpool_object *object )
assert( !object->num_pending_callbacks );
assert( !object->num_running_callbacks );
+ /* release reference to the group */
+ if (object->group)
+ {
+ struct threadpool_group *group = object->group;
+
+ RtlEnterCriticalSection( &group->cs );
+ if (object->is_group_member)
+ {
+ list_remove( &object->group_entry );
+ object->is_group_member = FALSE;
+ }
+ RtlLeaveCriticalSection( &group->cs );
+
+ tp_group_release( group );
+ }
+
tp_threadpool_unlock( object->pool );
RtlFreeHeap( GetProcessHeap(), 0, object );
@@ -1412,6 +1587,8 @@ static void CALLBACK threadpool_worker_proc( void *param )
RtlEnterCriticalSection( &pool->cs );
pool->num_busy_workers--;
object->num_running_callbacks--;
+ if (!object->num_pending_callbacks && !object->num_running_callbacks)
+ RtlWakeAllConditionVariable( &object->finished_event );
tp_object_release( object );
}
@@ -1439,6 +1616,17 @@ static void CALLBACK threadpool_worker_proc( void *param )
tp_threadpool_release( pool );
}
+
+/***********************************************************************
+ * TpAllocCleanupGroup (NTDLL.@)
+ */
+NTSTATUS WINAPI TpAllocCleanupGroup( TP_CLEANUP_GROUP **out )
+{
+ TRACE( "%p\n", out );
+
+ return tp_group_alloc( (struct threadpool_group **)out );
+}
+
/***********************************************************************
* TpAllocPool (NTDLL.@)
*/
@@ -1453,6 +1641,80 @@ NTSTATUS WINAPI TpAllocPool( TP_POOL **out, PVOID reserved )
}
/***********************************************************************
+ * TpReleaseCleanupGroup (NTDLL.@)
+ */
+VOID WINAPI TpReleaseCleanupGroup( TP_CLEANUP_GROUP *group )
+{
+ struct threadpool_group *this = impl_from_TP_CLEANUP_GROUP( group );
+
+ TRACE( "%p\n", group );
+
+ tp_group_shutdown( this );
+ tp_group_release( this );
+}
+
+/***********************************************************************
+ * TpReleaseCleanupGroupMembers (NTDLL.@)
+ */
+VOID WINAPI TpReleaseCleanupGroupMembers( TP_CLEANUP_GROUP *group, BOOL cancel_pending, PVOID userdata )
+{
+ struct threadpool_group *this = impl_from_TP_CLEANUP_GROUP( group );
+ struct threadpool_object *object, *next;
+ struct list members;
+
+ TRACE( "%p %u %p\n", group, cancel_pending, userdata );
+
+ RtlEnterCriticalSection( &this->cs );
+
+ /* Unset group, increase references, and mark objects for shutdown */
+ LIST_FOR_EACH_ENTRY_SAFE( object, next, &this->members, struct threadpool_object, group_entry )
+ {
+ assert( object->group == this );
+ assert( object->is_group_member );
+
+ /* Simple callbacks are very special. The user doesn't hold any reference, so
+ * they would be released too early. Add one additional temporary reference. */
+ if (object->type == TP_OBJECT_TYPE_SIMPLE)
+ {
+ if (interlocked_inc( &object->refcount ) == 1)
+ {
+ /* Object is basically already destroyed, but group reference
+ * was not deleted yet. We can safely ignore this object. */
+ interlocked_dec( &object->refcount );
+ list_remove( &object->group_entry );
+ object->is_group_member = FALSE;
+ continue;
+ }
+ }
+
+ object->is_group_member = FALSE;
+ tp_object_shutdown( object );
+ }
+
+ /* Move members to a new temporary list */
+ list_init( &members );
+ list_move_tail( &members, &this->members );
+
+ RtlLeaveCriticalSection( &this->cs );
+
+ /* Cancel pending callbacks if requested */
+ if (cancel_pending)
+ {
+ LIST_FOR_EACH_ENTRY( object, &members, struct threadpool_object, group_entry )
+ {
+ tp_object_cancel( object );
+ }
+ }
+
+ /* Wait for remaining callbacks to finish */
+ LIST_FOR_EACH_ENTRY_SAFE( object, next, &members, struct threadpool_object, group_entry )
+ {
+ tp_object_wait( object );
+ tp_object_release( object );
+ }
+}
+
+/***********************************************************************
* TpReleasePool (NTDLL.@)
*/
VOID WINAPI TpReleasePool( TP_POOL *pool )
@@ -1542,9 +1804,5 @@ NTSTATUS WINAPI TpSimpleTryPost( PTP_SIMPLE_CALLBACK callback, PVOID userdata,
object->u.simple.callback = callback;
tp_object_initialize( object, pool, userdata, environment );
- tp_object_submit( object );
-
- tp_object_shutdown( object );
- tp_object_release( object );
return STATUS_SUCCESS;
}
--
2.4.4
More information about the wine-patches
mailing list