[PATCH 5/9] rtworkq: Initialize queue with specific set of functionality.

Nikolay Sivov nsivov at codeweavers.com
Tue Feb 11 03:19:07 CST 2020


Signed-off-by: Nikolay Sivov <nsivov at codeweavers.com>
---
 dlls/rtworkq/queue.c | 162 ++++++++++++++++++++++++++++---------------
 1 file changed, 108 insertions(+), 54 deletions(-)

diff --git a/dlls/rtworkq/queue.c b/dlls/rtworkq/queue.c
index 38d376342d..e5026b6298 100644
--- a/dlls/rtworkq/queue.c
+++ b/dlls/rtworkq/queue.c
@@ -16,6 +16,8 @@
  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
  */
 
+#include <assert.h>
+
 #define COBJMACROS
 #define NONAMELESSUNION
 
@@ -136,13 +138,25 @@ static const TP_CALLBACK_PRIORITY priorities[] =
     TP_CALLBACK_PRIORITY_LOW,
 };
 
+struct queue;
+struct queue_desc;
+
+struct queue_ops
+{
+    HRESULT (*init)(const struct queue_desc *desc, struct queue *queue);
+    BOOL (*shutdown)(struct queue *queue);
+    void (*submit)(struct queue *queue, struct work_item *item);
+};
+
 struct queue_desc
 {
     RTWQ_WORKQUEUE_TYPE queue_type;
+    const struct queue_ops *ops;
 };
 
 struct queue
 {
+    const struct queue_ops *ops;
     TP_POOL *pool;
     TP_CALLBACK_ENVIRON_V3 envs[ARRAY_SIZE(priorities)];
     CRITICAL_SECTION cs;
@@ -171,6 +185,88 @@ static void CALLBACK standard_queue_cleanup_callback(void *object_data, void *gr
 {
 }
 
+static HRESULT pool_queue_init(const struct queue_desc *desc, struct queue *queue)
+{
+    TP_CALLBACK_ENVIRON_V3 env;
+    unsigned int max_thread, i;
+
+    queue->pool = CreateThreadpool(NULL);
+
+    memset(&env, 0, sizeof(env));
+    env.Version = 3;
+    env.Size = sizeof(env);
+    env.Pool = queue->pool;
+    env.CleanupGroup = CreateThreadpoolCleanupGroup();
+    env.CleanupGroupCancelCallback = standard_queue_cleanup_callback;
+    env.CallbackPriority = TP_CALLBACK_PRIORITY_NORMAL;
+    for (i = 0; i < ARRAY_SIZE(queue->envs); ++i)
+    {
+        queue->envs[i] = env;
+        queue->envs[i].CallbackPriority = priorities[i];
+    }
+    list_init(&queue->pending_items);
+    InitializeCriticalSection(&queue->cs);
+
+    max_thread = (desc->queue_type == RTWQ_STANDARD_WORKQUEUE || desc->queue_type == RTWQ_WINDOW_WORKQUEUE) ? 1 : 4;
+
+    SetThreadpoolThreadMinimum(queue->pool, 1);
+    SetThreadpoolThreadMaximum(queue->pool, max_thread);
+
+    if (desc->queue_type == RTWQ_WINDOW_WORKQUEUE)
+        FIXME("RTWQ_WINDOW_WORKQUEUE is not supported.\n");
+
+    return S_OK;
+}
+
+static BOOL pool_queue_shutdown(struct queue *queue)
+{
+    if (!queue->pool)
+        return FALSE;
+
+    CloseThreadpoolCleanupGroupMembers(queue->envs[0].CleanupGroup, TRUE, NULL);
+    CloseThreadpool(queue->pool);
+    queue->pool = NULL;
+
+    return TRUE;
+}
+
+static void CALLBACK standard_queue_worker(TP_CALLBACK_INSTANCE *instance, void *context, TP_WORK *work)
+{
+    struct work_item *item = context;
+    RTWQASYNCRESULT *result = (RTWQASYNCRESULT *)item->result;
+
+    TRACE("result object %p.\n", result);
+
+    IRtwqAsyncCallback_Invoke(result->pCallback, item->result);
+
+    IUnknown_Release(&item->IUnknown_iface);
+}
+
+static void pool_queue_submit(struct queue *queue, struct work_item *item)
+{
+    TP_CALLBACK_PRIORITY callback_priority;
+    TP_WORK *work_object;
+
+    if (item->priority == 0)
+        callback_priority = TP_CALLBACK_PRIORITY_NORMAL;
+    else if (item->priority < 0)
+        callback_priority = TP_CALLBACK_PRIORITY_LOW;
+    else
+        callback_priority = TP_CALLBACK_PRIORITY_HIGH;
+    work_object = CreateThreadpoolWork(standard_queue_worker, item,
+            (TP_CALLBACK_ENVIRON *)&queue->envs[callback_priority]);
+    SubmitThreadpoolWork(work_object);
+
+    TRACE("dispatched %p.\n", item->result);
+}
+
+static const struct queue_ops pool_queue_ops =
+{
+    pool_queue_init,
+    pool_queue_shutdown,
+    pool_queue_submit,
+};
+
 static HRESULT WINAPI work_item_QueryInterface(IUnknown *iface, REFIID riid, void **obj)
 {
     if (IsEqualIID(riid, &IID_IUnknown))
@@ -235,33 +331,14 @@ static struct work_item * alloc_work_item(struct queue *queue, LONG priority, IR
 
 static void init_work_queue(const struct queue_desc *desc, struct queue *queue)
 {
-    TP_CALLBACK_ENVIRON_V3 env;
-    unsigned int max_thread, i;
-
-    queue->pool = CreateThreadpool(NULL);
+    assert(desc->ops != NULL);
 
-    memset(&env, 0, sizeof(env));
-    env.Version = 3;
-    env.Size = sizeof(env);
-    env.Pool = queue->pool;
-    env.CleanupGroup = CreateThreadpoolCleanupGroup();
-    env.CleanupGroupCancelCallback = standard_queue_cleanup_callback;
-    env.CallbackPriority = TP_CALLBACK_PRIORITY_NORMAL;
-    for (i = 0; i < ARRAY_SIZE(queue->envs); ++i)
+    queue->ops = desc->ops;
+    if (SUCCEEDED(queue->ops->init(desc, queue)))
     {
-        queue->envs[i] = env;
-        queue->envs[i].CallbackPriority = priorities[i];
+        list_init(&queue->pending_items);
+        InitializeCriticalSection(&queue->cs);
     }
-    list_init(&queue->pending_items);
-    InitializeCriticalSection(&queue->cs);
-
-    max_thread = (desc->queue_type == RTWQ_STANDARD_WORKQUEUE || desc->queue_type == RTWQ_WINDOW_WORKQUEUE) ? 1 : 4;
-
-    SetThreadpoolThreadMinimum(queue->pool, 1);
-    SetThreadpoolThreadMaximum(queue->pool, max_thread);
-
-    if (desc->queue_type == RTWQ_WINDOW_WORKQUEUE)
-        FIXME("RTWQ_WINDOW_WORKQUEUE is not supported.\n");
 }
 
 static HRESULT grab_queue(DWORD queue_id, struct queue **ret)
@@ -297,6 +374,7 @@ static HRESULT grab_queue(DWORD queue_id, struct queue **ret)
         }
 
         desc.queue_type = queue_type;
+        desc.ops = &pool_queue_ops;
         init_work_queue(&desc, queue);
         LeaveCriticalSection(&queues_section);
         *ret = queue;
@@ -333,13 +411,9 @@ static void shutdown_queue(struct queue *queue)
 {
     struct work_item *item, *item2;
 
-    if (!queue->pool)
+    if (!queue->ops || !queue->ops->shutdown(queue))
         return;
 
-    CloseThreadpoolCleanupGroupMembers(queue->envs[0].CleanupGroup, TRUE, NULL);
-    CloseThreadpool(queue->pool);
-    queue->pool = NULL;
-
     EnterCriticalSection(&queue->cs);
     LIST_FOR_EACH_ENTRY_SAFE(item, item2, &queue->pending_items, struct work_item, entry)
     {
@@ -349,6 +423,8 @@ static void shutdown_queue(struct queue *queue)
     LeaveCriticalSection(&queue->cs);
 
     DeleteCriticalSection(&queue->cs);
+
+    memset(queue, 0, sizeof(*queue));
 }
 
 static HRESULT unlock_user_queue(DWORD queue)
@@ -376,38 +452,14 @@ static HRESULT unlock_user_queue(DWORD queue)
     return hr;
 }
 
-static void CALLBACK standard_queue_worker(TP_CALLBACK_INSTANCE *instance, void *context, TP_WORK *work)
-{
-    struct work_item *item = context;
-    RTWQASYNCRESULT *result = (RTWQASYNCRESULT *)item->result;
-
-    TRACE("result object %p.\n", result);
-
-    IRtwqAsyncCallback_Invoke(result->pCallback, item->result);
-
-    IUnknown_Release(&item->IUnknown_iface);
-}
-
 static HRESULT queue_submit_item(struct queue *queue, LONG priority, IRtwqAsyncResult *result)
 {
-    TP_CALLBACK_PRIORITY callback_priority;
     struct work_item *item;
-    TP_WORK *work_object;
 
     if (!(item = alloc_work_item(queue, priority, result)))
         return E_OUTOFMEMORY;
 
-    if (priority == 0)
-        callback_priority = TP_CALLBACK_PRIORITY_NORMAL;
-    else if (priority < 0)
-        callback_priority = TP_CALLBACK_PRIORITY_LOW;
-    else
-        callback_priority = TP_CALLBACK_PRIORITY_HIGH;
-    work_object = CreateThreadpoolWork(standard_queue_worker, item,
-            (TP_CALLBACK_ENVIRON *)&queue->envs[callback_priority]);
-    SubmitThreadpoolWork(work_object);
-
-    TRACE("dispatched %p.\n", result);
+    queue->ops->submit(queue, item);
 
     return S_OK;
 }
@@ -862,6 +914,7 @@ static void init_system_queues(void)
     }
 
     desc.queue_type = RTWQ_STANDARD_WORKQUEUE;
+    desc.ops = &pool_queue_ops;
     init_work_queue(&desc, &system_queues[SYS_QUEUE_STANDARD]);
 
     LeaveCriticalSection(&queues_section);
@@ -1114,6 +1167,7 @@ HRESULT WINAPI RtwqAllocateWorkQueue(RTWQ_WORKQUEUE_TYPE queue_type, DWORD *queu
     TRACE("%d, %p.\n", queue_type, queue);
 
     desc.queue_type = queue_type;
+    desc.ops = &pool_queue_ops;
     return alloc_user_queue(&desc, queue);
 }
 
-- 
2.25.0




More information about the wine-devel mailing list