[4/12] ntdll: Implement threadpool timer queues.
Sebastian Lackner
sebastian at fds-team.de
Thu Jul 2 18:58:29 CDT 2015
---
dlls/ntdll/ntdll.spec | 3
dlls/ntdll/threadpool.c | 321 +++++++++++++++++++++++++++++++++++++++++++++++-
2 files changed, 321 insertions(+), 3 deletions(-)
diff --git a/dlls/ntdll/ntdll.spec b/dlls/ntdll/ntdll.spec
index 10d3652..ce9d1bb 100644
--- a/dlls/ntdll/ntdll.spec
+++ b/dlls/ntdll/ntdll.spec
@@ -981,6 +981,7 @@
@ stdcall TpCallbackSetEventOnCompletion(ptr long)
@ stdcall TpCallbackUnloadDllOnCompletion(ptr ptr)
@ stdcall TpDisassociateCallback(ptr)
+@ stdcall TpIsTimerSet(ptr)
@ stdcall TpPostWork(ptr)
@ stdcall TpReleaseCleanupGroup(ptr)
@ stdcall TpReleaseCleanupGroupMembers(ptr long ptr)
@@ -989,7 +990,9 @@
@ stdcall TpReleaseWork(ptr)
@ stdcall TpSetPoolMaxThreads(ptr long)
@ stdcall TpSetPoolMinThreads(ptr long)
+@ stdcall TpSetTimer(ptr ptr long long)
@ stdcall TpSimpleTryPost(ptr ptr ptr)
+@ stdcall TpWaitForTimer(ptr long)
@ stdcall TpWaitForWork(ptr long)
@ stdcall -ret64 VerSetConditionMask(int64 long long)
@ stdcall WinSqmIsOptedIn()
diff --git a/dlls/ntdll/threadpool.c b/dlls/ntdll/threadpool.c
index cf714a6..ef502ba 100644
--- a/dlls/ntdll/threadpool.c
+++ b/dlls/ntdll/threadpool.c
@@ -77,14 +77,14 @@ static RTL_CRITICAL_SECTION_DEBUG critsect_debug =
{
0, 0, &old_threadpool.threadpool_cs,
{ &critsect_debug.ProcessLocksList, &critsect_debug.ProcessLocksList },
- 0, 0, { (DWORD_PTR)(__FILE__ ": threadpool_cs") }
+ 0, 0, { (DWORD_PTR)(__FILE__ ": threadpool_cs") }
};
static RTL_CRITICAL_SECTION_DEBUG critsect_compl_debug =
{
0, 0, &old_threadpool.threadpool_compl_cs,
{ &critsect_compl_debug.ProcessLocksList, &critsect_compl_debug.ProcessLocksList },
- 0, 0, { (DWORD_PTR)(__FILE__ ": threadpool_compl_cs") }
+ 0, 0, { (DWORD_PTR)(__FILE__ ": threadpool_compl_cs") }
};
struct work_item
@@ -200,6 +200,14 @@ struct threadpool_object
struct
{
PTP_TIMER_CALLBACK callback;
+ /* information about the timer, locked via timerqueue.cs */
+ BOOL timer_initialized;
+ BOOL timer_pending;
+ struct list timer_entry;
+ BOOL timer_set;
+ ULONGLONG timeout;
+ LONG period;
+ LONG window_length;
} timer;
} u;
};
@@ -232,6 +240,33 @@ struct threadpool_group
struct list members;
};
+/* global timerqueue object */
+static RTL_CRITICAL_SECTION_DEBUG timerqueue_debug;
+
+static struct
+{
+ CRITICAL_SECTION cs;
+ LONG objcount;
+ BOOL thread_running;
+ struct list pending_timers;
+ RTL_CONDITION_VARIABLE update_event;
+}
+timerqueue =
+{
+ { &timerqueue_debug, -1, 0, 0, 0, 0 }, /* cs */
+ 0, /* objcount */
+ FALSE, /* thread_running */
+ LIST_INIT( timerqueue.pending_timers ), /* pending_timers */
+ RTL_CONDITION_VARIABLE_INIT /* update_event */
+};
+
+static RTL_CRITICAL_SECTION_DEBUG timerqueue_debug =
+{
+ 0, 0, &timerqueue.cs,
+ { &timerqueue_debug.ProcessLocksList, &timerqueue_debug.ProcessLocksList },
+ 0, 0, { (DWORD_PTR)(__FILE__ ": timerqueue.cs") }
+};
+
static inline struct threadpool *impl_from_TP_POOL( TP_POOL *pool )
{
return (struct threadpool *)pool;
@@ -1186,6 +1221,171 @@ NTSTATUS WINAPI RtlDeleteTimer(HANDLE TimerQueue, HANDLE Timer,
}
/***********************************************************************
+ * timerqueue_thread_proc (internal)
+ */
+static void CALLBACK timerqueue_thread_proc( void *param )
+{
+ ULONGLONG timeout_lower, timeout_upper, new_timeout;
+ struct threadpool_object *other_timer;
+ LARGE_INTEGER now, timeout;
+ struct list *ptr;
+
+ TRACE( "starting timer queue thread\n" );
+
+ RtlEnterCriticalSection( &timerqueue.cs );
+ for (;;)
+ {
+ NtQuerySystemTime( &now );
+
+ /* Check for expired timers. */
+ while ((ptr = list_head( &timerqueue.pending_timers )))
+ {
+ struct threadpool_object *timer = LIST_ENTRY( ptr, struct threadpool_object, u.timer.timer_entry );
+ assert( timer->type == TP_OBJECT_TYPE_TIMER );
+ assert( timer->u.timer.timer_pending );
+ if (timer->u.timer.timeout > now.QuadPart)
+ break;
+
+ /* Queue a new callback in one of the worker threads. */
+ list_remove( &timer->u.timer.timer_entry );
+ timer->u.timer.timer_pending = FALSE;
+ tp_object_submit( timer );
+
+ /* Insert the timer back into the queue, except its marked for shutdown. */
+ if (timer->u.timer.period && !timer->shutdown)
+ {
+ timer->u.timer.timeout += (ULONGLONG)timer->u.timer.period * 10000;
+ if (timer->u.timer.timeout <= now.QuadPart)
+ timer->u.timer.timeout = now.QuadPart + 1;
+
+ LIST_FOR_EACH_ENTRY( other_timer, &timerqueue.pending_timers,
+ struct threadpool_object, u.timer.timer_entry )
+ {
+ assert( other_timer->type == TP_OBJECT_TYPE_TIMER );
+ if (timer->u.timer.timeout < other_timer->u.timer.timeout)
+ break;
+ }
+ list_add_before( &other_timer->u.timer.timer_entry, &timer->u.timer.timer_entry );
+ timer->u.timer.timer_pending = TRUE;
+ }
+ }
+
+ timeout_lower = TIMEOUT_INFINITE;
+ timeout_upper = TIMEOUT_INFINITE;
+
+ /* Determine next timeout and use the window length to optimize wakeup times. */
+ LIST_FOR_EACH_ENTRY( other_timer, &timerqueue.pending_timers,
+ struct threadpool_object, u.timer.timer_entry )
+ {
+ assert( other_timer->type == TP_OBJECT_TYPE_TIMER );
+ if (other_timer->u.timer.timeout >= timeout_upper)
+ break;
+
+ timeout_lower = other_timer->u.timer.timeout;
+ new_timeout = timeout_lower + (ULONGLONG)other_timer->u.timer.window_length * 10000;
+ if (new_timeout < timeout_upper)
+ timeout_upper = new_timeout;
+ }
+
+ /* Wait for timer update events or until the next timer expires. */
+ if (timerqueue.objcount)
+ {
+ timeout.QuadPart = timeout_lower;
+ RtlSleepConditionVariableCS( &timerqueue.update_event, &timerqueue.cs, &timeout );
+ continue;
+ }
+
+ /* All timers have been destroyed, if no new timers are created
+ * within some amount of time, then we can shutdown this thread. */
+ timeout.QuadPart = (ULONGLONG)THREADPOOL_WORKER_TIMEOUT * -10000;
+ if (RtlSleepConditionVariableCS( &timerqueue.update_event, &timerqueue.cs,
+ &timeout ) == STATUS_TIMEOUT && !timerqueue.objcount)
+ {
+ break;
+ }
+ }
+
+ timerqueue.thread_running = FALSE;
+ RtlLeaveCriticalSection( &timerqueue.cs );
+
+ TRACE( "terminating timer queue thread\n" );
+}
+
+/***********************************************************************
+ * tp_timerqueue_lock (internal)
+ *
+ * Acquires a lock on the global timerqueue. When the lock is acquired
+ * successfully, it is guaranteed that the timer thread is running.
+ */
+static NTSTATUS tp_timerqueue_lock( struct threadpool_object *timer )
+{
+ NTSTATUS status = STATUS_SUCCESS;
+ assert( timer->type == TP_OBJECT_TYPE_TIMER );
+
+ timer->u.timer.timer_initialized = FALSE;
+ timer->u.timer.timer_pending = FALSE;
+ timer->u.timer.timer_set = FALSE;
+ timer->u.timer.timeout = 0;
+ timer->u.timer.period = 0;
+ timer->u.timer.window_length = 0;
+
+ RtlEnterCriticalSection( &timerqueue.cs );
+
+ /* Make sure that the timerqueue thread is running. */
+ if (!timerqueue.thread_running)
+ {
+ HANDLE thread;
+ status = RtlCreateUserThread( GetCurrentProcess(), NULL, FALSE, NULL, 0, 0,
+ timerqueue_thread_proc, NULL, &thread, NULL );
+ if (status == STATUS_SUCCESS)
+ {
+ timerqueue.thread_running = TRUE;
+ NtClose( thread );
+ }
+ }
+
+ if (status == STATUS_SUCCESS)
+ {
+ timer->u.timer.timer_initialized = TRUE;
+ timerqueue.objcount++;
+ }
+
+ RtlLeaveCriticalSection( &timerqueue.cs );
+ return status;
+}
+
+/***********************************************************************
+ * tp_timerqueue_unlock (internal)
+ *
+ * Releases a lock on the global timerqueue.
+ */
+static void tp_timerqueue_unlock( struct threadpool_object *timer )
+{
+ assert( timer->type == TP_OBJECT_TYPE_TIMER );
+
+ RtlEnterCriticalSection( &timerqueue.cs );
+ if (timer->u.timer.timer_initialized)
+ {
+ /* If timer was pending, remove it. */
+ if (timer->u.timer.timer_pending)
+ {
+ list_remove( &timer->u.timer.timer_entry );
+ timer->u.timer.timer_pending = FALSE;
+ }
+
+ /* If the last timer object was destroyed, then wake up the thread. */
+ if (!--timerqueue.objcount)
+ {
+ assert( list_empty( &timerqueue.pending_timers ) );
+ RtlWakeAllConditionVariable( &timerqueue.update_event );
+ }
+
+ timer->u.timer.timer_initialized = FALSE;
+ }
+ RtlLeaveCriticalSection( &timerqueue.cs );
+}
+
+/***********************************************************************
* tp_threadpool_alloc (internal)
*
* Allocates a new threadpool object.
@@ -1583,6 +1783,9 @@ static void tp_object_wait( struct threadpool_object *object, BOOL group_wait )
*/
static void tp_object_shutdown( struct threadpool_object *object )
{
+ if (object->type == TP_OBJECT_TYPE_TIMER)
+ tp_timerqueue_unlock( object );
+
object->shutdown = TRUE;
}
@@ -1785,7 +1988,6 @@ static void CALLBACK threadpool_worker_proc( void *param )
tp_threadpool_release( pool );
}
-
/***********************************************************************
* TpAllocCleanupGroup (NTDLL.@)
*/
@@ -1834,6 +2036,15 @@ NTSTATUS WINAPI TpAllocTimer( TP_TIMER **out, PTP_TIMER_CALLBACK callback, PVOID
object->type = TP_OBJECT_TYPE_TIMER;
object->u.timer.callback = callback;
+
+ status = tp_timerqueue_lock( object );
+ if (status)
+ {
+ tp_threadpool_unlock( pool );
+ RtlFreeHeap( GetProcessHeap(), 0, object );
+ return status;
+ }
+
tp_object_initialize( object, pool, userdata, environment );
*out = (TP_TIMER *)object;
@@ -2021,6 +2232,18 @@ VOID WINAPI TpDisassociateCallback( TP_CALLBACK_INSTANCE *instance )
}
/***********************************************************************
+ * TpIsTimerSet (NTDLL.@)
+ */
+BOOL WINAPI TpIsTimerSet( TP_TIMER *timer )
+{
+ struct threadpool_object *this = impl_from_TP_TIMER( timer );
+
+ TRACE( "%p\n", timer );
+
+ return this->u.timer.timer_set;
+}
+
+/***********************************************************************
* TpPostWork (NTDLL.@)
*/
VOID WINAPI TpPostWork( TP_WORK *work )
@@ -2196,6 +2419,84 @@ BOOL WINAPI TpSetPoolMinThreads( TP_POOL *pool, DWORD minimum )
}
/***********************************************************************
+ * TpSetTimer (NTDLL.@)
+ */
+VOID WINAPI TpSetTimer( TP_TIMER *timer, LARGE_INTEGER *timeout, LONG period, LONG window_length )
+{
+ struct threadpool_object *this = impl_from_TP_TIMER( timer );
+ struct threadpool_object *other_timer;
+ BOOL submit_timer = FALSE;
+ ULONGLONG timestamp;
+
+ TRACE( "%p %p %u %u\n", timer, timeout, period, window_length );
+
+ RtlEnterCriticalSection( &timerqueue.cs );
+
+ assert( this->u.timer.timer_initialized );
+ this->u.timer.timer_set = timeout != NULL;
+
+ /* Convert relative timeout to absolute timestamp and handle a timeout
+ * of zero, which means that the timer is submitted immediately. */
+ if (timeout)
+ {
+ timestamp = timeout->QuadPart;
+ if ((LONGLONG)timestamp < 0)
+ {
+ LARGE_INTEGER now;
+ NtQuerySystemTime( &now );
+ timestamp = now.QuadPart - timestamp;
+ }
+ else if (!timestamp)
+ {
+ if (!period)
+ timeout = NULL;
+ else
+ {
+ LARGE_INTEGER now;
+ NtQuerySystemTime( &now );
+ timestamp = now.QuadPart + (ULONGLONG)period * 10000;
+ }
+ submit_timer = TRUE;
+ }
+ }
+
+ /* First remove existing timeout. */
+ if (this->u.timer.timer_pending)
+ {
+ list_remove( &this->u.timer.timer_entry );
+ this->u.timer.timer_pending = FALSE;
+ }
+
+ /* If the timer was enabled, then add it back to the queue. */
+ if (timeout)
+ {
+ this->u.timer.timeout = timestamp;
+ this->u.timer.period = period;
+ this->u.timer.window_length = window_length;
+
+ LIST_FOR_EACH_ENTRY( other_timer, &timerqueue.pending_timers,
+ struct threadpool_object, u.timer.timer_entry )
+ {
+ assert( other_timer->type == TP_OBJECT_TYPE_TIMER );
+ if (this->u.timer.timeout < other_timer->u.timer.timeout)
+ break;
+ }
+ list_add_before( &other_timer->u.timer.timer_entry, &this->u.timer.timer_entry );
+
+ /* Wake up the timer thread when the timeout has to be updated. */
+ if (list_head( &timerqueue.pending_timers ) == &this->u.timer.timer_entry )
+ RtlWakeAllConditionVariable( &timerqueue.update_event );
+
+ this->u.timer.timer_pending = TRUE;
+ }
+
+ RtlLeaveCriticalSection( &timerqueue.cs );
+
+ if (submit_timer)
+ tp_object_submit( this );
+}
+
+/***********************************************************************
* TpSimpleTryPost (NTDLL.@)
*/
NTSTATUS WINAPI TpSimpleTryPost( PTP_SIMPLE_CALLBACK callback, PVOID userdata,
@@ -2226,6 +2527,20 @@ NTSTATUS WINAPI TpSimpleTryPost( PTP_SIMPLE_CALLBACK callback, PVOID userdata,
}
/***********************************************************************
+ * TpWaitForTimer (NTDLL.@)
+ */
+VOID WINAPI TpWaitForTimer( TP_TIMER *timer, BOOL cancel_pending )
+{
+ struct threadpool_object *this = impl_from_TP_TIMER( timer );
+
+ TRACE( "%p %d\n", timer, cancel_pending );
+
+ if (cancel_pending)
+ tp_object_cancel( this, FALSE, NULL );
+ tp_object_wait( this, FALSE );
+}
+
+/***********************************************************************
* TpWaitForWork (NTDLL.@)
*/
VOID WINAPI TpWaitForWork( TP_WORK *work, BOOL cancel_pending )
--
2.4.4
More information about the wine-patches
mailing list