[4/7] webservices: Add asynchronous support for WsReceiveMessage.

Hans Leidekker hans at codeweavers.com
Thu Sep 21 04:26:14 CDT 2017


Signed-off-by: Hans Leidekker <hans at codeweavers.com>
---
 dlls/webservices/channel.c | 259 ++++++++++++++++++++++++++++++++++++++++-----
 1 file changed, 230 insertions(+), 29 deletions(-)

diff --git a/dlls/webservices/channel.c b/dlls/webservices/channel.c
index a8ad01235f..08e04935d3 100644
--- a/dlls/webservices/channel.c
+++ b/dlls/webservices/channel.c
@@ -86,6 +86,105 @@ static const struct prop_desc channel_props[] =
     { sizeof(ULONG), FALSE }                                /* WS_CHANNEL_PROPERTY_MAX_HTTP_REQUEST_HEADERS_BUFFER_SIZE */
 };
 
+struct task
+{
+    struct list   entry;
+    void        (*proc)( struct task * );
+};
+
+struct queue
+{
+    CRITICAL_SECTION cs;
+    HANDLE           wait;
+    HANDLE           cancel;
+    HANDLE           ready;
+    struct list      tasks;
+};
+
+static struct task *dequeue_task( struct queue *queue )
+{
+    struct task *task;
+
+    EnterCriticalSection( &queue->cs );
+    TRACE( "%u tasks queued\n", list_count( &queue->tasks ) );
+    task = LIST_ENTRY( list_head( &queue->tasks ), struct task, entry );
+    if (task) list_remove( &task->entry );
+    LeaveCriticalSection( &queue->cs );
+
+    TRACE( "returning task %p\n", task );
+    return task;
+}
+
+static void CALLBACK queue_runner( TP_CALLBACK_INSTANCE *instance, void *ctx )
+{
+    struct queue *queue = ctx;
+    HANDLE handles[] = { queue->wait, queue->cancel };
+
+    SetEvent( queue->ready );
+    for (;;)
+    {
+        DWORD err = WaitForMultipleObjects( 2, handles, FALSE, INFINITE );
+        switch (err)
+        {
+        case WAIT_OBJECT_0:
+        {
+            struct task *task;
+            while ((task = dequeue_task( queue )))
+            {
+                task->proc( task );
+                heap_free( task );
+            }
+            break;
+        }
+        case WAIT_OBJECT_0 + 1:
+            TRACE( "cancelled\n" );
+            SetEvent( queue->ready );
+            return;
+
+        default:
+            ERR( "wait failed %u\n", err );
+            return;
+        }
+    }
+}
+
+static HRESULT start_queue( struct queue *queue )
+{
+    HRESULT hr = E_OUTOFMEMORY;
+
+    if (queue->wait) return S_OK;
+    list_init( &queue->tasks );
+    if (!(queue->wait = CreateEventW( NULL, FALSE, FALSE, NULL ))) goto error;
+    if (!(queue->cancel = CreateEventW( NULL, FALSE, FALSE, NULL ))) goto error;
+    if (!(queue->ready = CreateEventW( NULL, FALSE, FALSE, NULL ))) goto error;
+    if (!TrySubmitThreadpoolCallback( queue_runner, queue, NULL )) hr = HRESULT_FROM_WIN32( GetLastError() );
+    else
+    {
+        WaitForSingleObject( queue->ready, INFINITE );
+        return S_OK;
+    }
+
+error:
+    CloseHandle( queue->wait );
+    CloseHandle( queue->cancel );
+    CloseHandle( queue->ready );
+    return hr;
+}
+
+static HRESULT queue_task( struct queue *queue, struct task *task )
+{
+    HRESULT hr;
+    if ((hr = start_queue( queue )) != S_OK) return hr;
+
+    EnterCriticalSection( &queue->cs );
+    TRACE( "queueing task %p\n", task );
+    list_add_tail( &queue->tasks, &task->entry );
+    LeaveCriticalSection( &queue->cs );
+
+    SetEvent( queue->wait );
+    return WS_S_ASYNC;
+};
+
 enum session_state
 {
     SESSION_STATE_UNINITIALIZED,
@@ -107,6 +206,8 @@ struct channel
     enum session_state      session_state;
     struct dictionary       dict_send;
     struct dictionary       dict_recv;
+    struct queue            send_q;
+    struct queue            recv_q;
     union
     {
         struct
@@ -147,22 +248,57 @@ static struct channel *alloc_channel(void)
     InitializeCriticalSection( &ret->cs );
     ret->cs.DebugInfo->Spare[0] = (DWORD_PTR)(__FILE__ ": channel.cs");
 
+    InitializeCriticalSection( &ret->send_q.cs );
+    ret->send_q.cs.DebugInfo->Spare[0] = (DWORD_PTR)(__FILE__ ": channel.send_q.cs");
+
+    InitializeCriticalSection( &ret->recv_q.cs );
+    ret->recv_q.cs.DebugInfo->Spare[0] = (DWORD_PTR)(__FILE__ ": channel.recv_q.cs");
+
     prop_init( channel_props, count, ret->prop, &ret[1] );
     ret->prop_count = count;
     return ret;
 }
 
+static void clear_addr( WS_ENDPOINT_ADDRESS *addr )
+{
+    heap_free( addr->url.chars );
+    addr->url.chars  = NULL;
+    addr->url.length = 0;
+}
+
+static void clear_queue( struct queue *queue )
+{
+    struct list *ptr;
+
+    SetEvent( queue->cancel );
+    WaitForSingleObject( queue->ready, INFINITE );
+
+    while ((ptr = list_head( &queue->tasks )))
+    {
+        struct task *task = LIST_ENTRY( ptr, struct task, entry );
+        list_remove( &task->entry );
+        heap_free( task );
+    }
+
+    CloseHandle( queue->wait );
+    queue->wait   = NULL;
+    CloseHandle( queue->cancel );
+    queue->cancel = NULL;
+    CloseHandle( queue->ready );
+    queue->ready   = NULL;
+}
+
 static void reset_channel( struct channel *channel )
 {
-    channel->state                = WS_CHANNEL_STATE_CREATED;
-    heap_free( channel->addr.url.chars );
-    channel->addr.url.chars       = NULL;
-    channel->addr.url.length      = 0;
-    channel->msg                  = NULL;
-    channel->read_size            = 0;
-    channel->session_state        = SESSION_STATE_UNINITIALIZED;
+    clear_queue( &channel->send_q );
+    clear_queue( &channel->recv_q );
+    channel->state         = WS_CHANNEL_STATE_CREATED;
+    channel->session_state = SESSION_STATE_UNINITIALIZED;
+    clear_addr( &channel->addr );
     clear_dict( &channel->dict_send );
     clear_dict( &channel->dict_recv );
+    channel->msg           = NULL;
+    channel->read_size     = 0;
 
     switch (channel->binding)
     {
@@ -201,6 +337,12 @@ static void free_channel( struct channel *channel )
 
     heap_free( channel->read_buf );
 
+    channel->send_q.cs.DebugInfo->Spare[0] = 0;
+    DeleteCriticalSection( &channel->send_q.cs );
+
+    channel->recv_q.cs.DebugInfo->Spare[0] = 0;
+    DeleteCriticalSection( &channel->recv_q.cs );
+
     channel->cs.DebugInfo->Spare[0] = 0;
     DeleteCriticalSection( &channel->cs );
     heap_free( channel );
@@ -1571,7 +1713,7 @@ static HRESULT receive_message_sock( struct channel *channel, SOCKET socket )
     return init_reader( channel );
 }
 
-static HRESULT receive_message( struct channel *channel )
+static HRESULT receive_message_bytes( struct channel *channel )
 {
     HRESULT hr;
     if ((hr = connect_channel( channel )) != S_OK) return hr;
@@ -1623,7 +1765,7 @@ HRESULT channel_receive_message( WS_CHANNEL *handle )
         return E_INVALIDARG;
     }
 
-    hr = receive_message( channel );
+    hr = receive_message_bytes( channel );
 
     LeaveCriticalSection( &channel->cs );
     return hr;
@@ -1656,6 +1798,80 @@ static HRESULT read_message( WS_MESSAGE *handle, WS_XML_READER *reader, const WS
     return WsReadEnvelopeEnd( handle, NULL );
 }
 
+static HRESULT receive_message( struct channel *channel, WS_MESSAGE *msg, const WS_MESSAGE_DESCRIPTION **desc,
+                                ULONG count, WS_RECEIVE_OPTION option, WS_READ_OPTION read_option, WS_HEAP *heap,
+                                void *value, ULONG size, ULONG *index )
+{
+    HRESULT hr;
+    ULONG i;
+
+    if ((hr = receive_message_bytes( channel )) != S_OK) return hr;
+    for (i = 0; i < count; i++)
+    {
+        const WS_ELEMENT_DESCRIPTION *body = desc[i]->bodyElementDescription;
+        if ((hr = read_message( msg, channel->reader, body, read_option, heap, value, size )) == S_OK)
+        {
+            if (index) *index = i;
+            break;
+        }
+        if ((hr = WsResetMessage( msg, NULL )) != S_OK) return hr;
+        if ((hr = init_reader( channel )) != S_OK) return hr;
+    }
+    return (i == count) ? WS_E_INVALID_FORMAT : S_OK;
+}
+
+struct receive_message
+{
+    struct task                    task;
+    struct channel                *channel;
+    WS_MESSAGE                    *msg;
+    const WS_MESSAGE_DESCRIPTION **desc;
+    ULONG                          count;
+    WS_RECEIVE_OPTION              option;
+    WS_READ_OPTION                 read_option;
+    WS_HEAP                       *heap;
+    void                          *value;
+    ULONG                          size;
+    ULONG                         *index;
+    const WS_ASYNC_CONTEXT        *ctx;
+};
+
+static void receive_message_proc( struct task *task )
+{
+    struct receive_message *r = (struct receive_message *)task;
+    HRESULT hr;
+
+    hr = receive_message( r->channel, r->msg, r->desc, r->count, r->option, r->read_option, r->heap, r->value,
+                          r->size, r->index );
+
+    TRACE( "calling %p(%08x)\n", r->ctx->callback, hr );
+    r->ctx->callback( hr, WS_LONG_CALLBACK, r->ctx->callbackState );
+    TRACE( "%p returned\n", r->ctx->callback );
+}
+
+static HRESULT queue_receive_message( struct channel *channel, WS_MESSAGE *msg, const WS_MESSAGE_DESCRIPTION **desc,
+                                      ULONG count, WS_RECEIVE_OPTION option, WS_READ_OPTION read_option,
+                                      WS_HEAP *heap, void *value, ULONG size, ULONG *index,
+                                      const WS_ASYNC_CONTEXT *ctx )
+{
+    struct receive_message *r;
+
+    if (!(r = heap_alloc( sizeof(*r) ))) return E_OUTOFMEMORY;
+    r->task.proc   = receive_message_proc;
+    r->channel     = channel;
+    r->msg         = msg;
+    r->desc        = desc;
+    r->count       = count;
+    r->option      = option;
+    r->read_option = read_option;
+    r->heap        = heap;
+    r->value       = value;
+    r->size        = size;
+    r->index       = index;
+    r->ctx         = ctx;
+    return queue_task( &channel->recv_q, &r->task );
+}
+
 /**************************************************************************
  *          WsReceiveMessage		[webservices.@]
  */
@@ -1665,12 +1881,10 @@ HRESULT WINAPI WsReceiveMessage( WS_CHANNEL *handle, WS_MESSAGE *msg, const WS_M
 {
     struct channel *channel = (struct channel *)handle;
     HRESULT hr;
-    ULONG i;
 
     TRACE( "%p %p %p %u %08x %08x %p %p %u %p %p %p\n", handle, msg, desc, count, option, read_option, heap,
            value, size, index, ctx, error );
     if (error) FIXME( "ignoring error parameter\n" );
-    if (ctx) FIXME( "ignoring ctx parameter\n" );
 
     if (!channel || !msg || !desc || !count) return E_INVALIDARG;
 
@@ -1682,24 +1896,11 @@ HRESULT WINAPI WsReceiveMessage( WS_CHANNEL *handle, WS_MESSAGE *msg, const WS_M
         return E_INVALIDARG;
     }
 
-    if (!channel->read_size) hr = receive_message( channel );
-    else if (option == WS_RECEIVE_OPTIONAL_MESSAGE) hr = WS_S_END;
-    else hr = WS_E_INVALID_FORMAT;
-
-    if (hr != S_OK) goto done;
-
-    for (i = 0; i < count; i++)
-    {
-        if ((hr = read_message( msg, channel->reader, desc[i]->bodyElementDescription, read_option, heap,
-                                value, size )) == S_OK)
-        {
-            if (index) *index = i;
-            break;
-        }
-        if ((hr = init_reader( channel )) != S_OK) break;
-    }
+    if (ctx)
+        hr = queue_receive_message( channel, msg, desc, count, option, read_option, heap, value, size, index, ctx );
+    else
+        hr = receive_message( channel, msg, desc, count, option, read_option, heap, value, size, index );
 
-done:
     LeaveCriticalSection( &channel->cs );
     return hr;
 }
@@ -1727,7 +1928,7 @@ HRESULT WINAPI WsReadMessageStart( WS_CHANNEL *handle, WS_MESSAGE *msg, const WS
         return E_INVALIDARG;
     }
 
-    if ((hr = receive_message( channel )) == S_OK)
+    if ((hr = receive_message_bytes( channel )) == S_OK)
     {
         hr = WsReadEnvelopeStart( msg, channel->reader, NULL, NULL, NULL );
     }
-- 
2.11.0




More information about the wine-patches mailing list