[1/3] server: add per-process apc queue for kernel apcs

Thomas Kho tkho at ucla.edu
Tue Aug 15 01:55:32 CDT 2006


[1/3] server: add per-process apc queue for kernel apcs

Hello,

The following three patches constitute an APC implementation for remote process
functions. This is a large revision of an earlier patch
(http://winehq.org/pipermail/wine-devel/2006-August/050010.html) that
introduces APCs that can execute in uninterruptible waits (like wait_suspend)
and prevents suspension inside ntdll critical sections.

I tested this patch with two pending conformance tests
(http://winehq.org/pipermail/wine-patches/2006-July/029259.html and
http://winehq.org/pipermail/wine-patches/2006-July/029260.html), with a program
that stresses VirtualAllocEx/VirtualFreeEx/CreateRemoteThread (I will post
this somewhere soon) and also attached to the stress test with OllyDbg and
could see a live memory map. Testing was done on both uni-processor and
multi-processor machines.

If there are any problems with this patch, please let me know!

Thomas Kho

---

 dlls/ntdll/sync.c              |   21 +++++++--
 include/wine/server_protocol.h |    5 +-
 server/process.c               |    4 ++
 server/process.h               |    1 
 server/protocol.def            |    3 +
 server/thread.c                |   89 +++++++++++++++++++++++++++-------------
 server/thread.h                |   24 ++++++++++-
 server/timer.c                 |    2 -
 server/trace.c                 |    3 +
 9 files changed, 112 insertions(+), 40 deletions(-)

Copyright 2006 Google (Thomas Kho)
License: LGPL
diff --git a/dlls/ntdll/sync.c b/dlls/ntdll/sync.c
index 106b187..260cc2d 100644
--- a/dlls/ntdll/sync.c
+++ b/dlls/ntdll/sync.c
@@ -655,20 +655,23 @@ static int wait_reply( void *cookie )
 /***********************************************************************
  *              call_apcs
  *
- * Call outstanding APCs.
+ * Call outstanding APCs. Returns true if non-kernel apc called.
  */
-static void call_apcs( BOOL alertable )
+static BOOL call_apcs( BOOL alertable, BOOL interruptible )
 {
+    BOOL ret;
     FARPROC proc;
     LARGE_INTEGER time;
     void *arg1, *arg2, *arg3;
 
+    ret = FALSE;
     for (;;)
     {
         int type = APC_NONE;
         SERVER_START_REQ( get_apc )
         {
             req->alertable = alertable;
+            req->interruptible = interruptible;
             if (!wine_server_call( req )) type = reply->type;
             proc = reply->func;
             arg1 = reply->arg1;
@@ -680,19 +683,25 @@ static void call_apcs( BOOL alertable )
         switch (type)
         {
         case APC_NONE:
-            return;  /* no more APCs */
+            return ret;  /* no more APCs */
         case APC_USER:
             proc( arg1, arg2, arg3 );
+            ret = TRUE;
+            break;
+        case APC_KERNEL:
+            proc( arg1, arg2, arg3 );
             break;
         case APC_TIMER:
             /* convert sec/usec to NT time */
             RtlSecondsSince1970ToTime( (time_t)arg1, &time );
             time.QuadPart += (DWORD)arg2 * 10;
             proc( arg3, time.u.LowPart, time.u.HighPart );
+            ret = TRUE;
             break;
         case APC_ASYNC_IO:
             NtCurrentTeb()->num_async_io--;
             proc( arg1, (IO_STATUS_BLOCK*)arg2, (ULONG)arg3 );
+            ret = TRUE;
             break;
         default:
             server_protocol_error( "get_apc_request: bad type %d\n", type );
@@ -716,6 +725,7 @@ NTSTATUS NTDLL_wait_for_multiple_objects
     if (timeout) flags |= SELECT_TIMEOUT;
     for (;;)
     {
+        BOOL alertable;
         SERVER_START_REQ( select )
         {
             req->flags   = flags;
@@ -728,8 +738,9 @@ NTSTATUS NTDLL_wait_for_multiple_objects
         SERVER_END_REQ;
         if (ret == STATUS_PENDING) ret = wait_reply( &cookie );
         if (ret != STATUS_USER_APC) break;
-        call_apcs( (flags & SELECT_ALERTABLE) != 0 );
-        if (flags & SELECT_ALERTABLE) break;
+        alertable = call_apcs( (flags & SELECT_ALERTABLE) != 0,
+                               (flags & SELECT_INTERRUPTIBLE) != 0 );
+        if (alertable && (flags & SELECT_ALERTABLE)) break;
         signal_object = 0;  /* don't signal it multiple times */
     }
 
diff --git a/server/process.c b/server/process.c
index 733b1c5..a1093cf 100644
--- a/server/process.c
+++ b/server/process.c
@@ -288,6 +288,7 @@ struct thread *create_process( int fd, s
     list_init( &process->locks );
     list_init( &process->classes );
     list_init( &process->dlls );
+    list_init( &process->system_apc );
 
     process->start_time = current_time;
     process->end_time.tv_sec = process->end_time.tv_usec = 0;
@@ -598,6 +599,9 @@ void remove_process_thread( struct proce
     if (!--process->running_threads)
     {
         /* we have removed the last running thread, exit the process */
+        struct thread_apc *apc;
+        while ((apc = thread_dequeue_apc( thread, APC_PROCESS_QUEUE_SYSTEM )))
+            free( apc );
         process->exit_code = thread->exit_code;
         generate_debug_event( thread, EXIT_PROCESS_DEBUG_EVENT, process );
         process_killed( process );
diff --git a/server/process.h b/server/process.h
index c447e2f..02fb95e 100644
--- a/server/process.h
+++ b/server/process.h
@@ -80,6 +80,7 @@ struct process
     struct list          dlls;            /* list of loaded dlls */
     void                *peb;             /* PEB address in client address space */
     void                *ldt_copy;        /* pointer to LDT copy in client addr space */
+    struct list          system_apc;      /* queue of system aync procedure calls */
 };
 
 struct process_snapshot
diff --git a/server/protocol.def b/server/protocol.def
index 0d014dd..3166696 100644
--- a/server/protocol.def
+++ b/server/protocol.def
@@ -422,6 +422,7 @@ #define SET_THREAD_INFO_TOKEN    0x04
 /* Get next APC to call */
 @REQ(get_apc)
     int          alertable;    /* is thread alertable? */
+    int          interruptible; /* is thread interruptible? */
 @REPLY
     void*        func;         /* function to call */
     int          type;         /* function type */
@@ -429,7 +430,7 @@ #define SET_THREAD_INFO_TOKEN    0x04
     void*        arg2;
     void*        arg3;
 @END
-enum apc_type { APC_NONE, APC_USER, APC_TIMER, APC_ASYNC_IO };
+enum apc_type { APC_NONE, APC_USER, APC_TIMER, APC_ASYNC_IO, APC_KERNEL };
 
 
 /* Close a handle for the current process */
diff --git a/server/thread.c b/server/thread.c
index 36d1d79..6d41c65 100644
--- a/server/thread.c
+++ b/server/thread.c
@@ -64,19 +64,6 @@ struct thread_wait
     struct wait_queue_entry queues[1];
 };
 
-/* asynchronous procedure calls */
-
-struct thread_apc
-{
-    struct list         entry;    /* queue linked list */
-    struct object      *owner;    /* object that queued this apc */
-    void               *func;     /* function to call in client */
-    enum apc_type       type;     /* type of apc function */
-    void               *arg1;     /* function arguments */
-    void               *arg2;
-    void               *arg3;
-};
-
 
 /* thread operations */
 
@@ -85,7 +72,6 @@ static int thread_signaled( struct objec
 static unsigned int thread_map_access( struct object *obj, unsigned int access );
 static void thread_poll_event( struct fd *fd, int event );
 static void destroy_thread( struct object *obj );
-static struct thread_apc *thread_dequeue_apc( struct thread *thread, int system_only );
 
 static const struct object_ops thread_ops =
 {
@@ -212,7 +198,9 @@ static void cleanup_thread( struct threa
     int i;
     struct thread_apc *apc;
 
-    while ((apc = thread_dequeue_apc( thread, 0 ))) free( apc );
+    while ((apc = thread_dequeue_apc( thread, APC_THREAD_QUEUE_USER
+                                      | APC_THREAD_QUEUE_SYSTEM )))
+        free( apc );
     if (thread->req_data) free( thread->req_data );
     if (thread->reply_data) free( thread->reply_data );
     if (thread->request_fd) release_object( thread->request_fd );
@@ -432,7 +420,12 @@ static int check_wait( struct thread *th
     struct wait_queue_entry *entry = wait->queues;
 
     /* Suspended threads may not acquire locks */
-    if (thread->process->suspend + thread->suspend > 0) return -1;
+    if (thread->process->suspend + thread->suspend > 0)
+    {
+        if (thread->reply_fd && !list_empty(&thread->process->system_apc))
+            return STATUS_USER_APC;
+        return -1;
+    }
 
     assert( wait );
     if (wait->flags & SELECT_ALL)
@@ -464,8 +457,17 @@ static int check_wait( struct thread *th
     }
 
  other_checks:
-    if ((wait->flags & SELECT_INTERRUPTIBLE) && !list_empty(&thread->system_apc)) return STATUS_USER_APC;
-    if ((wait->flags & SELECT_ALERTABLE) && !list_empty(&thread->user_apc)) return STATUS_USER_APC;
+    if (thread->unix_pid) /* initialized */
+    {
+        if (!list_empty(&thread->process->system_apc))
+            return STATUS_USER_APC;
+        if ((wait->flags & SELECT_INTERRUPTIBLE)
+            && !list_empty(&thread->system_apc))
+            return STATUS_USER_APC;
+        if ((wait->flags & SELECT_ALERTABLE)
+            && !list_empty(&thread->user_apc))
+            return STATUS_USER_APC;
+    }
     if (wait->flags & SELECT_TIMEOUT)
     {
         if (!time_before( &current_time, &wait->timeout )) return STATUS_TIMEOUT;
@@ -620,15 +622,32 @@ void wake_up( struct object *obj, int ma
     }
 }
 
+/* return a pointer to the correct queue */
+static struct list *get_apc_queue( struct thread *thread,
+                                   enum apc_queue apc_queue )
+{
+    switch (apc_queue)
+    {
+        case APC_PROCESS_QUEUE_SYSTEM:
+            return &thread->process->system_apc;
+        case APC_THREAD_QUEUE_SYSTEM:
+            return &thread->system_apc;
+        case APC_THREAD_QUEUE_USER:
+        default:
+            return &thread->user_apc;
+    }
+}
+
 /* queue an async procedure call */
 int thread_queue_apc( struct thread *thread, struct object *owner, void *func,
-                      enum apc_type type, int system, void *arg1, void *arg2, void *arg3 )
+                      enum apc_type type, enum apc_queue apc_queue, void *arg1,
+                      void *arg2, void *arg3 )
 {
     struct thread_apc *apc;
-    struct list *queue = system ? &thread->system_apc : &thread->user_apc;
+    struct list *queue = get_apc_queue( thread, apc_queue );
 
     /* cancel a possible previous APC with the same owner */
-    if (owner) thread_cancel_apc( thread, owner, system );
+    if (owner) thread_cancel_apc( thread, owner, apc_queue );
     if (thread->state == TERMINATED) return 0;
 
     if (!(apc = mem_alloc( sizeof(*apc) ))) return 0;
@@ -646,10 +665,11 @@ int thread_queue_apc( struct thread *thr
 }
 
 /* cancel the async procedure call owned by a specific object */
-void thread_cancel_apc( struct thread *thread, struct object *owner, int system )
+void thread_cancel_apc( struct thread *thread, struct object *owner,
+                        enum apc_queue apc_queue )
 {
     struct thread_apc *apc;
-    struct list *queue = system ? &thread->system_apc : &thread->user_apc;
+    struct list *queue = get_apc_queue( thread, apc_queue );
     LIST_FOR_EACH_ENTRY( apc, queue, struct thread_apc, entry )
     {
         if (apc->owner != owner) continue;
@@ -660,12 +680,18 @@ void thread_cancel_apc( struct thread *t
 }
 
 /* remove the head apc from the queue; the returned pointer must be freed by the caller */
-static struct thread_apc *thread_dequeue_apc( struct thread *thread, int system_only )
+struct thread_apc *thread_dequeue_apc( struct thread *thread, enum apc_queue apc_queue )
 {
     struct thread_apc *apc = NULL;
-    struct list *ptr = list_head( &thread->system_apc );
+    struct list *ptr = NULL;
+
+    if (apc_queue & APC_PROCESS_QUEUE_SYSTEM)
+        ptr = list_head( &thread->process->system_apc );
+    if (!ptr && (apc_queue & APC_THREAD_QUEUE_SYSTEM))
+        ptr = list_head( &thread->system_apc );
+    if (!ptr && (apc_queue & APC_THREAD_QUEUE_USER))
+        ptr = list_head( &thread->user_apc );
 
-    if (!ptr && !system_only) ptr = list_head( &thread->user_apc );
     if (ptr)
     {
         apc = LIST_ENTRY( ptr, struct thread_apc, entry );
@@ -1008,7 +1034,9 @@ DECL_HANDLER(queue_apc)
     struct thread *thread;
     if ((thread = get_thread_from_handle( req->handle, THREAD_SET_CONTEXT )))
     {
-        thread_queue_apc( thread, NULL, req->func, APC_USER, !req->user,
+        enum apc_queue queue;
+        queue = req->user ? APC_THREAD_QUEUE_USER : APC_THREAD_QUEUE_SYSTEM;
+        thread_queue_apc( thread, NULL, req->func, APC_USER, queue,
                           req->arg1, req->arg2, req->arg3 );
         release_object( thread );
     }
@@ -1021,7 +1049,12 @@ DECL_HANDLER(get_apc)
 
     for (;;)
     {
-        if (!(apc = thread_dequeue_apc( current, !req->alertable )))
+        enum apc_queue apc_queue = APC_PROCESS_QUEUE_SYSTEM;
+        if (req->interruptible)
+            apc_queue |= APC_THREAD_QUEUE_SYSTEM;
+        if (req->alertable)
+            apc_queue |= APC_THREAD_QUEUE_USER;
+        if (!(apc = thread_dequeue_apc( current, apc_queue )))
         {
             /* no more APCs */
             reply->func = NULL;
diff --git a/server/thread.h b/server/thread.h
index 61911cd..82121dd 100644
--- a/server/thread.h
+++ b/server/thread.h
@@ -96,6 +96,25 @@ struct thread_snapshot
     int             priority;  /* priority class */
 };
 
+/* asynchronous procedure calls */
+struct thread_apc
+{
+    struct list         entry;    /* queue linked list */
+    struct object      *owner;    /* object that queued this apc */
+    void               *func;     /* function to call in client */
+    enum apc_type       type;     /* type of apc function */
+    void               *arg1;     /* function arguments */
+    void               *arg2;
+    void               *arg3;
+};
+
+enum apc_queue
+{
+    APC_THREAD_QUEUE_USER    = 0x1,
+    APC_THREAD_QUEUE_SYSTEM  = 0x2,
+    APC_PROCESS_QUEUE_SYSTEM = 0x4
+};
+
 extern struct thread *current;
 
 /* thread functions */
@@ -112,8 +131,9 @@ extern void kill_thread( struct thread *
 extern void break_thread( struct thread *thread );
 extern void wake_up( struct object *obj, int max );
 extern int thread_queue_apc( struct thread *thread, struct object *owner, void *func,
-                             enum apc_type type, int system, void *arg1, void *arg2, void *arg3 );
-extern void thread_cancel_apc( struct thread *thread, struct object *owner, int system );
+                             enum apc_type type, enum apc_queue apc_queue, void *arg1, void *arg2, void *arg3 );
+extern struct thread_apc *thread_dequeue_apc( struct thread *thread, enum apc_queue apc_queue );
+extern void thread_cancel_apc( struct thread *thread, struct object *owner, enum apc_queue apc_queue );
 extern int thread_add_inflight_fd( struct thread *thread, int client, int server );
 extern int thread_get_inflight_fd( struct thread *thread, int client );
 extern struct thread_snapshot *thread_snap( int *count );
diff --git a/server/timer.c b/server/timer.c
index 2640dd8..c0e081d 100644
--- a/server/timer.c
+++ b/server/timer.c
@@ -136,7 +136,7 @@ static int cancel_timer( struct timer *t
     }
     if (timer->thread)
     {
-        thread_cancel_apc( timer->thread, &timer->obj, 0 );
+        thread_cancel_apc( timer->thread, &timer->obj, APC_THREAD_QUEUE_USER );
         release_object( timer->thread );
         timer->thread = NULL;
     }



More information about the wine-patches mailing list