[1/3] ntdll: Implement the timer queue thread. [take 2]

Dan Hipschman dsh at linux.ucla.edu
Thu Jul 24 18:26:32 CDT 2008


Once I got started on some optimizations I kept going (it's a slippery
slope).  This implementation uses a sorted timer list, and does a much
better job of only setting the event to break out of the wait in the
timer thread when necessary (we add a timer that will expire sooner than
the one we are waiting for, or the queue is in delete mode and the last
timer just finished).  It simplifies the thread proc and I think the
logic is easier to understand, making the control flow easier to predict.

I ran the tests for this a few hundred times with no failures.  Plus when
I added traces just to confirm what I thought should be happening actually
was happening, everything checked out as expected.  Hence, I'm happy with
this.  It also simplifies the following two patches a little.

---
 dlls/kernel32/tests/sync.c |   34 +++++-
 dlls/ntdll/threadpool.c    |  274 ++++++++++++++++++++++++++++++++++++++++++--
 2 files changed, 291 insertions(+), 17 deletions(-)

diff --git a/dlls/kernel32/tests/sync.c b/dlls/kernel32/tests/sync.c
index 05f67dc..d41bd79 100644
--- a/dlls/kernel32/tests/sync.c
+++ b/dlls/kernel32/tests/sync.c
@@ -588,11 +588,29 @@ static void CALLBACK timer_queue_cb3(PVOID p, BOOLEAN timedOut)
     }
 }
 
+static void CALLBACK timer_queue_cb4(PVOID p, BOOLEAN timedOut)
+{
+    struct timer_queue_data1 *d = p;
+    ok(timedOut, "Timer callbacks should always time out\n");
+    if (d->t)
+    {
+        /* This tests whether a timer gets flagged for deletion before
+           or after the callback runs.  If we start this timer with a
+           period of zero (run once), then ChangeTimerQueueTimer will
+           fail if the timer is already flagged.  Hence we really run
+           only once.  Otherwise we will run multiple times.  */
+        BOOL ret = pChangeTimerQueueTimer(d->q, d->t, 50, 50);
+        todo_wine
+        ok(ret, "ChangeTimerQueueTimer\n");
+        ++d->num_calls;
+    }
+}
+
 static void test_timer_queue(void)
 {
     HANDLE q, t1, t2, t3, t4, t5;
     int n1, n2, n3, n4, n5;
-    struct timer_queue_data1 d2, d3;
+    struct timer_queue_data1 d2, d3, d4;
     HANDLE e;
     BOOL ret;
 
@@ -661,13 +679,9 @@ static void test_timer_queue(void)
 
     ret = pDeleteTimerQueueEx(q, INVALID_HANDLE_VALUE);
     ok(ret, "DeleteTimerQueueEx\n");
-    todo_wine
-    {
     ok(n1 == 1, "Timer callback 1\n");
     ok(n2 < n3, "Timer callback 2 should be much slower than 3\n");
-    }
     ok(n4 == 0, "Timer callback 4\n");
-    todo_wine
     ok(n5 == 1, "Timer callback 5\n");
 
     /* Test synchronous deletion of the queue with event trigger. */
@@ -713,6 +727,15 @@ static void test_timer_queue(void)
     ok(ret, "CreateTimerQueueTimer\n");
     ok(t3 != NULL, "CreateTimerQueueTimer\n");
 
+    d4.t = t4 = NULL;
+    d4.num_calls = 0;
+    d4.q = q;
+    ret = pCreateTimerQueueTimer(&t4, q, timer_queue_cb4, &d4, 10,
+                                 0, 0);
+    d4.t = t4;
+    ok(ret, "CreateTimerQueueTimer\n");
+    ok(t4 != NULL, "CreateTimerQueueTimer\n");
+
     Sleep(200);
 
     ret = pDeleteTimerQueueEx(q, INVALID_HANDLE_VALUE);
@@ -722,6 +745,7 @@ static void test_timer_queue(void)
     ok(d2.num_calls == d2.max_calls, "DeleteTimerQueueTimer\n");
     ok(d3.num_calls == d3.max_calls, "ChangeTimerQueueTimer\n");
     }
+    ok(d4.num_calls == 1, "Timer flagged for deletion incorrectly\n");
 }
 
 START_TEST(sync)
diff --git a/dlls/ntdll/threadpool.c b/dlls/ntdll/threadpool.c
index c698cb7..cbb97f4 100644
--- a/dlls/ntdll/threadpool.c
+++ b/dlls/ntdll/threadpool.c
@@ -21,6 +21,7 @@
 #include "config.h"
 #include "wine/port.h"
 
+#include <assert.h>
 #include <stdarg.h>
 #include <limits.h>
 
@@ -532,21 +533,223 @@ NTSTATUS WINAPI RtlDeregisterWait(HANDLE WaitHandle)
 
 /************************** Timer Queue Impl **************************/
 
+struct timer_queue;
 struct queue_timer
 {
+    struct timer_queue *q;
     struct list entry;
+    ULONG runcount;             /* number of callbacks pending execution */
+    RTL_WAITORTIMERCALLBACKFUNC callback;
+    PVOID param;
+    DWORD period;
+    ULONG flags;
+    ULONGLONG expire;
+    BOOL destroy;      /* timer should be deleted; once set, never unset */
 };
 
 struct timer_queue
 {
     RTL_CRITICAL_SECTION cs;
-    struct list timers;
+    struct list timers;          /* sorted by expiration time */
+    BOOL quit;         /* queue should be deleted; once set, never unset */
+    HANDLE event;
+    HANDLE thread;
 };
 
+#define EXPIRE_NEVER (~(ULONGLONG) 0)
+
 static void queue_remove_timer(struct queue_timer *t)
 {
+    /* We MUST hold the queue cs while calling this function.  This ensures
+       that we cannot queue another callback for this timer.  The runcount
+       being zero makes sure we don't have any already queued.  */
+    struct timer_queue *q = t->q;
+
+    assert(t->runcount == 0);
+    assert(t->destroy);
+
     list_remove(&t->entry);
     RtlFreeHeap(GetProcessHeap(), 0, t);
+
+    if (q->quit && list_count(&q->timers) == 0)
+        NtSetEvent(q->event, NULL);
+}
+
+static void timer_cleanup_callback(struct queue_timer *t)
+{
+    struct timer_queue *q = t->q;
+    RtlEnterCriticalSection(&q->cs);
+
+    assert(0 < t->runcount);
+    --t->runcount;
+
+    if (t->destroy && t->runcount == 0)
+        queue_remove_timer(t);
+
+    RtlLeaveCriticalSection(&q->cs);
+}
+
+static DWORD WINAPI timer_callback_wrapper(LPVOID p)
+{
+    struct queue_timer *t = p;
+    t->callback(t->param, TRUE);
+    timer_cleanup_callback(t);
+    return 0;
+}
+
+static inline ULONGLONG queue_current_time(void)
+{
+    LARGE_INTEGER now;
+    NtQuerySystemTime(&now);
+    return now.QuadPart / 10000;
+}
+
+static void queue_add_timer(struct queue_timer *t, ULONGLONG time,
+                            BOOL set_event)
+{
+    /* We MUST hold the queue cs while calling this function.  */
+    struct timer_queue *q = t->q;
+    struct list *ptr = &q->timers;
+
+    assert(!q->quit || (t->destroy && time == EXPIRE_NEVER));
+
+    if (time != EXPIRE_NEVER)
+        LIST_FOR_EACH(ptr, &q->timers)
+        {
+            struct queue_timer *cur = LIST_ENTRY(ptr, struct queue_timer, entry);
+            if (time < cur->expire)
+                break;
+        }
+    list_add_before(ptr, &t->entry);
+
+    t->expire = time;
+
+    /* If we insert at the head of the list, we need to expire sooner
+       than expected.  */
+    if (set_event && &t->entry == list_head(&q->timers))
+        NtSetEvent(q->event, NULL);
+}
+
+static inline void queue_move_timer(struct queue_timer *t, ULONGLONG time,
+                                    BOOL set_event)
+{
+    /* We MUST hold the queue cs while calling this function.  */
+    list_remove(&t->entry);
+    queue_add_timer(t, time, set_event);
+}
+
+static void queue_timer_expire(struct timer_queue *q)
+{
+    struct queue_timer *t;
+
+    RtlEnterCriticalSection(&q->cs);
+    if (list_head(&q->timers))
+    {
+        t = LIST_ENTRY(list_head(&q->timers), struct queue_timer, entry);
+        if (!t->destroy && t->expire <= queue_current_time())
+        {
+            ++t->runcount;
+            queue_move_timer(
+                t, t->period ? queue_current_time() + t->period : EXPIRE_NEVER,
+                FALSE);
+        }
+    }
+    else
+        t = NULL;
+    RtlLeaveCriticalSection(&q->cs);
+
+    if (t)
+    {
+        if (t->flags & WT_EXECUTEINTIMERTHREAD)
+            timer_callback_wrapper(t);
+        else
+        {
+            ULONG flags
+                = (t->flags
+                   & (WT_EXECUTEINIOTHREAD | WT_EXECUTEINPERSISTENTTHREAD
+                      | WT_EXECUTELONGFUNCTION | WT_TRANSFER_IMPERSONATION));
+            NTSTATUS status = RtlQueueWorkItem(timer_callback_wrapper, t, flags);
+            if (status != STATUS_SUCCESS)
+                timer_cleanup_callback(t);
+        }
+    }
+}
+
+static ULONG queue_get_timeout(struct timer_queue *q)
+{
+    struct queue_timer *t;
+    ULONG timeout = INFINITE;
+
+    RtlEnterCriticalSection(&q->cs);
+    if (list_head(&q->timers))
+    {
+        t = LIST_ENTRY(list_head(&q->timers), struct queue_timer, entry);
+        assert(!t->destroy || t->expire == EXPIRE_NEVER);
+
+        if (t->expire != EXPIRE_NEVER)
+        {
+            ULONGLONG time = queue_current_time();
+            timeout = t->expire < time ? 0 : t->expire - time;
+        }
+    }
+    RtlLeaveCriticalSection(&q->cs);
+
+    return timeout;
+}
+
+static void WINAPI timer_queue_thread_proc(LPVOID p)
+{
+    struct timer_queue *q = p;
+    ULONG timeout_ms;
+
+    timeout_ms = INFINITE;
+    for (;;)
+    {
+        LARGE_INTEGER timeout;
+        NTSTATUS status;
+        BOOL done = FALSE;
+
+        status = NtWaitForSingleObject(
+            q->event, FALSE, get_nt_timeout(&timeout, timeout_ms));
+
+        if (status == STATUS_WAIT_0)
+        {
+            /* There are two possible ways to trigger the event.  Either
+               we are quitting and the last timer got removed, or a new
+               timer got put at the head of the list so we need to adjust
+               our timeout.  */
+            RtlEnterCriticalSection(&q->cs);
+            if (q->quit && list_count(&q->timers) == 0)
+                done = TRUE;
+            RtlLeaveCriticalSection(&q->cs);
+        }
+        else if (status == STATUS_TIMEOUT)
+            queue_timer_expire(q);
+
+        if (done)
+            break;
+
+        timeout_ms = queue_get_timeout(q);
+    }
+
+    NtClose(q->event);
+    RtlDeleteCriticalSection(&q->cs);
+    RtlFreeHeap(GetProcessHeap(), 0, q);
+}
+
+static void queue_destroy_timer(struct queue_timer *t)
+{
+    /* We MUST hold the queue cs while calling this function.  */
+    t->destroy = TRUE;
+    if (t->runcount == 0)
+        /* Ensure a timer is promptly removed.  If callbacks are pending,
+           it will be removed after the last one finishes by the callback
+           cleanup wrapper.  */
+        queue_remove_timer(t);
+    else
+        /* Make sure no destroyed timer masks an active timer at the head
+           of the sorted list.  */
+        queue_move_timer(t, EXPIRE_NEVER, FALSE);
 }
 
 /***********************************************************************
@@ -563,12 +766,28 @@ static void queue_remove_timer(struct queue_timer *t)
  */
 NTSTATUS WINAPI RtlCreateTimerQueue(PHANDLE NewTimerQueue)
 {
+    NTSTATUS status;
     struct timer_queue *q = RtlAllocateHeap(GetProcessHeap(), 0, sizeof *q);
     if (!q)
         return STATUS_NO_MEMORY;
 
     RtlInitializeCriticalSection(&q->cs);
     list_init(&q->timers);
+    q->quit = FALSE;
+    status = NtCreateEvent(&q->event, EVENT_ALL_ACCESS, NULL, FALSE, FALSE);
+    if (status != STATUS_SUCCESS)
+    {
+        RtlFreeHeap(GetProcessHeap(), 0, q);
+        return status;
+    }
+    status = RtlCreateUserThread(GetCurrentProcess(), NULL, FALSE, NULL, 0, 0,
+                                 timer_queue_thread_proc, q, &q->thread, NULL);
+    if (status != STATUS_SUCCESS)
+    {
+        NtClose(q->event);
+        RtlFreeHeap(GetProcessHeap(), 0, q);
+        return status;
+    }
 
     *NewTimerQueue = q;
     return STATUS_SUCCESS;
@@ -594,23 +813,39 @@ NTSTATUS WINAPI RtlDeleteTimerQueueEx(HANDLE TimerQueue, HANDLE CompletionEvent)
 {
     struct timer_queue *q = TimerQueue;
     struct queue_timer *t, *temp;
+    HANDLE thread = q->thread;
+    NTSTATUS status;
 
     RtlEnterCriticalSection(&q->cs);
-    LIST_FOR_EACH_ENTRY_SAFE(t, temp, &q->timers, struct queue_timer, entry)
-        queue_remove_timer(t);
+    q->quit = TRUE;
+    if (list_head(&q->timers))
+        /* When the last timer is removed, it will signal the timer thread to
+           exit...  */
+        LIST_FOR_EACH_ENTRY_SAFE(t, temp, &q->timers, struct queue_timer, entry)
+            queue_destroy_timer(t);
+    else
+        /* However if we have none, we must do it ourselves.  */
+        NtSetEvent(q->event, NULL);
     RtlLeaveCriticalSection(&q->cs);
 
-    RtlDeleteCriticalSection(&q->cs);
-    RtlFreeHeap(GetProcessHeap(), 0, q);
-
     if (CompletionEvent == INVALID_HANDLE_VALUE)
-        return STATUS_SUCCESS;
+    {
+        NtWaitForSingleObject(thread, FALSE, NULL);
+        status = STATUS_SUCCESS;
+    }
     else
     {
         if (CompletionEvent)
+        {
+            FIXME("asynchronous return on completion event unimplemented\n");
+            NtWaitForSingleObject(thread, FALSE, NULL);
             NtSetEvent(CompletionEvent, NULL);
-        return STATUS_PENDING;
+        }
+        status = STATUS_PENDING;
     }
+
+    NtClose(thread);
+    return status;
 }
 
 /***********************************************************************
@@ -643,17 +878,32 @@ NTSTATUS WINAPI RtlCreateTimer(PHANDLE NewTimer, HANDLE TimerQueue,
                                PVOID Parameter, DWORD DueTime, DWORD Period,
                                ULONG Flags)
 {
+    NTSTATUS status;
     struct timer_queue *q = TimerQueue;
     struct queue_timer *t = RtlAllocateHeap(GetProcessHeap(), 0, sizeof *t);
     if (!t)
         return STATUS_NO_MEMORY;
 
-    FIXME("timer expiration unimplemented\n");
+    t->q = q;
+    t->runcount = 0;
+    t->callback = Callback;
+    t->param = Parameter;
+    t->period = Period;
+    t->flags = Flags;
+    t->destroy = FALSE;
 
+    status = STATUS_SUCCESS;
     RtlEnterCriticalSection(&q->cs);
-    list_add_tail(&q->timers, &t->entry);
+    if (q->quit)
+        status = STATUS_INVALID_HANDLE;
+    else
+        queue_add_timer(t, queue_current_time() + DueTime, TRUE);
     RtlLeaveCriticalSection(&q->cs);
 
-    *NewTimer = t;
-    return STATUS_SUCCESS;
+    if (status == STATUS_SUCCESS)
+        *NewTimer = t;
+    else
+        RtlFreeHeap(GetProcessHeap(), 0, t);
+
+    return status;
 }



More information about the wine-patches mailing list