From 728202a2355a24840f8fa02f6c59968a2f04caab Mon Sep 17 00:00:00 2001 From: Maarten Lankhorst Date: Fri, 18 Apr 2008 19:30:25 -0700 Subject: [PATCH] quartz: Optimize the processing thread to fetch one sample while playing another --- dlls/quartz/avisplit.c | 17 ++- dlls/quartz/mpegsplit.c | 4 +- dlls/quartz/parser.c | 2 +- dlls/quartz/parser.h | 2 +- dlls/quartz/pin.c | 361 +++++++++++++++++++++++++++++----------------- dlls/quartz/pin.h | 44 +++++-- dlls/quartz/waveparser.c | 2 +- 7 files changed, 275 insertions(+), 157 deletions(-) diff --git a/dlls/quartz/avisplit.c b/dlls/quartz/avisplit.c index a619dcf..10b6c43 100644 --- a/dlls/quartz/avisplit.c +++ b/dlls/quartz/avisplit.c @@ -100,7 +100,7 @@ static HRESULT AVISplitter_NextChunk(LONGLONG * pllCurrentChunkOffset, RIFFCHUNK return S_OK; } -static HRESULT AVISplitter_Sample(LPVOID iface, IMediaSample * pSample) +static HRESULT AVISplitter_Sample(LPVOID iface, IMediaSample * pSample, DWORD_PTR cookie) { AVISplitterImpl *This = (AVISplitterImpl *)iface; LPBYTE pbSrcStream = NULL; @@ -438,6 +438,7 @@ static HRESULT AVISplitter_ProcessOldIndex(AVISplitterImpl *This) IAsyncReader_SyncRead(pin->pReader, offset, sizeof(DWORD), (BYTE *)&temp); relative = (chunkid != temp); + TRACE("dwChunkId: %.4s\n", (char *)&chunkid); if (chunkid == mmioFOURCC('7','F','x','x') && ((char *)&temp)[0] == 'i' && ((char *)&temp)[1] == 'x') relative = FALSE; @@ -459,14 +460,16 @@ static HRESULT AVISplitter_ProcessOldIndex(AVISplitterImpl *This) debugstr_an((char *)&temp2, 4), (DWORD)((mov_pos + offset) >> 32), (DWORD)(mov_pos + offset)); relative = -1; } + else + TRACE("Scanned dwChunkId: %s\n", debugstr_an((char *)&temp2, 4)); } + else if (!relative) + TRACE("Scanned dwChunkId: %s\n", debugstr_an((char *)&temp, 4)); + TRACE("dwFlags: %08x\n", pAviOldIndex->aIndex[x].dwFlags); + TRACE("dwOffset (%s): %08x\n", relative ? "relative" : "absolute", offset); + TRACE("dwSize: %08x\n", pAviOldIndex->aIndex[x].dwSize); } - - TRACE("Scanned dwChunkId: %s\n", debugstr_an((char *)&temp, 4)); - TRACE("dwChunkId: %.4s\n", (char *)&chunkid); - TRACE("dwFlags: %08x\n", pAviOldIndex->aIndex[x].dwFlags); - TRACE("dwOffset (%s): %08x\n", relative ? "relative" : "absolute", offset); - TRACE("dwSize: %08x\n", pAviOldIndex->aIndex[x].dwSize); + else break; } if (relative == -1) diff --git a/dlls/quartz/mpegsplit.c b/dlls/quartz/mpegsplit.c index 9af8973..3c33075 100644 --- a/dlls/quartz/mpegsplit.c +++ b/dlls/quartz/mpegsplit.c @@ -116,7 +116,6 @@ static const DWORD tabsel_123[2][3][16] = { {0,8,16,24,32,40,48,56,64,80,96,112,128,144,160,} } }; - static HRESULT parse_header(BYTE *header, LONGLONG *plen, LONGLONG *pduration) { LONGLONG duration = *pduration; @@ -355,7 +354,7 @@ out_append: } -static HRESULT MPEGSplitter_process_sample(LPVOID iface, IMediaSample * pSample) +static HRESULT MPEGSplitter_process_sample(LPVOID iface, IMediaSample * pSample, DWORD_PTR cookie) { MPEGSplitterImpl *This = (MPEGSplitterImpl*)iface; BYTE *pbSrcStream; @@ -413,6 +412,7 @@ static HRESULT MPEGSplitter_process_sample(LPVOID iface, IMediaSample * pSample) goto fail; IMediaSample_SetSyncPoint(This->pCurrentSample, TRUE); IMediaSample_SetDiscontinuity(This->pCurrentSample, This->seek); + IMediaSample_SetPreroll(This->pCurrentSample, (This->seek && This->position > 0)); This->seek = FALSE; } hr = FillBuffer(This, &pbSrcStream, &cbSrcStream, This->pCurrentSample); diff --git a/dlls/quartz/parser.c b/dlls/quartz/parser.c index a0e14f1..28ce90b 100644 --- a/dlls/quartz/parser.c +++ b/dlls/quartz/parser.c @@ -91,7 +91,7 @@ HRESULT Parser_Create(ParserImpl* pParser, const CLSID* pClsid, PFN_PROCESS_SAMP MediaSeekingImpl_Init((IBaseFilter*)pParser, stop, current, rate, &pParser->mediaSeeking, &pParser->csFilter); pParser->mediaSeeking.lpVtbl = &Parser_Seeking_Vtbl; - hr = PullPin_Construct(&Parser_InputPin_Vtbl, &piInput, fnProcessSample, (LPVOID)pParser, fnQueryAccept, fnCleanup, &pParser->csFilter, (IPin **)&pParser->pInputPin); + hr = PullPin_Construct(&Parser_InputPin_Vtbl, &piInput, fnProcessSample, (LPVOID)pParser, fnQueryAccept, fnCleanup, NULL, &pParser->csFilter, (IPin **)&pParser->pInputPin); if (SUCCEEDED(hr)) { diff --git a/dlls/quartz/parser.h b/dlls/quartz/parser.h index 3ca342e..0c9d917 100644 --- a/dlls/quartz/parser.h +++ b/dlls/quartz/parser.h @@ -20,7 +20,7 @@ typedef struct ParserImpl ParserImpl; -typedef HRESULT (*PFN_PROCESS_SAMPLE) (LPVOID iface, IMediaSample * pSample); +typedef HRESULT (*PFN_PROCESS_SAMPLE) (LPVOID iface, IMediaSample * pSample, DWORD_PTR cookie); typedef HRESULT (*PFN_QUERY_ACCEPT) (LPVOID iface, const AM_MEDIA_TYPE * pmt); typedef HRESULT (*PFN_PRE_CONNECT) (IPin * iface, IPin * pConnectPin); typedef HRESULT (*PFN_CLEANUP) (LPVOID iface); diff --git a/dlls/quartz/pin.c b/dlls/quartz/pin.c index b641ff2..a7932e4 100644 --- a/dlls/quartz/pin.c +++ b/dlls/quartz/pin.c @@ -226,7 +226,7 @@ static HRESULT OutputPin_ConnectSpecific(IPin * iface, IPin * pReceivePin, const return hr; } -static HRESULT InputPin_Init(const IPinVtbl *InputPin_Vtbl, const PIN_INFO * pPinInfo, SAMPLEPROC pSampleProc, LPVOID pUserData, +static HRESULT InputPin_Init(const IPinVtbl *InputPin_Vtbl, const PIN_INFO * pPinInfo, SAMPLEPROC_PUSH pSampleProc, LPVOID pUserData, QUERYACCEPTPROC pQueryAccept, CLEANUPPROC pCleanUp, LPCRITICAL_SECTION pCritSec, InputPin * pPinImpl) { TRACE("\n"); @@ -284,7 +284,7 @@ static HRESULT OutputPin_Init(const IPinVtbl *OutputPin_Vtbl, const PIN_INFO * p return S_OK; } -HRESULT InputPin_Construct(const IPinVtbl *InputPin_Vtbl, const PIN_INFO * pPinInfo, SAMPLEPROC pSampleProc, LPVOID pUserData, QUERYACCEPTPROC pQueryAccept, CLEANUPPROC pCleanUp, LPCRITICAL_SECTION pCritSec, IPin ** ppPin) +HRESULT InputPin_Construct(const IPinVtbl *InputPin_Vtbl, const PIN_INFO * pPinInfo, SAMPLEPROC_PUSH pSampleProc, LPVOID pUserData, QUERYACCEPTPROC pQueryAccept, CLEANUPPROC pCleanUp, LPCRITICAL_SECTION pCritSec, IPin ** ppPin) { InputPin * pPinImpl; @@ -770,7 +770,7 @@ HRESULT WINAPI MemInputPin_Receive(IMemInputPin * iface, IMediaSample * pSample) /*TRACE("(%p/%p)->(%p)\n", This, iface, pSample);*/ EnterCriticalSection(This->pin.pCritSec); - if (!This->end_of_stream && !This->flushing && !This->end_of_stream) + if (!This->end_of_stream && !This->flushing) hr = This->fnSampleProc(This->pin.pUserData, pSample); else hr = S_FALSE; @@ -851,9 +851,9 @@ ULONG WINAPI OutputPin_Release(IPin * iface) { OutputPin *This = (OutputPin *)iface; ULONG refCount = InterlockedDecrement(&This->pin.refCount); - + TRACE("(%p)->() Release from %d\n", iface, refCount + 1); - + if (!refCount) { FreeMediaType(&This->pin.mtCurrent); @@ -1173,8 +1173,8 @@ HRESULT OutputPin_DeliverDisconnect(OutputPin * This) } -static HRESULT PullPin_Init(const IPinVtbl *PullPin_Vtbl, const PIN_INFO * pPinInfo, SAMPLEPROC pSampleProc, LPVOID pUserData, - QUERYACCEPTPROC pQueryAccept, CLEANUPPROC pCleanUp, LPCRITICAL_SECTION pCritSec, PullPin * pPinImpl) +static HRESULT PullPin_Init(const IPinVtbl *PullPin_Vtbl, const PIN_INFO * pPinInfo, SAMPLEPROC_PULL pSampleProc, LPVOID pUserData, + QUERYACCEPTPROC pQueryAccept, CLEANUPPROC pCleanUp, REQUESTPROC pCustomRequest, LPCRITICAL_SECTION pCritSec, PullPin * pPinImpl) { /* Common attributes */ pPinImpl->pin.lpVtbl = PullPin_Vtbl; @@ -1194,12 +1194,15 @@ static HRESULT PullPin_Init(const IPinVtbl *PullPin_Vtbl, const PIN_INFO * pPinI pPinImpl->pReader = NULL; pPinImpl->hThread = NULL; pPinImpl->hEventStateChanged = CreateEventW(NULL, TRUE, TRUE, NULL); + pPinImpl->thread_sleepy = CreateEventW(NULL, FALSE, FALSE, NULL); pPinImpl->rtStart = 0; pPinImpl->rtCurrent = 0; pPinImpl->rtStop = ((LONGLONG)0x7fffffff << 32) | 0xffffffff; pPinImpl->dRate = 1.0; - pPinImpl->state = State_Stopped; + pPinImpl->state = Req_Die; + pPinImpl->fnCustomRequest = pCustomRequest; + pPinImpl->stop_playback = 1; InitializeCriticalSection(&pPinImpl->thread_lock); pPinImpl->thread_lock.DebugInfo->Spare[0] = (DWORD_PTR)( __FILE__ ": PullPin.thread_lock"); @@ -1207,7 +1210,7 @@ static HRESULT PullPin_Init(const IPinVtbl *PullPin_Vtbl, const PIN_INFO * pPinI return S_OK; } -HRESULT PullPin_Construct(const IPinVtbl *PullPin_Vtbl, const PIN_INFO * pPinInfo, SAMPLEPROC pSampleProc, LPVOID pUserData, QUERYACCEPTPROC pQueryAccept, CLEANUPPROC pCleanUp, LPCRITICAL_SECTION pCritSec, IPin ** ppPin) +HRESULT PullPin_Construct(const IPinVtbl *PullPin_Vtbl, const PIN_INFO * pPinInfo, SAMPLEPROC_PULL pSampleProc, LPVOID pUserData, QUERYACCEPTPROC pQueryAccept, CLEANUPPROC pCleanUp, REQUESTPROC pCustomRequest, LPCRITICAL_SECTION pCritSec, IPin ** ppPin) { PullPin * pPinImpl; @@ -1224,7 +1227,7 @@ HRESULT PullPin_Construct(const IPinVtbl *PullPin_Vtbl, const PIN_INFO * pPinInf if (!pPinImpl) return E_OUTOFMEMORY; - if (SUCCEEDED(PullPin_Init(PullPin_Vtbl, pPinInfo, pSampleProc, pUserData, pQueryAccept, pCleanUp, pCritSec, pPinImpl))) + if (SUCCEEDED(PullPin_Init(PullPin_Vtbl, pPinInfo, pSampleProc, pUserData, pQueryAccept, pCleanUp, pCustomRequest, pCritSec, pPinImpl))) { *ppPin = (IPin *)(&pPinImpl->pin.lpVtbl); return S_OK; @@ -1333,7 +1336,7 @@ HRESULT WINAPI PullPin_QueryInterface(IPin * iface, REFIID riid, LPVOID * ppv) return E_NOINTERFACE; } -ULONG WINAPI PullPin_Release(IPin * iface) +ULONG WINAPI PullPin_Release(IPin *iface) { PullPin *This = (PullPin *)iface; ULONG refCount = InterlockedDecrement(&This->pin.refCount); @@ -1346,6 +1349,7 @@ ULONG WINAPI PullPin_Release(IPin * iface) IMemAllocator_Release(This->pAlloc); if(This->pReader) IAsyncReader_Release(This->pReader); + CloseHandle(This->thread_sleepy); CloseHandle(This->hEventStateChanged); This->thread_lock.DebugInfo->Spare[0] = 0; DeleteCriticalSection(&This->thread_lock); @@ -1355,135 +1359,195 @@ ULONG WINAPI PullPin_Release(IPin * iface) return refCount; } -static DWORD WINAPI PullPin_Thread_Main(LPVOID pv) +static HRESULT PullPin_Standard_Request(PullPin *This, BOOL start) { - CoInitializeEx(NULL, COINIT_MULTITHREADED); + REFERENCE_TIME rtSampleStart; + REFERENCE_TIME rtSampleStop; + IMediaSample *sample = NULL; + HRESULT hr; - for (;;) - SleepEx(INFINITE, TRUE); + TRACE("Requesting sample!\n"); + + if (start) + This->rtNext = This->rtCurrent; + + if (This->rtNext >= This->rtStop) + /* Last sample has already been queued, request nothing more */ + return S_OK; + + hr = IMemAllocator_GetBuffer(This->pAlloc, &sample, NULL, NULL, 0); + + if (SUCCEEDED(hr)) + { + rtSampleStart = This->rtNext; + rtSampleStop = rtSampleStart + MEDIATIME_FROM_BYTES(IMediaSample_GetSize(sample)); + if (rtSampleStop > This->rtStop) + rtSampleStop = MEDIATIME_FROM_BYTES(ALIGNUP(BYTES_FROM_MEDIATIME(This->rtStop), This->cbAlign)); + hr = IMediaSample_SetTime(sample, &rtSampleStart, &rtSampleStop); + + This->rtCurrent = This->rtNext; + This->rtNext = rtSampleStop; + } + + if (SUCCEEDED(hr)) + hr = IAsyncReader_Request(This->pReader, sample, 0); + + return hr; } -static void CALLBACK PullPin_Thread_Process(ULONG_PTR iface) +static void CALLBACK PullPin_Flush(PullPin *This) { - PullPin *This = (PullPin *)iface; - HRESULT hr; + IMediaSample *pSample; - ALLOCATOR_PROPERTIES allocProps; + EnterCriticalSection(This->pin.pCritSec); + if (This->pReader) + { + /* Flush outstanding samples */ + IAsyncReader_BeginFlush(This->pReader); + for (;;) + { + DWORD_PTR dwUser; + + IAsyncReader_WaitForNext(This->pReader, 0, &pSample, &dwUser); + + if (!pSample) + break; + if (This->fnCustomRequest) + This->fnSampleProc(This->pin.pUserData, pSample, dwUser); + + IMediaSample_Release(pSample); + } + + IAsyncReader_EndFlush(This->pReader); + } + LeaveCriticalSection(This->pin.pCritSec); +} + +static void CALLBACK PullPin_Thread_Process(PullPin *This) +{ + HRESULT hr; + BOOL rejected = FALSE; + IMediaSample * pSample = NULL; + ALLOCATOR_PROPERTIES allocProps; EnterCriticalSection(This->pin.pCritSec); SetEvent(This->hEventStateChanged); - This->state = State_Running; LeaveCriticalSection(This->pin.pCritSec); hr = IMemAllocator_GetProperties(This->pAlloc, &allocProps); + This->cbAlign = allocProps.cbAlign; + if (This->rtCurrent < This->rtStart) - This->rtCurrent = MEDIATIME_FROM_BYTES(ALIGNDOWN(BYTES_FROM_MEDIATIME(This->rtStart), allocProps.cbAlign)); + This->rtCurrent = MEDIATIME_FROM_BYTES(ALIGNDOWN(BYTES_FROM_MEDIATIME(This->rtStart), This->cbAlign)); TRACE("Start\n"); if (This->rtCurrent >= This->rtStop) { - FIXME("Send an EndOfStream?\n"); + IPin_EndOfStream((IPin *)This); + return; } - else do + + /* There is no sample in our buffer */ + if (!This->fnCustomRequest) + hr = PullPin_Standard_Request(This, TRUE); + else + hr = This->fnCustomRequest(This->pin.pUserData); + + do { - /* FIXME: to improve performance by quite a bit this should be changed - * so that one sample is processed while one sample is fetched. However, - * it is harder to debug so for the moment it will stay as it is */ - IMediaSample * pSample = NULL; - REFERENCE_TIME rtSampleStart; - REFERENCE_TIME rtSampleStop; DWORD_PTR dwUser; TRACE("Process sample\n"); - hr = IMemAllocator_GetBuffer(This->pAlloc, &pSample, NULL, NULL, 0); + hr = IAsyncReader_WaitForNext(This->pReader, 10000, &pSample, &dwUser); - if (SUCCEEDED(hr)) + if (FAILED(hr)) + ERR("Queueing error: %x\n", hr); + + if (pSample) { - rtSampleStart = This->rtCurrent; - rtSampleStop = rtSampleStart + MEDIATIME_FROM_BYTES(IMediaSample_GetSize(pSample)); - if (rtSampleStop > This->rtStop) - rtSampleStop = MEDIATIME_FROM_BYTES(ALIGNUP(BYTES_FROM_MEDIATIME(This->rtStop), allocProps.cbAlign)); - hr = IMediaSample_SetTime(pSample, &rtSampleStart, &rtSampleStop); - This->rtCurrent = rtSampleStop; + if (!This->fnCustomRequest) + hr = PullPin_Standard_Request(This, FALSE); + else + hr = This->fnCustomRequest(This->pin.pUserData); } - if (SUCCEEDED(hr)) - hr = IAsyncReader_Request(This->pReader, pSample, (ULONG_PTR)0); + /* Return an empty sample on error to the implementation in case it does custom parsing, so it knows it's gone */ + if (SUCCEEDED(hr) || This->fnCustomRequest) + { + REFERENCE_TIME rtStart, rtStop; + IMediaSample_GetTime(pSample, &rtStart, &rtStop); - if (SUCCEEDED(hr)) - hr = IAsyncReader_WaitForNext(This->pReader, 1000, &pSample, &dwUser); + do + { + hr = This->fnSampleProc(This->pin.pUserData, pSample, dwUser); - if (SUCCEEDED(hr)) - { - rtSampleStop = rtSampleStart + MEDIATIME_FROM_BYTES(IMediaSample_GetActualDataLength(pSample)); - if (rtSampleStop > This->rtStop) - rtSampleStop = MEDIATIME_FROM_BYTES(ALIGNUP(BYTES_FROM_MEDIATIME(This->rtStop), allocProps.cbAlign)); - hr = IMediaSample_SetTime(pSample, &rtSampleStart, &rtSampleStop); - } + rejected = FALSE; - if (SUCCEEDED(hr)) - hr = This->fnSampleProc(This->pin.pUserData, pSample); + if (!This->fnCustomRequest) + { + if (This->rtCurrent == rtStart) + { + rejected = TRUE; + TRACE("DENIED!\n"); + Sleep(10); + /* Maybe it's transient? */ + } + /* rtNext = rtCurrent, because the next sample is already queued */ + else if (rtStop != This->rtCurrent && rtStop < This->rtStop) + { + WARN("Position changed! rtStop: %u, rtCurrent: %u\n", (DWORD)BYTES_FROM_MEDIATIME(rtStop), (DWORD)BYTES_FROM_MEDIATIME(This->rtCurrent)); + PullPin_Flush(This); + hr = PullPin_Standard_Request(This, TRUE); + } + } + } while (rejected && (This->rtCurrent < This->rtStop && hr == S_OK && !This->stop_playback)); + } else + { + /* FIXME: This is not well handled yet! */ ERR("Processing error: %x\n", hr); + } if (pSample) + { IMediaSample_Release(pSample); + pSample = NULL; + } } while (This->rtCurrent < This->rtStop && hr == S_OK && !This->stop_playback); - EnterCriticalSection(This->pin.pCritSec); - This->state = State_Paused; - LeaveCriticalSection(This->pin.pCritSec); - TRACE("End\n"); -} - -static void CALLBACK PullPin_Thread_Flush(ULONG_PTR iface) -{ - PullPin *This = (PullPin *)iface; - IMediaSample *pSample; - - EnterCriticalSection(This->pin.pCritSec); - if (This->pReader) + /* Sample was rejected, and we are asked to terminate */ + if (pSample) { - /* Flush outstanding samples */ - IAsyncReader_BeginFlush(This->pReader); - - for (;;) - { - IMemAllocator_GetBuffer(This->pAlloc, &pSample, NULL, NULL, 0); + IMediaSample_Release(pSample); + } - if (!pSample) - break; - IMediaSample_Release(pSample); - } + /* Can't reset state to Sleepy here because that might race, instead PauseProcessing will do that for us + * Flush remaining samples + */ + PullPin_Flush(This); - IAsyncReader_EndFlush(This->pReader); - } - LeaveCriticalSection(This->pin.pCritSec); + TRACE("End: %08x, %d\n", hr, This->stop_playback); } -static void CALLBACK PullPin_Thread_Pause(ULONG_PTR iface) +static void CALLBACK PullPin_Thread_Pause(PullPin *This) { - PullPin *This = (PullPin *)iface; - - TRACE("(%p/%p)->()\n", This, (LPVOID)iface); + TRACE("(%p)->()\n", This); EnterCriticalSection(This->pin.pCritSec); { - This->state = State_Paused; + This->state = Req_Sleepy; SetEvent(This->hEventStateChanged); } LeaveCriticalSection(This->pin.pCritSec); } -static void CALLBACK PullPin_Thread_Stop(ULONG_PTR iface) +static void CALLBACK PullPin_Thread_Stop(PullPin *This) { - PullPin *This = (PullPin *)iface; - - TRACE("(%p/%p)->()\n", This, (LPVOID)iface); + TRACE("(%p)->()\n", This); EnterCriticalSection(This->pin.pCritSec); { @@ -1495,7 +1559,6 @@ static void CALLBACK PullPin_Thread_Stop(ULONG_PTR iface) ERR("Allocator decommit failed with error %x. Possible memory leak\n", hr); SetEvent(This->hEventStateChanged); - This->state = State_Stopped; } LeaveCriticalSection(This->pin.pCritSec); @@ -1505,6 +1568,28 @@ static void CALLBACK PullPin_Thread_Stop(ULONG_PTR iface) ExitThread(0); } +static DWORD WINAPI PullPin_Thread_Main(LPVOID pv) +{ + PullPin *This = pv; + CoInitializeEx(NULL, COINIT_MULTITHREADED); + + for (;;) + { + WaitForSingleObject(This->thread_sleepy, INFINITE); + + TRACE("State: %d\n", This->state); + + switch (This->state) + { + case Req_Die: PullPin_Thread_Stop(This); break; + case Req_Run: PullPin_Thread_Process(This); break; + case Req_Pause: PullPin_Thread_Pause(This); break; + case Req_Sleepy: ERR("Should not be signalled with SLEEPY!\n"); break; + default: ERR("Unknown state request: %d\n", This->state); break; + } + } +} + HRESULT PullPin_InitProcessing(PullPin * This) { HRESULT hr = S_OK; @@ -1514,32 +1599,36 @@ HRESULT PullPin_InitProcessing(PullPin * This) /* if we are connected */ if (This->pAlloc) { + DWORD dwThreadId; + WaitForSingleObject(This->hEventStateChanged, INFINITE); EnterCriticalSection(This->pin.pCritSec); - if (This->state == State_Stopped) - { - DWORD dwThreadId; - assert(!This->hThread); - /* AddRef the filter to make sure it and it's pins will be around - * as long as the thread */ - IBaseFilter_AddRef(This->pin.pinInfo.pFilter); + assert(!This->hThread); + assert(This->state == Req_Die); + assert(This->stop_playback); + assert(WaitForSingleObject(This->thread_sleepy, 0) == WAIT_TIMEOUT); + This->state = Req_Sleepy; - This->hThread = CreateThread(NULL, 0, PullPin_Thread_Main, NULL, 0, &dwThreadId); - if (!This->hThread) - { - hr = HRESULT_FROM_WIN32(GetLastError()); - IBaseFilter_Release(This->pin.pinInfo.pFilter); - } + /* AddRef the filter to make sure it and it's pins will be around + * as long as the thread */ + IBaseFilter_AddRef(This->pin.pinInfo.pFilter); - if (SUCCEEDED(hr)) - { - hr = IMemAllocator_Commit(This->pAlloc); - This->state = State_Paused; - SetEvent(This->hEventStateChanged); - } + + This->hThread = CreateThread(NULL, 0, PullPin_Thread_Main, This, 0, &dwThreadId); + if (!This->hThread) + { + hr = HRESULT_FROM_WIN32(GetLastError()); + IBaseFilter_Release(This->pin.pinInfo.pFilter); + } + + if (SUCCEEDED(hr)) + { + hr = IMemAllocator_Commit(This->pAlloc); + + SetEvent(This->hEventStateChanged); + /* If assert fails, that means a command was not processed before the thread previously terminated */ } - else assert(This->hThread); LeaveCriticalSection(This->pin.pCritSec); } @@ -1557,11 +1646,15 @@ HRESULT PullPin_StartProcessing(PullPin * This) assert(This->hThread); PullPin_WaitForStateChange(This, INFINITE); - ResetEvent(This->hEventStateChanged); - This->stop_playback = 0; - if (!QueueUserAPC(PullPin_Thread_Process, This->hThread, (ULONG_PTR)This)) - return HRESULT_FROM_WIN32(GetLastError()); + assert(This->state == Req_Sleepy); + + /* Wake up! */ + assert(WaitForSingleObject(This->thread_sleepy, 0) == WAIT_TIMEOUT); + This->state = Req_Run; + This->stop_playback = 0; + ResetEvent(This->hEventStateChanged); + SetEvent(This->thread_sleepy); } return S_OK; @@ -1576,13 +1669,18 @@ HRESULT PullPin_PauseProcessing(PullPin * This) assert(This->hThread); PullPin_WaitForStateChange(This, INFINITE); + EnterCriticalSection(This->pin.pCritSec); + assert(!This->stop_playback); + assert(This->state == Req_Run|| This->state == Req_Sleepy); + + assert(WaitForSingleObject(This->thread_sleepy, 0) == WAIT_TIMEOUT); + This->state = Req_Pause; This->stop_playback = 1; - LeaveCriticalSection(This->pin.pCritSec); ResetEvent(This->hEventStateChanged); + SetEvent(This->thread_sleepy); - if (!QueueUserAPC(PullPin_Thread_Pause, This->hThread, (ULONG_PTR)This)) - return HRESULT_FROM_WIN32(GetLastError()); + LeaveCriticalSection(This->pin.pCritSec); } return S_OK; @@ -1593,16 +1691,17 @@ HRESULT PullPin_StopProcessing(PullPin * This) TRACE("(%p)->()\n", This); /* if we are alive */ - if (This->hThread) - { - PullPin_WaitForStateChange(This, INFINITE); + assert(This->hThread); - This->stop_playback = 1; - ResetEvent(This->hEventStateChanged); + PullPin_WaitForStateChange(This, INFINITE); - if (!QueueUserAPC(PullPin_Thread_Stop, This->hThread, (ULONG_PTR)This)) - return HRESULT_FROM_WIN32(GetLastError()); - } + assert(This->state == Req_Pause || This->state == Req_Sleepy); + + This->stop_playback = 1; + This->state = Req_Die; + assert(WaitForSingleObject(This->thread_sleepy, 0) == WAIT_TIMEOUT); + ResetEvent(This->hEventStateChanged); + SetEvent(This->thread_sleepy); return S_OK; } @@ -1624,7 +1723,7 @@ HRESULT WINAPI PullPin_EndOfStream(IPin * iface) HRESULT WINAPI PullPin_BeginFlush(IPin * iface) { PullPin *This = (PullPin *)iface; - TRACE("(%p)->()\n", iface); + TRACE("(%p)->()\n", This); EnterCriticalSection(This->pin.pCritSec); { @@ -1634,19 +1733,13 @@ HRESULT WINAPI PullPin_BeginFlush(IPin * iface) EnterCriticalSection(&This->thread_lock); { - if (This->state == State_Running) - PullPin_PauseProcessing(This); - PullPin_WaitForStateChange(This, INFINITE); - /* Workaround: The file async reader only cancels io on current thread - * only vista and newer have CancelIoEx, so try the thread context if available - * Anyone has some documentation on NtCancelFileIoEx? - */ - if (This->hThread) - QueueUserAPC(PullPin_Thread_Flush, This->hThread, (ULONG_PTR)This); - else - PullPin_Thread_Flush((ULONG_PTR)This); + if (This->hThread && !This->stop_playback) + { + PullPin_PauseProcessing(This); + PullPin_WaitForStateChange(This, INFINITE); + } } LeaveCriticalSection(&This->thread_lock); @@ -1670,7 +1763,7 @@ HRESULT WINAPI PullPin_EndFlush(IPin * iface) FILTER_STATE state; IBaseFilter_GetState(This->pin.pinInfo.pFilter, INFINITE, &state); - if (state == State_Running && This->state == State_Paused) + if (This->stop_playback && state == State_Running) PullPin_StartProcessing(This); PullPin_WaitForStateChange(This, INFINITE); diff --git a/dlls/quartz/pin.h b/dlls/quartz/pin.h index 19c5246..b28b52f 100644 --- a/dlls/quartz/pin.h +++ b/dlls/quartz/pin.h @@ -20,8 +20,12 @@ /* This function will process incoming samples to the pin. * Any return value valid in IMemInputPin::Receive is allowed here + * + * Cookie is the cookie that was set when requesting the buffer, if you don't + * implement custom requesting, you can safely ignore this */ -typedef HRESULT (* SAMPLEPROC)(LPVOID userdata, IMediaSample * pSample); +typedef HRESULT (* SAMPLEPROC_PUSH)(LPVOID userdata, IMediaSample * pSample); +typedef HRESULT (* SAMPLEPROC_PULL)(LPVOID userdata, IMediaSample * pSample, DWORD_PTR cookie); /* This function will determine whether a type is supported or not. * It is allowed to return any error value (within reason), as opposed @@ -42,6 +46,17 @@ typedef HRESULT (* PRECONNECTPROC)(IPin * iface, IPin * pConnectPin); */ typedef HRESULT (* CLEANUPPROC) (LPVOID userdata); +/* This function is called whenever a request for a new sample is made, + * If you implement it (it can be NULL for default behavior), you have to + * call IMemAllocator_GetBuffer and IMemAllocator_RequestBuffer + * This is useful if you want to request more then 1 buffer at simultaneously + * If PullPin->flushed is set, it means that all buffers queued previously are gone + * + * This will also cause the Sample Proc to be called with empty buffers to indicate + * failure in retrieving the sample. + */ +typedef HRESULT (* REQUESTPROC) (LPVOID userdata); + typedef struct IPinImpl { const struct IPinVtbl * lpVtbl; @@ -62,7 +77,7 @@ typedef struct InputPin const IMemInputPinVtbl * lpVtblMemInput; IMemAllocator * pAllocator; - SAMPLEPROC fnSampleProc; + SAMPLEPROC_PUSH fnSampleProc; CLEANUPPROC fnCleanProc; REFERENCE_TIME tStart; REFERENCE_TIME tStop; @@ -85,30 +100,37 @@ typedef struct PullPin /* inheritance C style! */ IPinImpl pin; + REFERENCE_TIME rtStart, rtCurrent, rtNext, rtStop; IAsyncReader * pReader; IMemAllocator * pAlloc; - SAMPLEPROC fnSampleProc; + SAMPLEPROC_PULL fnSampleProc; PRECONNECTPROC fnPreConnect; - HANDLE hThread; - HANDLE hEventStateChanged; + REQUESTPROC fnCustomRequest; CLEANUPPROC fnCleanProc; - REFERENCE_TIME rtStart; - REFERENCE_TIME rtStop; - REFERENCE_TIME rtCurrent; double dRate; - FILTER_STATE state; BOOL stop_playback; + DWORD cbAlign; /* Any code that touches the thread must hold the thread lock, * lock order: thread_lock and then the filter critical section + * also signal thread_sleepy so the thread knows to wake up */ CRITICAL_SECTION thread_lock; + HANDLE hThread; + DWORD requested_state; + HANDLE hEventStateChanged, thread_sleepy; + DWORD state; } PullPin; +#define Req_Sleepy 0 +#define Req_Die 1 +#define Req_Run 2 +#define Req_Pause 3 + /*** Constructors ***/ -HRESULT InputPin_Construct(const IPinVtbl *InputPin_Vtbl, const PIN_INFO * pPinInfo, SAMPLEPROC pSampleProc, LPVOID pUserData, QUERYACCEPTPROC pQueryAccept, CLEANUPPROC pCleanUp, LPCRITICAL_SECTION pCritSec, IPin ** ppPin); +HRESULT InputPin_Construct(const IPinVtbl *InputPin_Vtbl, const PIN_INFO * pPinInfo, SAMPLEPROC_PUSH pSampleProc, LPVOID pUserData, QUERYACCEPTPROC pQueryAccept, CLEANUPPROC pCleanUp, LPCRITICAL_SECTION pCritSec, IPin ** ppPin); HRESULT OutputPin_Construct(const IPinVtbl *OutputPin_Vtbl, long outputpin_size, const PIN_INFO * pPinInfo, ALLOCATOR_PROPERTIES *props, LPVOID pUserData, QUERYACCEPTPROC pQueryAccept, LPCRITICAL_SECTION pCritSec, IPin ** ppPin); -HRESULT PullPin_Construct(const IPinVtbl *PullPin_Vtbl, const PIN_INFO * pPinInfo, SAMPLEPROC pSampleProc, LPVOID pUserData, QUERYACCEPTPROC pQueryAccept, CLEANUPPROC pCleanUp, LPCRITICAL_SECTION pCritSec, IPin ** ppPin); +HRESULT PullPin_Construct(const IPinVtbl *PullPin_Vtbl, const PIN_INFO * pPinInfo, SAMPLEPROC_PULL pSampleProc, LPVOID pUserData, QUERYACCEPTPROC pQueryAccept, CLEANUPPROC pCleanUp, REQUESTPROC pCustomRequest, LPCRITICAL_SECTION pCritSec, IPin ** ppPin); /**************************/ /*** Pin Implementation ***/ diff --git a/dlls/quartz/waveparser.c b/dlls/quartz/waveparser.c index 5aef9ef..6b57a5f 100644 --- a/dlls/quartz/waveparser.c +++ b/dlls/quartz/waveparser.c @@ -72,7 +72,7 @@ static LONGLONG duration_to_bytepos(WAVEParserImpl *This, LONGLONG duration) return MEDIATIME_FROM_BYTES(bytepos); } -static HRESULT WAVEParser_Sample(LPVOID iface, IMediaSample * pSample) +static HRESULT WAVEParser_Sample(LPVOID iface, IMediaSample * pSample, DWORD_PTR cookie) { WAVEParserImpl *This = (WAVEParserImpl *)iface; LPBYTE pbSrcStream = NULL; -- 1.5.4.1