[4/7] webservices: Add asynchronous support for WsReceiveMessage.
Hans Leidekker
hans at codeweavers.com
Thu Sep 21 04:26:14 CDT 2017
Signed-off-by: Hans Leidekker <hans at codeweavers.com>
---
dlls/webservices/channel.c | 259 ++++++++++++++++++++++++++++++++++++++++-----
1 file changed, 230 insertions(+), 29 deletions(-)
diff --git a/dlls/webservices/channel.c b/dlls/webservices/channel.c
index a8ad01235f..08e04935d3 100644
--- a/dlls/webservices/channel.c
+++ b/dlls/webservices/channel.c
@@ -86,6 +86,105 @@ static const struct prop_desc channel_props[] =
{ sizeof(ULONG), FALSE } /* WS_CHANNEL_PROPERTY_MAX_HTTP_REQUEST_HEADERS_BUFFER_SIZE */
};
+struct task
+{
+ struct list entry;
+ void (*proc)( struct task * );
+};
+
+struct queue
+{
+ CRITICAL_SECTION cs;
+ HANDLE wait;
+ HANDLE cancel;
+ HANDLE ready;
+ struct list tasks;
+};
+
+static struct task *dequeue_task( struct queue *queue )
+{
+ struct task *task;
+
+ EnterCriticalSection( &queue->cs );
+ TRACE( "%u tasks queued\n", list_count( &queue->tasks ) );
+ task = LIST_ENTRY( list_head( &queue->tasks ), struct task, entry );
+ if (task) list_remove( &task->entry );
+ LeaveCriticalSection( &queue->cs );
+
+ TRACE( "returning task %p\n", task );
+ return task;
+}
+
+static void CALLBACK queue_runner( TP_CALLBACK_INSTANCE *instance, void *ctx )
+{
+ struct queue *queue = ctx;
+ HANDLE handles[] = { queue->wait, queue->cancel };
+
+ SetEvent( queue->ready );
+ for (;;)
+ {
+ DWORD err = WaitForMultipleObjects( 2, handles, FALSE, INFINITE );
+ switch (err)
+ {
+ case WAIT_OBJECT_0:
+ {
+ struct task *task;
+ while ((task = dequeue_task( queue )))
+ {
+ task->proc( task );
+ heap_free( task );
+ }
+ break;
+ }
+ case WAIT_OBJECT_0 + 1:
+ TRACE( "cancelled\n" );
+ SetEvent( queue->ready );
+ return;
+
+ default:
+ ERR( "wait failed %u\n", err );
+ return;
+ }
+ }
+}
+
+static HRESULT start_queue( struct queue *queue )
+{
+ HRESULT hr = E_OUTOFMEMORY;
+
+ if (queue->wait) return S_OK;
+ list_init( &queue->tasks );
+ if (!(queue->wait = CreateEventW( NULL, FALSE, FALSE, NULL ))) goto error;
+ if (!(queue->cancel = CreateEventW( NULL, FALSE, FALSE, NULL ))) goto error;
+ if (!(queue->ready = CreateEventW( NULL, FALSE, FALSE, NULL ))) goto error;
+ if (!TrySubmitThreadpoolCallback( queue_runner, queue, NULL )) hr = HRESULT_FROM_WIN32( GetLastError() );
+ else
+ {
+ WaitForSingleObject( queue->ready, INFINITE );
+ return S_OK;
+ }
+
+error:
+ CloseHandle( queue->wait );
+ CloseHandle( queue->cancel );
+ CloseHandle( queue->ready );
+ return hr;
+}
+
+static HRESULT queue_task( struct queue *queue, struct task *task )
+{
+ HRESULT hr;
+ if ((hr = start_queue( queue )) != S_OK) return hr;
+
+ EnterCriticalSection( &queue->cs );
+ TRACE( "queueing task %p\n", task );
+ list_add_tail( &queue->tasks, &task->entry );
+ LeaveCriticalSection( &queue->cs );
+
+ SetEvent( queue->wait );
+ return WS_S_ASYNC;
+};
+
enum session_state
{
SESSION_STATE_UNINITIALIZED,
@@ -107,6 +206,8 @@ struct channel
enum session_state session_state;
struct dictionary dict_send;
struct dictionary dict_recv;
+ struct queue send_q;
+ struct queue recv_q;
union
{
struct
@@ -147,22 +248,57 @@ static struct channel *alloc_channel(void)
InitializeCriticalSection( &ret->cs );
ret->cs.DebugInfo->Spare[0] = (DWORD_PTR)(__FILE__ ": channel.cs");
+ InitializeCriticalSection( &ret->send_q.cs );
+ ret->send_q.cs.DebugInfo->Spare[0] = (DWORD_PTR)(__FILE__ ": channel.send_q.cs");
+
+ InitializeCriticalSection( &ret->recv_q.cs );
+ ret->recv_q.cs.DebugInfo->Spare[0] = (DWORD_PTR)(__FILE__ ": channel.recv_q.cs");
+
prop_init( channel_props, count, ret->prop, &ret[1] );
ret->prop_count = count;
return ret;
}
+static void clear_addr( WS_ENDPOINT_ADDRESS *addr )
+{
+ heap_free( addr->url.chars );
+ addr->url.chars = NULL;
+ addr->url.length = 0;
+}
+
+static void clear_queue( struct queue *queue )
+{
+ struct list *ptr;
+
+ SetEvent( queue->cancel );
+ WaitForSingleObject( queue->ready, INFINITE );
+
+ while ((ptr = list_head( &queue->tasks )))
+ {
+ struct task *task = LIST_ENTRY( ptr, struct task, entry );
+ list_remove( &task->entry );
+ heap_free( task );
+ }
+
+ CloseHandle( queue->wait );
+ queue->wait = NULL;
+ CloseHandle( queue->cancel );
+ queue->cancel = NULL;
+ CloseHandle( queue->ready );
+ queue->ready = NULL;
+}
+
static void reset_channel( struct channel *channel )
{
- channel->state = WS_CHANNEL_STATE_CREATED;
- heap_free( channel->addr.url.chars );
- channel->addr.url.chars = NULL;
- channel->addr.url.length = 0;
- channel->msg = NULL;
- channel->read_size = 0;
- channel->session_state = SESSION_STATE_UNINITIALIZED;
+ clear_queue( &channel->send_q );
+ clear_queue( &channel->recv_q );
+ channel->state = WS_CHANNEL_STATE_CREATED;
+ channel->session_state = SESSION_STATE_UNINITIALIZED;
+ clear_addr( &channel->addr );
clear_dict( &channel->dict_send );
clear_dict( &channel->dict_recv );
+ channel->msg = NULL;
+ channel->read_size = 0;
switch (channel->binding)
{
@@ -201,6 +337,12 @@ static void free_channel( struct channel *channel )
heap_free( channel->read_buf );
+ channel->send_q.cs.DebugInfo->Spare[0] = 0;
+ DeleteCriticalSection( &channel->send_q.cs );
+
+ channel->recv_q.cs.DebugInfo->Spare[0] = 0;
+ DeleteCriticalSection( &channel->recv_q.cs );
+
channel->cs.DebugInfo->Spare[0] = 0;
DeleteCriticalSection( &channel->cs );
heap_free( channel );
@@ -1571,7 +1713,7 @@ static HRESULT receive_message_sock( struct channel *channel, SOCKET socket )
return init_reader( channel );
}
-static HRESULT receive_message( struct channel *channel )
+static HRESULT receive_message_bytes( struct channel *channel )
{
HRESULT hr;
if ((hr = connect_channel( channel )) != S_OK) return hr;
@@ -1623,7 +1765,7 @@ HRESULT channel_receive_message( WS_CHANNEL *handle )
return E_INVALIDARG;
}
- hr = receive_message( channel );
+ hr = receive_message_bytes( channel );
LeaveCriticalSection( &channel->cs );
return hr;
@@ -1656,6 +1798,80 @@ static HRESULT read_message( WS_MESSAGE *handle, WS_XML_READER *reader, const WS
return WsReadEnvelopeEnd( handle, NULL );
}
+static HRESULT receive_message( struct channel *channel, WS_MESSAGE *msg, const WS_MESSAGE_DESCRIPTION **desc,
+ ULONG count, WS_RECEIVE_OPTION option, WS_READ_OPTION read_option, WS_HEAP *heap,
+ void *value, ULONG size, ULONG *index )
+{
+ HRESULT hr;
+ ULONG i;
+
+ if ((hr = receive_message_bytes( channel )) != S_OK) return hr;
+ for (i = 0; i < count; i++)
+ {
+ const WS_ELEMENT_DESCRIPTION *body = desc[i]->bodyElementDescription;
+ if ((hr = read_message( msg, channel->reader, body, read_option, heap, value, size )) == S_OK)
+ {
+ if (index) *index = i;
+ break;
+ }
+ if ((hr = WsResetMessage( msg, NULL )) != S_OK) return hr;
+ if ((hr = init_reader( channel )) != S_OK) return hr;
+ }
+ return (i == count) ? WS_E_INVALID_FORMAT : S_OK;
+}
+
+struct receive_message
+{
+ struct task task;
+ struct channel *channel;
+ WS_MESSAGE *msg;
+ const WS_MESSAGE_DESCRIPTION **desc;
+ ULONG count;
+ WS_RECEIVE_OPTION option;
+ WS_READ_OPTION read_option;
+ WS_HEAP *heap;
+ void *value;
+ ULONG size;
+ ULONG *index;
+ const WS_ASYNC_CONTEXT *ctx;
+};
+
+static void receive_message_proc( struct task *task )
+{
+ struct receive_message *r = (struct receive_message *)task;
+ HRESULT hr;
+
+ hr = receive_message( r->channel, r->msg, r->desc, r->count, r->option, r->read_option, r->heap, r->value,
+ r->size, r->index );
+
+ TRACE( "calling %p(%08x)\n", r->ctx->callback, hr );
+ r->ctx->callback( hr, WS_LONG_CALLBACK, r->ctx->callbackState );
+ TRACE( "%p returned\n", r->ctx->callback );
+}
+
+static HRESULT queue_receive_message( struct channel *channel, WS_MESSAGE *msg, const WS_MESSAGE_DESCRIPTION **desc,
+ ULONG count, WS_RECEIVE_OPTION option, WS_READ_OPTION read_option,
+ WS_HEAP *heap, void *value, ULONG size, ULONG *index,
+ const WS_ASYNC_CONTEXT *ctx )
+{
+ struct receive_message *r;
+
+ if (!(r = heap_alloc( sizeof(*r) ))) return E_OUTOFMEMORY;
+ r->task.proc = receive_message_proc;
+ r->channel = channel;
+ r->msg = msg;
+ r->desc = desc;
+ r->count = count;
+ r->option = option;
+ r->read_option = read_option;
+ r->heap = heap;
+ r->value = value;
+ r->size = size;
+ r->index = index;
+ r->ctx = ctx;
+ return queue_task( &channel->recv_q, &r->task );
+}
+
/**************************************************************************
* WsReceiveMessage [webservices.@]
*/
@@ -1665,12 +1881,10 @@ HRESULT WINAPI WsReceiveMessage( WS_CHANNEL *handle, WS_MESSAGE *msg, const WS_M
{
struct channel *channel = (struct channel *)handle;
HRESULT hr;
- ULONG i;
TRACE( "%p %p %p %u %08x %08x %p %p %u %p %p %p\n", handle, msg, desc, count, option, read_option, heap,
value, size, index, ctx, error );
if (error) FIXME( "ignoring error parameter\n" );
- if (ctx) FIXME( "ignoring ctx parameter\n" );
if (!channel || !msg || !desc || !count) return E_INVALIDARG;
@@ -1682,24 +1896,11 @@ HRESULT WINAPI WsReceiveMessage( WS_CHANNEL *handle, WS_MESSAGE *msg, const WS_M
return E_INVALIDARG;
}
- if (!channel->read_size) hr = receive_message( channel );
- else if (option == WS_RECEIVE_OPTIONAL_MESSAGE) hr = WS_S_END;
- else hr = WS_E_INVALID_FORMAT;
-
- if (hr != S_OK) goto done;
-
- for (i = 0; i < count; i++)
- {
- if ((hr = read_message( msg, channel->reader, desc[i]->bodyElementDescription, read_option, heap,
- value, size )) == S_OK)
- {
- if (index) *index = i;
- break;
- }
- if ((hr = init_reader( channel )) != S_OK) break;
- }
+ if (ctx)
+ hr = queue_receive_message( channel, msg, desc, count, option, read_option, heap, value, size, index, ctx );
+ else
+ hr = receive_message( channel, msg, desc, count, option, read_option, heap, value, size, index );
-done:
LeaveCriticalSection( &channel->cs );
return hr;
}
@@ -1727,7 +1928,7 @@ HRESULT WINAPI WsReadMessageStart( WS_CHANNEL *handle, WS_MESSAGE *msg, const WS
return E_INVALIDARG;
}
- if ((hr = receive_message( channel )) == S_OK)
+ if ((hr = receive_message_bytes( channel )) == S_OK)
{
hr = WsReadEnvelopeStart( msg, channel->reader, NULL, NULL, NULL );
}
--
2.11.0
More information about the wine-patches
mailing list