[PATCH 6/9] rtworkq: Add support for serial work queues.
Nikolay Sivov
nsivov at codeweavers.com
Tue Feb 11 03:19:08 CST 2020
Signed-off-by: Nikolay Sivov <nsivov at codeweavers.com>
---
dlls/rtworkq/queue.c | 327 ++++++++++++++++++++++++++++++++------
dlls/rtworkq/rtworkq.spec | 2 +-
include/rtworkq.idl | 1 +
3 files changed, 280 insertions(+), 50 deletions(-)
diff --git a/dlls/rtworkq/queue.c b/dlls/rtworkq/queue.c
index e5026b6298..65007602c7 100644
--- a/dlls/rtworkq/queue.c
+++ b/dlls/rtworkq/queue.c
@@ -97,6 +97,16 @@ enum rtwq_callback_queue_id
RTWQ_CALLBACK_QUEUE_ALL = 0xffffffff,
};
+/* Should be kept in sync with corresponding MFASYNC_ constants. */
+enum rtwq_callback_flags
+{
+ RTWQ_FAST_IO_PROCESSING_CALLBACK = 0x00000001,
+ RTWQ_SIGNAL_CALLBACK = 0x00000002,
+ RTWQ_BLOCKING_CALLBACK = 0x00000004,
+ RTWQ_REPLY_CALLBACK = 0x00000008,
+ RTWQ_LOCALIZE_REMOTE_CALLBACK = 0x00000010,
+};
+
enum system_queue_index
{
SYS_QUEUE_STANDARD = 0,
@@ -115,10 +125,12 @@ struct work_item
LONG refcount;
struct list entry;
IRtwqAsyncResult *result;
+ IRtwqAsyncResult *reply_result;
struct queue *queue;
RTWQWORKITEM_KEY key;
LONG priority;
DWORD flags;
+ PTP_SIMPLE_CALLBACK finalization_callback;
union
{
TP_WAIT *wait_object;
@@ -152,15 +164,123 @@ struct queue_desc
{
RTWQ_WORKQUEUE_TYPE queue_type;
const struct queue_ops *ops;
+ DWORD target_queue;
};
struct queue
{
+ IRtwqAsyncCallback IRtwqAsyncCallback_iface;
const struct queue_ops *ops;
TP_POOL *pool;
TP_CALLBACK_ENVIRON_V3 envs[ARRAY_SIZE(priorities)];
CRITICAL_SECTION cs;
struct list pending_items;
+ DWORD id;
+ /* Data used for serial queues only. */
+ PTP_SIMPLE_CALLBACK finalization_callback;
+ DWORD target_queue;
+};
+
+static void shutdown_queue(struct queue *queue);
+
+static HRESULT lock_user_queue(DWORD queue)
+{
+ HRESULT hr = RTWQ_E_INVALID_WORKQUEUE;
+ struct queue_handle *entry;
+
+ if (!(queue & RTWQ_CALLBACK_QUEUE_PRIVATE_MASK))
+ return S_OK;
+
+ EnterCriticalSection(&queues_section);
+ entry = get_queue_obj(queue);
+ if (entry && entry->refcount)
+ {
+ entry->refcount++;
+ hr = S_OK;
+ }
+ LeaveCriticalSection(&queues_section);
+ return hr;
+}
+
+static HRESULT unlock_user_queue(DWORD queue)
+{
+ HRESULT hr = RTWQ_E_INVALID_WORKQUEUE;
+ struct queue_handle *entry;
+
+ if (!(queue & RTWQ_CALLBACK_QUEUE_PRIVATE_MASK))
+ return S_OK;
+
+ EnterCriticalSection(&queues_section);
+ entry = get_queue_obj(queue);
+ if (entry && entry->refcount)
+ {
+ if (--entry->refcount == 0)
+ {
+ shutdown_queue((struct queue *)entry->obj);
+ heap_free(entry->obj);
+ entry->obj = next_free_user_queue;
+ next_free_user_queue = entry;
+ }
+ hr = S_OK;
+ }
+ LeaveCriticalSection(&queues_section);
+ return hr;
+}
+
+static struct queue *queue_impl_from_IRtwqAsyncCallback(IRtwqAsyncCallback *iface)
+{
+ return CONTAINING_RECORD(iface, struct queue, IRtwqAsyncCallback_iface);
+}
+
+static HRESULT WINAPI queue_serial_callback_QueryInterface(IRtwqAsyncCallback *iface, REFIID riid, void **obj)
+{
+ if (IsEqualIID(riid, &IID_IRtwqAsyncCallback) ||
+ IsEqualIID(riid, &IID_IUnknown))
+ {
+ *obj = iface;
+ IRtwqAsyncCallback_AddRef(iface);
+ return S_OK;
+ }
+
+ *obj = NULL;
+ return E_NOINTERFACE;
+}
+
+static ULONG WINAPI queue_serial_callback_AddRef(IRtwqAsyncCallback *iface)
+{
+ return 2;
+}
+
+static ULONG WINAPI queue_serial_callback_Release(IRtwqAsyncCallback *iface)
+{
+ return 1;
+}
+
+static HRESULT WINAPI queue_serial_callback_GetParameters(IRtwqAsyncCallback *iface, DWORD *flags, DWORD *queue_id)
+{
+ struct queue *queue = queue_impl_from_IRtwqAsyncCallback(iface);
+
+ *flags = 0;
+ *queue_id = queue->id;
+
+ return S_OK;
+}
+
+static HRESULT WINAPI queue_serial_callback_Invoke(IRtwqAsyncCallback *iface, IRtwqAsyncResult *result)
+{
+ /* Reply callback won't be called in a regular way, pending items and chained queues will make it
+ unnecessary complicated to reach actual work queue that's able to execute this item. Instead
+ serial queues are cleaned up right away on submit(). */
+ return S_OK;
+}
+
+static const IRtwqAsyncCallbackVtbl queue_serial_callback_vtbl =
+{
+ queue_serial_callback_QueryInterface,
+ queue_serial_callback_AddRef,
+ queue_serial_callback_Release,
+ queue_serial_callback_GetParameters,
+ queue_serial_callback_Invoke,
};
static struct queue system_queues[SYS_QUEUE_COUNT];
@@ -181,6 +301,8 @@ static struct queue *get_system_queue(DWORD queue_id)
}
}
+static HRESULT grab_queue(DWORD queue_id, struct queue **ret);
+
static void CALLBACK standard_queue_cleanup_callback(void *object_data, void *group_data)
{
}
@@ -237,7 +359,11 @@ static void CALLBACK standard_queue_worker(TP_CALLBACK_INSTANCE *instance, void
TRACE("result object %p.\n", result);
- IRtwqAsyncCallback_Invoke(result->pCallback, item->result);
+ /* Submitting from serial queue in reply mode, use different result object acting as receipt token.
+ It's submitted to user callback still, but when invoked, special serial queue callback will be used
+ to ensure correct destination queue. */
+
+ IRtwqAsyncCallback_Invoke(result->pCallback, item->reply_result ? item->reply_result : item->result);
IUnknown_Release(&item->IUnknown_iface);
}
@@ -245,6 +371,7 @@ static void CALLBACK standard_queue_worker(TP_CALLBACK_INSTANCE *instance, void
static void pool_queue_submit(struct queue *queue, struct work_item *item)
{
TP_CALLBACK_PRIORITY callback_priority;
+ TP_CALLBACK_ENVIRON_V3 env;
TP_WORK *work_object;
if (item->priority == 0)
@@ -253,8 +380,14 @@ static void pool_queue_submit(struct queue *queue, struct work_item *item)
callback_priority = TP_CALLBACK_PRIORITY_LOW;
else
callback_priority = TP_CALLBACK_PRIORITY_HIGH;
- work_object = CreateThreadpoolWork(standard_queue_worker, item,
- (TP_CALLBACK_ENVIRON *)&queue->envs[callback_priority]);
+
+ env = queue->envs[callback_priority];
+ env.FinalizationCallback = item->finalization_callback;
+ /* Worker pool callback will release one reference. Grab one more to keep object alive when
+ we need finalization callback. */
+ if (item->finalization_callback)
+ IUnknown_AddRef(&item->IUnknown_iface);
+ work_object = CreateThreadpoolWork(standard_queue_worker, item, (TP_CALLBACK_ENVIRON *)&env);
SubmitThreadpoolWork(work_object);
TRACE("dispatched %p.\n", item->result);
@@ -267,6 +400,129 @@ static const struct queue_ops pool_queue_ops =
pool_queue_submit,
};
+static struct work_item * serial_queue_get_next(struct queue *queue, struct work_item *item)
+{
+ struct work_item *next_item = NULL;
+
+ list_remove(&item->entry);
+ if (!list_empty(&item->queue->pending_items))
+ next_item = LIST_ENTRY(list_head(&item->queue->pending_items), struct work_item, entry);
+
+ return next_item;
+}
+
+static void CALLBACK serial_queue_finalization_callback(PTP_CALLBACK_INSTANCE instance, void *user_data)
+{
+ struct work_item *item = (struct work_item *)user_data, *next_item;
+ struct queue *target_queue, *queue = item->queue;
+ HRESULT hr;
+
+ EnterCriticalSection(&queue->cs);
+
+ if ((next_item = serial_queue_get_next(queue, item)))
+ {
+ if (SUCCEEDED(hr = grab_queue(queue->target_queue, &target_queue)))
+ target_queue->ops->submit(target_queue, next_item);
+ else
+ WARN("Failed to grab queue for id %#x, hr %#x.\n", queue->target_queue, hr);
+ }
+
+ LeaveCriticalSection(&queue->cs);
+
+ IUnknown_Release(&item->IUnknown_iface);
+}
+
+static HRESULT serial_queue_init(const struct queue_desc *desc, struct queue *queue)
+{
+ queue->IRtwqAsyncCallback_iface.lpVtbl = &queue_serial_callback_vtbl;
+ queue->target_queue = desc->target_queue;
+ lock_user_queue(queue->target_queue);
+ queue->finalization_callback = serial_queue_finalization_callback;
+
+ return S_OK;
+}
+
+static BOOL serial_queue_shutdown(struct queue *queue)
+{
+ unlock_user_queue(queue->target_queue);
+
+ return TRUE;
+}
+
+static struct work_item * serial_queue_is_ack_token(struct queue *queue, struct work_item *item)
+{
+ RTWQASYNCRESULT *async_result = (RTWQASYNCRESULT *)item->result;
+ struct work_item *head;
+
+ if (list_empty(&queue->pending_items))
+ return NULL;
+
+ head = LIST_ENTRY(list_head(&queue->pending_items), struct work_item, entry);
+ if (head->reply_result == item->result && async_result->pCallback == &queue->IRtwqAsyncCallback_iface)
+ return head;
+
+ return NULL;
+}
+
+static void serial_queue_submit(struct queue *queue, struct work_item *item)
+{
+ struct work_item *head, *next_item = NULL;
+ struct queue *target_queue;
+ HRESULT hr;
+
+ /* In reply mode queue will advance when 'reply_result' is invoked, in regular mode it will advance automatically,
+ via finalization callback. */
+
+ if (item->flags & RTWQ_REPLY_CALLBACK)
+ {
+ if (FAILED(hr = RtwqCreateAsyncResult(NULL, &queue->IRtwqAsyncCallback_iface, NULL, &item->reply_result)))
+ WARN("Failed to create reply object, hr %#x.\n", hr);
+ }
+ else
+ item->finalization_callback = queue->finalization_callback;
+
+ /* Serial queues could be chained together, detach from current queue before transitioning item to this one.
+ Items are not detached when submitted to pool queues, because pool queues won't forward them further. */
+ EnterCriticalSection(&item->queue->cs);
+ list_remove(&item->entry);
+ LeaveCriticalSection(&item->queue->cs);
+
+ EnterCriticalSection(&queue->cs);
+
+ item->queue = queue;
+
+ if ((head = serial_queue_is_ack_token(queue, item)))
+ {
+ /* Ack receipt token - pop waiting item, advance. */
+ next_item = serial_queue_get_next(queue, head);
+ IUnknown_Release(&head->IUnknown_iface);
+ }
+ else
+ {
+ if (list_empty(&queue->pending_items))
+ next_item = item;
+ list_add_tail(&queue->pending_items, &item->entry);
+ IUnknown_AddRef(&item->IUnknown_iface);
+ }
+
+ if (next_item)
+ {
+ if (SUCCEEDED(hr = grab_queue(queue->target_queue, &target_queue)))
+ target_queue->ops->submit(target_queue, next_item);
+ else
+ WARN("Failed to grab queue for id %#x, hr %#x.\n", queue->target_queue, hr);
+ }
+
+ LeaveCriticalSection(&queue->cs);
+}
+
+static const struct queue_ops serial_queue_ops =
+{
+ serial_queue_init,
+ serial_queue_shutdown,
+ serial_queue_submit,
+};
+
static HRESULT WINAPI work_item_QueryInterface(IUnknown *iface, REFIID riid, void **obj)
{
if (IsEqualIID(riid, &IID_IUnknown))
@@ -293,8 +549,10 @@ static ULONG WINAPI work_item_Release(IUnknown *iface)
if (!refcount)
{
- IRtwqAsyncResult_Release(item->result);
- heap_free(item);
+ if (item->reply_result)
+ IRtwqAsyncResult_Release(item->reply_result);
+ IRtwqAsyncResult_Release(item->result);
+ heap_free(item);
}
return refcount;
@@ -375,6 +633,7 @@ static HRESULT grab_queue(DWORD queue_id, struct queue **ret)
desc.queue_type = queue_type;
desc.ops = &pool_queue_ops;
+ desc.target_queue = 0;
init_work_queue(&desc, queue);
LeaveCriticalSection(&queues_section);
*ret = queue;
@@ -388,25 +647,6 @@ static HRESULT grab_queue(DWORD queue_id, struct queue **ret)
return *ret ? S_OK : RTWQ_E_INVALID_WORKQUEUE;
}
-static HRESULT lock_user_queue(DWORD queue)
-{
- HRESULT hr = RTWQ_E_INVALID_WORKQUEUE;
- struct queue_handle *entry;
-
- if (!(queue & RTWQ_CALLBACK_QUEUE_PRIVATE_MASK))
- return S_OK;
-
- EnterCriticalSection(&queues_section);
- entry = get_queue_obj(queue);
- if (entry && entry->refcount)
- {
- entry->refcount++;
- hr = S_OK;
- }
- LeaveCriticalSection(&queues_section);
- return hr;
-}
-
static void shutdown_queue(struct queue *queue)
{
struct work_item *item, *item2;
@@ -427,31 +667,6 @@ static void shutdown_queue(struct queue *queue)
memset(queue, 0, sizeof(*queue));
}
-static HRESULT unlock_user_queue(DWORD queue)
-{
- HRESULT hr = RTWQ_E_INVALID_WORKQUEUE;
- struct queue_handle *entry;
-
- if (!(queue & RTWQ_CALLBACK_QUEUE_PRIVATE_MASK))
- return S_OK;
-
- EnterCriticalSection(&queues_section);
- entry = get_queue_obj(queue);
- if (entry && entry->refcount)
- {
- if (--entry->refcount == 0)
- {
- shutdown_queue((struct queue *)entry->obj);
- heap_free(entry->obj);
- entry->obj = next_free_user_queue;
- next_free_user_queue = entry;
- }
- hr = S_OK;
- }
- LeaveCriticalSection(&queues_section);
- return hr;
-}
-
static HRESULT queue_submit_item(struct queue *queue, LONG priority, IRtwqAsyncResult *result)
{
struct work_item *item;
@@ -915,6 +1130,7 @@ static void init_system_queues(void)
desc.queue_type = RTWQ_STANDARD_WORKQUEUE;
desc.ops = &pool_queue_ops;
+ desc.target_queue = 0;
init_work_queue(&desc, &system_queues[SYS_QUEUE_STANDARD]);
LeaveCriticalSection(&queues_section);
@@ -1168,6 +1384,7 @@ HRESULT WINAPI RtwqAllocateWorkQueue(RTWQ_WORKQUEUE_TYPE queue_type, DWORD *queu
desc.queue_type = queue_type;
desc.ops = &pool_queue_ops;
+ desc.target_queue = 0;
return alloc_user_queue(&desc, queue);
}
@@ -1233,3 +1450,15 @@ HRESULT WINAPI RtwqCancelDeadline(HANDLE request)
return E_NOTIMPL;
}
+
+HRESULT WINAPI RtwqAllocateSerialWorkQueue(DWORD target_queue, DWORD *queue)
+{
+ struct queue_desc desc;
+
+ TRACE("%#x, %p.\n", target_queue, queue);
+
+ desc.queue_type = RTWQ_STANDARD_WORKQUEUE;
+ desc.ops = &serial_queue_ops;
+ desc.target_queue = target_queue;
+ return alloc_user_queue(&desc, queue);
+}
diff --git a/dlls/rtworkq/rtworkq.spec b/dlls/rtworkq/rtworkq.spec
index 1a8909e098..adc05d679d 100644
--- a/dlls/rtworkq/rtworkq.spec
+++ b/dlls/rtworkq/rtworkq.spec
@@ -1,5 +1,5 @@
@ stdcall RtwqAddPeriodicCallback(ptr ptr ptr)
-@ stub RtwqAllocateSerialWorkQueue
+@ stdcall RtwqAllocateSerialWorkQueue(long ptr)
@ stdcall RtwqAllocateWorkQueue(long ptr)
@ stub RtwqBeginRegisterWorkQueueWithMMCSS
@ stub RtwqBeginUnregisterWorkQueueWithMMCSS
diff --git a/include/rtworkq.idl b/include/rtworkq.idl
index 325bebfde5..1a452c3edc 100644
--- a/include/rtworkq.idl
+++ b/include/rtworkq.idl
@@ -78,6 +78,7 @@ cpp_quote("} RTWQASYNCRESULT;")
cpp_quote("typedef void (WINAPI *RTWQPERIODICCALLBACK)(IUnknown *context);")
cpp_quote("HRESULT WINAPI RtwqAddPeriodicCallback(RTWQPERIODICCALLBACK callback, IUnknown *context, DWORD *key);")
+cpp_quote("HRESULT WINAPI RtwqAllocateSerialWorkQueue(DWORD target_queue, DWORD *queue);")
cpp_quote("HRESULT WINAPI RtwqAllocateWorkQueue(RTWQ_WORKQUEUE_TYPE queue_type, DWORD *queue);")
cpp_quote("HRESULT WINAPI RtwqCancelDeadline(HANDLE request);")
cpp_quote("HRESULT WINAPI RtwqCancelWorkItem(RTWQWORKITEM_KEY key);")
--
2.25.0
More information about the wine-devel
mailing list