[PATCH 4/9] rtworkq: Add RtwqPutWaitingWorkItem().
Nikolay Sivov
nsivov at codeweavers.com
Tue Feb 4 23:44:08 CST 2020
Signed-off-by: Nikolay Sivov <nsivov at codeweavers.com>
---
dlls/rtworkq/queue.c | 391 +++++++++++++++++++++++++++++++++++---
dlls/rtworkq/rtworkq.spec | 6 +-
include/rtworkq.idl | 12 ++
3 files changed, 384 insertions(+), 25 deletions(-)
diff --git a/dlls/rtworkq/queue.c b/dlls/rtworkq/queue.c
index 31a17c68ae..7d949e8e62 100644
--- a/dlls/rtworkq/queue.c
+++ b/dlls/rtworkq/queue.c
@@ -27,6 +27,34 @@
WINE_DEFAULT_DEBUG_CHANNEL(mfplat);
+#define FIRST_USER_QUEUE_HANDLE 5
+#define MAX_USER_QUEUE_HANDLES 124
+
+#define WAIT_ITEM_KEY_MASK (0x82000000)
+#define SCHEDULED_ITEM_KEY_MASK (0x80000000)
+
+static LONG next_item_key;
+
+static RTWQWORKITEM_KEY get_item_key(DWORD mask, DWORD key)
+{
+ return ((RTWQWORKITEM_KEY)mask << 32) | key;
+}
+
+static RTWQWORKITEM_KEY generate_item_key(DWORD mask)
+{
+ return get_item_key(mask, InterlockedIncrement(&next_item_key));
+}
+
+struct queue_handle
+{
+ void *obj;
+ LONG refcount;
+ WORD generation;
+};
+
+static struct queue_handle user_queues[MAX_USER_QUEUE_HANDLES];
+static struct queue_handle *next_free_user_queue;
+
static CRITICAL_SECTION queues_section;
static CRITICAL_SECTION_DEBUG queues_critsect_debug =
{
@@ -38,6 +66,33 @@ static CRITICAL_SECTION queues_section = { &queues_critsect_debug, -1, 0, 0, 0,
static LONG platform_lock;
+static struct queue_handle *get_queue_obj(DWORD handle)
+{
+ unsigned int idx = HIWORD(handle) - FIRST_USER_QUEUE_HANDLE;
+
+ if (idx < MAX_USER_QUEUE_HANDLES && user_queues[idx].refcount)
+ {
+ if (LOWORD(handle) == user_queues[idx].generation)
+ return &user_queues[idx];
+ }
+
+ return NULL;
+}
+
+/* Should be kept in sync with corresponding MFASYNC_CALLBACK_ constants. */
+enum rtwq_callback_queue_id
+{
+ RTWQ_CALLBACK_QUEUE_UNDEFINED = 0x00000000,
+ RTWQ_CALLBACK_QUEUE_STANDARD = 0x00000001,
+ RTWQ_CALLBACK_QUEUE_RT = 0x00000002,
+ RTWQ_CALLBACK_QUEUE_IO = 0x00000003,
+ RTWQ_CALLBACK_QUEUE_TIMER = 0x00000004,
+ RTWQ_CALLBACK_QUEUE_MULTITHREADED = 0x00000005,
+ RTWQ_CALLBACK_QUEUE_LONG_FUNCTION = 0x00000007,
+ RTWQ_CALLBACK_QUEUE_PRIVATE_MASK = 0xffff0000,
+ RTWQ_CALLBACK_QUEUE_ALL = 0xffffffff,
+};
+
enum system_queue_index
{
SYS_QUEUE_STANDARD = 0,
@@ -81,10 +136,40 @@ struct queue
static struct queue system_queues[SYS_QUEUE_COUNT];
+static struct queue *get_system_queue(DWORD queue_id)
+{
+ switch (queue_id)
+ {
+ case RTWQ_CALLBACK_QUEUE_STANDARD:
+ case RTWQ_CALLBACK_QUEUE_RT:
+ case RTWQ_CALLBACK_QUEUE_IO:
+ case RTWQ_CALLBACK_QUEUE_TIMER:
+ case RTWQ_CALLBACK_QUEUE_MULTITHREADED:
+ case RTWQ_CALLBACK_QUEUE_LONG_FUNCTION:
+ return &system_queues[queue_id - 1];
+ default:
+ return NULL;
+ }
+}
+
static void CALLBACK standard_queue_cleanup_callback(void *object_data, void *group_data)
{
}
+static struct work_item * alloc_work_item(struct queue *queue, IRtwqAsyncResult *result)
+{
+ struct work_item *item;
+
+ item = heap_alloc_zero(sizeof(*item));
+ item->result = result;
+ IRtwqAsyncResult_AddRef(item->result);
+ item->refcount = 1;
+ item->queue = queue;
+ list_init(&item->entry);
+
+ return item;
+}
+
static void release_work_item(struct work_item *item)
{
if (InterlockedDecrement(&item->refcount) == 0)
@@ -94,6 +179,12 @@ static void release_work_item(struct work_item *item)
}
}
+static struct work_item *grab_work_item(struct work_item *item)
+{
+ InterlockedIncrement(&item->refcount);
+ return item;
+}
+
static void init_work_queue(RTWQ_WORKQUEUE_TYPE queue_type, struct queue *queue)
{
TP_CALLBACK_ENVIRON_V3 env;
@@ -125,6 +216,255 @@ static void init_work_queue(RTWQ_WORKQUEUE_TYPE queue_type, struct queue *queue)
FIXME("RTWQ_WINDOW_WORKQUEUE is not supported.\n");
}
+static HRESULT grab_queue(DWORD queue_id, struct queue **ret)
+{
+ struct queue *queue = get_system_queue(queue_id);
+ RTWQ_WORKQUEUE_TYPE queue_type;
+ struct queue_handle *entry;
+
+ *ret = NULL;
+
+ if (!system_queues[SYS_QUEUE_STANDARD].pool)
+ return RTWQ_E_SHUTDOWN;
+
+ if (queue && queue->pool)
+ {
+ *ret = queue;
+ return S_OK;
+ }
+ else if (queue)
+ {
+ EnterCriticalSection(&queues_section);
+ switch (queue_id)
+ {
+ case RTWQ_CALLBACK_QUEUE_IO:
+ case RTWQ_CALLBACK_QUEUE_MULTITHREADED:
+ case RTWQ_CALLBACK_QUEUE_LONG_FUNCTION:
+ queue_type = RTWQ_MULTITHREADED_WORKQUEUE;
+ break;
+ default:
+ queue_type = RTWQ_STANDARD_WORKQUEUE;
+ }
+ init_work_queue(queue_type, queue);
+ LeaveCriticalSection(&queues_section);
+ *ret = queue;
+ return S_OK;
+ }
+
+ /* Handles user queues. */
+ if ((entry = get_queue_obj(queue_id)))
+ *ret = entry->obj;
+
+ return *ret ? S_OK : RTWQ_E_INVALID_WORKQUEUE;
+}
+
+static HRESULT lock_user_queue(DWORD queue)
+{
+ HRESULT hr = RTWQ_E_INVALID_WORKQUEUE;
+ struct queue_handle *entry;
+
+ if (!(queue & RTWQ_CALLBACK_QUEUE_PRIVATE_MASK))
+ return S_OK;
+
+ EnterCriticalSection(&queues_section);
+ entry = get_queue_obj(queue);
+ if (entry && entry->refcount)
+ {
+ entry->refcount++;
+ hr = S_OK;
+ }
+ LeaveCriticalSection(&queues_section);
+ return hr;
+}
+
+static void shutdown_queue(struct queue *queue)
+{
+ struct work_item *item, *item2;
+
+ if (!queue->pool)
+ return;
+
+ CloseThreadpoolCleanupGroupMembers(queue->envs[0].CleanupGroup, TRUE, NULL);
+ CloseThreadpool(queue->pool);
+ queue->pool = NULL;
+
+ EnterCriticalSection(&queue->cs);
+ LIST_FOR_EACH_ENTRY_SAFE(item, item2, &queue->pending_items, struct work_item, entry)
+ {
+ list_remove(&item->entry);
+ release_work_item(item);
+ }
+ LeaveCriticalSection(&queue->cs);
+
+ DeleteCriticalSection(&queue->cs);
+}
+
+static HRESULT unlock_user_queue(DWORD queue)
+{
+ HRESULT hr = RTWQ_E_INVALID_WORKQUEUE;
+ struct queue_handle *entry;
+
+ if (!(queue & RTWQ_CALLBACK_QUEUE_PRIVATE_MASK))
+ return S_OK;
+
+ EnterCriticalSection(&queues_section);
+ entry = get_queue_obj(queue);
+ if (entry && entry->refcount)
+ {
+ if (--entry->refcount == 0)
+ {
+ shutdown_queue((struct queue *)entry->obj);
+ heap_free(entry->obj);
+ entry->obj = next_free_user_queue;
+ next_free_user_queue = entry;
+ }
+ hr = S_OK;
+ }
+ LeaveCriticalSection(&queues_section);
+ return hr;
+}
+
+static void CALLBACK standard_queue_worker(TP_CALLBACK_INSTANCE *instance, void *context, TP_WORK *work)
+{
+ struct work_item *item = context;
+ RTWQASYNCRESULT *result = (RTWQASYNCRESULT *)item->result;
+
+ TRACE("result object %p.\n", result);
+
+ IRtwqAsyncCallback_Invoke(result->pCallback, item->result);
+
+ release_work_item(item);
+}
+
+static HRESULT queue_submit_item(struct queue *queue, LONG priority, IRtwqAsyncResult *result)
+{
+ TP_CALLBACK_PRIORITY callback_priority;
+ struct work_item *item;
+ TP_WORK *work_object;
+
+ if (!(item = alloc_work_item(queue, result)))
+ return E_OUTOFMEMORY;
+
+ if (priority == 0)
+ callback_priority = TP_CALLBACK_PRIORITY_NORMAL;
+ else if (priority < 0)
+ callback_priority = TP_CALLBACK_PRIORITY_LOW;
+ else
+ callback_priority = TP_CALLBACK_PRIORITY_HIGH;
+ work_object = CreateThreadpoolWork(standard_queue_worker, item,
+ (TP_CALLBACK_ENVIRON *)&queue->envs[callback_priority]);
+ SubmitThreadpoolWork(work_object);
+
+ TRACE("dispatched %p.\n", result);
+
+ return S_OK;
+}
+
+static HRESULT queue_put_work_item(DWORD queue_id, LONG priority, IRtwqAsyncResult *result)
+{
+ struct queue *queue;
+ HRESULT hr;
+
+ if (FAILED(hr = grab_queue(queue_id, &queue)))
+ return hr;
+
+ return queue_submit_item(queue, priority, result);
+}
+
+static HRESULT invoke_async_callback(IRtwqAsyncResult *result)
+{
+ RTWQASYNCRESULT *result_data = (RTWQASYNCRESULT *)result;
+ DWORD queue = RTWQ_CALLBACK_QUEUE_STANDARD, flags;
+ HRESULT hr;
+
+ if (FAILED(IRtwqAsyncCallback_GetParameters(result_data->pCallback, &flags, &queue)))
+ queue = RTWQ_CALLBACK_QUEUE_STANDARD;
+
+ if (FAILED(lock_user_queue(queue)))
+ queue = RTWQ_CALLBACK_QUEUE_STANDARD;
+
+ hr = queue_put_work_item(queue, 0, result);
+
+ unlock_user_queue(queue);
+
+ return hr;
+}
+
+static void queue_release_pending_item(struct work_item *item)
+{
+ EnterCriticalSection(&item->queue->cs);
+ if (item->key)
+ {
+ list_remove(&item->entry);
+ item->key = 0;
+ release_work_item(item);
+ }
+ LeaveCriticalSection(&item->queue->cs);
+}
+
+static void CALLBACK waiting_item_callback(TP_CALLBACK_INSTANCE *instance, void *context, TP_WAIT *wait,
+ TP_WAIT_RESULT wait_result)
+{
+ struct work_item *item = context;
+
+ TRACE("result object %p.\n", item->result);
+
+ invoke_async_callback(item->result);
+
+ release_work_item(item);
+}
+
+static void CALLBACK waiting_item_cancelable_callback(TP_CALLBACK_INSTANCE *instance, void *context, TP_WAIT *wait,
+ TP_WAIT_RESULT wait_result)
+{
+ struct work_item *item = context;
+
+ TRACE("result object %p.\n", item->result);
+
+ queue_release_pending_item(item);
+
+ invoke_async_callback(item->result);
+
+ release_work_item(item);
+}
+
+static void queue_mark_item_pending(DWORD mask, struct work_item *item, RTWQWORKITEM_KEY *key)
+{
+ *key = generate_item_key(mask);
+ item->key = *key;
+
+ EnterCriticalSection(&item->queue->cs);
+ list_add_tail(&item->queue->pending_items, &item->entry);
+ grab_work_item(item);
+ LeaveCriticalSection(&item->queue->cs);
+}
+
+static HRESULT queue_submit_wait(struct queue *queue, HANDLE event, LONG priority, IRtwqAsyncResult *result,
+ RTWQWORKITEM_KEY *key)
+{
+ PTP_WAIT_CALLBACK callback;
+ struct work_item *item;
+
+ if (!(item = alloc_work_item(queue, result)))
+ return E_OUTOFMEMORY;
+
+ if (key)
+ {
+ queue_mark_item_pending(WAIT_ITEM_KEY_MASK, item, key);
+ callback = waiting_item_cancelable_callback;
+ }
+ else
+ callback = waiting_item_callback;
+
+ item->u.wait_object = CreateThreadpoolWait(callback, item,
+ (TP_CALLBACK_ENVIRON *)&queue->envs[TP_CALLBACK_PRIORITY_NORMAL]);
+ SetThreadpoolWait(item->u.wait_object, event, NULL);
+
+ TRACE("dispatched %p.\n", result);
+
+ return S_OK;
+}
+
struct async_result
{
RTWQASYNCRESULT result;
@@ -342,28 +682,6 @@ HRESULT WINAPI RtwqStartup(void)
return S_OK;
}
-static void shutdown_queue(struct queue *queue)
-{
- struct work_item *item, *item2;
-
- if (!queue->pool)
- return;
-
- CloseThreadpoolCleanupGroupMembers(queue->envs[0].CleanupGroup, TRUE, NULL);
- CloseThreadpool(queue->pool);
- queue->pool = NULL;
-
- EnterCriticalSection(&queue->cs);
- LIST_FOR_EACH_ENTRY_SAFE(item, item2, &queue->pending_items, struct work_item, entry)
- {
- list_remove(&item->entry);
- release_work_item(item);
- }
- LeaveCriticalSection(&queue->cs);
-
- DeleteCriticalSection(&queue->cs);
-}
-
static void shutdown_system_queues(void)
{
unsigned int i;
@@ -390,3 +708,32 @@ HRESULT WINAPI RtwqShutdown(void)
return S_OK;
}
+
+HRESULT WINAPI RtwqPutWaitingWorkItem(HANDLE event, LONG priority, IRtwqAsyncResult *result, RTWQWORKITEM_KEY *key)
+{
+ struct queue *queue;
+ HRESULT hr;
+
+ TRACE("%p, %d, %p, %p.\n", event, priority, result, key);
+
+ if (FAILED(hr = grab_queue(RTWQ_CALLBACK_QUEUE_TIMER, &queue)))
+ return hr;
+
+ hr = queue_submit_wait(queue, event, priority, result, key);
+
+ return hr;
+}
+
+HRESULT WINAPI RtwqLockWorkQueue(DWORD queue)
+{
+ TRACE("%#x.\n", queue);
+
+ return lock_user_queue(queue);
+}
+
+HRESULT WINAPI RtwqUnlockWorkQueue(DWORD queue)
+{
+ TRACE("%#x.\n", queue);
+
+ return unlock_user_queue(queue);
+}
diff --git a/dlls/rtworkq/rtworkq.spec b/dlls/rtworkq/rtworkq.spec
index 8c352593ff..47e1c1b2ae 100644
--- a/dlls/rtworkq/rtworkq.spec
+++ b/dlls/rtworkq/rtworkq.spec
@@ -17,9 +17,9 @@
@ stub RtwqJoinWorkQueue
@ stdcall RtwqLockPlatform()
@ stub RtwqLockSharedWorkQueue
-@ stub RtwqLockWorkQueue
+@ stdcall RtwqLockWorkQueue(long)
@ stub RtwqPutMultipleWaitingWorkItem
-@ stub RtwqPutWaitingWorkItem
+@ stdcall RtwqPutWaitingWorkItem(long long ptr ptr)
@ stub RtwqPutWorkItem
@ stub RtwqRegisterPlatformEvents
@ stub RtwqRegisterPlatformWithMMCSS
@@ -32,6 +32,6 @@
@ stdcall RtwqStartup()
@ stub RtwqUnjoinWorkQueue
@ stdcall RtwqUnlockPlatform()
-@ stub RtwqUnlockWorkQueue
+@ stdcall RtwqUnlockWorkQueue(long)
@ stub RtwqUnregisterPlatformEvents
@ stub RtwqUnregisterPlatformFromMMCSS
diff --git a/include/rtworkq.idl b/include/rtworkq.idl
index c2e8237bba..a899376e92 100644
--- a/include/rtworkq.idl
+++ b/include/rtworkq.idl
@@ -52,6 +52,15 @@ interface IRtwqAsyncCallback : IUnknown
HRESULT Invoke([in] IRtwqAsyncResult *result);
}
+cpp_quote("#define RTWQ_E_ERROR(x) ((HRESULT)(0xc00d0000L+x))")
+cpp_quote("#define RTWQ_E_BUFFERTOOSMALL RTWQ_E_ERROR(14001)")
+cpp_quote("#define RTWQ_E_NOT_INITIALIZED RTWQ_E_ERROR(14006)")
+cpp_quote("#define RTWQ_E_UNEXPECTED RTWQ_E_ERROR(14011)")
+cpp_quote("#define RTWQ_E_NOT_FOUND RTWQ_E_ERROR(14037)")
+cpp_quote("#define RTWQ_E_OPERATION_CANCELLED RTWQ_E_ERROR(14061)")
+cpp_quote("#define RTWQ_E_INVALID_WORKQUEUE RTWQ_E_ERROR(14079)")
+cpp_quote("#define RTWQ_E_SHUTDOWN RTWQ_E_ERROR(16005)")
+
cpp_quote("#ifdef __WINESRC__")
cpp_quote("typedef struct tagRTWQASYNCRESULT")
cpp_quote("{")
@@ -68,6 +77,9 @@ cpp_quote("} RTWQASYNCRESULT;")
cpp_quote("HRESULT WINAPI RtwqCreateAsyncResult(IUnknown *object, IRtwqAsyncCallback *callback, IUnknown *state, IRtwqAsyncResult **result);")
cpp_quote("HRESULT WINAPI RtwqLockPlatform(void);")
+cpp_quote("HRESULT WINAPI RtwqLockWorkQueue(DWORD queue);")
+cpp_quote("HRESULT WINAPI RtwqPutWaitingWorkItem(HANDLE event, LONG priority, IRtwqAsyncResult *result, RTWQWORKITEM_KEY *key);")
cpp_quote("HRESULT WINAPI RtwqShutdown(void);")
cpp_quote("HRESULT WINAPI RtwqStartup(void);")
cpp_quote("HRESULT WINAPI RtwqUnlockPlatform(void);")
+cpp_quote("HRESULT WINAPI RtwqUnlockWorkQueue(DWORD queue);")
--
2.24.1
More information about the wine-devel
mailing list