[PATCH 3/9] rtworkq: Add RtwqStartup()/RtwqShutdown().
Nikolay Sivov
nsivov at codeweavers.com
Tue Feb 4 23:44:07 CST 2020
Signed-off-by: Nikolay Sivov <nsivov at codeweavers.com>
---
dlls/rtworkq/queue.c | 173 ++++++++++++++++++++++++++++++++++++++
dlls/rtworkq/rtworkq.spec | 4 +-
include/rtworkq.idl | 11 +++
3 files changed, 186 insertions(+), 2 deletions(-)
diff --git a/dlls/rtworkq/queue.c b/dlls/rtworkq/queue.c
index 7897ef5dc9..31a17c68ae 100644
--- a/dlls/rtworkq/queue.c
+++ b/dlls/rtworkq/queue.c
@@ -23,11 +23,108 @@
#include "rtworkq.h"
#include "wine/debug.h"
#include "wine/heap.h"
+#include "wine/list.h"
WINE_DEFAULT_DEBUG_CHANNEL(mfplat);
+static CRITICAL_SECTION queues_section;
+static CRITICAL_SECTION_DEBUG queues_critsect_debug =
+{
+ 0, 0, &queues_section,
+ { &queues_critsect_debug.ProcessLocksList, &queues_critsect_debug.ProcessLocksList },
+ 0, 0, { (DWORD_PTR)(__FILE__ ": queues_section") }
+};
+static CRITICAL_SECTION queues_section = { &queues_critsect_debug, -1, 0, 0, 0, 0 };
+
static LONG platform_lock;
+enum system_queue_index
+{
+ SYS_QUEUE_STANDARD = 0,
+ SYS_QUEUE_RT,
+ SYS_QUEUE_IO,
+ SYS_QUEUE_TIMER,
+ SYS_QUEUE_MULTITHREADED,
+ SYS_QUEUE_DO_NOT_USE,
+ SYS_QUEUE_LONG_FUNCTION,
+ SYS_QUEUE_COUNT,
+};
+
+struct work_item
+{
+ struct list entry;
+ LONG refcount;
+ IRtwqAsyncResult *result;
+ struct queue *queue;
+ RTWQWORKITEM_KEY key;
+ union
+ {
+ TP_WAIT *wait_object;
+ TP_TIMER *timer_object;
+ } u;
+};
+
+static const TP_CALLBACK_PRIORITY priorities[] =
+{
+ TP_CALLBACK_PRIORITY_HIGH,
+ TP_CALLBACK_PRIORITY_NORMAL,
+ TP_CALLBACK_PRIORITY_LOW,
+};
+
+struct queue
+{
+ TP_POOL *pool;
+ TP_CALLBACK_ENVIRON_V3 envs[ARRAY_SIZE(priorities)];
+ CRITICAL_SECTION cs;
+ struct list pending_items;
+};
+
+static struct queue system_queues[SYS_QUEUE_COUNT];
+
+static void CALLBACK standard_queue_cleanup_callback(void *object_data, void *group_data)
+{
+}
+
+static void release_work_item(struct work_item *item)
+{
+ if (InterlockedDecrement(&item->refcount) == 0)
+ {
+ IRtwqAsyncResult_Release(item->result);
+ heap_free(item);
+ }
+}
+
+static void init_work_queue(RTWQ_WORKQUEUE_TYPE queue_type, struct queue *queue)
+{
+ TP_CALLBACK_ENVIRON_V3 env;
+ unsigned int max_thread, i;
+
+ queue->pool = CreateThreadpool(NULL);
+
+ memset(&env, 0, sizeof(env));
+ env.Version = 3;
+ env.Size = sizeof(env);
+ env.Pool = queue->pool;
+ env.CleanupGroup = CreateThreadpoolCleanupGroup();
+ env.CleanupGroupCancelCallback = standard_queue_cleanup_callback;
+ env.CallbackPriority = TP_CALLBACK_PRIORITY_NORMAL;
+ for (i = 0; i < ARRAY_SIZE(queue->envs); ++i)
+ {
+ queue->envs[i] = env;
+ queue->envs[i].CallbackPriority = priorities[i];
+ }
+ list_init(&queue->pending_items);
+ InitializeCriticalSection(&queue->cs);
+
+ max_thread = (queue_type == RTWQ_STANDARD_WORKQUEUE || queue_type == RTWQ_WINDOW_WORKQUEUE) ? 1 : 4;
+
+ SetThreadpoolThreadMinimum(queue->pool, 1);
+ SetThreadpoolThreadMaximum(queue->pool, max_thread);
+
+ if (queue_type == RTWQ_WINDOW_WORKQUEUE)
+ FIXME("RTWQ_WINDOW_WORKQUEUE is not supported.\n");
+}
+
struct async_result
{
RTWQASYNCRESULT result;
@@ -217,3 +314,79 @@ HRESULT WINAPI RtwqUnlockPlatform(void)
return S_OK;
}
+
+static void init_system_queues(void)
+{
+ /* Always initialize standard queue, keep the rest lazy. */
+
+ EnterCriticalSection(&queues_section);
+
+ if (system_queues[SYS_QUEUE_STANDARD].pool)
+ {
+ LeaveCriticalSection(&queues_section);
+ return;
+ }
+
+ init_work_queue(RTWQ_STANDARD_WORKQUEUE, &system_queues[SYS_QUEUE_STANDARD]);
+
+ LeaveCriticalSection(&queues_section);
+}
+
+HRESULT WINAPI RtwqStartup(void)
+{
+ if (InterlockedIncrement(&platform_lock) == 1)
+ {
+ init_system_queues();
+ }
+
+ return S_OK;
+}
+
+static void shutdown_queue(struct queue *queue)
+{
+ struct work_item *item, *item2;
+
+ if (!queue->pool)
+ return;
+
+ CloseThreadpoolCleanupGroupMembers(queue->envs[0].CleanupGroup, TRUE, NULL);
+ CloseThreadpool(queue->pool);
+ queue->pool = NULL;
+
+ EnterCriticalSection(&queue->cs);
+ LIST_FOR_EACH_ENTRY_SAFE(item, item2, &queue->pending_items, struct work_item, entry)
+ {
+ list_remove(&item->entry);
+ release_work_item(item);
+ }
+ LeaveCriticalSection(&queue->cs);
+
+ DeleteCriticalSection(&queue->cs);
+}
+
+static void shutdown_system_queues(void)
+{
+ unsigned int i;
+
+ EnterCriticalSection(&queues_section);
+
+ for (i = 0; i < ARRAY_SIZE(system_queues); ++i)
+ {
+ shutdown_queue(&system_queues[i]);
+ }
+
+ LeaveCriticalSection(&queues_section);
+}
+
+HRESULT WINAPI RtwqShutdown(void)
+{
+ if (platform_lock <= 0)
+ return S_OK;
+
+ if (InterlockedExchangeAdd(&platform_lock, -1) == 1)
+ {
+ shutdown_system_queues();
+ }
+
+ return S_OK;
+}
diff --git a/dlls/rtworkq/rtworkq.spec b/dlls/rtworkq/rtworkq.spec
index 900aa5f230..8c352593ff 100644
--- a/dlls/rtworkq/rtworkq.spec
+++ b/dlls/rtworkq/rtworkq.spec
@@ -28,8 +28,8 @@
@ stub RtwqSetDeadline
@ stub RtwqSetDeadline2
@ stub RtwqSetLongRunning
-@ stub RtwqShutdown
-@ stub RtwqStartup
+@ stdcall RtwqShutdown()
+@ stdcall RtwqStartup()
@ stub RtwqUnjoinWorkQueue
@ stdcall RtwqUnlockPlatform()
@ stub RtwqUnlockWorkQueue
diff --git a/include/rtworkq.idl b/include/rtworkq.idl
index d22b09ca50..c2e8237bba 100644
--- a/include/rtworkq.idl
+++ b/include/rtworkq.idl
@@ -18,6 +18,15 @@
import "unknwn.idl";
+typedef enum
+{
+ RTWQ_STANDARD_WORKQUEUE = 0,
+ RTWQ_WINDOW_WORKQUEUE = 1,
+ RTWQ_MULTITHREADED_WORKQUEUE = 2,
+} RTWQ_WORKQUEUE_TYPE;
+
+typedef unsigned __int64 RTWQWORKITEM_KEY;
+
[
object,
uuid(ac6b7889-0740-4d51-8619-905994a55cc6),
@@ -59,4 +68,6 @@ cpp_quote("} RTWQASYNCRESULT;")
cpp_quote("HRESULT WINAPI RtwqCreateAsyncResult(IUnknown *object, IRtwqAsyncCallback *callback, IUnknown *state, IRtwqAsyncResult **result);")
cpp_quote("HRESULT WINAPI RtwqLockPlatform(void);")
+cpp_quote("HRESULT WINAPI RtwqShutdown(void);")
+cpp_quote("HRESULT WINAPI RtwqStartup(void);")
cpp_quote("HRESULT WINAPI RtwqUnlockPlatform(void);")
--
2.24.1
More information about the wine-devel
mailing list