[PATCH 1/5] quartz/filesource: Use a completion port to handle asynchronous requests.

Zebediah Figura z.figura12 at gmail.com
Wed Jan 29 19:56:59 CST 2020


Thus simplifying the code and allowing us to handle more than 63 at once.

Signed-off-by: Zebediah Figura <z.figura12 at gmail.com>
---
 dlls/quartz/filesource.c       | 376 +++++++++++++--------------------
 dlls/quartz/tests/filesource.c |  26 ++-
 2 files changed, 173 insertions(+), 229 deletions(-)

diff --git a/dlls/quartz/filesource.c b/dlls/quartz/filesource.c
index 8b7a0fde925..7ea67ccc12f 100644
--- a/dlls/quartz/filesource.c
+++ b/dlls/quartz/filesource.c
@@ -46,12 +46,12 @@ static const AM_MEDIA_TYPE default_mt =
     NULL
 };
 
-typedef struct DATAREQUEST
+struct request
 {
-    IMediaSample *pSample;
-    DWORD_PTR dwUserData;
+    IMediaSample *sample;
+    DWORD_PTR cookie;
     OVERLAPPED ovl;
-} DATAREQUEST;
+};
 
 typedef struct AsyncReader
 {
@@ -64,15 +64,12 @@ typedef struct AsyncReader
     LPOLESTR pszFileName;
     AM_MEDIA_TYPE *pmt;
     ALLOCATOR_PROPERTIES allocProps;
-    HANDLE file;
-    BOOL flushing;
-    unsigned int queued_number;
-    unsigned int samples;
-    unsigned int oldest_sample;
+    HANDLE file, port, io_thread;
     CRITICAL_SECTION sample_cs;
-    DATAREQUEST *sample_list;
-    /* Have a handle for every sample, and then one more as flushing handle */
-    HANDLE *handle_list;
+    BOOL flushing;
+    struct request *requests;
+    unsigned int max_requests;
+    CONDITION_VARIABLE sample_cv;
 } AsyncReader;
 
 static const struct strmbase_source_ops source_ops;
@@ -352,18 +349,23 @@ static void async_reader_destroy(struct strmbase_filter *iface)
 
         IPin_Disconnect(&filter->source.pin.IPin_iface);
 
-        CoTaskMemFree(filter->sample_list);
-        if (filter->handle_list)
+        if (filter->requests)
         {
-            for (i = 0; i <= filter->samples; ++i)
-                CloseHandle(filter->handle_list[i]);
-            CoTaskMemFree(filter->handle_list);
+            for (i = 0; i < filter->max_requests; ++i)
+                CloseHandle(filter->requests[i].ovl.hEvent);
+            CoTaskMemFree(filter->requests);
         }
         CloseHandle(filter->file);
         filter->sample_cs.DebugInfo->Spare[0] = 0;
         DeleteCriticalSection(&filter->sample_cs);
         strmbase_source_cleanup(&filter->source);
     }
+
+    PostQueuedCompletionStatus(filter->port, 0, 1, NULL);
+    WaitForSingleObject(filter->io_thread, INFINITE);
+    CloseHandle(filter->io_thread);
+    CloseHandle(filter->port);
+
     CoTaskMemFree(filter->pszFileName);
     if (filter->pmt)
         DeleteMediaType(filter->pmt);
@@ -392,6 +394,42 @@ static const struct strmbase_filter_ops filter_ops =
     .filter_query_interface = async_reader_query_interface,
 };
 
+static DWORD CALLBACK io_thread(void *arg)
+{
+    AsyncReader *filter = arg;
+    struct request *req;
+    OVERLAPPED *ovl;
+    ULONG_PTR key;
+    DWORD size;
+    BOOL ret;
+
+    for (;;)
+    {
+        ret = GetQueuedCompletionStatus(filter->port, &size, &key, &ovl, INFINITE);
+
+        if (ret && key)
+            break;
+
+        EnterCriticalSection(&filter->sample_cs);
+
+        req = CONTAINING_RECORD(ovl, struct request, ovl);
+        TRACE("Got sample %u.\n", req - filter->requests);
+        assert(req >= filter->requests && req < filter->requests + filter->max_requests);
+
+        if (ret)
+            WakeConditionVariable(&filter->sample_cv);
+        else
+        {
+            ERR("GetQueuedCompletionStatus() returned failure, error %u.\n", GetLastError());
+            req->sample = NULL;
+        }
+
+        LeaveCriticalSection(&filter->sample_cs);
+    }
+
+    return 0;
+}
+
 HRESULT AsyncReader_create(IUnknown *outer, void **out)
 {
     AsyncReader *pAsyncRead;
@@ -410,6 +448,12 @@ HRESULT AsyncReader_create(IUnknown *outer, void **out)
     pAsyncRead->pszFileName = NULL;
     pAsyncRead->pmt = NULL;
 
+    InitializeCriticalSection(&pAsyncRead->sample_cs);
+    pAsyncRead->sample_cs.DebugInfo->Spare[0] = (DWORD_PTR)(__FILE__ ": FileAsyncReader.sample_cs");
+    InitializeConditionVariable(&pAsyncRead->sample_cv);
+    pAsyncRead->port = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
+    pAsyncRead->io_thread = CreateThread(NULL, 0, io_thread, pAsyncRead, 0, NULL);
+
     *out = &pAsyncRead->filter.IUnknown_inner;
 
     TRACE("-- created at %p\n", pAsyncRead);
@@ -470,11 +514,7 @@ static HRESULT WINAPI FileSource_Load(IFileSourceFilter * iface, LPCOLESTR pszFi
 
     This->file = hFile;
     This->flushing = FALSE;
-    This->sample_list = NULL;
-    This->handle_list = NULL;
-    This->queued_number = 0;
-    InitializeCriticalSection(&This->sample_cs);
-    This->sample_cs.DebugInfo->Spare[0] = (DWORD_PTR)(__FILE__ ": FileAsyncReader.sample_cs");
+    This->requests = NULL;
 
     if (This->pmt)
         DeleteMediaType(This->pmt);
@@ -705,42 +745,33 @@ static HRESULT WINAPI FileAsyncReader_RequestAllocator(IAsyncReader * iface, IMe
 done:
     if (SUCCEEDED(hr))
     {
-        CoTaskMemFree(This->sample_list);
-        if (This->handle_list)
+        if (This->requests)
         {
-            int x;
-            for (x = 0; x <= This->samples; ++x)
-                CloseHandle(This->handle_list[x]);
-            CoTaskMemFree(This->handle_list);
+            unsigned int i;
+
+            for (i = 0; i < This->max_requests; ++i)
+                CloseHandle(This->requests[i].ovl.hEvent);
+            CoTaskMemFree(This->requests);
         }
 
-        This->samples = pProps->cBuffers;
-        This->oldest_sample = 0;
-        TRACE("Samples: %u\n", This->samples);
-        This->sample_list = CoTaskMemAlloc(sizeof(This->sample_list[0]) * pProps->cBuffers);
-        This->handle_list = CoTaskMemAlloc(sizeof(HANDLE) * pProps->cBuffers * 2);
+        This->max_requests = pProps->cBuffers;
+        TRACE("Maximum request count: %u.\n", This->max_requests);
+        This->requests = CoTaskMemAlloc(sizeof(This->requests[0]) * pProps->cBuffers);
 
-        if (This->sample_list && This->handle_list)
+        if (This->requests)
         {
             int x;
-            ZeroMemory(This->sample_list, sizeof(This->sample_list[0]) * pProps->cBuffers);
-            for (x = 0; x < This->samples; ++x)
-            {
-                This->sample_list[x].ovl.hEvent = This->handle_list[x] = CreateEventW(NULL, 0, 0, NULL);
-                if (x + 1 < This->samples)
-                    This->handle_list[This->samples + 1 + x] = This->handle_list[x];
-            }
-            This->handle_list[This->samples] = CreateEventW(NULL, 1, 0, NULL);
+            ZeroMemory(This->requests, sizeof(This->requests[0]) * pProps->cBuffers);
+            for (x = 0; x < This->max_requests; ++x)
+                This->requests[x].ovl.hEvent = CreateEventW(NULL, 0, 0, NULL);
             This->allocProps = *pProps;
         }
         else
         {
             hr = E_OUTOFMEMORY;
-            CoTaskMemFree(This->sample_list);
-            CoTaskMemFree(This->handle_list);
-            This->samples = 0;
-            This->sample_list = NULL;
-            This->handle_list = NULL;
+            CoTaskMemFree(This->requests);
+            This->max_requests = 0;
+            This->requests = NULL;
         }
     }
 
@@ -755,215 +786,107 @@ done:
     return hr;
 }
 
-/* we could improve the Request/WaitForNext mechanism by allowing out of order samples.
- * however, this would be quite complicated to do and may be a bit error prone */
-static HRESULT WINAPI FileAsyncReader_Request(IAsyncReader * iface, IMediaSample * pSample, DWORD_PTR dwUser)
+static HRESULT WINAPI FileAsyncReader_Request(IAsyncReader *iface, IMediaSample *sample, DWORD_PTR cookie)
 {
-    AsyncReader *This = impl_from_IAsyncReader(iface);
-    HRESULT hr = S_OK;
-    REFERENCE_TIME Start;
-    REFERENCE_TIME Stop;
-    LPBYTE pBuffer = NULL;
+    AsyncReader *filter = impl_from_IAsyncReader(iface);
+    REFERENCE_TIME start, end;
+    struct request *req;
+    unsigned int i;
+    HRESULT hr;
+    BYTE *data;
 
-    TRACE("%p->(%p, %lx)\n", This, pSample, dwUser);
+    TRACE("filter %p, sample %p, cookie %#lx.\n", filter, sample, cookie);
 
-    if (!pSample)
+    if (!sample)
         return E_POINTER;
 
-    /* get start and stop positions in bytes */
-    if (SUCCEEDED(hr))
-        hr = IMediaSample_GetTime(pSample, &Start, &Stop);
+    if (FAILED(hr = IMediaSample_GetTime(sample, &start, &end)))
+        return hr;
 
-    if (SUCCEEDED(hr))
-        hr = IMediaSample_GetPointer(pSample, &pBuffer);
+    if (FAILED(hr = IMediaSample_GetPointer(sample, &data)))
+        return hr;
 
-    EnterCriticalSection(&This->sample_cs);
-    if (This->flushing)
+    EnterCriticalSection(&filter->sample_cs);
+    if (filter->flushing)
     {
-        LeaveCriticalSection(&This->sample_cs);
+        LeaveCriticalSection(&filter->sample_cs);
         return VFW_E_WRONG_STATE;
     }
 
-    if (SUCCEEDED(hr))
+    for (i = 0; i < filter->max_requests; ++i)
     {
-        DWORD dwLength = (DWORD) BYTES_FROM_MEDIATIME(Stop - Start);
-        DATAREQUEST *pDataRq;
-        int x;
-
-        /* Try to insert above the waiting sample if possible */
-        for (x = This->oldest_sample; x < This->samples; ++x)
-        {
-            if (!This->sample_list[x].pSample)
-                break;
-        }
-
-        if (x >= This->samples)
-            for (x = 0; x < This->oldest_sample; ++x)
-            {
-                if (!This->sample_list[x].pSample)
-                    break;
-            }
-
-        /* There must be a sample we have found */
-        assert(x < This->samples);
-        ++This->queued_number;
-
-        pDataRq = This->sample_list + x;
-
-        pDataRq->ovl.u.s.Offset = (DWORD) BYTES_FROM_MEDIATIME(Start);
-        pDataRq->ovl.u.s.OffsetHigh = (DWORD)(BYTES_FROM_MEDIATIME(Start) >> (sizeof(DWORD) * 8));
-        pDataRq->dwUserData = dwUser;
-
-        /* we violate traditional COM rules here by maintaining
-         * a reference to the sample, but not calling AddRef, but
-         * that's what MSDN says to do */
-        pDataRq->pSample = pSample;
+        if (!filter->requests[i].sample)
+            break;
+    }
+    assert(i < filter->max_requests);
+    req = &filter->requests[i];
 
-        /* this is definitely not how it is implemented on Win9x
-         * as they do not support async reads on files, but it is
-         * sooo much easier to use this than messing around with threads!
-         */
-        if (!ReadFile(This->file, pBuffer, dwLength, NULL, &pDataRq->ovl))
-            hr = HRESULT_FROM_WIN32(GetLastError());
+    req->ovl.u.s.Offset = BYTES_FROM_MEDIATIME(start);
+    req->ovl.u.s.OffsetHigh = BYTES_FROM_MEDIATIME(start) >> 32;
+    /* No reference is taken. */
 
-        /* ERROR_IO_PENDING is not actually an error since this is what we want! */
-        if (hr == HRESULT_FROM_WIN32(ERROR_IO_PENDING))
-            hr = S_OK;
+    if (ReadFile(filter->file, data, BYTES_FROM_MEDIATIME(end - start), NULL, &req->ovl)
+            || GetLastError() == ERROR_IO_PENDING)
+    {
+        hr = S_OK;
+        req->sample = sample;
+        req->cookie = cookie;
     }
+    else
+        hr = HRESULT_FROM_WIN32(GetLastError());
 
-    LeaveCriticalSection(&This->sample_cs);
-
-    TRACE("-- %x\n", hr);
+    LeaveCriticalSection(&filter->sample_cs);
     return hr;
 }
 
-static HRESULT WINAPI FileAsyncReader_WaitForNext(IAsyncReader * iface, DWORD dwTimeout, IMediaSample ** ppSample, DWORD_PTR * pdwUser)
+static HRESULT WINAPI FileAsyncReader_WaitForNext(IAsyncReader *iface,
+        DWORD timeout, IMediaSample **sample, DWORD_PTR *cookie)
 {
-    AsyncReader *This = impl_from_IAsyncReader(iface);
-    HRESULT hr = S_OK;
-    DWORD buffer = ~0;
-
-    TRACE("%p->(%u, %p, %p)\n", This, dwTimeout, ppSample, pdwUser);
-
-    *ppSample = NULL;
-    *pdwUser = 0;
+    AsyncReader *filter = impl_from_IAsyncReader(iface);
+    unsigned int i;
 
-    EnterCriticalSection(&This->sample_cs);
-    if (!This->flushing)
-    {
-        LONG oldest = This->oldest_sample;
+    TRACE("filter %p, timeout %u, sample %p, cookie %p.\n", filter, timeout, sample, cookie);
 
-        if (!This->queued_number)
-        {
-            /* It could be that nothing is queued right now, but that can be fixed */
-            WARN("Called without samples in queue and not flushing!!\n");
-        }
-        LeaveCriticalSection(&This->sample_cs);
+    *sample = NULL;
+    *cookie = 0;
 
-        /* wait for an object to read, or time out */
-        buffer = WaitForMultipleObjectsEx(This->samples+1, This->handle_list + oldest, FALSE, dwTimeout, TRUE);
+    EnterCriticalSection(&filter->sample_cs);
 
-        EnterCriticalSection(&This->sample_cs);
-        if (buffer <= This->samples)
+    do
+    {
+        if (filter->flushing)
         {
-            /* Re-scale the buffer back to normal */
-            buffer += oldest;
-
-            /* Uh oh, we overshot the flusher handle, renormalize it back to 0..Samples-1 */
-            if (buffer > This->samples)
-                buffer -= This->samples + 1;
-            assert(buffer <= This->samples);
+            LeaveCriticalSection(&filter->sample_cs);
+            return VFW_E_WRONG_STATE;
         }
 
-        if (buffer >= This->samples)
+        for (i = 0; i < filter->max_requests; ++i)
         {
-            if (buffer != This->samples)
-            {
-                FIXME("Returned: %u (%08x)\n", buffer, GetLastError());
-                hr = VFW_E_TIMEOUT;
-            }
-            else
-                hr = VFW_E_WRONG_STATE;
-            buffer = ~0;
-        }
-        else
-            --This->queued_number;
-    }
+            struct request *req = &filter->requests[i];
+            DWORD size;
 
-    if (This->flushing && buffer == ~0)
-    {
-        for (buffer = 0; buffer < This->samples; ++buffer)
-        {
-            if (This->sample_list[buffer].pSample)
+            if (req->sample && GetOverlappedResult(filter->file, &req->ovl, &size, FALSE))
             {
-                ResetEvent(This->handle_list[buffer]);
-                break;
-            }
-        }
-        if (buffer == This->samples)
-        {
-            assert(!This->queued_number);
-            hr = VFW_E_TIMEOUT;
-        }
-        else
-        {
-            --This->queued_number;
-            hr = S_OK;
-        }
-    }
-
-    if (SUCCEEDED(hr))
-    {
-        REFERENCE_TIME rtStart, rtStop;
-        DATAREQUEST *pDataRq = This->sample_list + buffer;
-        DWORD dwBytes = 0;
-
-        /* get any errors */
-        if (!This->flushing && !GetOverlappedResult(This->file, &pDataRq->ovl, &dwBytes, FALSE))
-            hr = HRESULT_FROM_WIN32(GetLastError());
-
-        /* Return the sample no matter what so it can be destroyed */
-        *ppSample = pDataRq->pSample;
-        *pdwUser = pDataRq->dwUserData;
-
-        if (This->flushing)
-            hr = VFW_E_WRONG_STATE;
-
-        if (FAILED(hr))
-            dwBytes = 0;
+                REFERENCE_TIME start, end;
 
-        /* Set the time on the sample */
-        IMediaSample_SetActualDataLength(pDataRq->pSample, dwBytes);
+                IMediaSample_SetActualDataLength(req->sample, size);
+                start = MEDIATIME_FROM_BYTES(((ULONGLONG)req->ovl.u.s.OffsetHigh << 32) + req->ovl.u.s.Offset);
+                end = start + MEDIATIME_FROM_BYTES(size);
+                IMediaSample_SetTime(req->sample, &start, &end);
 
-        rtStart = (DWORD64)pDataRq->ovl.u.s.Offset + ((DWORD64)pDataRq->ovl.u.s.OffsetHigh << 32);
-        rtStart = MEDIATIME_FROM_BYTES(rtStart);
-        rtStop = rtStart + MEDIATIME_FROM_BYTES(dwBytes);
+                *sample = req->sample;
+                *cookie = req->cookie;
+                req->sample = NULL;
 
-        IMediaSample_SetTime(pDataRq->pSample, &rtStart, &rtStop);
-
-        This->sample_list[buffer].pSample = NULL;
-        assert(This->oldest_sample < This->samples);
-
-        if (buffer == This->oldest_sample)
-        {
-            LONG x;
-            for (x = This->oldest_sample + 1; x < This->samples; ++x)
-                if (This->sample_list[x].pSample)
-                    break;
-            if (x >= This->samples)
-                for (x = 0; x < This->oldest_sample; ++x)
-                    if (This->sample_list[x].pSample)
-                        break;
-            if (This->oldest_sample == x)
-                /* No samples found, reset to 0 */
-                x = 0;
-            This->oldest_sample = x;
+                LeaveCriticalSection(&filter->sample_cs);
+                TRACE("Returning sample %u.\n", i);
+                return S_OK;
+            }
         }
-    }
-    LeaveCriticalSection(&This->sample_cs);
+    } while (SleepConditionVariableCS(&filter->sample_cv, &filter->sample_cs, timeout));
 
-    TRACE("-- %x\n", hr);
-    return hr;
+    LeaveCriticalSection(&filter->sample_cs);
+    return VFW_E_TIMEOUT;
 }
 
 static BOOL sync_read(HANDLE file, LONGLONG offset, LONG length, BYTE *buffer, DWORD *read_len)
@@ -971,7 +894,7 @@ static BOOL sync_read(HANDLE file, LONGLONG offset, LONG length, BYTE *buffer, D
     OVERLAPPED ovl = {0};
     BOOL ret;
 
-    ovl.hEvent = CreateEventW(NULL, TRUE, FALSE, NULL);
+    ovl.hEvent = (HANDLE)((ULONG_PTR)CreateEventW(NULL, TRUE, FALSE, NULL) | 1);
     ovl.u.s.Offset = (DWORD)offset;
     ovl.u.s.OffsetHigh = offset >> 32;
 
@@ -1062,14 +985,17 @@ static HRESULT WINAPI FileAsyncReader_Length(IAsyncReader *iface, LONGLONG *tota
 static HRESULT WINAPI FileAsyncReader_BeginFlush(IAsyncReader * iface)
 {
     AsyncReader *filter = impl_from_IAsyncReader(iface);
+    unsigned int i;
 
     TRACE("iface %p.\n", iface);
 
     EnterCriticalSection(&filter->sample_cs);
 
     filter->flushing = TRUE;
+    for (i = 0; i < filter->max_requests; ++i)
+        filter->requests[i].sample = NULL;
     CancelIoEx(filter->file, NULL);
-    SetEvent(filter->handle_list[filter->samples]);
+    WakeAllConditionVariable(&filter->sample_cv);
 
     LeaveCriticalSection(&filter->sample_cs);
 
@@ -1079,16 +1005,12 @@ static HRESULT WINAPI FileAsyncReader_BeginFlush(IAsyncReader * iface)
 static HRESULT WINAPI FileAsyncReader_EndFlush(IAsyncReader * iface)
 {
     AsyncReader *filter = impl_from_IAsyncReader(iface);
-    int x;
 
     TRACE("iface %p.\n", iface);
 
     EnterCriticalSection(&filter->sample_cs);
 
-    ResetEvent(filter->handle_list[filter->samples]);
     filter->flushing = FALSE;
-    for (x = 0; x < filter->samples; ++x)
-        assert(!filter->sample_list[x].pSample);
 
     LeaveCriticalSection(&filter->sample_cs);
 
diff --git a/dlls/quartz/tests/filesource.c b/dlls/quartz/tests/filesource.c
index 918605da6d4..df0db1b3b53 100644
--- a/dlls/quartz/tests/filesource.c
+++ b/dlls/quartz/tests/filesource.c
@@ -916,12 +916,23 @@ static void test_request(IAsyncReader *reader, IMemAllocator *allocator)
     IMediaSample_Release(sample2);
 }
 
+static DWORD CALLBACK wait_thread(void *arg)
+{
+    IAsyncReader *reader = arg;
+    IMediaSample *sample;
+    DWORD_PTR cookie;
+    HRESULT hr = IAsyncReader_WaitForNext(reader, 2000, &sample, &cookie);
+    ok(hr == VFW_E_WRONG_STATE, "Got hr %#x.\n", hr);
+    return 0;
+}
+
 static void test_flush(IAsyncReader *reader, IMemAllocator *allocator)
 {
     REFERENCE_TIME start_time, end_time;
     IMediaSample *sample, *ret_sample;
     BYTE buffer[20], *data;
     DWORD_PTR cookie;
+    HANDLE thread;
     HRESULT hr;
     int i;
 
@@ -973,12 +984,23 @@ static void test_flush(IAsyncReader *reader, IMemAllocator *allocator)
     for (i = 0; i < 512; i++)
         ok(data[i] == i % 111, "Got wrong byte %02x at %u.\n", data[i], i);
 
+    thread = CreateThread(NULL, 0, wait_thread, reader, 0, NULL);
+    ok(WaitForSingleObject(thread, 100) == WAIT_TIMEOUT, "Expected timeout.\n");
+
+    hr = IAsyncReader_BeginFlush(reader);
+    ok(hr == S_OK, "Got hr %#x.\n", hr);
+    ok(!WaitForSingleObject(thread, 1000), "Wait timed out.\n");
+    CloseHandle(thread);
+
+    hr = IAsyncReader_EndFlush(reader);
+    ok(hr == S_OK, "Got hr %#x.\n", hr);
+
     IMediaSample_Release(sample);
 }
 
 static void test_async_reader(void)
 {
-    ALLOCATOR_PROPERTIES req_props = {2, 1024, 512, 0}, ret_props;
+    ALLOCATOR_PROPERTIES req_props = {100, 1024, 512, 0}, ret_props;
     IBaseFilter *filter = create_file_source();
     IFileSourceFilter *filesource;
     LONGLONG length, available;
@@ -1048,7 +1070,7 @@ static void test_async_reader(void)
     ret_props = req_props;
     hr = IAsyncReader_RequestAllocator(reader, NULL, &ret_props, &allocator);
     ok(hr == S_OK, "Got hr %#x.\n", hr);
-    ok(ret_props.cBuffers == 2, "Got %d buffers.\n", ret_props.cBuffers);
+    ok(ret_props.cBuffers == 100, "Got %d buffers.\n", ret_props.cBuffers);
     ok(ret_props.cbBuffer == 1024, "Got size %d.\n", ret_props.cbBuffer);
     ok(ret_props.cbAlign == 512, "Got alignment %d.\n", ret_props.cbAlign);
     ok(ret_props.cbPrefix == 0, "Got prefix %d.\n", ret_props.cbPrefix);
-- 
2.25.0




More information about the wine-devel mailing list