[PATCH 1/5] quartz/filesource: Use a completion port to handle asynchronous requests.
Zebediah Figura
z.figura12 at gmail.com
Wed Jan 29 19:56:59 CST 2020
Thus simplifying the code and allowing us to handle more than 63 at once.
Signed-off-by: Zebediah Figura <z.figura12 at gmail.com>
---
dlls/quartz/filesource.c | 376 +++++++++++++--------------------
dlls/quartz/tests/filesource.c | 26 ++-
2 files changed, 173 insertions(+), 229 deletions(-)
diff --git a/dlls/quartz/filesource.c b/dlls/quartz/filesource.c
index 8b7a0fde925..7ea67ccc12f 100644
--- a/dlls/quartz/filesource.c
+++ b/dlls/quartz/filesource.c
@@ -46,12 +46,12 @@ static const AM_MEDIA_TYPE default_mt =
NULL
};
-typedef struct DATAREQUEST
+struct request
{
- IMediaSample *pSample;
- DWORD_PTR dwUserData;
+ IMediaSample *sample;
+ DWORD_PTR cookie;
OVERLAPPED ovl;
-} DATAREQUEST;
+};
typedef struct AsyncReader
{
@@ -64,15 +64,12 @@ typedef struct AsyncReader
LPOLESTR pszFileName;
AM_MEDIA_TYPE *pmt;
ALLOCATOR_PROPERTIES allocProps;
- HANDLE file;
- BOOL flushing;
- unsigned int queued_number;
- unsigned int samples;
- unsigned int oldest_sample;
+ HANDLE file, port, io_thread;
CRITICAL_SECTION sample_cs;
- DATAREQUEST *sample_list;
- /* Have a handle for every sample, and then one more as flushing handle */
- HANDLE *handle_list;
+ BOOL flushing;
+ struct request *requests;
+ unsigned int max_requests;
+ CONDITION_VARIABLE sample_cv;
} AsyncReader;
static const struct strmbase_source_ops source_ops;
@@ -352,18 +349,23 @@ static void async_reader_destroy(struct strmbase_filter *iface)
IPin_Disconnect(&filter->source.pin.IPin_iface);
- CoTaskMemFree(filter->sample_list);
- if (filter->handle_list)
+ if (filter->requests)
{
- for (i = 0; i <= filter->samples; ++i)
- CloseHandle(filter->handle_list[i]);
- CoTaskMemFree(filter->handle_list);
+ for (i = 0; i < filter->max_requests; ++i)
+ CloseHandle(filter->requests[i].ovl.hEvent);
+ CoTaskMemFree(filter->requests);
}
CloseHandle(filter->file);
filter->sample_cs.DebugInfo->Spare[0] = 0;
DeleteCriticalSection(&filter->sample_cs);
strmbase_source_cleanup(&filter->source);
}
+
+ PostQueuedCompletionStatus(filter->port, 0, 1, NULL);
+ WaitForSingleObject(filter->io_thread, INFINITE);
+ CloseHandle(filter->io_thread);
+ CloseHandle(filter->port);
+
CoTaskMemFree(filter->pszFileName);
if (filter->pmt)
DeleteMediaType(filter->pmt);
@@ -392,6 +394,42 @@ static const struct strmbase_filter_ops filter_ops =
.filter_query_interface = async_reader_query_interface,
};
+static DWORD CALLBACK io_thread(void *arg)
+{
+ AsyncReader *filter = arg;
+ struct request *req;
+ OVERLAPPED *ovl;
+ ULONG_PTR key;
+ DWORD size;
+ BOOL ret;
+
+ for (;;)
+ {
+ ret = GetQueuedCompletionStatus(filter->port, &size, &key, &ovl, INFINITE);
+
+ if (ret && key)
+ break;
+
+ EnterCriticalSection(&filter->sample_cs);
+
+ req = CONTAINING_RECORD(ovl, struct request, ovl);
+ TRACE("Got sample %u.\n", req - filter->requests);
+ assert(req >= filter->requests && req < filter->requests + filter->max_requests);
+
+ if (ret)
+ WakeConditionVariable(&filter->sample_cv);
+ else
+ {
+ ERR("GetQueuedCompletionStatus() returned failure, error %u.\n", GetLastError());
+ req->sample = NULL;
+ }
+
+ LeaveCriticalSection(&filter->sample_cs);
+ }
+
+ return 0;
+}
+
HRESULT AsyncReader_create(IUnknown *outer, void **out)
{
AsyncReader *pAsyncRead;
@@ -410,6 +448,12 @@ HRESULT AsyncReader_create(IUnknown *outer, void **out)
pAsyncRead->pszFileName = NULL;
pAsyncRead->pmt = NULL;
+ InitializeCriticalSection(&pAsyncRead->sample_cs);
+ pAsyncRead->sample_cs.DebugInfo->Spare[0] = (DWORD_PTR)(__FILE__ ": FileAsyncReader.sample_cs");
+ InitializeConditionVariable(&pAsyncRead->sample_cv);
+ pAsyncRead->port = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
+ pAsyncRead->io_thread = CreateThread(NULL, 0, io_thread, pAsyncRead, 0, NULL);
+
*out = &pAsyncRead->filter.IUnknown_inner;
TRACE("-- created at %p\n", pAsyncRead);
@@ -470,11 +514,7 @@ static HRESULT WINAPI FileSource_Load(IFileSourceFilter * iface, LPCOLESTR pszFi
This->file = hFile;
This->flushing = FALSE;
- This->sample_list = NULL;
- This->handle_list = NULL;
- This->queued_number = 0;
- InitializeCriticalSection(&This->sample_cs);
- This->sample_cs.DebugInfo->Spare[0] = (DWORD_PTR)(__FILE__ ": FileAsyncReader.sample_cs");
+ This->requests = NULL;
if (This->pmt)
DeleteMediaType(This->pmt);
@@ -705,42 +745,33 @@ static HRESULT WINAPI FileAsyncReader_RequestAllocator(IAsyncReader * iface, IMe
done:
if (SUCCEEDED(hr))
{
- CoTaskMemFree(This->sample_list);
- if (This->handle_list)
+ if (This->requests)
{
- int x;
- for (x = 0; x <= This->samples; ++x)
- CloseHandle(This->handle_list[x]);
- CoTaskMemFree(This->handle_list);
+ unsigned int i;
+
+ for (i = 0; i < This->max_requests; ++i)
+ CloseHandle(This->requests[i].ovl.hEvent);
+ CoTaskMemFree(This->requests);
}
- This->samples = pProps->cBuffers;
- This->oldest_sample = 0;
- TRACE("Samples: %u\n", This->samples);
- This->sample_list = CoTaskMemAlloc(sizeof(This->sample_list[0]) * pProps->cBuffers);
- This->handle_list = CoTaskMemAlloc(sizeof(HANDLE) * pProps->cBuffers * 2);
+ This->max_requests = pProps->cBuffers;
+ TRACE("Maximum request count: %u.\n", This->max_requests);
+ This->requests = CoTaskMemAlloc(sizeof(This->requests[0]) * pProps->cBuffers);
- if (This->sample_list && This->handle_list)
+ if (This->requests)
{
int x;
- ZeroMemory(This->sample_list, sizeof(This->sample_list[0]) * pProps->cBuffers);
- for (x = 0; x < This->samples; ++x)
- {
- This->sample_list[x].ovl.hEvent = This->handle_list[x] = CreateEventW(NULL, 0, 0, NULL);
- if (x + 1 < This->samples)
- This->handle_list[This->samples + 1 + x] = This->handle_list[x];
- }
- This->handle_list[This->samples] = CreateEventW(NULL, 1, 0, NULL);
+ ZeroMemory(This->requests, sizeof(This->requests[0]) * pProps->cBuffers);
+ for (x = 0; x < This->max_requests; ++x)
+ This->requests[x].ovl.hEvent = CreateEventW(NULL, 0, 0, NULL);
This->allocProps = *pProps;
}
else
{
hr = E_OUTOFMEMORY;
- CoTaskMemFree(This->sample_list);
- CoTaskMemFree(This->handle_list);
- This->samples = 0;
- This->sample_list = NULL;
- This->handle_list = NULL;
+ CoTaskMemFree(This->requests);
+ This->max_requests = 0;
+ This->requests = NULL;
}
}
@@ -755,215 +786,107 @@ done:
return hr;
}
-/* we could improve the Request/WaitForNext mechanism by allowing out of order samples.
- * however, this would be quite complicated to do and may be a bit error prone */
-static HRESULT WINAPI FileAsyncReader_Request(IAsyncReader * iface, IMediaSample * pSample, DWORD_PTR dwUser)
+static HRESULT WINAPI FileAsyncReader_Request(IAsyncReader *iface, IMediaSample *sample, DWORD_PTR cookie)
{
- AsyncReader *This = impl_from_IAsyncReader(iface);
- HRESULT hr = S_OK;
- REFERENCE_TIME Start;
- REFERENCE_TIME Stop;
- LPBYTE pBuffer = NULL;
+ AsyncReader *filter = impl_from_IAsyncReader(iface);
+ REFERENCE_TIME start, end;
+ struct request *req;
+ unsigned int i;
+ HRESULT hr;
+ BYTE *data;
- TRACE("%p->(%p, %lx)\n", This, pSample, dwUser);
+ TRACE("filter %p, sample %p, cookie %#lx.\n", filter, sample, cookie);
- if (!pSample)
+ if (!sample)
return E_POINTER;
- /* get start and stop positions in bytes */
- if (SUCCEEDED(hr))
- hr = IMediaSample_GetTime(pSample, &Start, &Stop);
+ if (FAILED(hr = IMediaSample_GetTime(sample, &start, &end)))
+ return hr;
- if (SUCCEEDED(hr))
- hr = IMediaSample_GetPointer(pSample, &pBuffer);
+ if (FAILED(hr = IMediaSample_GetPointer(sample, &data)))
+ return hr;
- EnterCriticalSection(&This->sample_cs);
- if (This->flushing)
+ EnterCriticalSection(&filter->sample_cs);
+ if (filter->flushing)
{
- LeaveCriticalSection(&This->sample_cs);
+ LeaveCriticalSection(&filter->sample_cs);
return VFW_E_WRONG_STATE;
}
- if (SUCCEEDED(hr))
+ for (i = 0; i < filter->max_requests; ++i)
{
- DWORD dwLength = (DWORD) BYTES_FROM_MEDIATIME(Stop - Start);
- DATAREQUEST *pDataRq;
- int x;
-
- /* Try to insert above the waiting sample if possible */
- for (x = This->oldest_sample; x < This->samples; ++x)
- {
- if (!This->sample_list[x].pSample)
- break;
- }
-
- if (x >= This->samples)
- for (x = 0; x < This->oldest_sample; ++x)
- {
- if (!This->sample_list[x].pSample)
- break;
- }
-
- /* There must be a sample we have found */
- assert(x < This->samples);
- ++This->queued_number;
-
- pDataRq = This->sample_list + x;
-
- pDataRq->ovl.u.s.Offset = (DWORD) BYTES_FROM_MEDIATIME(Start);
- pDataRq->ovl.u.s.OffsetHigh = (DWORD)(BYTES_FROM_MEDIATIME(Start) >> (sizeof(DWORD) * 8));
- pDataRq->dwUserData = dwUser;
-
- /* we violate traditional COM rules here by maintaining
- * a reference to the sample, but not calling AddRef, but
- * that's what MSDN says to do */
- pDataRq->pSample = pSample;
+ if (!filter->requests[i].sample)
+ break;
+ }
+ assert(i < filter->max_requests);
+ req = &filter->requests[i];
- /* this is definitely not how it is implemented on Win9x
- * as they do not support async reads on files, but it is
- * sooo much easier to use this than messing around with threads!
- */
- if (!ReadFile(This->file, pBuffer, dwLength, NULL, &pDataRq->ovl))
- hr = HRESULT_FROM_WIN32(GetLastError());
+ req->ovl.u.s.Offset = BYTES_FROM_MEDIATIME(start);
+ req->ovl.u.s.OffsetHigh = BYTES_FROM_MEDIATIME(start) >> 32;
+ /* No reference is taken. */
- /* ERROR_IO_PENDING is not actually an error since this is what we want! */
- if (hr == HRESULT_FROM_WIN32(ERROR_IO_PENDING))
- hr = S_OK;
+ if (ReadFile(filter->file, data, BYTES_FROM_MEDIATIME(end - start), NULL, &req->ovl)
+ || GetLastError() == ERROR_IO_PENDING)
+ {
+ hr = S_OK;
+ req->sample = sample;
+ req->cookie = cookie;
}
+ else
+ hr = HRESULT_FROM_WIN32(GetLastError());
- LeaveCriticalSection(&This->sample_cs);
-
- TRACE("-- %x\n", hr);
+ LeaveCriticalSection(&filter->sample_cs);
return hr;
}
-static HRESULT WINAPI FileAsyncReader_WaitForNext(IAsyncReader * iface, DWORD dwTimeout, IMediaSample ** ppSample, DWORD_PTR * pdwUser)
+static HRESULT WINAPI FileAsyncReader_WaitForNext(IAsyncReader *iface,
+ DWORD timeout, IMediaSample **sample, DWORD_PTR *cookie)
{
- AsyncReader *This = impl_from_IAsyncReader(iface);
- HRESULT hr = S_OK;
- DWORD buffer = ~0;
-
- TRACE("%p->(%u, %p, %p)\n", This, dwTimeout, ppSample, pdwUser);
-
- *ppSample = NULL;
- *pdwUser = 0;
+ AsyncReader *filter = impl_from_IAsyncReader(iface);
+ unsigned int i;
- EnterCriticalSection(&This->sample_cs);
- if (!This->flushing)
- {
- LONG oldest = This->oldest_sample;
+ TRACE("filter %p, timeout %u, sample %p, cookie %p.\n", filter, timeout, sample, cookie);
- if (!This->queued_number)
- {
- /* It could be that nothing is queued right now, but that can be fixed */
- WARN("Called without samples in queue and not flushing!!\n");
- }
- LeaveCriticalSection(&This->sample_cs);
+ *sample = NULL;
+ *cookie = 0;
- /* wait for an object to read, or time out */
- buffer = WaitForMultipleObjectsEx(This->samples+1, This->handle_list + oldest, FALSE, dwTimeout, TRUE);
+ EnterCriticalSection(&filter->sample_cs);
- EnterCriticalSection(&This->sample_cs);
- if (buffer <= This->samples)
+ do
+ {
+ if (filter->flushing)
{
- /* Re-scale the buffer back to normal */
- buffer += oldest;
-
- /* Uh oh, we overshot the flusher handle, renormalize it back to 0..Samples-1 */
- if (buffer > This->samples)
- buffer -= This->samples + 1;
- assert(buffer <= This->samples);
+ LeaveCriticalSection(&filter->sample_cs);
+ return VFW_E_WRONG_STATE;
}
- if (buffer >= This->samples)
+ for (i = 0; i < filter->max_requests; ++i)
{
- if (buffer != This->samples)
- {
- FIXME("Returned: %u (%08x)\n", buffer, GetLastError());
- hr = VFW_E_TIMEOUT;
- }
- else
- hr = VFW_E_WRONG_STATE;
- buffer = ~0;
- }
- else
- --This->queued_number;
- }
+ struct request *req = &filter->requests[i];
+ DWORD size;
- if (This->flushing && buffer == ~0)
- {
- for (buffer = 0; buffer < This->samples; ++buffer)
- {
- if (This->sample_list[buffer].pSample)
+ if (req->sample && GetOverlappedResult(filter->file, &req->ovl, &size, FALSE))
{
- ResetEvent(This->handle_list[buffer]);
- break;
- }
- }
- if (buffer == This->samples)
- {
- assert(!This->queued_number);
- hr = VFW_E_TIMEOUT;
- }
- else
- {
- --This->queued_number;
- hr = S_OK;
- }
- }
-
- if (SUCCEEDED(hr))
- {
- REFERENCE_TIME rtStart, rtStop;
- DATAREQUEST *pDataRq = This->sample_list + buffer;
- DWORD dwBytes = 0;
-
- /* get any errors */
- if (!This->flushing && !GetOverlappedResult(This->file, &pDataRq->ovl, &dwBytes, FALSE))
- hr = HRESULT_FROM_WIN32(GetLastError());
-
- /* Return the sample no matter what so it can be destroyed */
- *ppSample = pDataRq->pSample;
- *pdwUser = pDataRq->dwUserData;
-
- if (This->flushing)
- hr = VFW_E_WRONG_STATE;
-
- if (FAILED(hr))
- dwBytes = 0;
+ REFERENCE_TIME start, end;
- /* Set the time on the sample */
- IMediaSample_SetActualDataLength(pDataRq->pSample, dwBytes);
+ IMediaSample_SetActualDataLength(req->sample, size);
+ start = MEDIATIME_FROM_BYTES(((ULONGLONG)req->ovl.u.s.OffsetHigh << 32) + req->ovl.u.s.Offset);
+ end = start + MEDIATIME_FROM_BYTES(size);
+ IMediaSample_SetTime(req->sample, &start, &end);
- rtStart = (DWORD64)pDataRq->ovl.u.s.Offset + ((DWORD64)pDataRq->ovl.u.s.OffsetHigh << 32);
- rtStart = MEDIATIME_FROM_BYTES(rtStart);
- rtStop = rtStart + MEDIATIME_FROM_BYTES(dwBytes);
+ *sample = req->sample;
+ *cookie = req->cookie;
+ req->sample = NULL;
- IMediaSample_SetTime(pDataRq->pSample, &rtStart, &rtStop);
-
- This->sample_list[buffer].pSample = NULL;
- assert(This->oldest_sample < This->samples);
-
- if (buffer == This->oldest_sample)
- {
- LONG x;
- for (x = This->oldest_sample + 1; x < This->samples; ++x)
- if (This->sample_list[x].pSample)
- break;
- if (x >= This->samples)
- for (x = 0; x < This->oldest_sample; ++x)
- if (This->sample_list[x].pSample)
- break;
- if (This->oldest_sample == x)
- /* No samples found, reset to 0 */
- x = 0;
- This->oldest_sample = x;
+ LeaveCriticalSection(&filter->sample_cs);
+ TRACE("Returning sample %u.\n", i);
+ return S_OK;
+ }
}
- }
- LeaveCriticalSection(&This->sample_cs);
+ } while (SleepConditionVariableCS(&filter->sample_cv, &filter->sample_cs, timeout));
- TRACE("-- %x\n", hr);
- return hr;
+ LeaveCriticalSection(&filter->sample_cs);
+ return VFW_E_TIMEOUT;
}
static BOOL sync_read(HANDLE file, LONGLONG offset, LONG length, BYTE *buffer, DWORD *read_len)
@@ -971,7 +894,7 @@ static BOOL sync_read(HANDLE file, LONGLONG offset, LONG length, BYTE *buffer, D
OVERLAPPED ovl = {0};
BOOL ret;
- ovl.hEvent = CreateEventW(NULL, TRUE, FALSE, NULL);
+ ovl.hEvent = (HANDLE)((ULONG_PTR)CreateEventW(NULL, TRUE, FALSE, NULL) | 1);
ovl.u.s.Offset = (DWORD)offset;
ovl.u.s.OffsetHigh = offset >> 32;
@@ -1062,14 +985,17 @@ static HRESULT WINAPI FileAsyncReader_Length(IAsyncReader *iface, LONGLONG *tota
static HRESULT WINAPI FileAsyncReader_BeginFlush(IAsyncReader * iface)
{
AsyncReader *filter = impl_from_IAsyncReader(iface);
+ unsigned int i;
TRACE("iface %p.\n", iface);
EnterCriticalSection(&filter->sample_cs);
filter->flushing = TRUE;
+ for (i = 0; i < filter->max_requests; ++i)
+ filter->requests[i].sample = NULL;
CancelIoEx(filter->file, NULL);
- SetEvent(filter->handle_list[filter->samples]);
+ WakeAllConditionVariable(&filter->sample_cv);
LeaveCriticalSection(&filter->sample_cs);
@@ -1079,16 +1005,12 @@ static HRESULT WINAPI FileAsyncReader_BeginFlush(IAsyncReader * iface)
static HRESULT WINAPI FileAsyncReader_EndFlush(IAsyncReader * iface)
{
AsyncReader *filter = impl_from_IAsyncReader(iface);
- int x;
TRACE("iface %p.\n", iface);
EnterCriticalSection(&filter->sample_cs);
- ResetEvent(filter->handle_list[filter->samples]);
filter->flushing = FALSE;
- for (x = 0; x < filter->samples; ++x)
- assert(!filter->sample_list[x].pSample);
LeaveCriticalSection(&filter->sample_cs);
diff --git a/dlls/quartz/tests/filesource.c b/dlls/quartz/tests/filesource.c
index 918605da6d4..df0db1b3b53 100644
--- a/dlls/quartz/tests/filesource.c
+++ b/dlls/quartz/tests/filesource.c
@@ -916,12 +916,23 @@ static void test_request(IAsyncReader *reader, IMemAllocator *allocator)
IMediaSample_Release(sample2);
}
+static DWORD CALLBACK wait_thread(void *arg)
+{
+ IAsyncReader *reader = arg;
+ IMediaSample *sample;
+ DWORD_PTR cookie;
+ HRESULT hr = IAsyncReader_WaitForNext(reader, 2000, &sample, &cookie);
+ ok(hr == VFW_E_WRONG_STATE, "Got hr %#x.\n", hr);
+ return 0;
+}
+
static void test_flush(IAsyncReader *reader, IMemAllocator *allocator)
{
REFERENCE_TIME start_time, end_time;
IMediaSample *sample, *ret_sample;
BYTE buffer[20], *data;
DWORD_PTR cookie;
+ HANDLE thread;
HRESULT hr;
int i;
@@ -973,12 +984,23 @@ static void test_flush(IAsyncReader *reader, IMemAllocator *allocator)
for (i = 0; i < 512; i++)
ok(data[i] == i % 111, "Got wrong byte %02x at %u.\n", data[i], i);
+ thread = CreateThread(NULL, 0, wait_thread, reader, 0, NULL);
+ ok(WaitForSingleObject(thread, 100) == WAIT_TIMEOUT, "Expected timeout.\n");
+
+ hr = IAsyncReader_BeginFlush(reader);
+ ok(hr == S_OK, "Got hr %#x.\n", hr);
+ ok(!WaitForSingleObject(thread, 1000), "Wait timed out.\n");
+ CloseHandle(thread);
+
+ hr = IAsyncReader_EndFlush(reader);
+ ok(hr == S_OK, "Got hr %#x.\n", hr);
+
IMediaSample_Release(sample);
}
static void test_async_reader(void)
{
- ALLOCATOR_PROPERTIES req_props = {2, 1024, 512, 0}, ret_props;
+ ALLOCATOR_PROPERTIES req_props = {100, 1024, 512, 0}, ret_props;
IBaseFilter *filter = create_file_source();
IFileSourceFilter *filesource;
LONGLONG length, available;
@@ -1048,7 +1070,7 @@ static void test_async_reader(void)
ret_props = req_props;
hr = IAsyncReader_RequestAllocator(reader, NULL, &ret_props, &allocator);
ok(hr == S_OK, "Got hr %#x.\n", hr);
- ok(ret_props.cBuffers == 2, "Got %d buffers.\n", ret_props.cBuffers);
+ ok(ret_props.cBuffers == 100, "Got %d buffers.\n", ret_props.cBuffers);
ok(ret_props.cbBuffer == 1024, "Got size %d.\n", ret_props.cbBuffer);
ok(ret_props.cbAlign == 512, "Got alignment %d.\n", ret_props.cbAlign);
ok(ret_props.cbPrefix == 0, "Got prefix %d.\n", ret_props.cbPrefix);
--
2.25.0
More information about the wine-devel
mailing list