[PATCH 6/9] rtworkq: Add support for serial work queues.

Nikolay Sivov nsivov at codeweavers.com
Tue Feb 11 03:19:08 CST 2020


Signed-off-by: Nikolay Sivov <nsivov at codeweavers.com>
---
 dlls/rtworkq/queue.c      | 327 ++++++++++++++++++++++++++++++++------
 dlls/rtworkq/rtworkq.spec |   2 +-
 include/rtworkq.idl       |   1 +
 3 files changed, 280 insertions(+), 50 deletions(-)

diff --git a/dlls/rtworkq/queue.c b/dlls/rtworkq/queue.c
index e5026b6298..65007602c7 100644
--- a/dlls/rtworkq/queue.c
+++ b/dlls/rtworkq/queue.c
@@ -97,6 +97,16 @@ enum rtwq_callback_queue_id
     RTWQ_CALLBACK_QUEUE_ALL           = 0xffffffff,
 };
 
+/* Should be kept in sync with corresponding MFASYNC_ constants. */
+enum rtwq_callback_flags
+{
+    RTWQ_FAST_IO_PROCESSING_CALLBACK = 0x00000001,
+    RTWQ_SIGNAL_CALLBACK             = 0x00000002,
+    RTWQ_BLOCKING_CALLBACK           = 0x00000004,
+    RTWQ_REPLY_CALLBACK              = 0x00000008,
+    RTWQ_LOCALIZE_REMOTE_CALLBACK    = 0x00000010,
+};
+
 enum system_queue_index
 {
     SYS_QUEUE_STANDARD = 0,
@@ -115,10 +125,12 @@ struct work_item
     LONG refcount;
     struct list entry;
     IRtwqAsyncResult *result;
+    IRtwqAsyncResult *reply_result;
     struct queue *queue;
     RTWQWORKITEM_KEY key;
     LONG priority;
     DWORD flags;
+    PTP_SIMPLE_CALLBACK finalization_callback;
     union
     {
         TP_WAIT *wait_object;
@@ -152,15 +164,123 @@ struct queue_desc
 {
     RTWQ_WORKQUEUE_TYPE queue_type;
     const struct queue_ops *ops;
+    DWORD target_queue;
 };
 
 struct queue
 {
+    IRtwqAsyncCallback IRtwqAsyncCallback_iface;
     const struct queue_ops *ops;
     TP_POOL *pool;
     TP_CALLBACK_ENVIRON_V3 envs[ARRAY_SIZE(priorities)];
     CRITICAL_SECTION cs;
     struct list pending_items;
+    DWORD id;
+    /* Data used for serial queues only. */
+    PTP_SIMPLE_CALLBACK finalization_callback;
+    DWORD target_queue;
+};
+
+static void shutdown_queue(struct queue *queue);
+
+static HRESULT lock_user_queue(DWORD queue)
+{
+    HRESULT hr = RTWQ_E_INVALID_WORKQUEUE;
+    struct queue_handle *entry;
+
+    if (!(queue & RTWQ_CALLBACK_QUEUE_PRIVATE_MASK))
+        return S_OK;
+
+    EnterCriticalSection(&queues_section);
+    entry = get_queue_obj(queue);
+    if (entry && entry->refcount)
+    {
+        entry->refcount++;
+        hr = S_OK;
+    }
+    LeaveCriticalSection(&queues_section);
+    return hr;
+}
+
+static HRESULT unlock_user_queue(DWORD queue)
+{
+    HRESULT hr = RTWQ_E_INVALID_WORKQUEUE;
+    struct queue_handle *entry;
+
+    if (!(queue & RTWQ_CALLBACK_QUEUE_PRIVATE_MASK))
+        return S_OK;
+
+    EnterCriticalSection(&queues_section);
+    entry = get_queue_obj(queue);
+    if (entry && entry->refcount)
+    {
+        if (--entry->refcount == 0)
+        {
+            shutdown_queue((struct queue *)entry->obj);
+            heap_free(entry->obj);
+            entry->obj = next_free_user_queue;
+            next_free_user_queue = entry;
+        }
+        hr = S_OK;
+    }
+    LeaveCriticalSection(&queues_section);
+    return hr;
+}
+
+static struct queue *queue_impl_from_IRtwqAsyncCallback(IRtwqAsyncCallback *iface)
+{
+    return CONTAINING_RECORD(iface, struct queue, IRtwqAsyncCallback_iface);
+}
+
+static HRESULT WINAPI queue_serial_callback_QueryInterface(IRtwqAsyncCallback *iface, REFIID riid, void **obj)
+{
+    if (IsEqualIID(riid, &IID_IRtwqAsyncCallback) ||
+            IsEqualIID(riid, &IID_IUnknown))
+    {
+        *obj = iface;
+        IRtwqAsyncCallback_AddRef(iface);
+        return S_OK;
+    }
+
+    *obj = NULL;
+    return E_NOINTERFACE;
+}
+
+static ULONG WINAPI queue_serial_callback_AddRef(IRtwqAsyncCallback *iface)
+{
+    return 2;
+}
+
+static ULONG WINAPI queue_serial_callback_Release(IRtwqAsyncCallback *iface)
+{
+    return 1;
+}
+
+static HRESULT WINAPI queue_serial_callback_GetParameters(IRtwqAsyncCallback *iface, DWORD *flags, DWORD *queue_id)
+{
+    struct queue *queue = queue_impl_from_IRtwqAsyncCallback(iface);
+
+    *flags = 0;
+    *queue_id = queue->id;
+
+    return S_OK;
+}
+
+static HRESULT WINAPI queue_serial_callback_Invoke(IRtwqAsyncCallback *iface, IRtwqAsyncResult *result)
+{
+    /* Reply callback won't be called in a regular way, pending items and chained queues will make it
+       unnecessary complicated to reach actual work queue that's able to execute this item. Instead
+       serial queues are cleaned up right away on submit(). */
+    return S_OK;
+}
+
+static const IRtwqAsyncCallbackVtbl queue_serial_callback_vtbl =
+{
+    queue_serial_callback_QueryInterface,
+    queue_serial_callback_AddRef,
+    queue_serial_callback_Release,
+    queue_serial_callback_GetParameters,
+    queue_serial_callback_Invoke,
 };
 
 static struct queue system_queues[SYS_QUEUE_COUNT];
@@ -181,6 +301,8 @@ static struct queue *get_system_queue(DWORD queue_id)
     }
 }
 
+static HRESULT grab_queue(DWORD queue_id, struct queue **ret);
+
 static void CALLBACK standard_queue_cleanup_callback(void *object_data, void *group_data)
 {
 }
@@ -237,7 +359,11 @@ static void CALLBACK standard_queue_worker(TP_CALLBACK_INSTANCE *instance, void
 
     TRACE("result object %p.\n", result);
 
-    IRtwqAsyncCallback_Invoke(result->pCallback, item->result);
+    /* Submitting from serial queue in reply mode, use different result object acting as receipt token.
+       It's submitted to user callback still, but when invoked, special serial queue callback will be used
+       to ensure correct destination queue. */
+
+    IRtwqAsyncCallback_Invoke(result->pCallback, item->reply_result ? item->reply_result : item->result);
 
     IUnknown_Release(&item->IUnknown_iface);
 }
@@ -245,6 +371,7 @@ static void CALLBACK standard_queue_worker(TP_CALLBACK_INSTANCE *instance, void
 static void pool_queue_submit(struct queue *queue, struct work_item *item)
 {
     TP_CALLBACK_PRIORITY callback_priority;
+    TP_CALLBACK_ENVIRON_V3 env;
     TP_WORK *work_object;
 
     if (item->priority == 0)
@@ -253,8 +380,14 @@ static void pool_queue_submit(struct queue *queue, struct work_item *item)
         callback_priority = TP_CALLBACK_PRIORITY_LOW;
     else
         callback_priority = TP_CALLBACK_PRIORITY_HIGH;
-    work_object = CreateThreadpoolWork(standard_queue_worker, item,
-            (TP_CALLBACK_ENVIRON *)&queue->envs[callback_priority]);
+
+    env = queue->envs[callback_priority];
+    env.FinalizationCallback = item->finalization_callback;
+    /* Worker pool callback will release one reference. Grab one more to keep object alive when
+       we need finalization callback. */
+    if (item->finalization_callback)
+        IUnknown_AddRef(&item->IUnknown_iface);
+    work_object = CreateThreadpoolWork(standard_queue_worker, item, (TP_CALLBACK_ENVIRON *)&env);
     SubmitThreadpoolWork(work_object);
 
     TRACE("dispatched %p.\n", item->result);
@@ -267,6 +400,129 @@ static const struct queue_ops pool_queue_ops =
     pool_queue_submit,
 };
 
+static struct work_item * serial_queue_get_next(struct queue *queue, struct work_item *item)
+{
+    struct work_item *next_item = NULL;
+
+    list_remove(&item->entry);
+    if (!list_empty(&item->queue->pending_items))
+        next_item = LIST_ENTRY(list_head(&item->queue->pending_items), struct work_item, entry);
+
+    return next_item;
+}
+
+static void CALLBACK serial_queue_finalization_callback(PTP_CALLBACK_INSTANCE instance, void *user_data)
+{
+    struct work_item *item = (struct work_item *)user_data, *next_item;
+    struct queue *target_queue, *queue = item->queue;
+    HRESULT hr;
+
+    EnterCriticalSection(&queue->cs);
+
+    if ((next_item = serial_queue_get_next(queue, item)))
+    {
+        if (SUCCEEDED(hr = grab_queue(queue->target_queue, &target_queue)))
+            target_queue->ops->submit(target_queue, next_item);
+        else
+            WARN("Failed to grab queue for id %#x, hr %#x.\n", queue->target_queue, hr);
+    }
+
+    LeaveCriticalSection(&queue->cs);
+
+    IUnknown_Release(&item->IUnknown_iface);
+}
+
+static HRESULT serial_queue_init(const struct queue_desc *desc, struct queue *queue)
+{
+    queue->IRtwqAsyncCallback_iface.lpVtbl = &queue_serial_callback_vtbl;
+    queue->target_queue = desc->target_queue;
+    lock_user_queue(queue->target_queue);
+    queue->finalization_callback = serial_queue_finalization_callback;
+
+    return S_OK;
+}
+
+static BOOL serial_queue_shutdown(struct queue *queue)
+{
+    unlock_user_queue(queue->target_queue);
+
+    return TRUE;
+}
+
+static struct work_item * serial_queue_is_ack_token(struct queue *queue, struct work_item *item)
+{
+    RTWQASYNCRESULT *async_result = (RTWQASYNCRESULT *)item->result;
+    struct work_item *head;
+
+    if (list_empty(&queue->pending_items))
+        return NULL;
+
+    head = LIST_ENTRY(list_head(&queue->pending_items), struct work_item, entry);
+    if (head->reply_result == item->result && async_result->pCallback == &queue->IRtwqAsyncCallback_iface)
+        return head;
+
+    return NULL;
+}
+
+static void serial_queue_submit(struct queue *queue, struct work_item *item)
+{
+    struct work_item *head, *next_item = NULL;
+    struct queue *target_queue;
+    HRESULT hr;
+
+    /* In reply mode queue will advance when 'reply_result' is invoked, in regular mode it will advance automatically,
+       via finalization callback. */
+
+    if (item->flags & RTWQ_REPLY_CALLBACK)
+    {
+        if (FAILED(hr = RtwqCreateAsyncResult(NULL, &queue->IRtwqAsyncCallback_iface, NULL, &item->reply_result)))
+            WARN("Failed to create reply object, hr %#x.\n", hr);
+    }
+    else
+        item->finalization_callback = queue->finalization_callback;
+
+    /* Serial queues could be chained together, detach from current queue before transitioning item to this one.
+       Items are not detached when submitted to pool queues, because pool queues won't forward them further. */
+    EnterCriticalSection(&item->queue->cs);
+    list_remove(&item->entry);
+    LeaveCriticalSection(&item->queue->cs);
+
+    EnterCriticalSection(&queue->cs);
+
+    item->queue = queue;
+
+    if ((head = serial_queue_is_ack_token(queue, item)))
+    {
+        /* Ack receipt token - pop waiting item, advance. */
+        next_item = serial_queue_get_next(queue, head);
+        IUnknown_Release(&head->IUnknown_iface);
+    }
+    else
+    {
+        if (list_empty(&queue->pending_items))
+            next_item = item;
+        list_add_tail(&queue->pending_items, &item->entry);
+        IUnknown_AddRef(&item->IUnknown_iface);
+    }
+
+    if (next_item)
+    {
+        if (SUCCEEDED(hr = grab_queue(queue->target_queue, &target_queue)))
+            target_queue->ops->submit(target_queue, next_item);
+        else
+            WARN("Failed to grab queue for id %#x, hr %#x.\n", queue->target_queue, hr);
+    }
+
+    LeaveCriticalSection(&queue->cs);
+}
+
+static const struct queue_ops serial_queue_ops =
+{
+    serial_queue_init,
+    serial_queue_shutdown,
+    serial_queue_submit,
+};
+
 static HRESULT WINAPI work_item_QueryInterface(IUnknown *iface, REFIID riid, void **obj)
 {
     if (IsEqualIID(riid, &IID_IUnknown))
@@ -293,8 +549,10 @@ static ULONG WINAPI work_item_Release(IUnknown *iface)
 
     if (!refcount)
     {
-         IRtwqAsyncResult_Release(item->result);
-         heap_free(item);
+        if (item->reply_result)
+            IRtwqAsyncResult_Release(item->reply_result);
+        IRtwqAsyncResult_Release(item->result);
+        heap_free(item);
     }
 
     return refcount;
@@ -375,6 +633,7 @@ static HRESULT grab_queue(DWORD queue_id, struct queue **ret)
 
         desc.queue_type = queue_type;
         desc.ops = &pool_queue_ops;
+        desc.target_queue = 0;
         init_work_queue(&desc, queue);
         LeaveCriticalSection(&queues_section);
         *ret = queue;
@@ -388,25 +647,6 @@ static HRESULT grab_queue(DWORD queue_id, struct queue **ret)
     return *ret ? S_OK : RTWQ_E_INVALID_WORKQUEUE;
 }
 
-static HRESULT lock_user_queue(DWORD queue)
-{
-    HRESULT hr = RTWQ_E_INVALID_WORKQUEUE;
-    struct queue_handle *entry;
-
-    if (!(queue & RTWQ_CALLBACK_QUEUE_PRIVATE_MASK))
-        return S_OK;
-
-    EnterCriticalSection(&queues_section);
-    entry = get_queue_obj(queue);
-    if (entry && entry->refcount)
-    {
-        entry->refcount++;
-        hr = S_OK;
-    }
-    LeaveCriticalSection(&queues_section);
-    return hr;
-}
-
 static void shutdown_queue(struct queue *queue)
 {
     struct work_item *item, *item2;
@@ -427,31 +667,6 @@ static void shutdown_queue(struct queue *queue)
     memset(queue, 0, sizeof(*queue));
 }
 
-static HRESULT unlock_user_queue(DWORD queue)
-{
-    HRESULT hr = RTWQ_E_INVALID_WORKQUEUE;
-    struct queue_handle *entry;
-
-    if (!(queue & RTWQ_CALLBACK_QUEUE_PRIVATE_MASK))
-        return S_OK;
-
-    EnterCriticalSection(&queues_section);
-    entry = get_queue_obj(queue);
-    if (entry && entry->refcount)
-    {
-        if (--entry->refcount == 0)
-        {
-            shutdown_queue((struct queue *)entry->obj);
-            heap_free(entry->obj);
-            entry->obj = next_free_user_queue;
-            next_free_user_queue = entry;
-        }
-        hr = S_OK;
-    }
-    LeaveCriticalSection(&queues_section);
-    return hr;
-}
-
 static HRESULT queue_submit_item(struct queue *queue, LONG priority, IRtwqAsyncResult *result)
 {
     struct work_item *item;
@@ -915,6 +1130,7 @@ static void init_system_queues(void)
 
     desc.queue_type = RTWQ_STANDARD_WORKQUEUE;
     desc.ops = &pool_queue_ops;
+    desc.target_queue = 0;
     init_work_queue(&desc, &system_queues[SYS_QUEUE_STANDARD]);
 
     LeaveCriticalSection(&queues_section);
@@ -1168,6 +1384,7 @@ HRESULT WINAPI RtwqAllocateWorkQueue(RTWQ_WORKQUEUE_TYPE queue_type, DWORD *queu
 
     desc.queue_type = queue_type;
     desc.ops = &pool_queue_ops;
+    desc.target_queue = 0;
     return alloc_user_queue(&desc, queue);
 }
 
@@ -1233,3 +1450,15 @@ HRESULT WINAPI RtwqCancelDeadline(HANDLE request)
 
     return E_NOTIMPL;
 }
+
+HRESULT WINAPI RtwqAllocateSerialWorkQueue(DWORD target_queue, DWORD *queue)
+{
+    struct queue_desc desc;
+
+    TRACE("%#x, %p.\n", target_queue, queue);
+
+    desc.queue_type = RTWQ_STANDARD_WORKQUEUE;
+    desc.ops = &serial_queue_ops;
+    desc.target_queue = target_queue;
+    return alloc_user_queue(&desc, queue);
+}
diff --git a/dlls/rtworkq/rtworkq.spec b/dlls/rtworkq/rtworkq.spec
index 1a8909e098..adc05d679d 100644
--- a/dlls/rtworkq/rtworkq.spec
+++ b/dlls/rtworkq/rtworkq.spec
@@ -1,5 +1,5 @@
 @ stdcall RtwqAddPeriodicCallback(ptr ptr ptr)
-@ stub RtwqAllocateSerialWorkQueue
+@ stdcall RtwqAllocateSerialWorkQueue(long ptr)
 @ stdcall RtwqAllocateWorkQueue(long ptr)
 @ stub RtwqBeginRegisterWorkQueueWithMMCSS
 @ stub RtwqBeginUnregisterWorkQueueWithMMCSS
diff --git a/include/rtworkq.idl b/include/rtworkq.idl
index 325bebfde5..1a452c3edc 100644
--- a/include/rtworkq.idl
+++ b/include/rtworkq.idl
@@ -78,6 +78,7 @@ cpp_quote("} RTWQASYNCRESULT;")
 cpp_quote("typedef void (WINAPI *RTWQPERIODICCALLBACK)(IUnknown *context);")
 
 cpp_quote("HRESULT WINAPI RtwqAddPeriodicCallback(RTWQPERIODICCALLBACK callback, IUnknown *context, DWORD *key);")
+cpp_quote("HRESULT WINAPI RtwqAllocateSerialWorkQueue(DWORD target_queue, DWORD *queue);")
 cpp_quote("HRESULT WINAPI RtwqAllocateWorkQueue(RTWQ_WORKQUEUE_TYPE queue_type, DWORD *queue);")
 cpp_quote("HRESULT WINAPI RtwqCancelDeadline(HANDLE request);")
 cpp_quote("HRESULT WINAPI RtwqCancelWorkItem(RTWQWORKITEM_KEY key);")
-- 
2.25.0




More information about the wine-devel mailing list