[PATCH 1/5] winegstreamer: Flush connected pins directly in SetPositions().
Zebediah Figura
z.figura12 at gmail.com
Wed Jan 20 18:58:49 CST 2021
Instead of propagating GStreamer flush events to corresponding DirectShow pins.
This is mainly to avoid more callbacks from GStreamer and further separate the
Win32 and Unix code.
Signed-off-by: Zebediah Figura <z.figura12 at gmail.com>
---
dlls/winegstreamer/gstdemux.c | 130 ++++++++++++++++++++++------------
1 file changed, 86 insertions(+), 44 deletions(-)
diff --git a/dlls/winegstreamer/gstdemux.c b/dlls/winegstreamer/gstdemux.c
index 30f6fbca168..a1be51472ba 100644
--- a/dlls/winegstreamer/gstdemux.c
+++ b/dlls/winegstreamer/gstdemux.c
@@ -65,7 +65,7 @@ struct parser
/* FIXME: It would be nice to avoid duplicating these with strmbase.
* However, synchronization is tricky; we need access to be protected by a
* separate lock. */
- bool streaming;
+ bool streaming, flushing;
BOOL initial, ignore_flush;
GstElement *container;
@@ -111,8 +111,9 @@ struct parser_source
GstCaps *caps;
SourceSeeking seek;
- CONDITION_VARIABLE event_cv, event_empty_cv, flushing_cv, flush_stop_cv;
- bool flushing, thread_blocked;
+ CRITICAL_SECTION flushing_cs;
+ CONDITION_VARIABLE event_cv, event_empty_cv;
+ bool flushing;
struct parser_event event;
HANDLE thread;
};
@@ -630,8 +631,6 @@ static gboolean gst_base_src_perform_seek(struct parser *This, GstEvent *event)
tevent = gst_event_new_flush_start();
gst_event_set_seqnum(tevent, seqnum);
gst_pad_push_event(This->my_src, tevent);
- if (This->reader)
- IAsyncReader_BeginFlush(This->reader);
if (thread)
gst_pad_set_active(This->my_src, 1);
}
@@ -643,8 +642,6 @@ static gboolean gst_base_src_perform_seek(struct parser *This, GstEvent *event)
tevent = gst_event_new_flush_stop(TRUE);
gst_event_set_seqnum(tevent, seqnum);
gst_pad_push_event(This->my_src, tevent);
- if (This->reader)
- IAsyncReader_EndFlush(This->reader);
if (thread)
gst_pad_set_active(This->my_src, 1);
}
@@ -659,25 +656,18 @@ static gboolean event_src(GstPad *pad, GstObject *parent, GstEvent *event)
TRACE("filter %p, type \"%s\".\n", This, GST_EVENT_TYPE_NAME(event));
- switch (event->type) {
+ switch (event->type)
+ {
case GST_EVENT_SEEK:
ret = gst_base_src_perform_seek(This, event);
break;
+
case GST_EVENT_FLUSH_START:
- EnterCriticalSection(&This->filter.filter_cs);
- if (This->reader)
- IAsyncReader_BeginFlush(This->reader);
- LeaveCriticalSection(&This->filter.filter_cs);
- break;
case GST_EVENT_FLUSH_STOP:
- EnterCriticalSection(&This->filter.filter_cs);
- if (This->reader)
- IAsyncReader_EndFlush(This->reader);
- LeaveCriticalSection(&This->filter.filter_cs);
- break;
case GST_EVENT_QOS:
case GST_EVENT_RECONFIGURE:
break;
+
default:
WARN("Ignoring \"%s\" event.\n", GST_EVENT_TYPE_NAME(event));
ret = FALSE;
@@ -757,19 +747,11 @@ static gboolean event_sink(GstPad *pad, GstObject *parent, GstEvent *event)
}
if (pin->pin.pin.peer)
{
- IPin_BeginFlush(pin->pin.pin.peer);
-
EnterCriticalSection(&filter->cs);
pin->flushing = true;
- WakeConditionVariable(&pin->event_cv);
WakeConditionVariable(&pin->event_empty_cv);
- /* Wait for the thread to pause itself, to ensure that no stale
- * samples are sent. */
- while (!pin->thread_blocked)
- SleepConditionVariableCS(&pin->flushing_cv, &filter->cs, INFINITE);
- /* And flush out any buffered event. */
switch (pin->event.type)
{
case PARSER_EVENT_NONE:
@@ -797,9 +779,6 @@ static gboolean event_sink(GstPad *pad, GstObject *parent, GstEvent *event)
EnterCriticalSection(&filter->cs);
pin->flushing = false;
LeaveCriticalSection(&filter->cs);
- WakeConditionVariable(&pin->flush_stop_cv);
-
- IPin_EndFlush(pin->pin.pin.peer);
}
break;
@@ -1058,32 +1037,31 @@ static DWORD CALLBACK stream_thread(void *arg)
{
struct parser_event event;
+ EnterCriticalSection(&pin->flushing_cs);
EnterCriticalSection(&filter->cs);
- while (filter->streaming && !pin->flushing && pin->event.type == PARSER_EVENT_NONE)
+ while (filter->streaming && !filter->flushing && pin->event.type == PARSER_EVENT_NONE)
SleepConditionVariableCS(&pin->event_cv, &filter->cs, INFINITE);
- if (pin->flushing)
+ if (filter->flushing)
{
- TRACE("Filter is flushing; pausing thread.\n");
- pin->thread_blocked = true;
- WakeConditionVariable(&pin->flushing_cv);
- do
- SleepConditionVariableCS(&pin->flush_stop_cv, &filter->cs, INFINITE);
- while (pin->flushing);
- pin->thread_blocked = false;
- TRACE("Filter is no longer flushing; resuming thread.\n");
+ LeaveCriticalSection(&filter->cs);
+ LeaveCriticalSection(&pin->flushing_cs);
+ TRACE("Filter is flushing.\n");
+ continue;
}
if (!filter->streaming)
{
LeaveCriticalSection(&filter->cs);
+ LeaveCriticalSection(&pin->flushing_cs);
break;
}
if (!pin->event.type)
{
LeaveCriticalSection(&filter->cs);
+ LeaveCriticalSection(&pin->flushing_cs);
continue;
}
@@ -1128,6 +1106,8 @@ static DWORD CALLBACK stream_thread(void *arg)
case PARSER_EVENT_NONE:
assert(0);
}
+
+ LeaveCriticalSection(&pin->flushing_cs);
}
TRACE("Streaming stopped; exiting.\n");
@@ -2154,7 +2134,10 @@ static HRESULT WINAPI GST_Seeking_SetPositions(IMediaSeeking *iface,
{
GstSeekType current_type = GST_SEEK_TYPE_SET, stop_type = GST_SEEK_TYPE_SET;
struct parser_source *pin = impl_from_IMediaSeeking(iface);
+ struct parser *filter = impl_from_strmbase_filter(pin->pin.pin.filter);
GstSeekFlags flags = 0;
+ HRESULT hr = S_OK;
+ int i;
TRACE("pin %p, current %s, current_flags %#x, stop %s, stop_flags %#x.\n",
pin, current ? debugstr_time(*current) : "<null>", current_flags,
@@ -2162,9 +2145,40 @@ static HRESULT WINAPI GST_Seeking_SetPositions(IMediaSeeking *iface,
mark_wine_thread();
- SourceSeekingImpl_SetPositions(iface, current, current_flags, stop, stop_flags);
if (pin->pin.pin.filter->state == State_Stopped)
+ {
+ SourceSeekingImpl_SetPositions(iface, current, current_flags, stop, stop_flags);
return S_OK;
+ }
+
+ if (!(current_flags & AM_SEEKING_NoFlush))
+ {
+ EnterCriticalSection(&filter->cs);
+ filter->flushing = true;
+ LeaveCriticalSection(&filter->cs);
+
+ for (i = 0; i < filter->source_count; ++i)
+ {
+ if (filter->sources[i]->pin.pin.peer)
+ {
+ WakeConditionVariable(&pin->event_cv);
+ IPin_BeginFlush(filter->sources[i]->pin.pin.peer);
+ }
+ }
+
+ if (filter->reader)
+ IAsyncReader_BeginFlush(filter->reader);
+ }
+
+ /* Acquire the flushing locks. This blocks the streaming threads, and
+ * ensures the seek is serialized between flushes. */
+ for (i = 0; i < filter->source_count; ++i)
+ {
+ if (filter->sources[i]->pin.pin.peer)
+ EnterCriticalSection(&pin->flushing_cs);
+ }
+
+ SourceSeekingImpl_SetPositions(iface, current, current_flags, stop, stop_flags);
if (current_flags & AM_SEEKING_SeekToKeyFrame)
flags |= GST_SEEK_FLAG_KEY_UNIT;
@@ -2183,9 +2197,33 @@ static HRESULT WINAPI GST_Seeking_SetPositions(IMediaSeeking *iface,
{
ERR("Failed to seek (current %s, stop %s).\n",
debugstr_time(pin->seek.llCurrent), debugstr_time(pin->seek.llStop));
- return E_FAIL;
+ hr = E_FAIL;
}
- return S_OK;
+
+ if (!(current_flags & AM_SEEKING_NoFlush))
+ {
+ EnterCriticalSection(&filter->cs);
+ filter->flushing = false;
+ LeaveCriticalSection(&filter->cs);
+
+ for (i = 0; i < filter->source_count; ++i)
+ {
+ if (filter->sources[i]->pin.pin.peer)
+ IPin_EndFlush(filter->sources[i]->pin.pin.peer);
+ }
+
+ if (filter->reader)
+ IAsyncReader_EndFlush(filter->reader);
+ }
+
+ /* Release the flushing locks. */
+ for (i = filter->source_count - 1; i >= 0; --i)
+ {
+ if (filter->sources[i]->pin.pin.peer)
+ LeaveCriticalSection(&pin->flushing_cs);
+ }
+
+ return hr;
}
static const IMediaSeekingVtbl GST_Seeking_Vtbl =
@@ -2398,6 +2436,9 @@ static void free_source_pin(struct parser_source *pin)
CloseHandle(pin->eos_event);
gst_segment_free(pin->segment);
+ pin->flushing_cs.DebugInfo->Spare[0] = 0;
+ DeleteCriticalSection(&pin->flushing_cs);
+
strmbase_seeking_cleanup(&pin->seek);
strmbase_source_cleanup(&pin->pin);
heap_free(pin);
@@ -2435,10 +2476,11 @@ static struct parser_source *create_pin(struct parser *filter, const WCHAR *name
GST_ChangeCurrent, GST_ChangeRate);
InitializeConditionVariable(&pin->event_cv);
InitializeConditionVariable(&pin->event_empty_cv);
- InitializeConditionVariable(&pin->flushing_cv);
- InitializeConditionVariable(&pin->flush_stop_cv);
BaseFilterImpl_IncrementPinVersion(&filter->filter);
+ InitializeCriticalSection(&pin->flushing_cs);
+ pin->flushing_cs.DebugInfo->Spare[0] = (DWORD_PTR)(__FILE__ ": pin.flushing_cs");
+
sprintf(pad_name, "qz_sink_%u", filter->source_count);
pin->my_sink = gst_pad_new(pad_name, GST_PAD_SINK);
gst_pad_set_element_private(pin->my_sink, pin);
--
2.30.0
More information about the wine-devel
mailing list