Jacek Caban : server Introduce read queue for server-side named pipe I/O.

Alexandre Julliard julliard at winehq.org
Fri Mar 24 16:24:25 CDT 2017


Module: wine
Branch: master
Commit: d4139833c11133d458ee4b1e6afe6bd0e91b9e38
URL:    http://source.winehq.org/git/wine.git/?a=commit;h=d4139833c11133d458ee4b1e6afe6bd0e91b9e38

Author: Jacek Caban <jacek at codeweavers.com>
Date:   Thu Mar 23 15:41:43 2017 +0100

server Introduce read queue for server-side named pipe I/O.

Signed-off-by: Jacek Caban <jacek at codeweavers.com>
Signed-off-by: Alexandre Julliard <julliard at winehq.org>

---

 server/async.c      |  9 ++++++
 server/file.h       |  1 +
 server/named_pipe.c | 85 +++++++++++++++++++++++++++++++++++++++++++++++++++++
 3 files changed, 95 insertions(+)

diff --git a/server/async.c b/server/async.c
index 4e30c2e..e113681 100644
--- a/server/async.c
+++ b/server/async.c
@@ -474,6 +474,15 @@ struct iosb *async_get_iosb( struct async *async )
     return async->iosb ? (struct iosb *)grab_object( async->iosb ) : NULL;
 }
 
+/* find the first pending async in queue */
+struct async *find_pending_async( struct async_queue *queue )
+{
+    struct async *async;
+    if (queue) LIST_FOR_EACH_ENTRY( async, &queue->queue, struct async, queue_entry )
+        if (async->status == STATUS_PENDING) return (struct async *)grab_object( async );
+    return NULL;
+}
+
 /* cancels all async I/O */
 DECL_HANDLER(cancel_async)
 {
diff --git a/server/file.h b/server/file.h
index 8e906d3..398733c 100644
--- a/server/file.h
+++ b/server/file.h
@@ -188,6 +188,7 @@ extern struct completion *fd_get_completion( struct fd *fd, apc_param_t *p_key )
 extern void fd_copy_completion( struct fd *src, struct fd *dst );
 extern struct iosb *create_iosb( const void *in_data, data_size_t in_size, data_size_t out_size );
 extern struct iosb *async_get_iosb( struct async *async );
+extern struct async *find_pending_async( struct async_queue *queue );
 extern void cancel_process_asyncs( struct process *process );
 
 /* access rights that require Unix read permission */
diff --git a/server/named_pipe.c b/server/named_pipe.c
index 85521e0..e83f773 100644
--- a/server/named_pipe.c
+++ b/server/named_pipe.c
@@ -81,6 +81,7 @@ struct pipe_end
     struct pipe_end     *connection; /* the other end of the pipe */
     data_size_t          buffer_size;/* size of buffered data that doesn't block caller */
     struct list          message_queue;
+    struct async_queue  *read_q;     /* read queue */
     struct async_queue  *write_q;    /* write queue */
 };
 
@@ -425,6 +426,7 @@ static void pipe_end_disconnect( struct pipe_end *pipe_end, unsigned int status
         struct pipe_message *message, *next;
         struct async *async;
         if (pipe_end->fd) fd_async_wake_up( pipe_end->fd, ASYNC_TYPE_WAIT, status );
+        async_wake_up( pipe_end->read_q, status );
         LIST_FOR_EACH_ENTRY_SAFE( message, next, &pipe_end->message_queue, struct pipe_message, entry )
         {
             async = message->async;
@@ -470,6 +472,7 @@ static void pipe_end_destroy( struct pipe_end *pipe_end )
         free_message( message );
     }
 
+    free_async_queue( pipe_end->read_q );
     free_async_queue( pipe_end->write_q );
 }
 
@@ -681,10 +684,89 @@ static obj_handle_t pipe_client_flush( struct fd *fd, struct async *async, int b
     return 0;
 }
 
+static void message_queue_read( struct pipe_end *pipe_end, struct iosb *iosb )
+{
+    struct pipe_message *message;
+    data_size_t avail = 0;
+
+    LIST_FOR_EACH_ENTRY( message, &pipe_end->message_queue, struct pipe_message, entry )
+    {
+        avail += message->iosb->in_size - message->read_pos;
+        if (avail >= iosb->out_size) break;
+    }
+    iosb->out_size = min( iosb->out_size, avail );
+    iosb->status = STATUS_SUCCESS;
+
+    message = LIST_ENTRY( list_head(&pipe_end->message_queue), struct pipe_message, entry );
+    if (!message->read_pos && message->iosb->in_size == iosb->out_size) /* fast path */
+    {
+        iosb->out_data = message->iosb->in_data;
+        message->iosb->in_data = NULL;
+        wake_message( message );
+        free_message( message );
+    }
+    else
+    {
+        data_size_t write_pos = 0, writing;
+        char *buf = NULL;
+
+        if (iosb->out_size && !(buf = iosb->out_data = malloc( iosb->out_size )))
+        {
+            iosb->out_size = 0;
+            iosb->status = STATUS_NO_MEMORY;
+            return;
+        }
+
+        do
+        {
+            message = LIST_ENTRY( list_head(&pipe_end->message_queue), struct pipe_message, entry );
+            writing = min( iosb->out_size - write_pos, message->iosb->in_size - message->read_pos );
+            if (writing) memcpy( buf + write_pos, (const char *)message->iosb->in_data + message->read_pos, writing );
+            write_pos += writing;
+            message->read_pos += writing;
+            if (message->read_pos == message->iosb->in_size)
+            {
+                wake_message(message);
+                free_message(message);
+            }
+        } while (write_pos < iosb->out_size);
+    }
+    iosb->result = iosb->out_size;
+}
+
 /* We call async_terminate in our reselect implementation, which causes recursive reselect.
  * We're not interested in such reselect calls, so we ignore them. */
 static int ignore_reselect;
 
+static void reselect_write_queue( struct pipe_end *pipe_end );
+
+static void reselect_read_queue( struct pipe_end *pipe_end )
+{
+    struct async *async;
+    struct iosb *iosb;
+    int read_done = 0;
+
+    ignore_reselect = 1;
+    while (!list_empty( &pipe_end->message_queue) && (async = find_pending_async( pipe_end->read_q )))
+    {
+        iosb = async_get_iosb( async );
+        message_queue_read( pipe_end, iosb );
+        async_terminate( async, iosb->result ? STATUS_ALERTED : iosb->status );
+        release_object( async );
+        release_object( iosb );
+        read_done = 1;
+    }
+    ignore_reselect = 0;
+
+    if (pipe_end->connection)
+    {
+        if (list_empty( &pipe_end->message_queue ))
+            fd_async_wake_up( pipe_end->connection->fd, ASYNC_TYPE_WAIT, STATUS_SUCCESS );
+        else if (read_done)
+            reselect_write_queue( pipe_end->connection );
+    }
+}
+
 static void reselect_write_queue( struct pipe_end *pipe_end )
 {
     struct pipe_message *message, *next;
@@ -777,6 +859,8 @@ static void pipe_end_reselect_async( struct fd *fd, struct async_queue *queue )
         default_fd_reselect_async( fd, queue );
     else if (pipe_end->write_q && pipe_end->write_q == queue)
         reselect_write_queue( pipe_end );
+    else if (pipe_end->read_q && pipe_end->read_q == queue)
+        reselect_read_queue( pipe_end );
 }
 
 static inline int is_overlapped( unsigned int options )
@@ -937,6 +1021,7 @@ static void init_pipe_end( struct pipe_end *pipe_end, unsigned int pipe_flags, d
     pipe_end->flags = pipe_flags;
     pipe_end->connection = NULL;
     pipe_end->buffer_size = buffer_size;
+    pipe_end->read_q = NULL;
     pipe_end->write_q = NULL;
     list_init( &pipe_end->message_queue );
 }




More information about the wine-cvs mailing list