[PATCH 07/10] mfplat: Implement waiting items with ability to cancel.

Nikolay Sivov nsivov at codeweavers.com
Fri Mar 1 00:59:36 CST 2019


Signed-off-by: Nikolay Sivov <nsivov at codeweavers.com>
---
 dlls/mfplat/mfplat.spec    |   1 +
 dlls/mfplat/queue.c        | 288 +++++++++++++++++++++++++++++--------
 dlls/mfplat/tests/mfplat.c |   5 +-
 3 files changed, 233 insertions(+), 61 deletions(-)

diff --git a/dlls/mfplat/mfplat.spec b/dlls/mfplat/mfplat.spec
index a218087f6a..c43746ec7b 100644
--- a/dlls/mfplat/mfplat.spec
+++ b/dlls/mfplat/mfplat.spec
@@ -120,6 +120,7 @@
 @ stub MFJoinIoPort
 @ stdcall MFLockPlatform()
 @ stdcall MFLockWorkQueue(long)
+@ stdcall MFPutWaitingWorkItem(long long ptr ptr)
 @ stdcall MFPutWorkItem(long ptr ptr)
 @ stdcall MFPutWorkItemEx(long ptr)
 @ stub MFRecordError
diff --git a/dlls/mfplat/queue.c b/dlls/mfplat/queue.c
index 8c7cf195e5..1434645f49 100644
--- a/dlls/mfplat/queue.c
+++ b/dlls/mfplat/queue.c
@@ -25,6 +25,7 @@
 
 #include "wine/debug.h"
 #include "wine/heap.h"
+#include "wine/list.h"
 
 #include "mfplat_private.h"
 
@@ -33,16 +34,34 @@ WINE_DEFAULT_DEBUG_CHANNEL(mfplat);
 #define FIRST_USER_QUEUE_HANDLE 5
 #define MAX_USER_QUEUE_HANDLES 124
 
+#define WAIT_ITEM_KEY_MASK      (0x82000000)
+
+static LONG next_item_key;
+
+static MFWORKITEM_KEY generate_item_key(DWORD mask)
+{
+    return ((MFWORKITEM_KEY)mask << 32) | InterlockedIncrement(&next_item_key);
+}
+
 struct work_item
 {
+    struct list entry;
     LONG refcount;
     IMFAsyncResult *result;
+    struct queue *queue;
+    MFWORKITEM_KEY key;
+    union
+    {
+        TP_WAIT *wait_object;
+    } u;
 };
 
 struct queue
 {
     TP_POOL *pool;
     TP_CALLBACK_ENVIRON env;
+    CRITICAL_SECTION cs;
+    struct list pending_items;
 };
 
 struct queue_handle
@@ -121,6 +140,8 @@ static struct work_item * alloc_work_item(struct queue *queue, IMFAsyncResult *r
     item->result = result;
     IMFAsyncResult_AddRef(item->result);
     item->refcount = 1;
+    item->queue = queue;
+    list_init(&item->entry);
 
     return item;
 }
@@ -143,6 +164,8 @@ static void init_work_queue(MFASYNC_WORKQUEUE_TYPE queue_type, struct queue *que
     queue->env.Pool = queue->pool;
     queue->env.CleanupGroup = CreateThreadpoolCleanupGroup();
     queue->env.CleanupGroupCancelCallback = standard_queue_cleanup_callback;
+    list_init(&queue->pending_items);
+    InitializeCriticalSection(&queue->cs);
 
     max_thread = (queue_type == MF_STANDARD_WORKQUEUE || queue_type == MF_WINDOW_WORKQUEUE) ? 1 : 4;
 
@@ -210,14 +233,68 @@ void init_system_queues(void)
     LeaveCriticalSection(&queues_section);
 }
 
+static HRESULT lock_user_queue(DWORD queue)
+{
+    HRESULT hr = MF_E_INVALID_WORKQUEUE;
+    struct queue_handle *entry;
+
+    if (!(queue & MFASYNC_CALLBACK_QUEUE_PRIVATE_MASK))
+        return S_OK;
+
+    EnterCriticalSection(&queues_section);
+    entry = get_queue_obj(queue);
+    if (entry && entry->refcount)
+    {
+        entry->refcount++;
+        hr = S_OK;
+    }
+    LeaveCriticalSection(&queues_section);
+    return hr;
+}
+
+static HRESULT unlock_user_queue(DWORD queue)
+{
+    HRESULT hr = MF_E_INVALID_WORKQUEUE;
+    struct queue_handle *entry;
+
+    if (!(queue & MFASYNC_CALLBACK_QUEUE_PRIVATE_MASK))
+        return S_OK;
+
+    EnterCriticalSection(&queues_section);
+    entry = get_queue_obj(queue);
+    if (entry && entry->refcount)
+    {
+        if (--entry->refcount == 0)
+        {
+            entry->obj = next_free_user_queue;
+            next_free_user_queue = entry;
+        }
+        hr = S_OK;
+    }
+    LeaveCriticalSection(&queues_section);
+    return hr;
+}
+
 static void shutdown_queue(struct queue *queue)
 {
+    struct work_item *item, *item2;
+
     if (!queue->pool)
         return;
 
     CloseThreadpoolCleanupGroupMembers(queue->env.CleanupGroup, TRUE, NULL);
     CloseThreadpool(queue->pool);
     queue->pool = NULL;
+
+    EnterCriticalSection(&queue->cs);
+    LIST_FOR_EACH_ENTRY_SAFE(item, item2, &queue->pending_items, struct work_item, entry)
+    {
+        list_remove(&item->entry);
+        release_work_item(item);
+    }
+    LeaveCriticalSection(&queue->cs);
+
+    DeleteCriticalSection(&queue->cs);
 }
 
 void shutdown_system_queues(void)
@@ -234,6 +311,11 @@ void shutdown_system_queues(void)
     LeaveCriticalSection(&queues_section);
 }
 
+static void grab_work_item(struct work_item *item)
+{
+    InterlockedIncrement(&item->refcount);
+}
+
 static void CALLBACK standard_queue_worker(TP_CALLBACK_INSTANCE *instance, void *context, TP_WORK *work)
 {
     struct work_item *item = context;
@@ -275,6 +357,124 @@ static HRESULT queue_put_work_item(DWORD queue_id, IMFAsyncResult *result)
     return hr;
 }
 
+static HRESULT invoke_async_callback(IMFAsyncResult *result)
+{
+    MFASYNCRESULT *result_data = (MFASYNCRESULT *)result;
+    DWORD queue = MFASYNC_CALLBACK_QUEUE_STANDARD, flags;
+    HRESULT hr;
+
+    if (FAILED(IMFAsyncCallback_GetParameters(result_data->pCallback, &flags, &queue)))
+        queue = MFASYNC_CALLBACK_QUEUE_STANDARD;
+
+    if (FAILED(lock_user_queue(queue)))
+        queue = MFASYNC_CALLBACK_QUEUE_STANDARD;
+
+    hr = queue_put_work_item(queue, result);
+
+    unlock_user_queue(queue);
+
+    return hr;
+}
+
+static void queue_release_pending_item(struct work_item *item)
+{
+    EnterCriticalSection(&item->queue->cs);
+    if (item->key)
+    {
+        list_remove(&item->entry);
+        item->key = 0;
+        release_work_item(item);
+    }
+    LeaveCriticalSection(&item->queue->cs);
+}
+
+static void CALLBACK waiting_item_callback(TP_CALLBACK_INSTANCE *instance, void *context, TP_WAIT *wait,
+        TP_WAIT_RESULT wait_result)
+{
+    struct work_item *item = context;
+
+    TRACE("result object %p.\n", item->result);
+
+    invoke_async_callback(item->result);
+
+    release_work_item(item);
+}
+
+static void CALLBACK waiting_item_cancelable_callback(TP_CALLBACK_INSTANCE *instance, void *context, TP_WAIT *wait,
+        TP_WAIT_RESULT wait_result)
+{
+    struct work_item *item = context;
+
+    TRACE("result object %p.\n", item->result);
+
+    queue_release_pending_item(item);
+
+    invoke_async_callback(item->result);
+
+    release_work_item(item);
+}
+
+static void queue_mark_item_pending(DWORD mask, struct work_item *item, MFWORKITEM_KEY *key)
+{
+    *key = generate_item_key(mask);
+    item->key = *key;
+
+    EnterCriticalSection(&item->queue->cs);
+    list_add_tail(&item->queue->pending_items, &item->entry);
+    grab_work_item(item);
+    LeaveCriticalSection(&item->queue->cs);
+}
+
+static HRESULT queue_submit_wait(struct queue *queue, HANDLE event, LONG priority, IMFAsyncResult *result,
+        MFWORKITEM_KEY *key)
+{
+    PTP_WAIT_CALLBACK callback;
+    struct work_item *item;
+
+    if (!(item = alloc_work_item(queue, result)))
+        return E_OUTOFMEMORY;
+
+    if (key)
+    {
+        queue_mark_item_pending(WAIT_ITEM_KEY_MASK, item, key);
+        callback = waiting_item_cancelable_callback;
+    }
+    else
+        callback = waiting_item_callback;
+
+    item->u.wait_object = CreateThreadpoolWait(callback, item, &queue->env);
+    SetThreadpoolWait(item->u.wait_object, event, NULL);
+
+    TRACE("dispatched %p.\n", result);
+
+    return S_OK;
+}
+
+static HRESULT queue_cancel_item(struct queue *queue, MFWORKITEM_KEY key)
+{
+    HRESULT hr = MF_E_NOT_FOUND;
+    struct work_item *item;
+
+    EnterCriticalSection(&queue->cs);
+    LIST_FOR_EACH_ENTRY(item, &queue->pending_items, struct work_item, entry)
+    {
+        if (item->key == key)
+        {
+            key >>= 32;
+            if ((key & WAIT_ITEM_KEY_MASK) == WAIT_ITEM_KEY_MASK)
+                CloseThreadpoolWait(item->u.wait_object);
+            else
+                WARN("Unknown item key mask %#x.\n", (DWORD)key);
+            queue_release_pending_item(item);
+            hr = S_OK;
+            break;
+        }
+    }
+    LeaveCriticalSection(&queue->cs);
+
+    return hr;
+}
+
 static HRESULT alloc_user_queue(MFASYNC_WORKQUEUE_TYPE queue_type, DWORD *queue_id)
 {
     struct queue_handle *entry;
@@ -313,48 +513,6 @@ static HRESULT alloc_user_queue(MFASYNC_WORKQUEUE_TYPE queue_type, DWORD *queue_
     return S_OK;
 }
 
-static HRESULT lock_user_queue(DWORD queue)
-{
-    HRESULT hr = MF_E_INVALID_WORKQUEUE;
-    struct queue_handle *entry;
-
-    if (!(queue & MFASYNC_CALLBACK_QUEUE_PRIVATE_MASK))
-        return S_OK;
-
-    EnterCriticalSection(&queues_section);
-    entry = get_queue_obj(queue);
-    if (entry && entry->refcount)
-    {
-        entry->refcount++;
-        hr = S_OK;
-    }
-    LeaveCriticalSection(&queues_section);
-    return hr;
-}
-
-static HRESULT unlock_user_queue(DWORD queue)
-{
-    HRESULT hr = MF_E_INVALID_WORKQUEUE;
-    struct queue_handle *entry;
-
-    if (!(queue & MFASYNC_CALLBACK_QUEUE_PRIVATE_MASK))
-        return S_OK;
-
-    EnterCriticalSection(&queues_section);
-    entry = get_queue_obj(queue);
-    if (entry && entry->refcount)
-    {
-        if (--entry->refcount == 0)
-        {
-            entry->obj = next_free_user_queue;
-            next_free_user_queue = entry;
-        }
-        hr = S_OK;
-    }
-    LeaveCriticalSection(&queues_section);
-    return hr;
-}
-
 struct async_result
 {
     MFASYNCRESULT result;
@@ -606,23 +764,9 @@ HRESULT WINAPI MFPutWorkItemEx(DWORD queue, IMFAsyncResult *result)
  */
 HRESULT WINAPI MFInvokeCallback(IMFAsyncResult *result)
 {
-    MFASYNCRESULT *result_data = (MFASYNCRESULT *)result;
-    DWORD queue = MFASYNC_CALLBACK_QUEUE_STANDARD, flags;
-    HRESULT hr;
-
     TRACE("%p.\n", result);
 
-    if (FAILED(IMFAsyncCallback_GetParameters(result_data->pCallback, &flags, &queue)))
-        queue = MFASYNC_CALLBACK_QUEUE_STANDARD;
-
-    if (FAILED(MFLockWorkQueue(queue)))
-        queue = MFASYNC_CALLBACK_QUEUE_STANDARD;
-
-    hr = MFPutWorkItemEx(queue, result);
-
-    MFUnlockWorkQueue(queue);
-
-    return hr;
+    return invoke_async_callback(result);
 }
 
 static HRESULT schedule_work_item(IMFAsyncResult *result, INT64 timeout, MFWORKITEM_KEY *key)
@@ -662,12 +806,38 @@ HRESULT WINAPI MFScheduleWorkItem(IMFAsyncCallback *callback, IUnknown *state, I
     return hr;
 }
 
+/***********************************************************************
+ *      MFPutWaitingWorkItem (mfplat.@)
+ */
+HRESULT WINAPI MFPutWaitingWorkItem(HANDLE event, LONG priority, IMFAsyncResult *result, MFWORKITEM_KEY *key)
+{
+    struct queue *queue;
+    HRESULT hr;
+
+    TRACE("%p, %d, %p, %p.\n", event, priority, result, key);
+
+    if (FAILED(hr = grab_queue(MFASYNC_CALLBACK_QUEUE_TIMER, &queue)))
+        return hr;
+
+    hr = queue_submit_wait(queue, event, priority, result, key);
+
+    return hr;
+}
+
 /***********************************************************************
  *      MFCancelWorkItem (mfplat.@)
  */
 HRESULT WINAPI MFCancelWorkItem(MFWORKITEM_KEY key)
 {
-    FIXME("%s.\n", wine_dbgstr_longlong(key));
+    struct queue *queue;
+    HRESULT hr;
 
-    return E_NOTIMPL;
+    TRACE("%s.\n", wine_dbgstr_longlong(key));
+
+    if (FAILED(hr = grab_queue(MFASYNC_CALLBACK_QUEUE_TIMER, &queue)))
+        return hr;
+
+    hr = queue_cancel_item(queue, key);
+
+    return hr;
 }
diff --git a/dlls/mfplat/tests/mfplat.c b/dlls/mfplat/tests/mfplat.c
index 790eeaf6d9..cd55b3973b 100644
--- a/dlls/mfplat/tests/mfplat.c
+++ b/dlls/mfplat/tests/mfplat.c
@@ -1060,12 +1060,11 @@ todo_wine
     ok(hr == S_OK, "Failed to cancel item, hr %#x.\n", hr);
 
     hr = MFCancelWorkItem(key);
-todo_wine
     ok(hr == MF_E_NOT_FOUND || broken(hr == S_OK) /* < win10 */, "Unexpected hr %#x.\n", hr);
 
     if (!pMFPutWaitingWorkItem)
     {
-        skip("Waiting items are not supported.\n");
+        win_skip("Waiting items are not supported.\n");
         return;
     }
 
@@ -1087,9 +1086,11 @@ todo_wine
     IMFAsyncResult_Release(result);
 
     hr = MFScheduleWorkItem(&callback, NULL, -5000, &key);
+todo_wine
     ok(hr == S_OK, "Failed to schedule item, hr %#x.\n", hr);
 
     hr = MFCancelWorkItem(key);
+todo_wine
     ok(hr == S_OK, "Failed to cancel item, hr %#x.\n", hr);
 
     hr = MFShutdown();
-- 
2.20.1




More information about the wine-devel mailing list