[PATCH 9/9] mfplat: Forward async queue functionality to rtworkq.
Nikolay Sivov
nsivov at codeweavers.com
Tue Feb 4 23:44:13 CST 2020
Signed-off-by: Nikolay Sivov <nsivov at codeweavers.com>
---
dlls/mfplat/Makefile.in | 2 +-
dlls/mfplat/main.c | 44 +-
dlls/mfplat/mfplat.spec | 18 +-
dlls/mfplat/mfplat_private.h | 4 -
dlls/mfplat/queue.c | 1041 +---------------------------------
dlls/rtworkq/queue.c | 11 +
6 files changed, 48 insertions(+), 1072 deletions(-)
diff --git a/dlls/mfplat/Makefile.in b/dlls/mfplat/Makefile.in
index f3484fa611..cdb2b81334 100644
--- a/dlls/mfplat/Makefile.in
+++ b/dlls/mfplat/Makefile.in
@@ -1,6 +1,6 @@
MODULE = mfplat.dll
IMPORTLIB = mfplat
-IMPORTS = advapi32 ole32 mfuuid propsys
+IMPORTS = advapi32 ole32 mfuuid propsys rtworkq
EXTRADLLFLAGS = -mno-cygwin
diff --git a/dlls/mfplat/main.c b/dlls/mfplat/main.c
index c8beeda973..e96a972909 100644
--- a/dlls/mfplat/main.c
+++ b/dlls/mfplat/main.c
@@ -40,6 +40,7 @@
#include "mfreadwrite.h"
#include "propvarutil.h"
#include "strsafe.h"
+#include "rtworkq.h"
WINE_DEFAULT_DEBUG_CHANNEL(mfplat);
@@ -64,8 +65,6 @@ static HRESULT heap_strdupW(const WCHAR *str, WCHAR **dest)
return hr;
}
-static LONG platform_lock;
-
struct local_handler
{
struct list entry;
@@ -1128,10 +1127,7 @@ HRESULT WINAPI MFStartup(ULONG version, DWORD flags)
if (version != MF_VERSION_XP && version != MF_VERSION_WIN7)
return MF_E_BAD_STARTUP_VERSION;
- if (InterlockedIncrement(&platform_lock) == 1)
- {
- init_system_queues();
- }
+ RtwqStartup();
return S_OK;
}
@@ -1143,45 +1139,11 @@ HRESULT WINAPI MFShutdown(void)
{
TRACE("\n");
- if (platform_lock <= 0)
- return S_OK;
-
- if (InterlockedExchangeAdd(&platform_lock, -1) == 1)
- {
- shutdown_system_queues();
- }
-
- return S_OK;
-}
-
-/***********************************************************************
- * MFLockPlatform (mfplat.@)
- */
-HRESULT WINAPI MFLockPlatform(void)
-{
- InterlockedIncrement(&platform_lock);
-
- return S_OK;
-}
-
-/***********************************************************************
- * MFUnlockPlatform (mfplat.@)
- */
-HRESULT WINAPI MFUnlockPlatform(void)
-{
- if (InterlockedDecrement(&platform_lock) == 0)
- {
- shutdown_system_queues();
- }
+ RtwqShutdown();
return S_OK;
}
-BOOL is_platform_locked(void)
-{
- return platform_lock > 0;
-}
-
/***********************************************************************
* MFCopyImage (mfplat.@)
*/
diff --git a/dlls/mfplat/mfplat.spec b/dlls/mfplat/mfplat.spec
index a13c192caf..ba835b694a 100644
--- a/dlls/mfplat/mfplat.spec
+++ b/dlls/mfplat/mfplat.spec
@@ -15,9 +15,9 @@
@ stub GetAMSubtypeFromD3DFormat
@ stub GetD3DFormatFromMFSubtype
@ stub LFGetGlobalPool
-@ stdcall MFAddPeriodicCallback(ptr ptr ptr)
+@ stdcall MFAddPeriodicCallback(ptr ptr ptr) rtworkq.RtwqAddPeriodicCallback
@ stdcall MFAllocateWorkQueue(ptr)
-@ stdcall MFAllocateWorkQueueEx(long ptr)
+@ stdcall MFAllocateWorkQueueEx(long ptr) rtworkq.RtwqAllocateWorkQueue
@ stub MFAppendCollection
@ stub MFAverageTimePerFrameToFrameRate
@ stdcall MFBeginCreateFile(long long long wstr ptr ptr ptr)
@@ -28,7 +28,7 @@
@ stub MFCalculateBitmapImageSize
@ stdcall MFCalculateImageSize(ptr long long ptr)
@ stdcall MFCancelCreateFile(ptr)
-@ stdcall MFCancelWorkItem(int64)
+@ stdcall MFCancelWorkItem(int64) rtworkq.RtwqCancelWorkItem
@ stdcall MFCompareFullToPartialMediaType(ptr ptr)
@ stub MFCompareSockaddrAddresses
@ stub MFConvertColorInfoFromDXVA
@@ -121,9 +121,9 @@
@ stub MFInitVideoFormat_RGB
@ stdcall MFInvokeCallback(ptr)
@ stub MFJoinIoPort
-@ stdcall MFLockPlatform()
-@ stdcall MFLockWorkQueue(long)
-@ stdcall MFPutWaitingWorkItem(long long ptr ptr)
+@ stdcall MFLockPlatform() rtworkq.RtwqLockPlatform
+@ stdcall MFLockWorkQueue(long) rtworkq.RtwqLockWorkQueue
+@ stdcall MFPutWaitingWorkItem(long long ptr ptr) rtworkq.RtwqPutWaitingWorkItem
@ stdcall MFPutWorkItem(long ptr ptr)
@ stdcall MFPutWorkItem2(long long ptr ptr)
@ stdcall MFPutWorkItemEx(long ptr)
@@ -131,7 +131,7 @@
@ stub MFRecordError
@ stdcall MFRegisterLocalByteStreamHandler(wstr wstr ptr)
@ stdcall MFRegisterLocalSchemeHandler(wstr ptr)
-@ stdcall MFRemovePeriodicCallback(long)
+@ stdcall MFRemovePeriodicCallback(long) rtworkq.RtwqRemovePeriodicCallback
@ stdcall MFScheduleWorkItem(ptr ptr int64 ptr)
@ stdcall MFScheduleWorkItemEx(ptr int64 ptr)
@ stub MFSerializeAttributesToStream
@@ -154,8 +154,8 @@
@ stub MFTraceError
@ stub MFTraceFuncEnter
@ stub MFUnblockThread
-@ stdcall MFUnlockPlatform()
-@ stdcall MFUnlockWorkQueue(long)
+@ stdcall MFUnlockPlatform() rtworkq.RtwqUnlockPlatform
+@ stdcall MFUnlockWorkQueue(long) rtworkq.RtwqUnlockWorkQueue
@ stdcall MFUnwrapMediaType(ptr ptr)
@ stub MFValidateMediaTypeSize
@ stdcall MFWrapMediaType(ptr ptr ptr ptr)
diff --git a/dlls/mfplat/mfplat_private.h b/dlls/mfplat/mfplat_private.h
index 6418afcb97..af8583daee 100644
--- a/dlls/mfplat/mfplat_private.h
+++ b/dlls/mfplat/mfplat_private.h
@@ -87,10 +87,6 @@ extern HRESULT attributes_GetItemByIndex(struct attributes *object, UINT32 index
PROPVARIANT *value) DECLSPEC_HIDDEN;
extern HRESULT attributes_CopyAllItems(struct attributes *object, IMFAttributes *dest) DECLSPEC_HIDDEN;
-extern void init_system_queues(void) DECLSPEC_HIDDEN;
-extern void shutdown_system_queues(void) DECLSPEC_HIDDEN;
-extern BOOL is_platform_locked(void) DECLSPEC_HIDDEN;
-
static inline BOOL mf_array_reserve(void **elements, size_t *capacity, size_t count, size_t size)
{
size_t new_capacity, max_capacity;
diff --git a/dlls/mfplat/queue.c b/dlls/mfplat/queue.c
index 31b281a9a3..c0279f1744 100644
--- a/dlls/mfplat/queue.c
+++ b/dlls/mfplat/queue.c
@@ -25,781 +25,10 @@
#include "wine/list.h"
#include "mfplat_private.h"
+#include "rtworkq.h"
WINE_DEFAULT_DEBUG_CHANNEL(mfplat);
-#define FIRST_USER_QUEUE_HANDLE 5
-#define MAX_USER_QUEUE_HANDLES 124
-
-#define WAIT_ITEM_KEY_MASK (0x82000000)
-#define SCHEDULED_ITEM_KEY_MASK (0x80000000)
-
-static LONG next_item_key;
-
-static MFWORKITEM_KEY get_item_key(DWORD mask, DWORD key)
-{
- return ((MFWORKITEM_KEY)mask << 32) | key;
-}
-
-static MFWORKITEM_KEY generate_item_key(DWORD mask)
-{
- return get_item_key(mask, InterlockedIncrement(&next_item_key));
-}
-
-struct work_item
-{
- struct list entry;
- LONG refcount;
- IMFAsyncResult *result;
- struct queue *queue;
- MFWORKITEM_KEY key;
- union
- {
- TP_WAIT *wait_object;
- TP_TIMER *timer_object;
- } u;
-};
-
-static const TP_CALLBACK_PRIORITY priorities[] =
-{
- TP_CALLBACK_PRIORITY_HIGH,
- TP_CALLBACK_PRIORITY_NORMAL,
- TP_CALLBACK_PRIORITY_LOW,
-};
-
-struct queue
-{
- TP_POOL *pool;
- TP_CALLBACK_ENVIRON_V3 envs[ARRAY_SIZE(priorities)];
- CRITICAL_SECTION cs;
- struct list pending_items;
-};
-
-struct queue_handle
-{
- void *obj;
- LONG refcount;
- WORD generation;
-};
-
-static struct queue_handle user_queues[MAX_USER_QUEUE_HANDLES];
-static struct queue_handle *next_free_user_queue;
-static struct queue_handle *next_unused_user_queue = user_queues;
-static WORD queue_generation;
-
-static CRITICAL_SECTION queues_section;
-static CRITICAL_SECTION_DEBUG queues_critsect_debug =
-{
- 0, 0, &queues_section,
- { &queues_critsect_debug.ProcessLocksList, &queues_critsect_debug.ProcessLocksList },
- 0, 0, { (DWORD_PTR)(__FILE__ ": queues_section") }
-};
-static CRITICAL_SECTION queues_section = { &queues_critsect_debug, -1, 0, 0, 0, 0 };
-
-static struct queue_handle *get_queue_obj(DWORD handle)
-{
- unsigned int idx = HIWORD(handle) - FIRST_USER_QUEUE_HANDLE;
-
- if (idx < MAX_USER_QUEUE_HANDLES && user_queues[idx].refcount)
- {
- if (LOWORD(handle) == user_queues[idx].generation)
- return &user_queues[idx];
- }
-
- return NULL;
-}
-
-enum system_queue_index
-{
- SYS_QUEUE_STANDARD = 0,
- SYS_QUEUE_RT,
- SYS_QUEUE_IO,
- SYS_QUEUE_TIMER,
- SYS_QUEUE_MULTITHREADED,
- SYS_QUEUE_DO_NOT_USE,
- SYS_QUEUE_LONG_FUNCTION,
- SYS_QUEUE_COUNT,
-};
-
-static struct queue system_queues[SYS_QUEUE_COUNT];
-
-static struct queue *get_system_queue(DWORD queue_id)
-{
- switch (queue_id)
- {
- case MFASYNC_CALLBACK_QUEUE_STANDARD:
- case MFASYNC_CALLBACK_QUEUE_RT:
- case MFASYNC_CALLBACK_QUEUE_IO:
- case MFASYNC_CALLBACK_QUEUE_TIMER:
- case MFASYNC_CALLBACK_QUEUE_MULTITHREADED:
- case MFASYNC_CALLBACK_QUEUE_LONG_FUNCTION:
- return &system_queues[queue_id - 1];
- default:
- return NULL;
- }
-}
-
-static void CALLBACK standard_queue_cleanup_callback(void *object_data, void *group_data)
-{
-}
-
-static struct work_item * alloc_work_item(struct queue *queue, IMFAsyncResult *result)
-{
- struct work_item *item;
-
- item = heap_alloc_zero(sizeof(*item));
- item->result = result;
- IMFAsyncResult_AddRef(item->result);
- item->refcount = 1;
- item->queue = queue;
- list_init(&item->entry);
-
- return item;
-}
-
-static void release_work_item(struct work_item *item)
-{
- if (InterlockedDecrement(&item->refcount) == 0)
- {
- IMFAsyncResult_Release(item->result);
- heap_free(item);
- }
-}
-
-static void init_work_queue(MFASYNC_WORKQUEUE_TYPE queue_type, struct queue *queue)
-{
- TP_CALLBACK_ENVIRON_V3 env;
- unsigned int max_thread, i;
-
- queue->pool = CreateThreadpool(NULL);
-
- memset(&env, 0, sizeof(env));
- env.Version = 3;
- env.Size = sizeof(env);
- env.Pool = queue->pool;
- env.CleanupGroup = CreateThreadpoolCleanupGroup();
- env.CleanupGroupCancelCallback = standard_queue_cleanup_callback;
- env.CallbackPriority = TP_CALLBACK_PRIORITY_NORMAL;
- for (i = 0; i < ARRAY_SIZE(queue->envs); ++i)
- {
- queue->envs[i] = env;
- queue->envs[i].CallbackPriority = priorities[i];
- }
- list_init(&queue->pending_items);
- InitializeCriticalSection(&queue->cs);
-
- max_thread = (queue_type == MF_STANDARD_WORKQUEUE || queue_type == MF_WINDOW_WORKQUEUE) ? 1 : 4;
-
- SetThreadpoolThreadMinimum(queue->pool, 1);
- SetThreadpoolThreadMaximum(queue->pool, max_thread);
-
- if (queue_type == MF_WINDOW_WORKQUEUE)
- FIXME("MF_WINDOW_WORKQUEUE is not supported.\n");
-}
-
-static HRESULT grab_queue(DWORD queue_id, struct queue **ret)
-{
- struct queue *queue = get_system_queue(queue_id);
- MFASYNC_WORKQUEUE_TYPE queue_type;
- struct queue_handle *entry;
-
- *ret = NULL;
-
- if (!system_queues[SYS_QUEUE_STANDARD].pool)
- return MF_E_SHUTDOWN;
-
- if (queue && queue->pool)
- {
- *ret = queue;
- return S_OK;
- }
- else if (queue)
- {
- EnterCriticalSection(&queues_section);
- switch (queue_id)
- {
- case MFASYNC_CALLBACK_QUEUE_IO:
- case MFASYNC_CALLBACK_QUEUE_MULTITHREADED:
- case MFASYNC_CALLBACK_QUEUE_LONG_FUNCTION:
- queue_type = MF_MULTITHREADED_WORKQUEUE;
- break;
- default:
- queue_type = MF_STANDARD_WORKQUEUE;
- }
- init_work_queue(queue_type, queue);
- LeaveCriticalSection(&queues_section);
- *ret = queue;
- return S_OK;
- }
-
- /* Handles user queues. */
- if ((entry = get_queue_obj(queue_id)))
- *ret = entry->obj;
-
- return *ret ? S_OK : MF_E_INVALID_WORKQUEUE;
-}
-
-void init_system_queues(void)
-{
- /* Always initialize standard queue, keep the rest lazy. */
-
- EnterCriticalSection(&queues_section);
-
- if (system_queues[SYS_QUEUE_STANDARD].pool)
- {
- LeaveCriticalSection(&queues_section);
- return;
- }
-
- init_work_queue(MF_STANDARD_WORKQUEUE, &system_queues[SYS_QUEUE_STANDARD]);
-
- LeaveCriticalSection(&queues_section);
-}
-
-static HRESULT lock_user_queue(DWORD queue)
-{
- HRESULT hr = MF_E_INVALID_WORKQUEUE;
- struct queue_handle *entry;
-
- if (!(queue & MFASYNC_CALLBACK_QUEUE_PRIVATE_MASK))
- return S_OK;
-
- EnterCriticalSection(&queues_section);
- entry = get_queue_obj(queue);
- if (entry && entry->refcount)
- {
- entry->refcount++;
- hr = S_OK;
- }
- LeaveCriticalSection(&queues_section);
- return hr;
-}
-
-static void shutdown_queue(struct queue *queue)
-{
- struct work_item *item, *item2;
-
- if (!queue->pool)
- return;
-
- CloseThreadpoolCleanupGroupMembers(queue->envs[0].CleanupGroup, TRUE, NULL);
- CloseThreadpool(queue->pool);
- queue->pool = NULL;
-
- EnterCriticalSection(&queue->cs);
- LIST_FOR_EACH_ENTRY_SAFE(item, item2, &queue->pending_items, struct work_item, entry)
- {
- list_remove(&item->entry);
- release_work_item(item);
- }
- LeaveCriticalSection(&queue->cs);
-
- DeleteCriticalSection(&queue->cs);
-}
-
-static HRESULT unlock_user_queue(DWORD queue)
-{
- HRESULT hr = MF_E_INVALID_WORKQUEUE;
- struct queue_handle *entry;
-
- if (!(queue & MFASYNC_CALLBACK_QUEUE_PRIVATE_MASK))
- return S_OK;
-
- EnterCriticalSection(&queues_section);
- entry = get_queue_obj(queue);
- if (entry && entry->refcount)
- {
- if (--entry->refcount == 0)
- {
- shutdown_queue((struct queue *)entry->obj);
- heap_free(entry->obj);
- entry->obj = next_free_user_queue;
- next_free_user_queue = entry;
- }
- hr = S_OK;
- }
- LeaveCriticalSection(&queues_section);
- return hr;
-}
-
-void shutdown_system_queues(void)
-{
- unsigned int i;
-
- EnterCriticalSection(&queues_section);
-
- for (i = 0; i < ARRAY_SIZE(system_queues); ++i)
- {
- shutdown_queue(&system_queues[i]);
- }
-
- LeaveCriticalSection(&queues_section);
-}
-
-static struct work_item *grab_work_item(struct work_item *item)
-{
- InterlockedIncrement(&item->refcount);
- return item;
-}
-
-static void CALLBACK standard_queue_worker(TP_CALLBACK_INSTANCE *instance, void *context, TP_WORK *work)
-{
- struct work_item *item = context;
- MFASYNCRESULT *result = (MFASYNCRESULT *)item->result;
-
- TRACE("result object %p.\n", result);
-
- IMFAsyncCallback_Invoke(result->pCallback, item->result);
-
- release_work_item(item);
-}
-
-static HRESULT queue_submit_item(struct queue *queue, LONG priority, IMFAsyncResult *result)
-{
- TP_CALLBACK_PRIORITY callback_priority;
- struct work_item *item;
- TP_WORK *work_object;
-
- if (!(item = alloc_work_item(queue, result)))
- return E_OUTOFMEMORY;
-
- if (priority == 0)
- callback_priority = TP_CALLBACK_PRIORITY_NORMAL;
- else if (priority < 0)
- callback_priority = TP_CALLBACK_PRIORITY_LOW;
- else
- callback_priority = TP_CALLBACK_PRIORITY_HIGH;
- work_object = CreateThreadpoolWork(standard_queue_worker, item,
- (TP_CALLBACK_ENVIRON *)&queue->envs[callback_priority]);
- SubmitThreadpoolWork(work_object);
-
- TRACE("dispatched %p.\n", result);
-
- return S_OK;
-}
-
-static HRESULT queue_put_work_item(DWORD queue_id, LONG priority, IMFAsyncResult *result)
-{
- struct queue *queue;
- HRESULT hr;
-
- if (FAILED(hr = grab_queue(queue_id, &queue)))
- return hr;
-
- hr = queue_submit_item(queue, priority, result);
-
- return hr;
-}
-
-static HRESULT invoke_async_callback(IMFAsyncResult *result)
-{
- MFASYNCRESULT *result_data = (MFASYNCRESULT *)result;
- DWORD queue = MFASYNC_CALLBACK_QUEUE_STANDARD, flags;
- HRESULT hr;
-
- if (FAILED(IMFAsyncCallback_GetParameters(result_data->pCallback, &flags, &queue)))
- queue = MFASYNC_CALLBACK_QUEUE_STANDARD;
-
- if (FAILED(lock_user_queue(queue)))
- queue = MFASYNC_CALLBACK_QUEUE_STANDARD;
-
- hr = queue_put_work_item(queue, 0, result);
-
- unlock_user_queue(queue);
-
- return hr;
-}
-
-static void queue_release_pending_item(struct work_item *item)
-{
- EnterCriticalSection(&item->queue->cs);
- if (item->key)
- {
- list_remove(&item->entry);
- item->key = 0;
- release_work_item(item);
- }
- LeaveCriticalSection(&item->queue->cs);
-}
-
-static void CALLBACK waiting_item_callback(TP_CALLBACK_INSTANCE *instance, void *context, TP_WAIT *wait,
- TP_WAIT_RESULT wait_result)
-{
- struct work_item *item = context;
-
- TRACE("result object %p.\n", item->result);
-
- invoke_async_callback(item->result);
-
- release_work_item(item);
-}
-
-static void CALLBACK waiting_item_cancelable_callback(TP_CALLBACK_INSTANCE *instance, void *context, TP_WAIT *wait,
- TP_WAIT_RESULT wait_result)
-{
- struct work_item *item = context;
-
- TRACE("result object %p.\n", item->result);
-
- queue_release_pending_item(item);
-
- invoke_async_callback(item->result);
-
- release_work_item(item);
-}
-
-static void CALLBACK scheduled_item_callback(TP_CALLBACK_INSTANCE *instance, void *context, TP_TIMER *timer)
-{
- struct work_item *item = context;
-
- TRACE("result object %p.\n", item->result);
-
- invoke_async_callback(item->result);
-
- release_work_item(item);
-}
-
-static void CALLBACK scheduled_item_cancelable_callback(TP_CALLBACK_INSTANCE *instance, void *context, TP_TIMER *timer)
-{
- struct work_item *item = context;
-
- TRACE("result object %p.\n", item->result);
-
- queue_release_pending_item(item);
-
- invoke_async_callback(item->result);
-
- release_work_item(item);
-}
-
-static void CALLBACK periodic_item_callback(TP_CALLBACK_INSTANCE *instance, void *context, TP_TIMER *timer)
-{
- struct work_item *item = grab_work_item(context);
-
- invoke_async_callback(item->result);
-
- release_work_item(item);
-}
-
-static void queue_mark_item_pending(DWORD mask, struct work_item *item, MFWORKITEM_KEY *key)
-{
- *key = generate_item_key(mask);
- item->key = *key;
-
- EnterCriticalSection(&item->queue->cs);
- list_add_tail(&item->queue->pending_items, &item->entry);
- grab_work_item(item);
- LeaveCriticalSection(&item->queue->cs);
-}
-
-static HRESULT queue_submit_wait(struct queue *queue, HANDLE event, LONG priority, IMFAsyncResult *result,
- MFWORKITEM_KEY *key)
-{
- PTP_WAIT_CALLBACK callback;
- struct work_item *item;
-
- if (!(item = alloc_work_item(queue, result)))
- return E_OUTOFMEMORY;
-
- if (key)
- {
- queue_mark_item_pending(WAIT_ITEM_KEY_MASK, item, key);
- callback = waiting_item_cancelable_callback;
- }
- else
- callback = waiting_item_callback;
-
- item->u.wait_object = CreateThreadpoolWait(callback, item,
- (TP_CALLBACK_ENVIRON *)&queue->envs[TP_CALLBACK_PRIORITY_NORMAL]);
- SetThreadpoolWait(item->u.wait_object, event, NULL);
-
- TRACE("dispatched %p.\n", result);
-
- return S_OK;
-}
-
-static HRESULT queue_submit_timer(struct queue *queue, IMFAsyncResult *result, INT64 timeout, DWORD period,
- MFWORKITEM_KEY *key)
-{
- PTP_TIMER_CALLBACK callback;
- struct work_item *item;
- FILETIME filetime;
- LARGE_INTEGER t;
-
- if (!(item = alloc_work_item(queue, result)))
- return E_OUTOFMEMORY;
-
- if (key)
- {
- queue_mark_item_pending(SCHEDULED_ITEM_KEY_MASK, item, key);
- }
-
- if (period)
- callback = periodic_item_callback;
- else
- callback = key ? scheduled_item_cancelable_callback : scheduled_item_callback;
-
- t.QuadPart = timeout * 1000 * 10;
- filetime.dwLowDateTime = t.u.LowPart;
- filetime.dwHighDateTime = t.u.HighPart;
-
- item->u.timer_object = CreateThreadpoolTimer(callback, item,
- (TP_CALLBACK_ENVIRON *)&queue->envs[TP_CALLBACK_PRIORITY_NORMAL]);
- SetThreadpoolTimer(item->u.timer_object, &filetime, period, 0);
-
- TRACE("dispatched %p.\n", result);
-
- return S_OK;
-}
-
-static HRESULT queue_cancel_item(struct queue *queue, MFWORKITEM_KEY key)
-{
- HRESULT hr = MF_E_NOT_FOUND;
- struct work_item *item;
-
- EnterCriticalSection(&queue->cs);
- LIST_FOR_EACH_ENTRY(item, &queue->pending_items, struct work_item, entry)
- {
- if (item->key == key)
- {
- key >>= 32;
- if ((key & WAIT_ITEM_KEY_MASK) == WAIT_ITEM_KEY_MASK)
- CloseThreadpoolWait(item->u.wait_object);
- else if ((key & SCHEDULED_ITEM_KEY_MASK) == SCHEDULED_ITEM_KEY_MASK)
- CloseThreadpoolTimer(item->u.timer_object);
- else
- WARN("Unknown item key mask %#x.\n", (DWORD)key);
- queue_release_pending_item(item);
- hr = S_OK;
- break;
- }
- }
- LeaveCriticalSection(&queue->cs);
-
- return hr;
-}
-
-static HRESULT alloc_user_queue(MFASYNC_WORKQUEUE_TYPE queue_type, DWORD *queue_id)
-{
- struct queue_handle *entry;
- struct queue *queue;
- unsigned int idx;
-
- *queue_id = MFASYNC_CALLBACK_QUEUE_UNDEFINED;
-
- if (!is_platform_locked())
- return MF_E_SHUTDOWN;
-
- queue = heap_alloc_zero(sizeof(*queue));
- if (!queue)
- return E_OUTOFMEMORY;
- init_work_queue(queue_type, queue);
-
- EnterCriticalSection(&queues_section);
-
- entry = next_free_user_queue;
- if (entry)
- next_free_user_queue = entry->obj;
- else if (next_unused_user_queue < user_queues + MAX_USER_QUEUE_HANDLES)
- entry = next_unused_user_queue++;
- else
- {
- LeaveCriticalSection(&queues_section);
- heap_free(queue);
- WARN("Out of user queue handles.\n");
- return E_OUTOFMEMORY;
- }
-
- entry->refcount = 1;
- entry->obj = queue;
- if (++queue_generation == 0xffff) queue_generation = 1;
- entry->generation = queue_generation;
- idx = entry - user_queues + FIRST_USER_QUEUE_HANDLE;
- *queue_id = (idx << 16) | entry->generation;
-
- LeaveCriticalSection(&queues_section);
-
- return S_OK;
-}
-
-struct async_result
-{
- MFASYNCRESULT result;
- LONG refcount;
- IUnknown *object;
- IUnknown *state;
-};
-
-static struct async_result *impl_from_IMFAsyncResult(IMFAsyncResult *iface)
-{
- return CONTAINING_RECORD(iface, struct async_result, result.AsyncResult);
-}
-
-static HRESULT WINAPI async_result_QueryInterface(IMFAsyncResult *iface, REFIID riid, void **obj)
-{
- TRACE("%p, %s, %p.\n", iface, debugstr_guid(riid), obj);
-
- if (IsEqualIID(riid, &IID_IMFAsyncResult) ||
- IsEqualIID(riid, &IID_IUnknown))
- {
- *obj = iface;
- IMFAsyncResult_AddRef(iface);
- return S_OK;
- }
-
- *obj = NULL;
- WARN("Unsupported interface %s.\n", debugstr_guid(riid));
- return E_NOINTERFACE;
-}
-
-static ULONG WINAPI async_result_AddRef(IMFAsyncResult *iface)
-{
- struct async_result *result = impl_from_IMFAsyncResult(iface);
- ULONG refcount = InterlockedIncrement(&result->refcount);
-
- TRACE("%p, %u.\n", iface, refcount);
-
- return refcount;
-}
-
-static ULONG WINAPI async_result_Release(IMFAsyncResult *iface)
-{
- struct async_result *result = impl_from_IMFAsyncResult(iface);
- ULONG refcount = InterlockedDecrement(&result->refcount);
-
- TRACE("%p, %u.\n", iface, refcount);
-
- if (!refcount)
- {
- if (result->result.pCallback)
- IMFAsyncCallback_Release(result->result.pCallback);
- if (result->object)
- IUnknown_Release(result->object);
- if (result->state)
- IUnknown_Release(result->state);
- if (result->result.hEvent)
- CloseHandle(result->result.hEvent);
- heap_free(result);
-
- MFUnlockPlatform();
- }
-
- return refcount;
-}
-
-static HRESULT WINAPI async_result_GetState(IMFAsyncResult *iface, IUnknown **state)
-{
- struct async_result *result = impl_from_IMFAsyncResult(iface);
-
- TRACE("%p, %p.\n", iface, state);
-
- if (!result->state)
- return E_POINTER;
-
- *state = result->state;
- IUnknown_AddRef(*state);
-
- return S_OK;
-}
-
-static HRESULT WINAPI async_result_GetStatus(IMFAsyncResult *iface)
-{
- struct async_result *result = impl_from_IMFAsyncResult(iface);
-
- TRACE("%p.\n", iface);
-
- return result->result.hrStatusResult;
-}
-
-static HRESULT WINAPI async_result_SetStatus(IMFAsyncResult *iface, HRESULT status)
-{
- struct async_result *result = impl_from_IMFAsyncResult(iface);
-
- TRACE("%p, %#x.\n", iface, status);
-
- result->result.hrStatusResult = status;
-
- return S_OK;
-}
-
-static HRESULT WINAPI async_result_GetObject(IMFAsyncResult *iface, IUnknown **object)
-{
- struct async_result *result = impl_from_IMFAsyncResult(iface);
-
- TRACE("%p, %p.\n", iface, object);
-
- if (!result->object)
- return E_POINTER;
-
- *object = result->object;
- IUnknown_AddRef(*object);
-
- return S_OK;
-}
-
-static IUnknown * WINAPI async_result_GetStateNoAddRef(IMFAsyncResult *iface)
-{
- struct async_result *result = impl_from_IMFAsyncResult(iface);
-
- TRACE("%p.\n", iface);
-
- return result->state;
-}
-
-static const IMFAsyncResultVtbl async_result_vtbl =
-{
- async_result_QueryInterface,
- async_result_AddRef,
- async_result_Release,
- async_result_GetState,
- async_result_GetStatus,
- async_result_SetStatus,
- async_result_GetObject,
- async_result_GetStateNoAddRef,
-};
-
-static HRESULT create_async_result(IUnknown *object, IMFAsyncCallback *callback, IUnknown *state, IMFAsyncResult **out)
-{
- struct async_result *result;
-
- if (!out)
- return E_INVALIDARG;
-
- result = heap_alloc_zero(sizeof(*result));
- if (!result)
- return E_OUTOFMEMORY;
-
- MFLockPlatform();
-
- result->result.AsyncResult.lpVtbl = &async_result_vtbl;
- result->refcount = 1;
- result->object = object;
- if (result->object)
- IUnknown_AddRef(result->object);
- result->result.pCallback = callback;
- if (result->result.pCallback)
- IMFAsyncCallback_AddRef(result->result.pCallback);
- result->state = state;
- if (result->state)
- IUnknown_AddRef(result->state);
-
- *out = &result->result.AsyncResult;
-
- TRACE("Created async result object %p.\n", *out);
-
- return S_OK;
-}
-
-/***********************************************************************
- * MFCreateAsyncResult (mfplat.@)
- */
-HRESULT WINAPI MFCreateAsyncResult(IUnknown *object, IMFAsyncCallback *callback, IUnknown *state, IMFAsyncResult **out)
-{
- TRACE("%p, %p, %p, %p.\n", object, callback, state, out);
-
- return create_async_result(object, callback, state, out);
-}
-
/***********************************************************************
* MFAllocateWorkQueue (mfplat.@)
*/
@@ -807,37 +36,7 @@ HRESULT WINAPI MFAllocateWorkQueue(DWORD *queue)
{
TRACE("%p.\n", queue);
- return alloc_user_queue(MF_STANDARD_WORKQUEUE, queue);
-}
-
-/***********************************************************************
- * MFAllocateWorkQueueEx (mfplat.@)
- */
-HRESULT WINAPI MFAllocateWorkQueueEx(MFASYNC_WORKQUEUE_TYPE queue_type, DWORD *queue)
-{
- TRACE("%d, %p.\n", queue_type, queue);
-
- return alloc_user_queue(queue_type, queue);
-}
-
-/***********************************************************************
- * MFLockWorkQueue (mfplat.@)
- */
-HRESULT WINAPI MFLockWorkQueue(DWORD queue)
-{
- TRACE("%#x.\n", queue);
-
- return lock_user_queue(queue);
-}
-
-/***********************************************************************
- * MFUnlockWorkQueue (mfplat.@)
- */
-HRESULT WINAPI MFUnlockWorkQueue(DWORD queue)
-{
- TRACE("%#x.\n", queue);
-
- return unlock_user_queue(queue);
+ return RtwqAllocateWorkQueue(RTWQ_STANDARD_WORKQUEUE, queue);
}
/***********************************************************************
@@ -853,7 +52,7 @@ HRESULT WINAPI MFPutWorkItem(DWORD queue, IMFAsyncCallback *callback, IUnknown *
if (FAILED(hr = MFCreateAsyncResult(NULL, callback, state, &result)))
return hr;
- hr = queue_put_work_item(queue, 0, result);
+ hr = MFPutWorkItemEx2(queue, 0, result);
IMFAsyncResult_Release(result);
@@ -873,7 +72,7 @@ HRESULT WINAPI MFPutWorkItem2(DWORD queue, LONG priority, IMFAsyncCallback *call
if (FAILED(hr = MFCreateAsyncResult(NULL, callback, state, &result)))
return hr;
- hr = queue_put_work_item(queue, priority, result);
+ hr = MFPutWorkItemEx2(queue, priority, result);
IMFAsyncResult_Release(result);
@@ -887,7 +86,7 @@ HRESULT WINAPI MFPutWorkItemEx(DWORD queue, IMFAsyncResult *result)
{
TRACE("%#x, %p\n", queue, result);
- return queue_put_work_item(queue, 0, result);
+ return MFPutWorkItemEx2(queue, 0, result);
}
/***********************************************************************
@@ -897,46 +96,11 @@ HRESULT WINAPI MFPutWorkItemEx2(DWORD queue, LONG priority, IMFAsyncResult *resu
{
TRACE("%#x, %d, %p\n", queue, priority, result);
- return queue_put_work_item(queue, priority, result);
+ return RtwqPutWorkItem(queue, priority, (IRtwqAsyncResult *)result);
}
/***********************************************************************
- * MFInvokeCallback (mfplat.@)
- */
-HRESULT WINAPI MFInvokeCallback(IMFAsyncResult *result)
-{
- TRACE("%p.\n", result);
-
- return invoke_async_callback(result);
-}
-
-static HRESULT schedule_work_item(IMFAsyncResult *result, INT64 timeout, MFWORKITEM_KEY *key)
-{
- struct queue *queue;
- HRESULT hr;
-
- if (FAILED(hr = grab_queue(MFASYNC_CALLBACK_QUEUE_TIMER, &queue)))
- return hr;
-
- TRACE("%p, %s, %p.\n", result, wine_dbgstr_longlong(timeout), key);
-
- hr = queue_submit_timer(queue, result, timeout, 0, key);
-
- return hr;
-}
-
-/***********************************************************************
- * MFScheduleWorkItemEx (mfplat.@)
- */
-HRESULT WINAPI MFScheduleWorkItemEx(IMFAsyncResult *result, INT64 timeout, MFWORKITEM_KEY *key)
-{
- TRACE("%p, %s, %p.\n", result, wine_dbgstr_longlong(timeout), key);
-
- return schedule_work_item(result, timeout, key);
-}
-
-/***********************************************************************
- * MFScheduleWorkItemEx (mfplat.@)
+ * MFScheduleWorkItem (mfplat.@)
*/
HRESULT WINAPI MFScheduleWorkItem(IMFAsyncCallback *callback, IUnknown *state, INT64 timeout, MFWORKITEM_KEY *key)
{
@@ -948,7 +112,7 @@ HRESULT WINAPI MFScheduleWorkItem(IMFAsyncCallback *callback, IUnknown *state, I
if (FAILED(hr = MFCreateAsyncResult(NULL, callback, state, &result)))
return hr;
- hr = schedule_work_item(result, timeout, key);
+ hr = MFScheduleWorkItemEx(result, timeout, key);
IMFAsyncResult_Release(result);
@@ -956,200 +120,43 @@ HRESULT WINAPI MFScheduleWorkItem(IMFAsyncCallback *callback, IUnknown *state, I
}
/***********************************************************************
- * MFPutWaitingWorkItem (mfplat.@)
+ * MFScheduleWorkItemEx (mfplat.@)
*/
-HRESULT WINAPI MFPutWaitingWorkItem(HANDLE event, LONG priority, IMFAsyncResult *result, MFWORKITEM_KEY *key)
+HRESULT WINAPI MFScheduleWorkItemEx(IMFAsyncResult *result, INT64 timeout, MFWORKITEM_KEY *key)
{
- struct queue *queue;
- HRESULT hr;
-
- TRACE("%p, %d, %p, %p.\n", event, priority, result, key);
-
- if (FAILED(hr = grab_queue(MFASYNC_CALLBACK_QUEUE_TIMER, &queue)))
- return hr;
-
- hr = queue_submit_wait(queue, event, priority, result, key);
+ TRACE("%p, %s, %p.\n", result, wine_dbgstr_longlong(timeout), key);
- return hr;
+ return RtwqScheduleWorkItem((IRtwqAsyncResult *)result, timeout, key);
}
/***********************************************************************
- * MFCancelWorkItem (mfplat.@)
+ * MFInvokeCallback (mfplat.@)
*/
-HRESULT WINAPI MFCancelWorkItem(MFWORKITEM_KEY key)
+HRESULT WINAPI MFInvokeCallback(IMFAsyncResult *result)
{
- struct queue *queue;
- HRESULT hr;
-
- TRACE("%s.\n", wine_dbgstr_longlong(key));
-
- if (FAILED(hr = grab_queue(MFASYNC_CALLBACK_QUEUE_TIMER, &queue)))
- return hr;
-
- hr = queue_cancel_item(queue, key);
-
- return hr;
-}
+ TRACE("%p.\n", result);
-static DWORD get_timer_period(void)
-{
- return 10;
+ return RtwqInvokeCallback((IRtwqAsyncResult *)result);
}
/***********************************************************************
- * MFGetTimerPeriodicity (mfplat.@)
+ * MFCreateAsyncResult (mfplat.@)
*/
-HRESULT WINAPI MFGetTimerPeriodicity(DWORD *period)
-{
- TRACE("%p.\n", period);
-
- *period = get_timer_period();
-
- return S_OK;
-}
-
-struct periodic_callback
-{
- IMFAsyncCallback IMFAsyncCallback_iface;
- LONG refcount;
- MFPERIODICCALLBACK callback;
-};
-
-static struct periodic_callback *impl_from_IMFAsyncCallback(IMFAsyncCallback *iface)
-{
- return CONTAINING_RECORD(iface, struct periodic_callback, IMFAsyncCallback_iface);
-}
-
-static HRESULT WINAPI periodic_callback_QueryInterface(IMFAsyncCallback *iface, REFIID riid, void **obj)
-{
- if (IsEqualIID(riid, &IID_IMFAsyncCallback) ||
- IsEqualIID(riid, &IID_IUnknown))
- {
- *obj = iface;
- IMFAsyncCallback_AddRef(iface);
- return S_OK;
- }
-
- *obj = NULL;
- return E_NOINTERFACE;
-}
-
-static ULONG WINAPI periodic_callback_AddRef(IMFAsyncCallback *iface)
-{
- struct periodic_callback *callback = impl_from_IMFAsyncCallback(iface);
- ULONG refcount = InterlockedIncrement(&callback->refcount);
-
- TRACE("%p, %u.\n", iface, refcount);
-
- return refcount;
-}
-
-static ULONG WINAPI periodic_callback_Release(IMFAsyncCallback *iface)
-{
- struct periodic_callback *callback = impl_from_IMFAsyncCallback(iface);
- ULONG refcount = InterlockedDecrement(&callback->refcount);
-
- TRACE("%p, %u.\n", iface, refcount);
-
- if (!refcount)
- heap_free(callback);
-
- return refcount;
-}
-
-static HRESULT WINAPI periodic_callback_GetParameters(IMFAsyncCallback *iface, DWORD *flags, DWORD *queue)
-{
- return E_NOTIMPL;
-}
-
-static HRESULT WINAPI periodic_callback_Invoke(IMFAsyncCallback *iface, IMFAsyncResult *result)
-{
- struct periodic_callback *callback = impl_from_IMFAsyncCallback(iface);
- IUnknown *context = NULL;
-
- if (FAILED(IMFAsyncResult_GetObject(result, &context)))
- WARN("Expected object to be set for result object.\n");
-
- callback->callback(context);
-
- if (context)
- IUnknown_Release(context);
-
- return S_OK;
-}
-
-static const IMFAsyncCallbackVtbl periodic_callback_vtbl =
-{
- periodic_callback_QueryInterface,
- periodic_callback_AddRef,
- periodic_callback_Release,
- periodic_callback_GetParameters,
- periodic_callback_Invoke,
-};
-
-static HRESULT create_periodic_callback_obj(MFPERIODICCALLBACK callback, IMFAsyncCallback **out)
+HRESULT WINAPI MFCreateAsyncResult(IUnknown *object, IMFAsyncCallback *callback, IUnknown *state, IMFAsyncResult **out)
{
- struct periodic_callback *object;
-
- object = heap_alloc(sizeof(*object));
- if (!object)
- return E_OUTOFMEMORY;
-
- object->IMFAsyncCallback_iface.lpVtbl = &periodic_callback_vtbl;
- object->refcount = 1;
- object->callback = callback;
-
- *out = &object->IMFAsyncCallback_iface;
+ TRACE("%p, %p, %p, %p.\n", object, callback, state, out);
- return S_OK;
+ return RtwqCreateAsyncResult(object, (IRtwqAsyncCallback *)callback, state, (IRtwqAsyncResult **)out);
}
/***********************************************************************
- * MFAddPeriodicCallback (mfplat.@)
+ * MFGetTimerPeriodicity (mfplat.@)
*/
-HRESULT WINAPI MFAddPeriodicCallback(MFPERIODICCALLBACK callback, IUnknown *context, DWORD *key)
+HRESULT WINAPI MFGetTimerPeriodicity(DWORD *period)
{
- IMFAsyncCallback *periodic_callback;
- MFWORKITEM_KEY workitem_key;
- IMFAsyncResult *result;
- struct queue *queue;
- HRESULT hr;
-
- TRACE("%p, %p, %p.\n", callback, context, key);
-
- if (FAILED(hr = grab_queue(MFASYNC_CALLBACK_QUEUE_TIMER, &queue)))
- return hr;
-
- if (FAILED(hr = create_periodic_callback_obj(callback, &periodic_callback)))
- return hr;
-
- hr = create_async_result(context, periodic_callback, NULL, &result);
- IMFAsyncCallback_Release(periodic_callback);
- if (FAILED(hr))
- return hr;
-
- hr = queue_submit_timer(queue, result, 0, get_timer_period(), key ? &workitem_key : NULL);
-
- IMFAsyncResult_Release(result);
+ TRACE("%p.\n", period);
- if (key)
- *key = workitem_key;
+ *period = 10;
return S_OK;
}
-
-/***********************************************************************
- * MFRemovePeriodicCallback (mfplat.@)
- */
-HRESULT WINAPI MFRemovePeriodicCallback(DWORD key)
-{
- struct queue *queue;
- HRESULT hr;
-
- TRACE("%#x.\n", key);
-
- if (FAILED(hr = grab_queue(MFASYNC_CALLBACK_QUEUE_TIMER, &queue)))
- return hr;
-
- return queue_cancel_item(queue, get_item_key(SCHEDULED_ITEM_KEY_MASK, key));
-}
diff --git a/dlls/rtworkq/queue.c b/dlls/rtworkq/queue.c
index a6aac6abda..ffe2fd1ae0 100644
--- a/dlls/rtworkq/queue.c
+++ b/dlls/rtworkq/queue.c
@@ -1071,3 +1071,14 @@ HRESULT WINAPI RtwqUnlockWorkQueue(DWORD queue)
return unlock_user_queue(queue);
}
+
+BOOL WINAPI DllMain(HINSTANCE hinstDLL, DWORD reason, void *reserved)
+{
+ switch (reason)
+ {
+ case DLL_PROCESS_ATTACH:
+ DisableThreadLibraryCalls(hinstDLL);
+ break;
+ }
+ return TRUE;
+}
--
2.24.1
More information about the wine-devel
mailing list