[PATCH 4/9] rtworkq: Add RtwqPutWaitingWorkItem().

Nikolay Sivov nsivov at codeweavers.com
Tue Feb 4 23:44:08 CST 2020


Signed-off-by: Nikolay Sivov <nsivov at codeweavers.com>
---
 dlls/rtworkq/queue.c      | 391 +++++++++++++++++++++++++++++++++++---
 dlls/rtworkq/rtworkq.spec |   6 +-
 include/rtworkq.idl       |  12 ++
 3 files changed, 384 insertions(+), 25 deletions(-)

diff --git a/dlls/rtworkq/queue.c b/dlls/rtworkq/queue.c
index 31a17c68ae..7d949e8e62 100644
--- a/dlls/rtworkq/queue.c
+++ b/dlls/rtworkq/queue.c
@@ -27,6 +27,34 @@
 
 WINE_DEFAULT_DEBUG_CHANNEL(mfplat);
 
+#define FIRST_USER_QUEUE_HANDLE 5
+#define MAX_USER_QUEUE_HANDLES 124
+
+#define WAIT_ITEM_KEY_MASK      (0x82000000)
+#define SCHEDULED_ITEM_KEY_MASK (0x80000000)
+
+static LONG next_item_key;
+
+static RTWQWORKITEM_KEY get_item_key(DWORD mask, DWORD key)
+{
+    return ((RTWQWORKITEM_KEY)mask << 32) | key;
+}
+
+static RTWQWORKITEM_KEY generate_item_key(DWORD mask)
+{
+    return get_item_key(mask, InterlockedIncrement(&next_item_key));
+}
+
+struct queue_handle
+{
+    void *obj;
+    LONG refcount;
+    WORD generation;
+};
+
+static struct queue_handle user_queues[MAX_USER_QUEUE_HANDLES];
+static struct queue_handle *next_free_user_queue;
+
 static CRITICAL_SECTION queues_section;
 static CRITICAL_SECTION_DEBUG queues_critsect_debug =
 {
@@ -38,6 +66,33 @@ static CRITICAL_SECTION queues_section = { &queues_critsect_debug, -1, 0, 0, 0,
 
 static LONG platform_lock;
 
+static struct queue_handle *get_queue_obj(DWORD handle)
+{
+    unsigned int idx = HIWORD(handle) - FIRST_USER_QUEUE_HANDLE;
+
+    if (idx < MAX_USER_QUEUE_HANDLES && user_queues[idx].refcount)
+    {
+        if (LOWORD(handle) == user_queues[idx].generation)
+            return &user_queues[idx];
+    }
+
+    return NULL;
+}
+
+/* Should be kept in sync with corresponding MFASYNC_CALLBACK_ constants. */
+enum rtwq_callback_queue_id
+{
+    RTWQ_CALLBACK_QUEUE_UNDEFINED     = 0x00000000,
+    RTWQ_CALLBACK_QUEUE_STANDARD      = 0x00000001,
+    RTWQ_CALLBACK_QUEUE_RT            = 0x00000002,
+    RTWQ_CALLBACK_QUEUE_IO            = 0x00000003,
+    RTWQ_CALLBACK_QUEUE_TIMER         = 0x00000004,
+    RTWQ_CALLBACK_QUEUE_MULTITHREADED = 0x00000005,
+    RTWQ_CALLBACK_QUEUE_LONG_FUNCTION = 0x00000007,
+    RTWQ_CALLBACK_QUEUE_PRIVATE_MASK  = 0xffff0000,
+    RTWQ_CALLBACK_QUEUE_ALL           = 0xffffffff,
+};
+
 enum system_queue_index
 {
     SYS_QUEUE_STANDARD = 0,
@@ -81,10 +136,40 @@ struct queue
 
 static struct queue system_queues[SYS_QUEUE_COUNT];
 
+static struct queue *get_system_queue(DWORD queue_id)
+{
+    switch (queue_id)
+    {
+        case RTWQ_CALLBACK_QUEUE_STANDARD:
+        case RTWQ_CALLBACK_QUEUE_RT:
+        case RTWQ_CALLBACK_QUEUE_IO:
+        case RTWQ_CALLBACK_QUEUE_TIMER:
+        case RTWQ_CALLBACK_QUEUE_MULTITHREADED:
+        case RTWQ_CALLBACK_QUEUE_LONG_FUNCTION:
+            return &system_queues[queue_id - 1];
+        default:
+            return NULL;
+    }
+}
+
 static void CALLBACK standard_queue_cleanup_callback(void *object_data, void *group_data)
 {
 }
 
+static struct work_item * alloc_work_item(struct queue *queue, IRtwqAsyncResult *result)
+{
+    struct work_item *item;
+
+    item = heap_alloc_zero(sizeof(*item));
+    item->result = result;
+    IRtwqAsyncResult_AddRef(item->result);
+    item->refcount = 1;
+    item->queue = queue;
+    list_init(&item->entry);
+
+    return item;
+}
+
 static void release_work_item(struct work_item *item)
 {
     if (InterlockedDecrement(&item->refcount) == 0)
@@ -94,6 +179,12 @@ static void release_work_item(struct work_item *item)
     }
 }
 
+static struct work_item *grab_work_item(struct work_item *item)
+{
+    InterlockedIncrement(&item->refcount);
+    return item;
+}
+
 static void init_work_queue(RTWQ_WORKQUEUE_TYPE queue_type, struct queue *queue)
 {
     TP_CALLBACK_ENVIRON_V3 env;
@@ -125,6 +216,255 @@ static void init_work_queue(RTWQ_WORKQUEUE_TYPE queue_type, struct queue *queue)
         FIXME("RTWQ_WINDOW_WORKQUEUE is not supported.\n");
 }
 
+static HRESULT grab_queue(DWORD queue_id, struct queue **ret)
+{
+    struct queue *queue = get_system_queue(queue_id);
+    RTWQ_WORKQUEUE_TYPE queue_type;
+    struct queue_handle *entry;
+
+    *ret = NULL;
+
+    if (!system_queues[SYS_QUEUE_STANDARD].pool)
+        return RTWQ_E_SHUTDOWN;
+
+    if (queue && queue->pool)
+    {
+        *ret = queue;
+        return S_OK;
+    }
+    else if (queue)
+    {
+        EnterCriticalSection(&queues_section);
+        switch (queue_id)
+        {
+            case RTWQ_CALLBACK_QUEUE_IO:
+            case RTWQ_CALLBACK_QUEUE_MULTITHREADED:
+            case RTWQ_CALLBACK_QUEUE_LONG_FUNCTION:
+                queue_type = RTWQ_MULTITHREADED_WORKQUEUE;
+                break;
+            default:
+                queue_type = RTWQ_STANDARD_WORKQUEUE;
+        }
+        init_work_queue(queue_type, queue);
+        LeaveCriticalSection(&queues_section);
+        *ret = queue;
+        return S_OK;
+    }
+
+    /* Handles user queues. */
+    if ((entry = get_queue_obj(queue_id)))
+        *ret = entry->obj;
+
+    return *ret ? S_OK : RTWQ_E_INVALID_WORKQUEUE;
+}
+
+static HRESULT lock_user_queue(DWORD queue)
+{
+    HRESULT hr = RTWQ_E_INVALID_WORKQUEUE;
+    struct queue_handle *entry;
+
+    if (!(queue & RTWQ_CALLBACK_QUEUE_PRIVATE_MASK))
+        return S_OK;
+
+    EnterCriticalSection(&queues_section);
+    entry = get_queue_obj(queue);
+    if (entry && entry->refcount)
+    {
+        entry->refcount++;
+        hr = S_OK;
+    }
+    LeaveCriticalSection(&queues_section);
+    return hr;
+}
+
+static void shutdown_queue(struct queue *queue)
+{
+    struct work_item *item, *item2;
+
+    if (!queue->pool)
+        return;
+
+    CloseThreadpoolCleanupGroupMembers(queue->envs[0].CleanupGroup, TRUE, NULL);
+    CloseThreadpool(queue->pool);
+    queue->pool = NULL;
+
+    EnterCriticalSection(&queue->cs);
+    LIST_FOR_EACH_ENTRY_SAFE(item, item2, &queue->pending_items, struct work_item, entry)
+    {
+        list_remove(&item->entry);
+        release_work_item(item);
+    }
+    LeaveCriticalSection(&queue->cs);
+
+    DeleteCriticalSection(&queue->cs);
+}
+
+static HRESULT unlock_user_queue(DWORD queue)
+{
+    HRESULT hr = RTWQ_E_INVALID_WORKQUEUE;
+    struct queue_handle *entry;
+
+    if (!(queue & RTWQ_CALLBACK_QUEUE_PRIVATE_MASK))
+        return S_OK;
+
+    EnterCriticalSection(&queues_section);
+    entry = get_queue_obj(queue);
+    if (entry && entry->refcount)
+    {
+        if (--entry->refcount == 0)
+        {
+            shutdown_queue((struct queue *)entry->obj);
+            heap_free(entry->obj);
+            entry->obj = next_free_user_queue;
+            next_free_user_queue = entry;
+        }
+        hr = S_OK;
+    }
+    LeaveCriticalSection(&queues_section);
+    return hr;
+}
+
+static void CALLBACK standard_queue_worker(TP_CALLBACK_INSTANCE *instance, void *context, TP_WORK *work)
+{
+    struct work_item *item = context;
+    RTWQASYNCRESULT *result = (RTWQASYNCRESULT *)item->result;
+
+    TRACE("result object %p.\n", result);
+
+    IRtwqAsyncCallback_Invoke(result->pCallback, item->result);
+
+    release_work_item(item);
+}
+
+static HRESULT queue_submit_item(struct queue *queue, LONG priority, IRtwqAsyncResult *result)
+{
+    TP_CALLBACK_PRIORITY callback_priority;
+    struct work_item *item;
+    TP_WORK *work_object;
+
+    if (!(item = alloc_work_item(queue, result)))
+        return E_OUTOFMEMORY;
+
+    if (priority == 0)
+        callback_priority = TP_CALLBACK_PRIORITY_NORMAL;
+    else if (priority < 0)
+        callback_priority = TP_CALLBACK_PRIORITY_LOW;
+    else
+        callback_priority = TP_CALLBACK_PRIORITY_HIGH;
+    work_object = CreateThreadpoolWork(standard_queue_worker, item,
+            (TP_CALLBACK_ENVIRON *)&queue->envs[callback_priority]);
+    SubmitThreadpoolWork(work_object);
+
+    TRACE("dispatched %p.\n", result);
+
+    return S_OK;
+}
+
+static HRESULT queue_put_work_item(DWORD queue_id, LONG priority, IRtwqAsyncResult *result)
+{
+    struct queue *queue;
+    HRESULT hr;
+
+    if (FAILED(hr = grab_queue(queue_id, &queue)))
+        return hr;
+
+    return queue_submit_item(queue, priority, result);
+}
+
+static HRESULT invoke_async_callback(IRtwqAsyncResult *result)
+{
+    RTWQASYNCRESULT *result_data = (RTWQASYNCRESULT *)result;
+    DWORD queue = RTWQ_CALLBACK_QUEUE_STANDARD, flags;
+    HRESULT hr;
+
+    if (FAILED(IRtwqAsyncCallback_GetParameters(result_data->pCallback, &flags, &queue)))
+        queue = RTWQ_CALLBACK_QUEUE_STANDARD;
+
+    if (FAILED(lock_user_queue(queue)))
+        queue = RTWQ_CALLBACK_QUEUE_STANDARD;
+
+    hr = queue_put_work_item(queue, 0, result);
+
+    unlock_user_queue(queue);
+
+    return hr;
+}
+
+static void queue_release_pending_item(struct work_item *item)
+{
+    EnterCriticalSection(&item->queue->cs);
+    if (item->key)
+    {
+        list_remove(&item->entry);
+        item->key = 0;
+        release_work_item(item);
+    }
+    LeaveCriticalSection(&item->queue->cs);
+}
+
+static void CALLBACK waiting_item_callback(TP_CALLBACK_INSTANCE *instance, void *context, TP_WAIT *wait,
+        TP_WAIT_RESULT wait_result)
+{
+    struct work_item *item = context;
+
+    TRACE("result object %p.\n", item->result);
+
+    invoke_async_callback(item->result);
+
+    release_work_item(item);
+}
+
+static void CALLBACK waiting_item_cancelable_callback(TP_CALLBACK_INSTANCE *instance, void *context, TP_WAIT *wait,
+        TP_WAIT_RESULT wait_result)
+{
+    struct work_item *item = context;
+
+    TRACE("result object %p.\n", item->result);
+
+    queue_release_pending_item(item);
+
+    invoke_async_callback(item->result);
+
+    release_work_item(item);
+}
+
+static void queue_mark_item_pending(DWORD mask, struct work_item *item, RTWQWORKITEM_KEY *key)
+{
+    *key = generate_item_key(mask);
+    item->key = *key;
+
+    EnterCriticalSection(&item->queue->cs);
+    list_add_tail(&item->queue->pending_items, &item->entry);
+    grab_work_item(item);
+    LeaveCriticalSection(&item->queue->cs);
+}
+
+static HRESULT queue_submit_wait(struct queue *queue, HANDLE event, LONG priority, IRtwqAsyncResult *result,
+        RTWQWORKITEM_KEY *key)
+{
+    PTP_WAIT_CALLBACK callback;
+    struct work_item *item;
+
+    if (!(item = alloc_work_item(queue, result)))
+        return E_OUTOFMEMORY;
+
+    if (key)
+    {
+        queue_mark_item_pending(WAIT_ITEM_KEY_MASK, item, key);
+        callback = waiting_item_cancelable_callback;
+    }
+    else
+        callback = waiting_item_callback;
+
+    item->u.wait_object = CreateThreadpoolWait(callback, item,
+            (TP_CALLBACK_ENVIRON *)&queue->envs[TP_CALLBACK_PRIORITY_NORMAL]);
+    SetThreadpoolWait(item->u.wait_object, event, NULL);
+
+    TRACE("dispatched %p.\n", result);
+
+    return S_OK;
+}
+
 struct async_result
 {
     RTWQASYNCRESULT result;
@@ -342,28 +682,6 @@ HRESULT WINAPI RtwqStartup(void)
     return S_OK;
 }
 
-static void shutdown_queue(struct queue *queue)
-{
-    struct work_item *item, *item2;
-
-    if (!queue->pool)
-        return;
-
-    CloseThreadpoolCleanupGroupMembers(queue->envs[0].CleanupGroup, TRUE, NULL);
-    CloseThreadpool(queue->pool);
-    queue->pool = NULL;
-
-    EnterCriticalSection(&queue->cs);
-    LIST_FOR_EACH_ENTRY_SAFE(item, item2, &queue->pending_items, struct work_item, entry)
-    {
-        list_remove(&item->entry);
-        release_work_item(item);
-    }
-    LeaveCriticalSection(&queue->cs);
-
-    DeleteCriticalSection(&queue->cs);
-}
-
 static void shutdown_system_queues(void)
 {
     unsigned int i;
@@ -390,3 +708,32 @@ HRESULT WINAPI RtwqShutdown(void)
 
     return S_OK;
 }
+
+HRESULT WINAPI RtwqPutWaitingWorkItem(HANDLE event, LONG priority, IRtwqAsyncResult *result, RTWQWORKITEM_KEY *key)
+{
+    struct queue *queue;
+    HRESULT hr;
+
+    TRACE("%p, %d, %p, %p.\n", event, priority, result, key);
+
+    if (FAILED(hr = grab_queue(RTWQ_CALLBACK_QUEUE_TIMER, &queue)))
+        return hr;
+
+    hr = queue_submit_wait(queue, event, priority, result, key);
+
+    return hr;
+}
+
+HRESULT WINAPI RtwqLockWorkQueue(DWORD queue)
+{
+    TRACE("%#x.\n", queue);
+
+    return lock_user_queue(queue);
+}
+
+HRESULT WINAPI RtwqUnlockWorkQueue(DWORD queue)
+{
+    TRACE("%#x.\n", queue);
+
+    return unlock_user_queue(queue);
+}
diff --git a/dlls/rtworkq/rtworkq.spec b/dlls/rtworkq/rtworkq.spec
index 8c352593ff..47e1c1b2ae 100644
--- a/dlls/rtworkq/rtworkq.spec
+++ b/dlls/rtworkq/rtworkq.spec
@@ -17,9 +17,9 @@
 @ stub RtwqJoinWorkQueue
 @ stdcall RtwqLockPlatform()
 @ stub RtwqLockSharedWorkQueue
-@ stub RtwqLockWorkQueue
+@ stdcall RtwqLockWorkQueue(long)
 @ stub RtwqPutMultipleWaitingWorkItem
-@ stub RtwqPutWaitingWorkItem
+@ stdcall RtwqPutWaitingWorkItem(long long ptr ptr)
 @ stub RtwqPutWorkItem
 @ stub RtwqRegisterPlatformEvents
 @ stub RtwqRegisterPlatformWithMMCSS
@@ -32,6 +32,6 @@
 @ stdcall RtwqStartup()
 @ stub RtwqUnjoinWorkQueue
 @ stdcall RtwqUnlockPlatform()
-@ stub RtwqUnlockWorkQueue
+@ stdcall RtwqUnlockWorkQueue(long)
 @ stub RtwqUnregisterPlatformEvents
 @ stub RtwqUnregisterPlatformFromMMCSS
diff --git a/include/rtworkq.idl b/include/rtworkq.idl
index c2e8237bba..a899376e92 100644
--- a/include/rtworkq.idl
+++ b/include/rtworkq.idl
@@ -52,6 +52,15 @@ interface IRtwqAsyncCallback : IUnknown
     HRESULT Invoke([in] IRtwqAsyncResult *result);
 }
 
+cpp_quote("#define RTWQ_E_ERROR(x)            ((HRESULT)(0xc00d0000L+x))")
+cpp_quote("#define RTWQ_E_BUFFERTOOSMALL      RTWQ_E_ERROR(14001)")
+cpp_quote("#define RTWQ_E_NOT_INITIALIZED     RTWQ_E_ERROR(14006)")
+cpp_quote("#define RTWQ_E_UNEXPECTED          RTWQ_E_ERROR(14011)")
+cpp_quote("#define RTWQ_E_NOT_FOUND           RTWQ_E_ERROR(14037)")
+cpp_quote("#define RTWQ_E_OPERATION_CANCELLED RTWQ_E_ERROR(14061)")
+cpp_quote("#define RTWQ_E_INVALID_WORKQUEUE   RTWQ_E_ERROR(14079)")
+cpp_quote("#define RTWQ_E_SHUTDOWN            RTWQ_E_ERROR(16005)")
+
 cpp_quote("#ifdef __WINESRC__")
 cpp_quote("typedef struct tagRTWQASYNCRESULT")
 cpp_quote("{")
@@ -68,6 +77,9 @@ cpp_quote("} RTWQASYNCRESULT;")
 
 cpp_quote("HRESULT WINAPI RtwqCreateAsyncResult(IUnknown *object, IRtwqAsyncCallback *callback, IUnknown *state, IRtwqAsyncResult **result);")
 cpp_quote("HRESULT WINAPI RtwqLockPlatform(void);")
+cpp_quote("HRESULT WINAPI RtwqLockWorkQueue(DWORD queue);")
+cpp_quote("HRESULT WINAPI RtwqPutWaitingWorkItem(HANDLE event, LONG priority, IRtwqAsyncResult *result, RTWQWORKITEM_KEY *key);")
 cpp_quote("HRESULT WINAPI RtwqShutdown(void);")
 cpp_quote("HRESULT WINAPI RtwqStartup(void);")
 cpp_quote("HRESULT WINAPI RtwqUnlockPlatform(void);")
+cpp_quote("HRESULT WINAPI RtwqUnlockWorkQueue(DWORD queue);")
-- 
2.24.1




More information about the wine-devel mailing list