[PATCH 3/9] rtworkq: Add RtwqStartup()/RtwqShutdown().

Nikolay Sivov nsivov at codeweavers.com
Tue Feb 4 23:44:07 CST 2020


Signed-off-by: Nikolay Sivov <nsivov at codeweavers.com>
---
 dlls/rtworkq/queue.c      | 173 ++++++++++++++++++++++++++++++++++++++
 dlls/rtworkq/rtworkq.spec |   4 +-
 include/rtworkq.idl       |  11 +++
 3 files changed, 186 insertions(+), 2 deletions(-)

diff --git a/dlls/rtworkq/queue.c b/dlls/rtworkq/queue.c
index 7897ef5dc9..31a17c68ae 100644
--- a/dlls/rtworkq/queue.c
+++ b/dlls/rtworkq/queue.c
@@ -23,11 +23,108 @@
 #include "rtworkq.h"
 #include "wine/debug.h"
 #include "wine/heap.h"
+#include "wine/list.h"
 
 WINE_DEFAULT_DEBUG_CHANNEL(mfplat);
 
+static CRITICAL_SECTION queues_section;
+static CRITICAL_SECTION_DEBUG queues_critsect_debug =
+{
+    0, 0, &queues_section,
+    { &queues_critsect_debug.ProcessLocksList, &queues_critsect_debug.ProcessLocksList },
+      0, 0, { (DWORD_PTR)(__FILE__ ": queues_section") }
+};
+static CRITICAL_SECTION queues_section = { &queues_critsect_debug, -1, 0, 0, 0, 0 };
+
 static LONG platform_lock;
 
+enum system_queue_index
+{
+    SYS_QUEUE_STANDARD = 0,
+    SYS_QUEUE_RT,
+    SYS_QUEUE_IO,
+    SYS_QUEUE_TIMER,
+    SYS_QUEUE_MULTITHREADED,
+    SYS_QUEUE_DO_NOT_USE,
+    SYS_QUEUE_LONG_FUNCTION,
+    SYS_QUEUE_COUNT,
+};
+
+struct work_item
+{
+    struct list entry;
+    LONG refcount;
+    IRtwqAsyncResult *result;
+    struct queue *queue;
+    RTWQWORKITEM_KEY key;
+    union
+    {
+        TP_WAIT *wait_object;
+        TP_TIMER *timer_object;
+    } u;
+};
+
+static const TP_CALLBACK_PRIORITY priorities[] =
+{
+    TP_CALLBACK_PRIORITY_HIGH,
+    TP_CALLBACK_PRIORITY_NORMAL,
+    TP_CALLBACK_PRIORITY_LOW,
+};
+
+struct queue
+{
+    TP_POOL *pool;
+    TP_CALLBACK_ENVIRON_V3 envs[ARRAY_SIZE(priorities)];
+    CRITICAL_SECTION cs;
+    struct list pending_items;
+};
+
+static struct queue system_queues[SYS_QUEUE_COUNT];
+
+static void CALLBACK standard_queue_cleanup_callback(void *object_data, void *group_data)
+{
+}
+
+static void release_work_item(struct work_item *item)
+{
+    if (InterlockedDecrement(&item->refcount) == 0)
+    {
+         IRtwqAsyncResult_Release(item->result);
+         heap_free(item);
+    }
+}
+
+static void init_work_queue(RTWQ_WORKQUEUE_TYPE queue_type, struct queue *queue)
+{
+    TP_CALLBACK_ENVIRON_V3 env;
+    unsigned int max_thread, i;
+
+    queue->pool = CreateThreadpool(NULL);
+
+    memset(&env, 0, sizeof(env));
+    env.Version = 3;
+    env.Size = sizeof(env);
+    env.Pool = queue->pool;
+    env.CleanupGroup = CreateThreadpoolCleanupGroup();
+    env.CleanupGroupCancelCallback = standard_queue_cleanup_callback;
+    env.CallbackPriority = TP_CALLBACK_PRIORITY_NORMAL;
+    for (i = 0; i < ARRAY_SIZE(queue->envs); ++i)
+    {
+        queue->envs[i] = env;
+        queue->envs[i].CallbackPriority = priorities[i];
+    }
+    list_init(&queue->pending_items);
+    InitializeCriticalSection(&queue->cs);
+
+    max_thread = (queue_type == RTWQ_STANDARD_WORKQUEUE || queue_type == RTWQ_WINDOW_WORKQUEUE) ? 1 : 4;
+
+    SetThreadpoolThreadMinimum(queue->pool, 1);
+    SetThreadpoolThreadMaximum(queue->pool, max_thread);
+
+    if (queue_type == RTWQ_WINDOW_WORKQUEUE)
+        FIXME("RTWQ_WINDOW_WORKQUEUE is not supported.\n");
+}
+
 struct async_result
 {
     RTWQASYNCRESULT result;
@@ -217,3 +314,79 @@ HRESULT WINAPI RtwqUnlockPlatform(void)
 
     return S_OK;
 }
+
+static void init_system_queues(void)
+{
+    /* Always initialize standard queue, keep the rest lazy. */
+
+    EnterCriticalSection(&queues_section);
+
+    if (system_queues[SYS_QUEUE_STANDARD].pool)
+    {
+        LeaveCriticalSection(&queues_section);
+        return;
+    }
+
+    init_work_queue(RTWQ_STANDARD_WORKQUEUE, &system_queues[SYS_QUEUE_STANDARD]);
+
+    LeaveCriticalSection(&queues_section);
+}
+
+HRESULT WINAPI RtwqStartup(void)
+{
+    if (InterlockedIncrement(&platform_lock) == 1)
+    {
+        init_system_queues();
+    }
+
+    return S_OK;
+}
+
+static void shutdown_queue(struct queue *queue)
+{
+    struct work_item *item, *item2;
+
+    if (!queue->pool)
+        return;
+
+    CloseThreadpoolCleanupGroupMembers(queue->envs[0].CleanupGroup, TRUE, NULL);
+    CloseThreadpool(queue->pool);
+    queue->pool = NULL;
+
+    EnterCriticalSection(&queue->cs);
+    LIST_FOR_EACH_ENTRY_SAFE(item, item2, &queue->pending_items, struct work_item, entry)
+    {
+        list_remove(&item->entry);
+        release_work_item(item);
+    }
+    LeaveCriticalSection(&queue->cs);
+
+    DeleteCriticalSection(&queue->cs);
+}
+
+static void shutdown_system_queues(void)
+{
+    unsigned int i;
+
+    EnterCriticalSection(&queues_section);
+
+    for (i = 0; i < ARRAY_SIZE(system_queues); ++i)
+    {
+        shutdown_queue(&system_queues[i]);
+    }
+
+    LeaveCriticalSection(&queues_section);
+}
+
+HRESULT WINAPI RtwqShutdown(void)
+{
+    if (platform_lock <= 0)
+        return S_OK;
+
+    if (InterlockedExchangeAdd(&platform_lock, -1) == 1)
+    {
+        shutdown_system_queues();
+    }
+
+    return S_OK;
+}
diff --git a/dlls/rtworkq/rtworkq.spec b/dlls/rtworkq/rtworkq.spec
index 900aa5f230..8c352593ff 100644
--- a/dlls/rtworkq/rtworkq.spec
+++ b/dlls/rtworkq/rtworkq.spec
@@ -28,8 +28,8 @@
 @ stub RtwqSetDeadline
 @ stub RtwqSetDeadline2
 @ stub RtwqSetLongRunning
-@ stub RtwqShutdown
-@ stub RtwqStartup
+@ stdcall RtwqShutdown()
+@ stdcall RtwqStartup()
 @ stub RtwqUnjoinWorkQueue
 @ stdcall RtwqUnlockPlatform()
 @ stub RtwqUnlockWorkQueue
diff --git a/include/rtworkq.idl b/include/rtworkq.idl
index d22b09ca50..c2e8237bba 100644
--- a/include/rtworkq.idl
+++ b/include/rtworkq.idl
@@ -18,6 +18,15 @@
 
 import "unknwn.idl";
 
+typedef enum
+{
+    RTWQ_STANDARD_WORKQUEUE = 0,
+    RTWQ_WINDOW_WORKQUEUE = 1,
+    RTWQ_MULTITHREADED_WORKQUEUE = 2,
+} RTWQ_WORKQUEUE_TYPE;
+
+typedef unsigned __int64 RTWQWORKITEM_KEY;
+
 [
     object,
     uuid(ac6b7889-0740-4d51-8619-905994a55cc6),
@@ -59,4 +68,6 @@ cpp_quote("} RTWQASYNCRESULT;")
 
 cpp_quote("HRESULT WINAPI RtwqCreateAsyncResult(IUnknown *object, IRtwqAsyncCallback *callback, IUnknown *state, IRtwqAsyncResult **result);")
 cpp_quote("HRESULT WINAPI RtwqLockPlatform(void);")
+cpp_quote("HRESULT WINAPI RtwqShutdown(void);")
+cpp_quote("HRESULT WINAPI RtwqStartup(void);")
 cpp_quote("HRESULT WINAPI RtwqUnlockPlatform(void);")
-- 
2.24.1




More information about the wine-devel mailing list