[4/12] ntdll: Implement threadpool timer queues.

Sebastian Lackner sebastian at fds-team.de
Thu Jul 2 18:58:29 CDT 2015


---
 dlls/ntdll/ntdll.spec   |    3 
 dlls/ntdll/threadpool.c |  321 +++++++++++++++++++++++++++++++++++++++++++++++-
 2 files changed, 321 insertions(+), 3 deletions(-)

diff --git a/dlls/ntdll/ntdll.spec b/dlls/ntdll/ntdll.spec
index 10d3652..ce9d1bb 100644
--- a/dlls/ntdll/ntdll.spec
+++ b/dlls/ntdll/ntdll.spec
@@ -981,6 +981,7 @@
 @ stdcall TpCallbackSetEventOnCompletion(ptr long)
 @ stdcall TpCallbackUnloadDllOnCompletion(ptr ptr)
 @ stdcall TpDisassociateCallback(ptr)
+@ stdcall TpIsTimerSet(ptr)
 @ stdcall TpPostWork(ptr)
 @ stdcall TpReleaseCleanupGroup(ptr)
 @ stdcall TpReleaseCleanupGroupMembers(ptr long ptr)
@@ -989,7 +990,9 @@
 @ stdcall TpReleaseWork(ptr)
 @ stdcall TpSetPoolMaxThreads(ptr long)
 @ stdcall TpSetPoolMinThreads(ptr long)
+@ stdcall TpSetTimer(ptr ptr long long)
 @ stdcall TpSimpleTryPost(ptr ptr ptr)
+@ stdcall TpWaitForTimer(ptr long)
 @ stdcall TpWaitForWork(ptr long)
 @ stdcall -ret64 VerSetConditionMask(int64 long long)
 @ stdcall WinSqmIsOptedIn()
diff --git a/dlls/ntdll/threadpool.c b/dlls/ntdll/threadpool.c
index cf714a6..ef502ba 100644
--- a/dlls/ntdll/threadpool.c
+++ b/dlls/ntdll/threadpool.c
@@ -77,14 +77,14 @@ static RTL_CRITICAL_SECTION_DEBUG critsect_debug =
 {
     0, 0, &old_threadpool.threadpool_cs,
     { &critsect_debug.ProcessLocksList, &critsect_debug.ProcessLocksList },
-    0, 0, { (DWORD_PTR)(__FILE__ ": threadpool_cs") }
+      0, 0, { (DWORD_PTR)(__FILE__ ": threadpool_cs") }
 };
 
 static RTL_CRITICAL_SECTION_DEBUG critsect_compl_debug =
 {
     0, 0, &old_threadpool.threadpool_compl_cs,
     { &critsect_compl_debug.ProcessLocksList, &critsect_compl_debug.ProcessLocksList },
-    0, 0, { (DWORD_PTR)(__FILE__ ": threadpool_compl_cs") }
+      0, 0, { (DWORD_PTR)(__FILE__ ": threadpool_compl_cs") }
 };
 
 struct work_item
@@ -200,6 +200,14 @@ struct threadpool_object
         struct
         {
             PTP_TIMER_CALLBACK callback;
+            /* information about the timer, locked via timerqueue.cs */
+            BOOL            timer_initialized;
+            BOOL            timer_pending;
+            struct list     timer_entry;
+            BOOL            timer_set;
+            ULONGLONG       timeout;
+            LONG            period;
+            LONG            window_length;
         } timer;
     } u;
 };
@@ -232,6 +240,33 @@ struct threadpool_group
     struct list             members;
 };
 
+/* global timerqueue object */
+static RTL_CRITICAL_SECTION_DEBUG timerqueue_debug;
+
+static struct
+{
+    CRITICAL_SECTION        cs;
+    LONG                    objcount;
+    BOOL                    thread_running;
+    struct list             pending_timers;
+    RTL_CONDITION_VARIABLE  update_event;
+}
+timerqueue =
+{
+    { &timerqueue_debug, -1, 0, 0, 0, 0 },      /* cs */
+    0,                                          /* objcount */
+    FALSE,                                      /* thread_running */
+    LIST_INIT( timerqueue.pending_timers ),     /* pending_timers */
+    RTL_CONDITION_VARIABLE_INIT                 /* update_event */
+};
+
+static RTL_CRITICAL_SECTION_DEBUG timerqueue_debug =
+{
+    0, 0, &timerqueue.cs,
+    { &timerqueue_debug.ProcessLocksList, &timerqueue_debug.ProcessLocksList },
+      0, 0, { (DWORD_PTR)(__FILE__ ": timerqueue.cs") }
+};
+
 static inline struct threadpool *impl_from_TP_POOL( TP_POOL *pool )
 {
     return (struct threadpool *)pool;
@@ -1186,6 +1221,171 @@ NTSTATUS WINAPI RtlDeleteTimer(HANDLE TimerQueue, HANDLE Timer,
 }
 
 /***********************************************************************
+ *           timerqueue_thread_proc    (internal)
+ */
+static void CALLBACK timerqueue_thread_proc( void *param )
+{
+    ULONGLONG timeout_lower, timeout_upper, new_timeout;
+    struct threadpool_object *other_timer;
+    LARGE_INTEGER now, timeout;
+    struct list *ptr;
+
+    TRACE( "starting timer queue thread\n" );
+
+    RtlEnterCriticalSection( &timerqueue.cs );
+    for (;;)
+    {
+        NtQuerySystemTime( &now );
+
+        /* Check for expired timers. */
+        while ((ptr = list_head( &timerqueue.pending_timers )))
+        {
+            struct threadpool_object *timer = LIST_ENTRY( ptr, struct threadpool_object, u.timer.timer_entry );
+            assert( timer->type == TP_OBJECT_TYPE_TIMER );
+            assert( timer->u.timer.timer_pending );
+            if (timer->u.timer.timeout > now.QuadPart)
+                break;
+
+            /* Queue a new callback in one of the worker threads. */
+            list_remove( &timer->u.timer.timer_entry );
+            timer->u.timer.timer_pending = FALSE;
+            tp_object_submit( timer );
+
+            /* Insert the timer back into the queue, except its marked for shutdown. */
+            if (timer->u.timer.period && !timer->shutdown)
+            {
+                timer->u.timer.timeout += (ULONGLONG)timer->u.timer.period * 10000;
+                if (timer->u.timer.timeout <= now.QuadPart)
+                    timer->u.timer.timeout = now.QuadPart + 1;
+
+                LIST_FOR_EACH_ENTRY( other_timer, &timerqueue.pending_timers,
+                                     struct threadpool_object, u.timer.timer_entry )
+                {
+                    assert( other_timer->type == TP_OBJECT_TYPE_TIMER );
+                    if (timer->u.timer.timeout < other_timer->u.timer.timeout)
+                        break;
+                }
+                list_add_before( &other_timer->u.timer.timer_entry, &timer->u.timer.timer_entry );
+                timer->u.timer.timer_pending = TRUE;
+            }
+        }
+
+        timeout_lower = TIMEOUT_INFINITE;
+        timeout_upper = TIMEOUT_INFINITE;
+
+        /* Determine next timeout and use the window length to optimize wakeup times. */
+        LIST_FOR_EACH_ENTRY( other_timer, &timerqueue.pending_timers,
+                             struct threadpool_object, u.timer.timer_entry )
+        {
+            assert( other_timer->type == TP_OBJECT_TYPE_TIMER );
+            if (other_timer->u.timer.timeout >= timeout_upper)
+                break;
+
+            timeout_lower = other_timer->u.timer.timeout;
+            new_timeout   = timeout_lower + (ULONGLONG)other_timer->u.timer.window_length * 10000;
+            if (new_timeout < timeout_upper)
+                timeout_upper = new_timeout;
+        }
+
+        /* Wait for timer update events or until the next timer expires. */
+        if (timerqueue.objcount)
+        {
+            timeout.QuadPart = timeout_lower;
+            RtlSleepConditionVariableCS( &timerqueue.update_event, &timerqueue.cs, &timeout );
+            continue;
+        }
+
+        /* All timers have been destroyed, if no new timers are created
+         * within some amount of time, then we can shutdown this thread. */
+        timeout.QuadPart = (ULONGLONG)THREADPOOL_WORKER_TIMEOUT * -10000;
+        if (RtlSleepConditionVariableCS( &timerqueue.update_event, &timerqueue.cs,
+            &timeout ) == STATUS_TIMEOUT && !timerqueue.objcount)
+        {
+            break;
+        }
+    }
+
+    timerqueue.thread_running = FALSE;
+    RtlLeaveCriticalSection( &timerqueue.cs );
+
+    TRACE( "terminating timer queue thread\n" );
+}
+
+/***********************************************************************
+ *           tp_timerqueue_lock    (internal)
+ *
+ * Acquires a lock on the global timerqueue. When the lock is acquired
+ * successfully, it is guaranteed that the timer thread is running.
+ */
+static NTSTATUS tp_timerqueue_lock( struct threadpool_object *timer )
+{
+    NTSTATUS status = STATUS_SUCCESS;
+    assert( timer->type == TP_OBJECT_TYPE_TIMER );
+
+    timer->u.timer.timer_initialized    = FALSE;
+    timer->u.timer.timer_pending        = FALSE;
+    timer->u.timer.timer_set            = FALSE;
+    timer->u.timer.timeout              = 0;
+    timer->u.timer.period               = 0;
+    timer->u.timer.window_length        = 0;
+
+    RtlEnterCriticalSection( &timerqueue.cs );
+
+    /* Make sure that the timerqueue thread is running. */
+    if (!timerqueue.thread_running)
+    {
+        HANDLE thread;
+        status = RtlCreateUserThread( GetCurrentProcess(), NULL, FALSE, NULL, 0, 0,
+                                      timerqueue_thread_proc, NULL, &thread, NULL );
+        if (status == STATUS_SUCCESS)
+        {
+            timerqueue.thread_running = TRUE;
+            NtClose( thread );
+        }
+    }
+
+    if (status == STATUS_SUCCESS)
+    {
+        timer->u.timer.timer_initialized = TRUE;
+        timerqueue.objcount++;
+    }
+
+    RtlLeaveCriticalSection( &timerqueue.cs );
+    return status;
+}
+
+/***********************************************************************
+ *           tp_timerqueue_unlock    (internal)
+ *
+ * Releases a lock on the global timerqueue.
+ */
+static void tp_timerqueue_unlock( struct threadpool_object *timer )
+{
+    assert( timer->type == TP_OBJECT_TYPE_TIMER );
+
+    RtlEnterCriticalSection( &timerqueue.cs );
+    if (timer->u.timer.timer_initialized)
+    {
+        /* If timer was pending, remove it. */
+        if (timer->u.timer.timer_pending)
+        {
+            list_remove( &timer->u.timer.timer_entry );
+            timer->u.timer.timer_pending = FALSE;
+        }
+
+        /* If the last timer object was destroyed, then wake up the thread. */
+        if (!--timerqueue.objcount)
+        {
+            assert( list_empty( &timerqueue.pending_timers ) );
+            RtlWakeAllConditionVariable( &timerqueue.update_event );
+        }
+
+        timer->u.timer.timer_initialized = FALSE;
+    }
+    RtlLeaveCriticalSection( &timerqueue.cs );
+}
+
+/***********************************************************************
  *           tp_threadpool_alloc    (internal)
  *
  * Allocates a new threadpool object.
@@ -1583,6 +1783,9 @@ static void tp_object_wait( struct threadpool_object *object, BOOL group_wait )
  */
 static void tp_object_shutdown( struct threadpool_object *object )
 {
+    if (object->type == TP_OBJECT_TYPE_TIMER)
+        tp_timerqueue_unlock( object );
+
     object->shutdown = TRUE;
 }
 
@@ -1785,7 +1988,6 @@ static void CALLBACK threadpool_worker_proc( void *param )
     tp_threadpool_release( pool );
 }
 
-
 /***********************************************************************
  *           TpAllocCleanupGroup    (NTDLL.@)
  */
@@ -1834,6 +2036,15 @@ NTSTATUS WINAPI TpAllocTimer( TP_TIMER **out, PTP_TIMER_CALLBACK callback, PVOID
 
     object->type = TP_OBJECT_TYPE_TIMER;
     object->u.timer.callback = callback;
+
+    status = tp_timerqueue_lock( object );
+    if (status)
+    {
+        tp_threadpool_unlock( pool );
+        RtlFreeHeap( GetProcessHeap(), 0, object );
+        return status;
+    }
+
     tp_object_initialize( object, pool, userdata, environment );
 
     *out = (TP_TIMER *)object;
@@ -2021,6 +2232,18 @@ VOID WINAPI TpDisassociateCallback( TP_CALLBACK_INSTANCE *instance )
 }
 
 /***********************************************************************
+ *           TpIsTimerSet    (NTDLL.@)
+ */
+BOOL WINAPI TpIsTimerSet( TP_TIMER *timer )
+{
+    struct threadpool_object *this = impl_from_TP_TIMER( timer );
+
+    TRACE( "%p\n", timer );
+
+    return this->u.timer.timer_set;
+}
+
+/***********************************************************************
  *           TpPostWork    (NTDLL.@)
  */
 VOID WINAPI TpPostWork( TP_WORK *work )
@@ -2196,6 +2419,84 @@ BOOL WINAPI TpSetPoolMinThreads( TP_POOL *pool, DWORD minimum )
 }
 
 /***********************************************************************
+ *           TpSetTimer    (NTDLL.@)
+ */
+VOID WINAPI TpSetTimer( TP_TIMER *timer, LARGE_INTEGER *timeout, LONG period, LONG window_length )
+{
+    struct threadpool_object *this = impl_from_TP_TIMER( timer );
+    struct threadpool_object *other_timer;
+    BOOL submit_timer = FALSE;
+    ULONGLONG timestamp;
+
+    TRACE( "%p %p %u %u\n", timer, timeout, period, window_length );
+
+    RtlEnterCriticalSection( &timerqueue.cs );
+
+    assert( this->u.timer.timer_initialized );
+    this->u.timer.timer_set = timeout != NULL;
+
+    /* Convert relative timeout to absolute timestamp and handle a timeout
+     * of zero, which means that the timer is submitted immediately. */
+    if (timeout)
+    {
+        timestamp = timeout->QuadPart;
+        if ((LONGLONG)timestamp < 0)
+        {
+            LARGE_INTEGER now;
+            NtQuerySystemTime( &now );
+            timestamp = now.QuadPart - timestamp;
+        }
+        else if (!timestamp)
+        {
+            if (!period)
+                timeout = NULL;
+            else
+            {
+                LARGE_INTEGER now;
+                NtQuerySystemTime( &now );
+                timestamp = now.QuadPart + (ULONGLONG)period * 10000;
+            }
+            submit_timer = TRUE;
+        }
+    }
+
+    /* First remove existing timeout. */
+    if (this->u.timer.timer_pending)
+    {
+        list_remove( &this->u.timer.timer_entry );
+        this->u.timer.timer_pending = FALSE;
+    }
+
+    /* If the timer was enabled, then add it back to the queue. */
+    if (timeout)
+    {
+        this->u.timer.timeout       = timestamp;
+        this->u.timer.period        = period;
+        this->u.timer.window_length = window_length;
+
+        LIST_FOR_EACH_ENTRY( other_timer, &timerqueue.pending_timers,
+                             struct threadpool_object, u.timer.timer_entry )
+        {
+            assert( other_timer->type == TP_OBJECT_TYPE_TIMER );
+            if (this->u.timer.timeout < other_timer->u.timer.timeout)
+                break;
+        }
+        list_add_before( &other_timer->u.timer.timer_entry, &this->u.timer.timer_entry );
+
+        /* Wake up the timer thread when the timeout has to be updated. */
+        if (list_head( &timerqueue.pending_timers ) == &this->u.timer.timer_entry )
+            RtlWakeAllConditionVariable( &timerqueue.update_event );
+
+        this->u.timer.timer_pending = TRUE;
+    }
+
+    RtlLeaveCriticalSection( &timerqueue.cs );
+
+    if (submit_timer)
+       tp_object_submit( this );
+}
+
+/***********************************************************************
  *           TpSimpleTryPost    (NTDLL.@)
  */
 NTSTATUS WINAPI TpSimpleTryPost( PTP_SIMPLE_CALLBACK callback, PVOID userdata,
@@ -2226,6 +2527,20 @@ NTSTATUS WINAPI TpSimpleTryPost( PTP_SIMPLE_CALLBACK callback, PVOID userdata,
 }
 
 /***********************************************************************
+ *           TpWaitForTimer    (NTDLL.@)
+ */
+VOID WINAPI TpWaitForTimer( TP_TIMER *timer, BOOL cancel_pending )
+{
+    struct threadpool_object *this = impl_from_TP_TIMER( timer );
+
+    TRACE( "%p %d\n", timer, cancel_pending );
+
+    if (cancel_pending)
+        tp_object_cancel( this, FALSE, NULL );
+    tp_object_wait( this, FALSE );
+}
+
+/***********************************************************************
  *           TpWaitForWork    (NTDLL.@)
  */
 VOID WINAPI TpWaitForWork( TP_WORK *work, BOOL cancel_pending )
-- 
2.4.4



More information about the wine-patches mailing list