[PATCH 5/7] ntdll: Introduce new tp_object_execute helper.
Rémi Bernon
rbernon at codeweavers.com
Wed Dec 2 04:01:14 CST 2020
To execute a threadpool_object callbacks.
Signed-off-by: Rémi Bernon <rbernon at codeweavers.com>
---
dlls/ntdll/threadpool.c | 303 +++++++++++++++++++++-------------------
1 file changed, 160 insertions(+), 143 deletions(-)
diff --git a/dlls/ntdll/threadpool.c b/dlls/ntdll/threadpool.c
index bef9a11905e..e9ca32a3be7 100644
--- a/dlls/ntdll/threadpool.c
+++ b/dlls/ntdll/threadpool.c
@@ -372,6 +372,7 @@ static inline struct threadpool_instance *impl_from_TP_CALLBACK_INSTANCE( TP_CAL
static void CALLBACK threadpool_worker_proc( void *param );
static void tp_object_submit( struct threadpool_object *object, BOOL signaled );
+static void tp_object_execute( struct threadpool_object *object );
static void tp_object_prepare_shutdown( struct threadpool_object *object );
static BOOL tp_object_release( struct threadpool_object *object );
static struct threadpool *default_threadpool = NULL;
@@ -2095,18 +2096,171 @@ static struct list *threadpool_get_next_item( const struct threadpool *pool )
}
/***********************************************************************
- * threadpool_worker_proc (internal)
+ * tp_object_execute (internal)
+ *
+ * Executes a threadpool object callback, object->pool->cs has to be
+ * held.
*/
-static void CALLBACK threadpool_worker_proc( void *param )
+static void tp_object_execute( struct threadpool_object *object )
{
TP_CALLBACK_INSTANCE *callback_instance;
struct threadpool_instance instance;
struct io_completion completion;
- struct threadpool *pool = param;
+ struct threadpool *pool = object->pool;
TP_WAIT_RESULT wait_result = 0;
+ NTSTATUS status;
+
+ object->num_pending_callbacks--;
+
+ /* For wait objects check if they were signaled or have timed out. */
+ if (object->type == TP_OBJECT_TYPE_WAIT)
+ {
+ wait_result = object->u.wait.signaled ? WAIT_OBJECT_0 : WAIT_TIMEOUT;
+ if (wait_result == WAIT_OBJECT_0) object->u.wait.signaled--;
+ }
+ else if (object->type == TP_OBJECT_TYPE_IO)
+ {
+ assert( object->u.io.completion_count );
+ completion = object->u.io.completions[--object->u.io.completion_count];
+ object->u.io.pending_count--;
+ }
+
+ /* Leave critical section and do the actual callback. */
+ object->num_associated_callbacks++;
+ object->num_running_callbacks++;
+ RtlLeaveCriticalSection( &pool->cs );
+
+ /* Initialize threadpool instance struct. */
+ callback_instance = (TP_CALLBACK_INSTANCE *)&instance;
+ instance.object = object;
+ instance.threadid = GetCurrentThreadId();
+ instance.associated = TRUE;
+ instance.may_run_long = object->may_run_long;
+ instance.cleanup.critical_section = NULL;
+ instance.cleanup.mutex = NULL;
+ instance.cleanup.semaphore = NULL;
+ instance.cleanup.semaphore_count = 0;
+ instance.cleanup.event = NULL;
+ instance.cleanup.library = NULL;
+
+ switch (object->type)
+ {
+ case TP_OBJECT_TYPE_SIMPLE:
+ {
+ TRACE( "executing simple callback %p(%p, %p)\n",
+ object->u.simple.callback, callback_instance, object->userdata );
+ object->u.simple.callback( callback_instance, object->userdata );
+ TRACE( "callback %p returned\n", object->u.simple.callback );
+ break;
+ }
+
+ case TP_OBJECT_TYPE_WORK:
+ {
+ TRACE( "executing work callback %p(%p, %p, %p)\n",
+ object->u.work.callback, callback_instance, object->userdata, object );
+ object->u.work.callback( callback_instance, object->userdata, (TP_WORK *)object );
+ TRACE( "callback %p returned\n", object->u.work.callback );
+ break;
+ }
+
+ case TP_OBJECT_TYPE_TIMER:
+ {
+ TRACE( "executing timer callback %p(%p, %p, %p)\n",
+ object->u.timer.callback, callback_instance, object->userdata, object );
+ object->u.timer.callback( callback_instance, object->userdata, (TP_TIMER *)object );
+ TRACE( "callback %p returned\n", object->u.timer.callback );
+ break;
+ }
+
+ case TP_OBJECT_TYPE_WAIT:
+ {
+ TRACE( "executing wait callback %p(%p, %p, %p, %u)\n",
+ object->u.wait.callback, callback_instance, object->userdata, object, wait_result );
+ object->u.wait.callback( callback_instance, object->userdata, (TP_WAIT *)object, wait_result );
+ TRACE( "callback %p returned\n", object->u.wait.callback );
+ break;
+ }
+
+ case TP_OBJECT_TYPE_IO:
+ {
+ TRACE( "executing I/O callback %p(%p, %p, %#lx, %p, %p)\n",
+ object->u.io.callback, callback_instance, object->userdata,
+ completion.cvalue, &completion.iosb, (TP_IO *)object );
+ object->u.io.callback( callback_instance, object->userdata,
+ (void *)completion.cvalue, &completion.iosb, (TP_IO *)object );
+ TRACE( "callback %p returned\n", object->u.io.callback );
+ break;
+ }
+
+ default:
+ assert(0);
+ break;
+ }
+
+ /* Execute finalization callback. */
+ if (object->finalization_callback)
+ {
+ TRACE( "executing finalization callback %p(%p, %p)\n",
+ object->finalization_callback, callback_instance, object->userdata );
+ object->finalization_callback( callback_instance, object->userdata );
+ TRACE( "callback %p returned\n", object->finalization_callback );
+ }
+
+ /* Execute cleanup tasks. */
+ if (instance.cleanup.critical_section)
+ {
+ RtlLeaveCriticalSection( instance.cleanup.critical_section );
+ }
+ if (instance.cleanup.mutex)
+ {
+ status = NtReleaseMutant( instance.cleanup.mutex, NULL );
+ if (status != STATUS_SUCCESS) goto skip_cleanup;
+ }
+ if (instance.cleanup.semaphore)
+ {
+ status = NtReleaseSemaphore( instance.cleanup.semaphore, instance.cleanup.semaphore_count, NULL );
+ if (status != STATUS_SUCCESS) goto skip_cleanup;
+ }
+ if (instance.cleanup.event)
+ {
+ status = NtSetEvent( instance.cleanup.event, NULL );
+ if (status != STATUS_SUCCESS) goto skip_cleanup;
+ }
+ if (instance.cleanup.library)
+ {
+ LdrUnloadDll( instance.cleanup.library );
+ }
+
+skip_cleanup:
+ RtlEnterCriticalSection( &pool->cs );
+
+ /* Simple callbacks are automatically shutdown after execution. */
+ if (object->type == TP_OBJECT_TYPE_SIMPLE)
+ {
+ tp_object_prepare_shutdown( object );
+ object->shutdown = TRUE;
+ }
+
+ object->num_running_callbacks--;
+ if (object_is_finished( object, TRUE ))
+ RtlWakeAllConditionVariable( &object->group_finished_event );
+
+ if (instance.associated)
+ {
+ object->num_associated_callbacks--;
+ if (object_is_finished( object, FALSE ))
+ RtlWakeAllConditionVariable( &object->finished_event );
+ }
+}
+
+/***********************************************************************
+ * threadpool_worker_proc (internal)
+ */
+static void CALLBACK threadpool_worker_proc( void *param )
+{
+ struct threadpool *pool = param;
LARGE_INTEGER timeout;
struct list *ptr;
- NTSTATUS status;
TRACE( "starting worker thread for pool %p\n", pool );
@@ -2121,151 +2275,14 @@ static void CALLBACK threadpool_worker_proc( void *param )
/* If further pending callbacks are queued, move the work item to
* the end of the pool list. Otherwise remove it from the pool. */
list_remove( &object->pool_entry );
- if (--object->num_pending_callbacks)
+ if (object->num_pending_callbacks > 1)
tp_object_prio_queue( object );
- /* For wait objects check if they were signaled or have timed out. */
- if (object->type == TP_OBJECT_TYPE_WAIT)
- {
- wait_result = object->u.wait.signaled ? WAIT_OBJECT_0 : WAIT_TIMEOUT;
- if (wait_result == WAIT_OBJECT_0) object->u.wait.signaled--;
- }
- else if (object->type == TP_OBJECT_TYPE_IO)
- {
- assert( object->u.io.completion_count );
- completion = object->u.io.completions[--object->u.io.completion_count];
- object->u.io.pending_count--;
- }
+ tp_object_execute( object );
- /* Leave critical section and do the actual callback. */
- object->num_associated_callbacks++;
- object->num_running_callbacks++;
- RtlLeaveCriticalSection( &pool->cs );
-
- /* Initialize threadpool instance struct. */
- callback_instance = (TP_CALLBACK_INSTANCE *)&instance;
- instance.object = object;
- instance.threadid = GetCurrentThreadId();
- instance.associated = TRUE;
- instance.may_run_long = object->may_run_long;
- instance.cleanup.critical_section = NULL;
- instance.cleanup.mutex = NULL;
- instance.cleanup.semaphore = NULL;
- instance.cleanup.semaphore_count = 0;
- instance.cleanup.event = NULL;
- instance.cleanup.library = NULL;
-
- switch (object->type)
- {
- case TP_OBJECT_TYPE_SIMPLE:
- {
- TRACE( "executing simple callback %p(%p, %p)\n",
- object->u.simple.callback, callback_instance, object->userdata );
- object->u.simple.callback( callback_instance, object->userdata );
- TRACE( "callback %p returned\n", object->u.simple.callback );
- break;
- }
-
- case TP_OBJECT_TYPE_WORK:
- {
- TRACE( "executing work callback %p(%p, %p, %p)\n",
- object->u.work.callback, callback_instance, object->userdata, object );
- object->u.work.callback( callback_instance, object->userdata, (TP_WORK *)object );
- TRACE( "callback %p returned\n", object->u.work.callback );
- break;
- }
-
- case TP_OBJECT_TYPE_TIMER:
- {
- TRACE( "executing timer callback %p(%p, %p, %p)\n",
- object->u.timer.callback, callback_instance, object->userdata, object );
- object->u.timer.callback( callback_instance, object->userdata, (TP_TIMER *)object );
- TRACE( "callback %p returned\n", object->u.timer.callback );
- break;
- }
-
- case TP_OBJECT_TYPE_WAIT:
- {
- TRACE( "executing wait callback %p(%p, %p, %p, %u)\n",
- object->u.wait.callback, callback_instance, object->userdata, object, wait_result );
- object->u.wait.callback( callback_instance, object->userdata, (TP_WAIT *)object, wait_result );
- TRACE( "callback %p returned\n", object->u.wait.callback );
- break;
- }
-
- case TP_OBJECT_TYPE_IO:
- {
- TRACE( "executing I/O callback %p(%p, %p, %#lx, %p, %p)\n",
- object->u.io.callback, callback_instance, object->userdata,
- completion.cvalue, &completion.iosb, (TP_IO *)object );
- object->u.io.callback( callback_instance, object->userdata,
- (void *)completion.cvalue, &completion.iosb, (TP_IO *)object );
- TRACE( "callback %p returned\n", object->u.io.callback );
- break;
- }
-
- default:
- assert(0);
- break;
- }
-
- /* Execute finalization callback. */
- if (object->finalization_callback)
- {
- TRACE( "executing finalization callback %p(%p, %p)\n",
- object->finalization_callback, callback_instance, object->userdata );
- object->finalization_callback( callback_instance, object->userdata );
- TRACE( "callback %p returned\n", object->finalization_callback );
- }
-
- /* Execute cleanup tasks. */
- if (instance.cleanup.critical_section)
- {
- RtlLeaveCriticalSection( instance.cleanup.critical_section );
- }
- if (instance.cleanup.mutex)
- {
- status = NtReleaseMutant( instance.cleanup.mutex, NULL );
- if (status != STATUS_SUCCESS) goto skip_cleanup;
- }
- if (instance.cleanup.semaphore)
- {
- status = NtReleaseSemaphore( instance.cleanup.semaphore, instance.cleanup.semaphore_count, NULL );
- if (status != STATUS_SUCCESS) goto skip_cleanup;
- }
- if (instance.cleanup.event)
- {
- status = NtSetEvent( instance.cleanup.event, NULL );
- if (status != STATUS_SUCCESS) goto skip_cleanup;
- }
- if (instance.cleanup.library)
- {
- LdrUnloadDll( instance.cleanup.library );
- }
-
- skip_cleanup:
- RtlEnterCriticalSection( &pool->cs );
assert(pool->num_busy_workers);
pool->num_busy_workers--;
- /* Simple callbacks are automatically shutdown after execution. */
- if (object->type == TP_OBJECT_TYPE_SIMPLE)
- {
- tp_object_prepare_shutdown( object );
- object->shutdown = TRUE;
- }
-
- object->num_running_callbacks--;
- if (object_is_finished( object, TRUE ))
- RtlWakeAllConditionVariable( &object->group_finished_event );
-
- if (instance.associated)
- {
- object->num_associated_callbacks--;
- if (object_is_finished( object, FALSE ))
- RtlWakeAllConditionVariable( &object->finished_event );
- }
-
tp_object_release( object );
}
--
2.29.2
More information about the wine-devel
mailing list