[PATCH 1/5] winegstreamer: Flush connected pins directly in SetPositions().

Zebediah Figura z.figura12 at gmail.com
Wed Jan 20 18:58:49 CST 2021


Instead of propagating GStreamer flush events to corresponding DirectShow pins.

This is mainly to avoid more callbacks from GStreamer and further separate the
Win32 and Unix code.

Signed-off-by: Zebediah Figura <z.figura12 at gmail.com>
---
 dlls/winegstreamer/gstdemux.c | 130 ++++++++++++++++++++++------------
 1 file changed, 86 insertions(+), 44 deletions(-)

diff --git a/dlls/winegstreamer/gstdemux.c b/dlls/winegstreamer/gstdemux.c
index 30f6fbca168..a1be51472ba 100644
--- a/dlls/winegstreamer/gstdemux.c
+++ b/dlls/winegstreamer/gstdemux.c
@@ -65,7 +65,7 @@ struct parser
     /* FIXME: It would be nice to avoid duplicating these with strmbase.
      * However, synchronization is tricky; we need access to be protected by a
      * separate lock. */
-    bool streaming;
+    bool streaming, flushing;
 
     BOOL initial, ignore_flush;
     GstElement *container;
@@ -111,8 +111,9 @@ struct parser_source
     GstCaps *caps;
     SourceSeeking seek;
 
-    CONDITION_VARIABLE event_cv, event_empty_cv, flushing_cv, flush_stop_cv;
-    bool flushing, thread_blocked;
+    CRITICAL_SECTION flushing_cs;
+    CONDITION_VARIABLE event_cv, event_empty_cv;
+    bool flushing;
     struct parser_event event;
     HANDLE thread;
 };
@@ -630,8 +631,6 @@ static gboolean gst_base_src_perform_seek(struct parser *This, GstEvent *event)
         tevent = gst_event_new_flush_start();
         gst_event_set_seqnum(tevent, seqnum);
         gst_pad_push_event(This->my_src, tevent);
-        if (This->reader)
-            IAsyncReader_BeginFlush(This->reader);
         if (thread)
             gst_pad_set_active(This->my_src, 1);
     }
@@ -643,8 +642,6 @@ static gboolean gst_base_src_perform_seek(struct parser *This, GstEvent *event)
         tevent = gst_event_new_flush_stop(TRUE);
         gst_event_set_seqnum(tevent, seqnum);
         gst_pad_push_event(This->my_src, tevent);
-        if (This->reader)
-            IAsyncReader_EndFlush(This->reader);
         if (thread)
             gst_pad_set_active(This->my_src, 1);
     }
@@ -659,25 +656,18 @@ static gboolean event_src(GstPad *pad, GstObject *parent, GstEvent *event)
 
     TRACE("filter %p, type \"%s\".\n", This, GST_EVENT_TYPE_NAME(event));
 
-    switch (event->type) {
+    switch (event->type)
+    {
         case GST_EVENT_SEEK:
             ret = gst_base_src_perform_seek(This, event);
             break;
+
         case GST_EVENT_FLUSH_START:
-            EnterCriticalSection(&This->filter.filter_cs);
-            if (This->reader)
-                IAsyncReader_BeginFlush(This->reader);
-            LeaveCriticalSection(&This->filter.filter_cs);
-            break;
         case GST_EVENT_FLUSH_STOP:
-            EnterCriticalSection(&This->filter.filter_cs);
-            if (This->reader)
-                IAsyncReader_EndFlush(This->reader);
-            LeaveCriticalSection(&This->filter.filter_cs);
-            break;
         case GST_EVENT_QOS:
         case GST_EVENT_RECONFIGURE:
             break;
+
         default:
             WARN("Ignoring \"%s\" event.\n", GST_EVENT_TYPE_NAME(event));
             ret = FALSE;
@@ -757,19 +747,11 @@ static gboolean event_sink(GstPad *pad, GstObject *parent, GstEvent *event)
             }
             if (pin->pin.pin.peer)
             {
-                IPin_BeginFlush(pin->pin.pin.peer);
-
                 EnterCriticalSection(&filter->cs);
 
                 pin->flushing = true;
-                WakeConditionVariable(&pin->event_cv);
                 WakeConditionVariable(&pin->event_empty_cv);
-                /* Wait for the thread to pause itself, to ensure that no stale
-                 * samples are sent. */
-                while (!pin->thread_blocked)
-                    SleepConditionVariableCS(&pin->flushing_cv, &filter->cs, INFINITE);
 
-                /* And flush out any buffered event. */
                 switch (pin->event.type)
                 {
                     case PARSER_EVENT_NONE:
@@ -797,9 +779,6 @@ static gboolean event_sink(GstPad *pad, GstObject *parent, GstEvent *event)
                 EnterCriticalSection(&filter->cs);
                 pin->flushing = false;
                 LeaveCriticalSection(&filter->cs);
-                WakeConditionVariable(&pin->flush_stop_cv);
-
-                IPin_EndFlush(pin->pin.pin.peer);
             }
             break;
 
@@ -1058,32 +1037,31 @@ static DWORD CALLBACK stream_thread(void *arg)
     {
         struct parser_event event;
 
+        EnterCriticalSection(&pin->flushing_cs);
         EnterCriticalSection(&filter->cs);
 
-        while (filter->streaming && !pin->flushing && pin->event.type == PARSER_EVENT_NONE)
+        while (filter->streaming && !filter->flushing && pin->event.type == PARSER_EVENT_NONE)
             SleepConditionVariableCS(&pin->event_cv, &filter->cs, INFINITE);
 
-        if (pin->flushing)
+        if (filter->flushing)
         {
-            TRACE("Filter is flushing; pausing thread.\n");
-            pin->thread_blocked = true;
-            WakeConditionVariable(&pin->flushing_cv);
-            do
-                SleepConditionVariableCS(&pin->flush_stop_cv, &filter->cs, INFINITE);
-            while (pin->flushing);
-            pin->thread_blocked = false;
-            TRACE("Filter is no longer flushing; resuming thread.\n");
+            LeaveCriticalSection(&filter->cs);
+            LeaveCriticalSection(&pin->flushing_cs);
+            TRACE("Filter is flushing.\n");
+            continue;
         }
 
         if (!filter->streaming)
         {
             LeaveCriticalSection(&filter->cs);
+            LeaveCriticalSection(&pin->flushing_cs);
             break;
         }
 
         if (!pin->event.type)
         {
             LeaveCriticalSection(&filter->cs);
+            LeaveCriticalSection(&pin->flushing_cs);
             continue;
         }
 
@@ -1128,6 +1106,8 @@ static DWORD CALLBACK stream_thread(void *arg)
             case PARSER_EVENT_NONE:
                 assert(0);
         }
+
+        LeaveCriticalSection(&pin->flushing_cs);
     }
 
     TRACE("Streaming stopped; exiting.\n");
@@ -2154,7 +2134,10 @@ static HRESULT WINAPI GST_Seeking_SetPositions(IMediaSeeking *iface,
 {
     GstSeekType current_type = GST_SEEK_TYPE_SET, stop_type = GST_SEEK_TYPE_SET;
     struct parser_source *pin = impl_from_IMediaSeeking(iface);
+    struct parser *filter = impl_from_strmbase_filter(pin->pin.pin.filter);
     GstSeekFlags flags = 0;
+    HRESULT hr = S_OK;
+    int i;
 
     TRACE("pin %p, current %s, current_flags %#x, stop %s, stop_flags %#x.\n",
             pin, current ? debugstr_time(*current) : "<null>", current_flags,
@@ -2162,9 +2145,40 @@ static HRESULT WINAPI GST_Seeking_SetPositions(IMediaSeeking *iface,
 
     mark_wine_thread();
 
-    SourceSeekingImpl_SetPositions(iface, current, current_flags, stop, stop_flags);
     if (pin->pin.pin.filter->state == State_Stopped)
+    {
+        SourceSeekingImpl_SetPositions(iface, current, current_flags, stop, stop_flags);
         return S_OK;
+    }
+
+    if (!(current_flags & AM_SEEKING_NoFlush))
+    {
+        EnterCriticalSection(&filter->cs);
+        filter->flushing = true;
+        LeaveCriticalSection(&filter->cs);
+
+        for (i = 0; i < filter->source_count; ++i)
+        {
+            if (filter->sources[i]->pin.pin.peer)
+            {
+                WakeConditionVariable(&pin->event_cv);
+                IPin_BeginFlush(filter->sources[i]->pin.pin.peer);
+            }
+        }
+
+        if (filter->reader)
+            IAsyncReader_BeginFlush(filter->reader);
+    }
+
+    /* Acquire the flushing locks. This blocks the streaming threads, and
+     * ensures the seek is serialized between flushes. */
+    for (i = 0; i < filter->source_count; ++i)
+    {
+        if (filter->sources[i]->pin.pin.peer)
+            EnterCriticalSection(&pin->flushing_cs);
+    }
+
+    SourceSeekingImpl_SetPositions(iface, current, current_flags, stop, stop_flags);
 
     if (current_flags & AM_SEEKING_SeekToKeyFrame)
         flags |= GST_SEEK_FLAG_KEY_UNIT;
@@ -2183,9 +2197,33 @@ static HRESULT WINAPI GST_Seeking_SetPositions(IMediaSeeking *iface,
     {
         ERR("Failed to seek (current %s, stop %s).\n",
                 debugstr_time(pin->seek.llCurrent), debugstr_time(pin->seek.llStop));
-        return E_FAIL;
+        hr = E_FAIL;
     }
-    return S_OK;
+
+    if (!(current_flags & AM_SEEKING_NoFlush))
+    {
+        EnterCriticalSection(&filter->cs);
+        filter->flushing = false;
+        LeaveCriticalSection(&filter->cs);
+
+        for (i = 0; i < filter->source_count; ++i)
+        {
+            if (filter->sources[i]->pin.pin.peer)
+                IPin_EndFlush(filter->sources[i]->pin.pin.peer);
+        }
+
+        if (filter->reader)
+            IAsyncReader_EndFlush(filter->reader);
+    }
+
+    /* Release the flushing locks. */
+    for (i = filter->source_count - 1; i >= 0; --i)
+    {
+        if (filter->sources[i]->pin.pin.peer)
+            LeaveCriticalSection(&pin->flushing_cs);
+    }
+
+    return hr;
 }
 
 static const IMediaSeekingVtbl GST_Seeking_Vtbl =
@@ -2398,6 +2436,9 @@ static void free_source_pin(struct parser_source *pin)
     CloseHandle(pin->eos_event);
     gst_segment_free(pin->segment);
 
+    pin->flushing_cs.DebugInfo->Spare[0] = 0;
+    DeleteCriticalSection(&pin->flushing_cs);
+
     strmbase_seeking_cleanup(&pin->seek);
     strmbase_source_cleanup(&pin->pin);
     heap_free(pin);
@@ -2435,10 +2476,11 @@ static struct parser_source *create_pin(struct parser *filter, const WCHAR *name
             GST_ChangeCurrent, GST_ChangeRate);
     InitializeConditionVariable(&pin->event_cv);
     InitializeConditionVariable(&pin->event_empty_cv);
-    InitializeConditionVariable(&pin->flushing_cv);
-    InitializeConditionVariable(&pin->flush_stop_cv);
     BaseFilterImpl_IncrementPinVersion(&filter->filter);
 
+    InitializeCriticalSection(&pin->flushing_cs);
+    pin->flushing_cs.DebugInfo->Spare[0] = (DWORD_PTR)(__FILE__ ": pin.flushing_cs");
+
     sprintf(pad_name, "qz_sink_%u", filter->source_count);
     pin->my_sink = gst_pad_new(pad_name, GST_PAD_SINK);
     gst_pad_set_element_private(pin->my_sink, pin);
-- 
2.30.0




More information about the wine-devel mailing list