[v2 PATCH 05/10] mfplat: Add basic support for submitting work items.

Nikolay Sivov nsivov at codeweavers.com
Fri Mar 1 02:03:10 CST 2019


Signed-off-by: Nikolay Sivov <nsivov at codeweavers.com>
---
 dlls/mfplat/queue.c | 150 +++++++++++++++++++++++++++++++++++++++++++-
 1 file changed, 147 insertions(+), 3 deletions(-)

diff --git a/dlls/mfplat/queue.c b/dlls/mfplat/queue.c
index 3b599840cb..8c7cf195e5 100644
--- a/dlls/mfplat/queue.c
+++ b/dlls/mfplat/queue.c
@@ -33,9 +33,16 @@ WINE_DEFAULT_DEBUG_CHANNEL(mfplat);
 #define FIRST_USER_QUEUE_HANDLE 5
 #define MAX_USER_QUEUE_HANDLES 124
 
+struct work_item
+{
+    LONG refcount;
+    IMFAsyncResult *result;
+};
+
 struct queue
 {
     TP_POOL *pool;
+    TP_CALLBACK_ENVIRON env;
 };
 
 struct queue_handle
@@ -86,9 +93,104 @@ enum system_queue_index
 
 static struct queue system_queues[SYS_QUEUE_COUNT];
 
+static struct queue *get_system_queue(DWORD queue_id)
+{
+    switch (queue_id)
+    {
+        case MFASYNC_CALLBACK_QUEUE_STANDARD:
+        case MFASYNC_CALLBACK_QUEUE_RT:
+        case MFASYNC_CALLBACK_QUEUE_IO:
+        case MFASYNC_CALLBACK_QUEUE_TIMER:
+        case MFASYNC_CALLBACK_QUEUE_MULTITHREADED:
+        case MFASYNC_CALLBACK_QUEUE_LONG_FUNCTION:
+            return &system_queues[queue_id - 1];
+        default:
+            return NULL;
+    }
+}
+
+static void CALLBACK standard_queue_cleanup_callback(void *object_data, void *group_data)
+{
+}
+
+static struct work_item * alloc_work_item(struct queue *queue, IMFAsyncResult *result)
+{
+    struct work_item *item;
+
+    item = heap_alloc_zero(sizeof(*item));
+    item->result = result;
+    IMFAsyncResult_AddRef(item->result);
+    item->refcount = 1;
+
+    return item;
+}
+
+static void release_work_item(struct work_item *item)
+{
+    if (InterlockedDecrement(&item->refcount) == 0)
+    {
+         IMFAsyncResult_Release(item->result);
+         heap_free(item);
+    }
+}
+
 static void init_work_queue(MFASYNC_WORKQUEUE_TYPE queue_type, struct queue *queue)
 {
+    unsigned int max_thread;
+
     queue->pool = CreateThreadpool(NULL);
+    queue->env.Version = 1;
+    queue->env.Pool = queue->pool;
+    queue->env.CleanupGroup = CreateThreadpoolCleanupGroup();
+    queue->env.CleanupGroupCancelCallback = standard_queue_cleanup_callback;
+
+    max_thread = (queue_type == MF_STANDARD_WORKQUEUE || queue_type == MF_WINDOW_WORKQUEUE) ? 1 : 4;
+
+    SetThreadpoolThreadMinimum(queue->pool, 1);
+    SetThreadpoolThreadMaximum(queue->pool, max_thread);
+
+    if (queue_type == MF_WINDOW_WORKQUEUE)
+        FIXME("MF_WINDOW_WORKQUEUE is not supported.\n");
+}
+
+static HRESULT grab_queue(DWORD queue_id, struct queue **ret)
+{
+    struct queue *queue = get_system_queue(queue_id);
+    MFASYNC_WORKQUEUE_TYPE queue_type;
+    struct queue_handle *entry;
+
+    if (!system_queues[SYS_QUEUE_STANDARD].pool)
+        return MF_E_SHUTDOWN;
+
+    if (queue && queue->pool)
+    {
+        *ret = queue;
+        return S_OK;
+    }
+    else if (queue)
+    {
+        EnterCriticalSection(&queues_section);
+        switch (queue_id)
+        {
+            case MFASYNC_CALLBACK_QUEUE_IO:
+            case MFASYNC_CALLBACK_QUEUE_MULTITHREADED:
+            case MFASYNC_CALLBACK_QUEUE_LONG_FUNCTION:
+                queue_type = MF_MULTITHREADED_WORKQUEUE;
+                break;
+            default:
+                queue_type = MF_STANDARD_WORKQUEUE;
+        }
+        init_work_queue(queue_type, queue);
+        LeaveCriticalSection(&queues_section);
+        *ret = queue;
+        return S_OK;
+    }
+
+    /* Handles user queues. */
+    if ((entry = get_queue_obj(queue_id)))
+        *ret = entry->obj;
+
+    return *ret ? S_OK : MF_E_INVALID_WORKQUEUE;
 }
 
 void init_system_queues(void)
@@ -113,6 +215,7 @@ static void shutdown_queue(struct queue *queue)
     if (!queue->pool)
         return;
 
+    CloseThreadpoolCleanupGroupMembers(queue->env.CleanupGroup, TRUE, NULL);
     CloseThreadpool(queue->pool);
     queue->pool = NULL;
 }
@@ -131,6 +234,47 @@ void shutdown_system_queues(void)
     LeaveCriticalSection(&queues_section);
 }
 
+static void CALLBACK standard_queue_worker(TP_CALLBACK_INSTANCE *instance, void *context, TP_WORK *work)
+{
+    struct work_item *item = context;
+    MFASYNCRESULT *result = (MFASYNCRESULT *)item->result;
+
+    TRACE("result object %p.\n", result);
+
+    IMFAsyncCallback_Invoke(result->pCallback, item->result);
+
+    release_work_item(item);
+}
+
+static HRESULT queue_submit_item(struct queue *queue, IMFAsyncResult *result)
+{
+    struct work_item *item;
+    TP_WORK *work_object;
+
+    if (!(item = alloc_work_item(queue, result)))
+        return E_OUTOFMEMORY;
+
+    work_object = CreateThreadpoolWork(standard_queue_worker, item, &queue->env);
+    SubmitThreadpoolWork(work_object);
+
+    TRACE("dispatched %p.\n", result);
+
+    return S_OK;
+}
+
+static HRESULT queue_put_work_item(DWORD queue_id, IMFAsyncResult *result)
+{
+    struct queue *queue;
+    HRESULT hr;
+
+    if (FAILED(hr = grab_queue(queue_id, &queue)))
+        return hr;
+
+    hr = queue_submit_item(queue, result);
+
+    return hr;
+}
+
 static HRESULT alloc_user_queue(MFASYNC_WORKQUEUE_TYPE queue_type, DWORD *queue_id)
 {
     struct queue_handle *entry;
@@ -440,7 +584,7 @@ HRESULT WINAPI MFPutWorkItem(DWORD queue, IMFAsyncCallback *callback, IUnknown *
     if (FAILED(hr = MFCreateAsyncResult(NULL, callback, state, &result)))
         return hr;
 
-    hr = MFPutWorkItemEx(queue, result);
+    hr = queue_put_work_item(queue, result);
 
     IMFAsyncResult_Release(result);
 
@@ -452,9 +596,9 @@ HRESULT WINAPI MFPutWorkItem(DWORD queue, IMFAsyncCallback *callback, IUnknown *
  */
 HRESULT WINAPI MFPutWorkItemEx(DWORD queue, IMFAsyncResult *result)
 {
-    FIXME("%#x, %p\n", queue, result);
+    TRACE("%#x, %p\n", queue, result);
 
-    return E_NOTIMPL;
+    return queue_put_work_item(queue, result);
 }
 
 /***********************************************************************
-- 
2.20.1




More information about the wine-devel mailing list