Jacek Caban : server Introduce read queue for server-side named pipe I/O.
Alexandre Julliard
julliard at winehq.org
Fri Mar 24 16:24:25 CDT 2017
Module: wine
Branch: master
Commit: d4139833c11133d458ee4b1e6afe6bd0e91b9e38
URL: http://source.winehq.org/git/wine.git/?a=commit;h=d4139833c11133d458ee4b1e6afe6bd0e91b9e38
Author: Jacek Caban <jacek at codeweavers.com>
Date: Thu Mar 23 15:41:43 2017 +0100
server Introduce read queue for server-side named pipe I/O.
Signed-off-by: Jacek Caban <jacek at codeweavers.com>
Signed-off-by: Alexandre Julliard <julliard at winehq.org>
---
server/async.c | 9 ++++++
server/file.h | 1 +
server/named_pipe.c | 85 +++++++++++++++++++++++++++++++++++++++++++++++++++++
3 files changed, 95 insertions(+)
diff --git a/server/async.c b/server/async.c
index 4e30c2e..e113681 100644
--- a/server/async.c
+++ b/server/async.c
@@ -474,6 +474,15 @@ struct iosb *async_get_iosb( struct async *async )
return async->iosb ? (struct iosb *)grab_object( async->iosb ) : NULL;
}
+/* find the first pending async in queue */
+struct async *find_pending_async( struct async_queue *queue )
+{
+ struct async *async;
+ if (queue) LIST_FOR_EACH_ENTRY( async, &queue->queue, struct async, queue_entry )
+ if (async->status == STATUS_PENDING) return (struct async *)grab_object( async );
+ return NULL;
+}
+
/* cancels all async I/O */
DECL_HANDLER(cancel_async)
{
diff --git a/server/file.h b/server/file.h
index 8e906d3..398733c 100644
--- a/server/file.h
+++ b/server/file.h
@@ -188,6 +188,7 @@ extern struct completion *fd_get_completion( struct fd *fd, apc_param_t *p_key )
extern void fd_copy_completion( struct fd *src, struct fd *dst );
extern struct iosb *create_iosb( const void *in_data, data_size_t in_size, data_size_t out_size );
extern struct iosb *async_get_iosb( struct async *async );
+extern struct async *find_pending_async( struct async_queue *queue );
extern void cancel_process_asyncs( struct process *process );
/* access rights that require Unix read permission */
diff --git a/server/named_pipe.c b/server/named_pipe.c
index 85521e0..e83f773 100644
--- a/server/named_pipe.c
+++ b/server/named_pipe.c
@@ -81,6 +81,7 @@ struct pipe_end
struct pipe_end *connection; /* the other end of the pipe */
data_size_t buffer_size;/* size of buffered data that doesn't block caller */
struct list message_queue;
+ struct async_queue *read_q; /* read queue */
struct async_queue *write_q; /* write queue */
};
@@ -425,6 +426,7 @@ static void pipe_end_disconnect( struct pipe_end *pipe_end, unsigned int status
struct pipe_message *message, *next;
struct async *async;
if (pipe_end->fd) fd_async_wake_up( pipe_end->fd, ASYNC_TYPE_WAIT, status );
+ async_wake_up( pipe_end->read_q, status );
LIST_FOR_EACH_ENTRY_SAFE( message, next, &pipe_end->message_queue, struct pipe_message, entry )
{
async = message->async;
@@ -470,6 +472,7 @@ static void pipe_end_destroy( struct pipe_end *pipe_end )
free_message( message );
}
+ free_async_queue( pipe_end->read_q );
free_async_queue( pipe_end->write_q );
}
@@ -681,10 +684,89 @@ static obj_handle_t pipe_client_flush( struct fd *fd, struct async *async, int b
return 0;
}
+static void message_queue_read( struct pipe_end *pipe_end, struct iosb *iosb )
+{
+ struct pipe_message *message;
+ data_size_t avail = 0;
+
+ LIST_FOR_EACH_ENTRY( message, &pipe_end->message_queue, struct pipe_message, entry )
+ {
+ avail += message->iosb->in_size - message->read_pos;
+ if (avail >= iosb->out_size) break;
+ }
+ iosb->out_size = min( iosb->out_size, avail );
+ iosb->status = STATUS_SUCCESS;
+
+ message = LIST_ENTRY( list_head(&pipe_end->message_queue), struct pipe_message, entry );
+ if (!message->read_pos && message->iosb->in_size == iosb->out_size) /* fast path */
+ {
+ iosb->out_data = message->iosb->in_data;
+ message->iosb->in_data = NULL;
+ wake_message( message );
+ free_message( message );
+ }
+ else
+ {
+ data_size_t write_pos = 0, writing;
+ char *buf = NULL;
+
+ if (iosb->out_size && !(buf = iosb->out_data = malloc( iosb->out_size )))
+ {
+ iosb->out_size = 0;
+ iosb->status = STATUS_NO_MEMORY;
+ return;
+ }
+
+ do
+ {
+ message = LIST_ENTRY( list_head(&pipe_end->message_queue), struct pipe_message, entry );
+ writing = min( iosb->out_size - write_pos, message->iosb->in_size - message->read_pos );
+ if (writing) memcpy( buf + write_pos, (const char *)message->iosb->in_data + message->read_pos, writing );
+ write_pos += writing;
+ message->read_pos += writing;
+ if (message->read_pos == message->iosb->in_size)
+ {
+ wake_message(message);
+ free_message(message);
+ }
+ } while (write_pos < iosb->out_size);
+ }
+ iosb->result = iosb->out_size;
+}
+
/* We call async_terminate in our reselect implementation, which causes recursive reselect.
* We're not interested in such reselect calls, so we ignore them. */
static int ignore_reselect;
+static void reselect_write_queue( struct pipe_end *pipe_end );
+
+static void reselect_read_queue( struct pipe_end *pipe_end )
+{
+ struct async *async;
+ struct iosb *iosb;
+ int read_done = 0;
+
+ ignore_reselect = 1;
+ while (!list_empty( &pipe_end->message_queue) && (async = find_pending_async( pipe_end->read_q )))
+ {
+ iosb = async_get_iosb( async );
+ message_queue_read( pipe_end, iosb );
+ async_terminate( async, iosb->result ? STATUS_ALERTED : iosb->status );
+ release_object( async );
+ release_object( iosb );
+ read_done = 1;
+ }
+ ignore_reselect = 0;
+
+ if (pipe_end->connection)
+ {
+ if (list_empty( &pipe_end->message_queue ))
+ fd_async_wake_up( pipe_end->connection->fd, ASYNC_TYPE_WAIT, STATUS_SUCCESS );
+ else if (read_done)
+ reselect_write_queue( pipe_end->connection );
+ }
+}
+
static void reselect_write_queue( struct pipe_end *pipe_end )
{
struct pipe_message *message, *next;
@@ -777,6 +859,8 @@ static void pipe_end_reselect_async( struct fd *fd, struct async_queue *queue )
default_fd_reselect_async( fd, queue );
else if (pipe_end->write_q && pipe_end->write_q == queue)
reselect_write_queue( pipe_end );
+ else if (pipe_end->read_q && pipe_end->read_q == queue)
+ reselect_read_queue( pipe_end );
}
static inline int is_overlapped( unsigned int options )
@@ -937,6 +1021,7 @@ static void init_pipe_end( struct pipe_end *pipe_end, unsigned int pipe_flags, d
pipe_end->flags = pipe_flags;
pipe_end->connection = NULL;
pipe_end->buffer_size = buffer_size;
+ pipe_end->read_q = NULL;
pipe_end->write_q = NULL;
list_init( &pipe_end->message_queue );
}
More information about the wine-cvs
mailing list