[PATCH 5/9] rtworkq: Initialize queue with specific set of functionality.
Nikolay Sivov
nsivov at codeweavers.com
Tue Feb 11 03:19:07 CST 2020
Signed-off-by: Nikolay Sivov <nsivov at codeweavers.com>
---
dlls/rtworkq/queue.c | 162 ++++++++++++++++++++++++++++---------------
1 file changed, 108 insertions(+), 54 deletions(-)
diff --git a/dlls/rtworkq/queue.c b/dlls/rtworkq/queue.c
index 38d376342d..e5026b6298 100644
--- a/dlls/rtworkq/queue.c
+++ b/dlls/rtworkq/queue.c
@@ -16,6 +16,8 @@
* Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
*/
+#include <assert.h>
+
#define COBJMACROS
#define NONAMELESSUNION
@@ -136,13 +138,25 @@ static const TP_CALLBACK_PRIORITY priorities[] =
TP_CALLBACK_PRIORITY_LOW,
};
+struct queue;
+struct queue_desc;
+
+struct queue_ops
+{
+ HRESULT (*init)(const struct queue_desc *desc, struct queue *queue);
+ BOOL (*shutdown)(struct queue *queue);
+ void (*submit)(struct queue *queue, struct work_item *item);
+};
+
struct queue_desc
{
RTWQ_WORKQUEUE_TYPE queue_type;
+ const struct queue_ops *ops;
};
struct queue
{
+ const struct queue_ops *ops;
TP_POOL *pool;
TP_CALLBACK_ENVIRON_V3 envs[ARRAY_SIZE(priorities)];
CRITICAL_SECTION cs;
@@ -171,6 +185,88 @@ static void CALLBACK standard_queue_cleanup_callback(void *object_data, void *gr
{
}
+static HRESULT pool_queue_init(const struct queue_desc *desc, struct queue *queue)
+{
+ TP_CALLBACK_ENVIRON_V3 env;
+ unsigned int max_thread, i;
+
+ queue->pool = CreateThreadpool(NULL);
+
+ memset(&env, 0, sizeof(env));
+ env.Version = 3;
+ env.Size = sizeof(env);
+ env.Pool = queue->pool;
+ env.CleanupGroup = CreateThreadpoolCleanupGroup();
+ env.CleanupGroupCancelCallback = standard_queue_cleanup_callback;
+ env.CallbackPriority = TP_CALLBACK_PRIORITY_NORMAL;
+ for (i = 0; i < ARRAY_SIZE(queue->envs); ++i)
+ {
+ queue->envs[i] = env;
+ queue->envs[i].CallbackPriority = priorities[i];
+ }
+ list_init(&queue->pending_items);
+ InitializeCriticalSection(&queue->cs);
+
+ max_thread = (desc->queue_type == RTWQ_STANDARD_WORKQUEUE || desc->queue_type == RTWQ_WINDOW_WORKQUEUE) ? 1 : 4;
+
+ SetThreadpoolThreadMinimum(queue->pool, 1);
+ SetThreadpoolThreadMaximum(queue->pool, max_thread);
+
+ if (desc->queue_type == RTWQ_WINDOW_WORKQUEUE)
+ FIXME("RTWQ_WINDOW_WORKQUEUE is not supported.\n");
+
+ return S_OK;
+}
+
+static BOOL pool_queue_shutdown(struct queue *queue)
+{
+ if (!queue->pool)
+ return FALSE;
+
+ CloseThreadpoolCleanupGroupMembers(queue->envs[0].CleanupGroup, TRUE, NULL);
+ CloseThreadpool(queue->pool);
+ queue->pool = NULL;
+
+ return TRUE;
+}
+
+static void CALLBACK standard_queue_worker(TP_CALLBACK_INSTANCE *instance, void *context, TP_WORK *work)
+{
+ struct work_item *item = context;
+ RTWQASYNCRESULT *result = (RTWQASYNCRESULT *)item->result;
+
+ TRACE("result object %p.\n", result);
+
+ IRtwqAsyncCallback_Invoke(result->pCallback, item->result);
+
+ IUnknown_Release(&item->IUnknown_iface);
+}
+
+static void pool_queue_submit(struct queue *queue, struct work_item *item)
+{
+ TP_CALLBACK_PRIORITY callback_priority;
+ TP_WORK *work_object;
+
+ if (item->priority == 0)
+ callback_priority = TP_CALLBACK_PRIORITY_NORMAL;
+ else if (item->priority < 0)
+ callback_priority = TP_CALLBACK_PRIORITY_LOW;
+ else
+ callback_priority = TP_CALLBACK_PRIORITY_HIGH;
+ work_object = CreateThreadpoolWork(standard_queue_worker, item,
+ (TP_CALLBACK_ENVIRON *)&queue->envs[callback_priority]);
+ SubmitThreadpoolWork(work_object);
+
+ TRACE("dispatched %p.\n", item->result);
+}
+
+static const struct queue_ops pool_queue_ops =
+{
+ pool_queue_init,
+ pool_queue_shutdown,
+ pool_queue_submit,
+};
+
static HRESULT WINAPI work_item_QueryInterface(IUnknown *iface, REFIID riid, void **obj)
{
if (IsEqualIID(riid, &IID_IUnknown))
@@ -235,33 +331,14 @@ static struct work_item * alloc_work_item(struct queue *queue, LONG priority, IR
static void init_work_queue(const struct queue_desc *desc, struct queue *queue)
{
- TP_CALLBACK_ENVIRON_V3 env;
- unsigned int max_thread, i;
-
- queue->pool = CreateThreadpool(NULL);
+ assert(desc->ops != NULL);
- memset(&env, 0, sizeof(env));
- env.Version = 3;
- env.Size = sizeof(env);
- env.Pool = queue->pool;
- env.CleanupGroup = CreateThreadpoolCleanupGroup();
- env.CleanupGroupCancelCallback = standard_queue_cleanup_callback;
- env.CallbackPriority = TP_CALLBACK_PRIORITY_NORMAL;
- for (i = 0; i < ARRAY_SIZE(queue->envs); ++i)
+ queue->ops = desc->ops;
+ if (SUCCEEDED(queue->ops->init(desc, queue)))
{
- queue->envs[i] = env;
- queue->envs[i].CallbackPriority = priorities[i];
+ list_init(&queue->pending_items);
+ InitializeCriticalSection(&queue->cs);
}
- list_init(&queue->pending_items);
- InitializeCriticalSection(&queue->cs);
-
- max_thread = (desc->queue_type == RTWQ_STANDARD_WORKQUEUE || desc->queue_type == RTWQ_WINDOW_WORKQUEUE) ? 1 : 4;
-
- SetThreadpoolThreadMinimum(queue->pool, 1);
- SetThreadpoolThreadMaximum(queue->pool, max_thread);
-
- if (desc->queue_type == RTWQ_WINDOW_WORKQUEUE)
- FIXME("RTWQ_WINDOW_WORKQUEUE is not supported.\n");
}
static HRESULT grab_queue(DWORD queue_id, struct queue **ret)
@@ -297,6 +374,7 @@ static HRESULT grab_queue(DWORD queue_id, struct queue **ret)
}
desc.queue_type = queue_type;
+ desc.ops = &pool_queue_ops;
init_work_queue(&desc, queue);
LeaveCriticalSection(&queues_section);
*ret = queue;
@@ -333,13 +411,9 @@ static void shutdown_queue(struct queue *queue)
{
struct work_item *item, *item2;
- if (!queue->pool)
+ if (!queue->ops || !queue->ops->shutdown(queue))
return;
- CloseThreadpoolCleanupGroupMembers(queue->envs[0].CleanupGroup, TRUE, NULL);
- CloseThreadpool(queue->pool);
- queue->pool = NULL;
-
EnterCriticalSection(&queue->cs);
LIST_FOR_EACH_ENTRY_SAFE(item, item2, &queue->pending_items, struct work_item, entry)
{
@@ -349,6 +423,8 @@ static void shutdown_queue(struct queue *queue)
LeaveCriticalSection(&queue->cs);
DeleteCriticalSection(&queue->cs);
+
+ memset(queue, 0, sizeof(*queue));
}
static HRESULT unlock_user_queue(DWORD queue)
@@ -376,38 +452,14 @@ static HRESULT unlock_user_queue(DWORD queue)
return hr;
}
-static void CALLBACK standard_queue_worker(TP_CALLBACK_INSTANCE *instance, void *context, TP_WORK *work)
-{
- struct work_item *item = context;
- RTWQASYNCRESULT *result = (RTWQASYNCRESULT *)item->result;
-
- TRACE("result object %p.\n", result);
-
- IRtwqAsyncCallback_Invoke(result->pCallback, item->result);
-
- IUnknown_Release(&item->IUnknown_iface);
-}
-
static HRESULT queue_submit_item(struct queue *queue, LONG priority, IRtwqAsyncResult *result)
{
- TP_CALLBACK_PRIORITY callback_priority;
struct work_item *item;
- TP_WORK *work_object;
if (!(item = alloc_work_item(queue, priority, result)))
return E_OUTOFMEMORY;
- if (priority == 0)
- callback_priority = TP_CALLBACK_PRIORITY_NORMAL;
- else if (priority < 0)
- callback_priority = TP_CALLBACK_PRIORITY_LOW;
- else
- callback_priority = TP_CALLBACK_PRIORITY_HIGH;
- work_object = CreateThreadpoolWork(standard_queue_worker, item,
- (TP_CALLBACK_ENVIRON *)&queue->envs[callback_priority]);
- SubmitThreadpoolWork(work_object);
-
- TRACE("dispatched %p.\n", result);
+ queue->ops->submit(queue, item);
return S_OK;
}
@@ -862,6 +914,7 @@ static void init_system_queues(void)
}
desc.queue_type = RTWQ_STANDARD_WORKQUEUE;
+ desc.ops = &pool_queue_ops;
init_work_queue(&desc, &system_queues[SYS_QUEUE_STANDARD]);
LeaveCriticalSection(&queues_section);
@@ -1114,6 +1167,7 @@ HRESULT WINAPI RtwqAllocateWorkQueue(RTWQ_WORKQUEUE_TYPE queue_type, DWORD *queu
TRACE("%d, %p.\n", queue_type, queue);
desc.queue_type = queue_type;
+ desc.ops = &pool_queue_ops;
return alloc_user_queue(&desc, queue);
}
--
2.25.0
More information about the wine-devel
mailing list