Zebediah Figura : winegstreamer: Flush connected pins directly in SetPositions().

Alexandre Julliard julliard at winehq.org
Thu Jan 21 16:19:14 CST 2021


Module: wine
Branch: master
Commit: ef79e902a61e25845b3fac16bc51294b370030d4
URL:    https://source.winehq.org/git/wine.git/?a=commit;h=ef79e902a61e25845b3fac16bc51294b370030d4

Author: Zebediah Figura <z.figura12 at gmail.com>
Date:   Wed Jan 20 18:58:49 2021 -0600

winegstreamer: Flush connected pins directly in SetPositions().

Instead of propagating GStreamer flush events to corresponding DirectShow pins.

This is mainly to avoid more callbacks from GStreamer and further separate the
Win32 and Unix code.

Signed-off-by: Zebediah Figura <z.figura12 at gmail.com>
Signed-off-by: Alexandre Julliard <julliard at winehq.org>

---

 dlls/winegstreamer/gstdemux.c | 130 ++++++++++++++++++++++++++++--------------
 1 file changed, 86 insertions(+), 44 deletions(-)

diff --git a/dlls/winegstreamer/gstdemux.c b/dlls/winegstreamer/gstdemux.c
index 949acd012aa..9cf2379cb2c 100644
--- a/dlls/winegstreamer/gstdemux.c
+++ b/dlls/winegstreamer/gstdemux.c
@@ -65,7 +65,7 @@ struct parser
     /* FIXME: It would be nice to avoid duplicating these with strmbase.
      * However, synchronization is tricky; we need access to be protected by a
      * separate lock. */
-    bool streaming;
+    bool streaming, flushing;
 
     BOOL initial, ignore_flush;
     GstElement *container;
@@ -111,8 +111,9 @@ struct parser_source
     GstCaps *caps;
     SourceSeeking seek;
 
-    CONDITION_VARIABLE event_cv, event_empty_cv, flushing_cv, flush_stop_cv;
-    bool flushing, thread_blocked;
+    CRITICAL_SECTION flushing_cs;
+    CONDITION_VARIABLE event_cv, event_empty_cv;
+    bool flushing;
     struct parser_event event;
     HANDLE thread;
 };
@@ -630,8 +631,6 @@ static gboolean gst_base_src_perform_seek(struct parser *This, GstEvent *event)
         tevent = gst_event_new_flush_start();
         gst_event_set_seqnum(tevent, seqnum);
         gst_pad_push_event(This->my_src, tevent);
-        if (This->reader)
-            IAsyncReader_BeginFlush(This->reader);
         if (thread)
             gst_pad_set_active(This->my_src, 1);
     }
@@ -643,8 +642,6 @@ static gboolean gst_base_src_perform_seek(struct parser *This, GstEvent *event)
         tevent = gst_event_new_flush_stop(TRUE);
         gst_event_set_seqnum(tevent, seqnum);
         gst_pad_push_event(This->my_src, tevent);
-        if (This->reader)
-            IAsyncReader_EndFlush(This->reader);
         if (thread)
             gst_pad_set_active(This->my_src, 1);
     }
@@ -659,25 +656,18 @@ static gboolean event_src(GstPad *pad, GstObject *parent, GstEvent *event)
 
     TRACE("filter %p, type \"%s\".\n", This, GST_EVENT_TYPE_NAME(event));
 
-    switch (event->type) {
+    switch (event->type)
+    {
         case GST_EVENT_SEEK:
             ret = gst_base_src_perform_seek(This, event);
             break;
+
         case GST_EVENT_FLUSH_START:
-            EnterCriticalSection(&This->filter.filter_cs);
-            if (This->reader)
-                IAsyncReader_BeginFlush(This->reader);
-            LeaveCriticalSection(&This->filter.filter_cs);
-            break;
         case GST_EVENT_FLUSH_STOP:
-            EnterCriticalSection(&This->filter.filter_cs);
-            if (This->reader)
-                IAsyncReader_EndFlush(This->reader);
-            LeaveCriticalSection(&This->filter.filter_cs);
-            break;
         case GST_EVENT_QOS:
         case GST_EVENT_RECONFIGURE:
             break;
+
         default:
             WARN("Ignoring \"%s\" event.\n", GST_EVENT_TYPE_NAME(event));
             ret = FALSE;
@@ -757,19 +747,11 @@ static gboolean event_sink(GstPad *pad, GstObject *parent, GstEvent *event)
             }
             if (pin->pin.pin.peer)
             {
-                IPin_BeginFlush(pin->pin.pin.peer);
-
                 EnterCriticalSection(&filter->cs);
 
                 pin->flushing = true;
-                WakeConditionVariable(&pin->event_cv);
                 WakeConditionVariable(&pin->event_empty_cv);
-                /* Wait for the thread to pause itself, to ensure that no stale
-                 * samples are sent. */
-                while (!pin->thread_blocked)
-                    SleepConditionVariableCS(&pin->flushing_cv, &filter->cs, INFINITE);
 
-                /* And flush out any buffered event. */
                 switch (pin->event.type)
                 {
                     case PARSER_EVENT_NONE:
@@ -797,9 +779,6 @@ static gboolean event_sink(GstPad *pad, GstObject *parent, GstEvent *event)
                 EnterCriticalSection(&filter->cs);
                 pin->flushing = false;
                 LeaveCriticalSection(&filter->cs);
-                WakeConditionVariable(&pin->flush_stop_cv);
-
-                IPin_EndFlush(pin->pin.pin.peer);
             }
             break;
 
@@ -1057,32 +1036,31 @@ static DWORD CALLBACK stream_thread(void *arg)
     {
         struct parser_event event;
 
+        EnterCriticalSection(&pin->flushing_cs);
         EnterCriticalSection(&filter->cs);
 
-        while (filter->streaming && !pin->flushing && pin->event.type == PARSER_EVENT_NONE)
+        while (filter->streaming && !filter->flushing && pin->event.type == PARSER_EVENT_NONE)
             SleepConditionVariableCS(&pin->event_cv, &filter->cs, INFINITE);
 
-        if (pin->flushing)
+        if (filter->flushing)
         {
-            TRACE("Filter is flushing; pausing thread.\n");
-            pin->thread_blocked = true;
-            WakeConditionVariable(&pin->flushing_cv);
-            do
-                SleepConditionVariableCS(&pin->flush_stop_cv, &filter->cs, INFINITE);
-            while (pin->flushing);
-            pin->thread_blocked = false;
-            TRACE("Filter is no longer flushing; resuming thread.\n");
+            LeaveCriticalSection(&filter->cs);
+            LeaveCriticalSection(&pin->flushing_cs);
+            TRACE("Filter is flushing.\n");
+            continue;
         }
 
         if (!filter->streaming)
         {
             LeaveCriticalSection(&filter->cs);
+            LeaveCriticalSection(&pin->flushing_cs);
             break;
         }
 
         if (!pin->event.type)
         {
             LeaveCriticalSection(&filter->cs);
+            LeaveCriticalSection(&pin->flushing_cs);
             continue;
         }
 
@@ -1127,6 +1105,8 @@ static DWORD CALLBACK stream_thread(void *arg)
             case PARSER_EVENT_NONE:
                 assert(0);
         }
+
+        LeaveCriticalSection(&pin->flushing_cs);
     }
 
     TRACE("Streaming stopped; exiting.\n");
@@ -2153,7 +2133,10 @@ static HRESULT WINAPI GST_Seeking_SetPositions(IMediaSeeking *iface,
 {
     GstSeekType current_type = GST_SEEK_TYPE_SET, stop_type = GST_SEEK_TYPE_SET;
     struct parser_source *pin = impl_from_IMediaSeeking(iface);
+    struct parser *filter = impl_from_strmbase_filter(pin->pin.pin.filter);
     GstSeekFlags flags = 0;
+    HRESULT hr = S_OK;
+    int i;
 
     TRACE("pin %p, current %s, current_flags %#x, stop %s, stop_flags %#x.\n",
             pin, current ? debugstr_time(*current) : "<null>", current_flags,
@@ -2161,9 +2144,40 @@ static HRESULT WINAPI GST_Seeking_SetPositions(IMediaSeeking *iface,
 
     mark_wine_thread();
 
-    SourceSeekingImpl_SetPositions(iface, current, current_flags, stop, stop_flags);
     if (pin->pin.pin.filter->state == State_Stopped)
+    {
+        SourceSeekingImpl_SetPositions(iface, current, current_flags, stop, stop_flags);
         return S_OK;
+    }
+
+    if (!(current_flags & AM_SEEKING_NoFlush))
+    {
+        EnterCriticalSection(&filter->cs);
+        filter->flushing = true;
+        LeaveCriticalSection(&filter->cs);
+
+        for (i = 0; i < filter->source_count; ++i)
+        {
+            if (filter->sources[i]->pin.pin.peer)
+            {
+                WakeConditionVariable(&pin->event_cv);
+                IPin_BeginFlush(filter->sources[i]->pin.pin.peer);
+            }
+        }
+
+        if (filter->reader)
+            IAsyncReader_BeginFlush(filter->reader);
+    }
+
+    /* Acquire the flushing locks. This blocks the streaming threads, and
+     * ensures the seek is serialized between flushes. */
+    for (i = 0; i < filter->source_count; ++i)
+    {
+        if (filter->sources[i]->pin.pin.peer)
+            EnterCriticalSection(&pin->flushing_cs);
+    }
+
+    SourceSeekingImpl_SetPositions(iface, current, current_flags, stop, stop_flags);
 
     if (current_flags & AM_SEEKING_SeekToKeyFrame)
         flags |= GST_SEEK_FLAG_KEY_UNIT;
@@ -2182,9 +2196,33 @@ static HRESULT WINAPI GST_Seeking_SetPositions(IMediaSeeking *iface,
     {
         ERR("Failed to seek (current %s, stop %s).\n",
                 debugstr_time(pin->seek.llCurrent), debugstr_time(pin->seek.llStop));
-        return E_FAIL;
+        hr = E_FAIL;
     }
-    return S_OK;
+
+    if (!(current_flags & AM_SEEKING_NoFlush))
+    {
+        EnterCriticalSection(&filter->cs);
+        filter->flushing = false;
+        LeaveCriticalSection(&filter->cs);
+
+        for (i = 0; i < filter->source_count; ++i)
+        {
+            if (filter->sources[i]->pin.pin.peer)
+                IPin_EndFlush(filter->sources[i]->pin.pin.peer);
+        }
+
+        if (filter->reader)
+            IAsyncReader_EndFlush(filter->reader);
+    }
+
+    /* Release the flushing locks. */
+    for (i = filter->source_count - 1; i >= 0; --i)
+    {
+        if (filter->sources[i]->pin.pin.peer)
+            LeaveCriticalSection(&pin->flushing_cs);
+    }
+
+    return hr;
 }
 
 static const IMediaSeekingVtbl GST_Seeking_Vtbl =
@@ -2397,6 +2435,9 @@ static void free_source_pin(struct parser_source *pin)
     CloseHandle(pin->eos_event);
     gst_segment_free(pin->segment);
 
+    pin->flushing_cs.DebugInfo->Spare[0] = 0;
+    DeleteCriticalSection(&pin->flushing_cs);
+
     strmbase_seeking_cleanup(&pin->seek);
     strmbase_source_cleanup(&pin->pin);
     heap_free(pin);
@@ -2434,10 +2475,11 @@ static struct parser_source *create_pin(struct parser *filter, const WCHAR *name
             GST_ChangeCurrent, GST_ChangeRate);
     InitializeConditionVariable(&pin->event_cv);
     InitializeConditionVariable(&pin->event_empty_cv);
-    InitializeConditionVariable(&pin->flushing_cv);
-    InitializeConditionVariable(&pin->flush_stop_cv);
     BaseFilterImpl_IncrementPinVersion(&filter->filter);
 
+    InitializeCriticalSection(&pin->flushing_cs);
+    pin->flushing_cs.DebugInfo->Spare[0] = (DWORD_PTR)(__FILE__ ": pin.flushing_cs");
+
     sprintf(pad_name, "qz_sink_%u", filter->source_count);
     pin->my_sink = gst_pad_new(pad_name, GST_PAD_SINK);
     gst_pad_set_element_private(pin->my_sink, pin);




More information about the wine-cvs mailing list