[PATCH 4/5] winegstreamer: Keep the stream in paused state for its entire lifetime.

Zebediah Figura z.figura12 at gmail.com
Wed Jan 20 18:58:52 CST 2021


This noticeably improves stream startup time. The process of typefinding and
negotiation takes about 70 ms for a simple test file on one machine, and this
is currently done twice. This patch replaces the second instance with a simple
seek, which takes less than 10 ms.

Signed-off-by: Zebediah Figura <z.figura12 at gmail.com>
---
 dlls/winegstreamer/gstdemux.c | 194 ++++++----------------------------
 1 file changed, 32 insertions(+), 162 deletions(-)

diff --git a/dlls/winegstreamer/gstdemux.c b/dlls/winegstreamer/gstdemux.c
index d532d4b35bb..e7a4740b566 100644
--- a/dlls/winegstreamer/gstdemux.c
+++ b/dlls/winegstreamer/gstdemux.c
@@ -67,7 +67,6 @@ struct parser
      * separate lock. */
     bool streaming, flushing;
 
-    BOOL initial, ignore_flush;
     GstElement *container;
     GstPad *my_src, *their_sink;
     GstBus *bus;
@@ -682,9 +681,9 @@ static GstFlowReturn queue_stream_event(struct parser_source *pin, const struct
     struct parser *filter = impl_from_strmbase_filter(pin->pin.pin.filter);
 
     EnterCriticalSection(&filter->cs);
-    while (filter->streaming && !pin->flushing && pin->event.type != PARSER_EVENT_NONE)
+    while (!pin->flushing && pin->event.type != PARSER_EVENT_NONE)
         SleepConditionVariableCS(&pin->event_empty_cv, &filter->cs, INFINITE);
-    if (!filter->streaming || pin->flushing)
+    if (pin->flushing)
     {
         LeaveCriticalSection(&filter->cs);
         TRACE("Filter is flushing; discarding event.\n");
@@ -734,17 +733,6 @@ static gboolean event_sink(GstPad *pad, GstObject *parent, GstEvent *event)
             break;
 
         case GST_EVENT_FLUSH_START:
-            if (impl_from_strmbase_filter(pin->pin.pin.filter)->ignore_flush) {
-                /* gst-plugins-base prior to 1.7 contains a bug which causes
-                 * our sink pins to receive a flush-start event when the
-                 * decodebin changes from PAUSED to READY (including
-                 * PLAYING->PAUSED->READY), but no matching flush-stop event is
-                 * sent. See <gst-plugins-base.git:60bad4815db966a8e4). Here we
-                 * unset the flushing flag to avoid the problem. */
-                TRACE("Working around gst <1.7 bug, ignoring FLUSH_START\n");
-                GST_PAD_UNSET_FLUSHING (pad);
-                break;
-            }
             if (pin->pin.pin.peer)
             {
                 EnterCriticalSection(&filter->cs);
@@ -872,13 +860,12 @@ static DWORD CALLBACK push_data(LPVOID iface)
 static GstFlowReturn got_data_sink(GstPad *pad, GstObject *parent, GstBuffer *buffer)
 {
     struct parser_source *pin = gst_pad_get_element_private(pad);
-    struct parser *filter = impl_from_strmbase_filter(pin->pin.pin.filter);
     struct parser_event stream_event;
     GstFlowReturn ret;
 
     TRACE("pad %p, pin %p, buffer %p.\n", pad, pin, buffer);
 
-    if (filter->initial || !pin->pin.pin.peer)
+    if (!pin->pin.pin.peer)
     {
         gst_buffer_unref(buffer);
         return GST_FLOW_OK;
@@ -1143,30 +1130,6 @@ static GstFlowReturn request_buffer_src(GstPad *pad, GstObject *parent, guint64
     return GST_FLOW_OK;
 }
 
-static DWORD CALLBACK push_data_init(LPVOID iface)
-{
-    struct parser *This = iface;
-    DWORD64 ofs = 0;
-
-    TRACE("Starting..\n");
-    for (;;) {
-        GstBuffer *buf;
-        GstFlowReturn ret = request_buffer_src(This->my_src, NULL, ofs, 4096, &buf);
-        if (ret < 0) {
-            ERR("Obtaining buffer returned: %i\n", ret);
-            break;
-        }
-        ret = gst_pad_push(This->my_src, buf);
-        ofs += 4096;
-        if (ret)
-            TRACE("Sending returned: %i\n", ret);
-        if (ret < 0)
-            break;
-    }
-    TRACE("Stopping..\n");
-    return 0;
-}
-
 static void removed_decoded_pad(GstElement *bin, GstPad *pad, gpointer user)
 {
     struct parser *filter = user;
@@ -1344,39 +1307,12 @@ out:
 static void existing_new_pad(GstElement *bin, GstPad *pad, gpointer user)
 {
     struct parser *This = user;
-    unsigned int i;
-    int ret;
 
     TRACE("%p %p %p\n", This, bin, pad);
 
     if (gst_pad_is_linked(pad))
         return;
 
-    /* Still holding our own lock */
-    if (This->initial) {
-        init_new_decoded_pad(bin, pad, This);
-        return;
-    }
-
-    for (i = 0; i < This->source_count; ++i)
-    {
-        struct parser_source *pin = This->sources[i];
-        if (!pin->their_src) {
-            gst_segment_init(pin->segment, GST_FORMAT_TIME);
-
-            if (pin->post_sink)
-                ret = gst_pad_link(pad, pin->post_sink);
-            else
-                ret = gst_pad_link(pad, pin->my_sink);
-
-            if (ret >= 0) {
-                pin->their_src = pad;
-                gst_object_ref(pin->their_src);
-                TRACE("Relinked\n");
-                return;
-            }
-        }
-    }
     init_new_decoded_pad(bin, pad, This);
 }
 
@@ -1428,23 +1364,18 @@ static gboolean activate_push(GstPad *pad, gboolean activate)
     EnterCriticalSection(&This->filter.filter_cs);
     if (!activate) {
         TRACE("Deactivating\n");
-        if (!This->initial)
-            IAsyncReader_BeginFlush(This->reader);
+        IAsyncReader_BeginFlush(This->reader);
         if (This->push_thread) {
             WaitForSingleObject(This->push_thread, -1);
             CloseHandle(This->push_thread);
             This->push_thread = NULL;
         }
-        if (!This->initial)
-            IAsyncReader_EndFlush(This->reader);
+        IAsyncReader_EndFlush(This->reader);
         if (This->filter.state == State_Stopped)
             This->nextofs = This->start;
     } else if (!This->push_thread) {
         TRACE("Activating\n");
-        if (This->initial)
-            This->push_thread = CreateThread(NULL, 0, push_data_init, This, 0, NULL);
-        else
-            This->push_thread = CreateThread(NULL, 0, push_data, This, 0, NULL);
+        This->push_thread = CreateThread(NULL, 0, push_data, This, 0, NULL);
     }
     LeaveCriticalSection(&This->filter.filter_cs);
     return TRUE;
@@ -1557,10 +1488,8 @@ static HRESULT GST_Connect(struct parser *This, IPin *pConnectPin)
 
     This->start = This->nextofs = This->nextpullofs = This->stop = 0;
 
-    This->initial = TRUE;
     if (!This->init_gst(This))
         return E_FAIL;
-    This->initial = FALSE;
 
     This->nextofs = This->nextpullofs = 0;
     return S_OK;
@@ -1650,8 +1579,8 @@ static void parser_destroy(struct strmbase_filter *iface)
 static HRESULT parser_init_stream(struct strmbase_filter *iface)
 {
     struct parser *filter = impl_from_strmbase_filter(iface);
+    GstSeekType stop_type = GST_SEEK_TYPE_NONE;
     const SourceSeeking *seeking;
-    GstStateChangeReturn ret;
     unsigned int i;
 
     if (!filter->container)
@@ -1661,6 +1590,17 @@ static HRESULT parser_init_stream(struct strmbase_filter *iface)
     filter->streaming = true;
     LeaveCriticalSection(&filter->cs);
 
+    /* DirectShow retains the old seek positions, but resets to them every time
+     * it transitions from stopped -> paused. */
+
+    seeking = &filter->sources[0]->seek;
+    if (seeking->llStop && seeking->llStop != seeking->llDuration)
+        stop_type = GST_SEEK_TYPE_SET;
+    gst_pad_push_event(filter->sources[0]->my_sink, gst_event_new_seek(
+            seeking->dRate, GST_FORMAT_TIME, GST_SEEK_FLAG_FLUSH,
+            GST_SEEK_TYPE_SET, seeking->llCurrent * 100,
+            stop_type, seeking->llStop * 100));
+
     for (i = 0; i < filter->source_count; ++i)
     {
         HRESULT hr;
@@ -1674,44 +1614,12 @@ static HRESULT parser_init_stream(struct strmbase_filter *iface)
         filter->sources[i]->thread = CreateThread(NULL, 0, stream_thread, filter->sources[i], 0, NULL);
     }
 
-    if (filter->no_more_pads_event)
-        ResetEvent(filter->no_more_pads_event);
-
-    if ((ret = gst_element_set_state(filter->container, GST_STATE_PAUSED)) == GST_STATE_CHANGE_FAILURE)
-    {
-        ERR("Failed to pause stream.\n");
-        return E_FAIL;
-    }
-
-    /* Make sure that all of our pads are connected before returning, lest we
-     * e.g. try to seek and fail. */
-    if (filter->no_more_pads_event)
-        WaitForSingleObject(filter->no_more_pads_event, INFINITE);
-
-    seeking = &filter->sources[0]->seek;
-
-    /* GStreamer can't seek while stopped, and it resets position to the
-     * beginning of the stream every time it is stopped. */
-    if (seeking->llCurrent)
-    {
-        GstSeekType stop_type = GST_SEEK_TYPE_NONE;
-
-        if (seeking->llStop && seeking->llStop != seeking->llDuration)
-            stop_type = GST_SEEK_TYPE_SET;
-
-        gst_pad_push_event(filter->sources[0]->my_sink, gst_event_new_seek(
-                seeking->dRate, GST_FORMAT_TIME, GST_SEEK_FLAG_FLUSH,
-                GST_SEEK_TYPE_SET, seeking->llCurrent * 100,
-                stop_type, seeking->llStop * 100));
-    }
-
     return S_OK;
 }
 
 static HRESULT parser_cleanup_stream(struct strmbase_filter *iface)
 {
     struct parser *filter = impl_from_strmbase_filter(iface);
-    GstStateChangeReturn ret;
     unsigned int i;
 
     if (!filter->container)
@@ -1729,17 +1637,7 @@ static HRESULT parser_cleanup_stream(struct strmbase_filter *iface)
             continue;
 
         WakeConditionVariable(&pin->event_cv);
-        WakeConditionVariable(&pin->event_empty_cv);
-    }
-
-    filter->ignore_flush = TRUE;
-    if ((ret = gst_element_set_state(filter->container, GST_STATE_READY)) == GST_STATE_CHANGE_FAILURE)
-    {
-        ERR("Failed to pause stream.\n");
-        return E_FAIL;
     }
-    gst_element_get_state(filter->container, NULL, NULL, GST_CLOCK_TIME_NONE);
-    filter->ignore_flush = FALSE;
 
     for (i = 0; i < filter->source_count; ++i)
     {
@@ -1758,33 +1656,12 @@ static HRESULT parser_cleanup_stream(struct strmbase_filter *iface)
     return S_OK;
 }
 
-static HRESULT parser_wait_state(struct strmbase_filter *iface, DWORD timeout)
-{
-    struct parser *filter = impl_from_strmbase_filter(iface);
-    GstStateChangeReturn ret;
-
-    if (!filter->container)
-        return S_OK;
-
-    ret = gst_element_get_state(filter->container, NULL, NULL,
-            timeout == INFINITE ? GST_CLOCK_TIME_NONE : timeout * 1000000);
-    if (ret == GST_STATE_CHANGE_FAILURE)
-    {
-        ERR("Failed to get state.\n");
-        return E_FAIL;
-    }
-    else if (ret == GST_STATE_CHANGE_ASYNC)
-        return VFW_S_STATE_INTERMEDIATE;
-    return S_OK;
-}
-
 static const struct strmbase_filter_ops filter_ops =
 {
     .filter_get_pin = parser_get_pin,
     .filter_destroy = parser_destroy,
     .filter_init_stream = parser_init_stream,
     .filter_cleanup_stream = parser_cleanup_stream,
-    .filter_wait_state = parser_wait_state,
 };
 
 static inline struct parser *impl_from_strmbase_sink(struct strmbase_sink *iface)
@@ -1895,11 +1772,6 @@ static BOOL decodebin_parser_init_gst(struct parser *filter)
             return FALSE;
     }
 
-    filter->ignore_flush = TRUE;
-    gst_element_set_state(filter->container, GST_STATE_READY);
-    gst_element_get_state(filter->container, NULL, NULL, -1);
-    filter->ignore_flush = FALSE;
-
     return TRUE;
 }
 
@@ -2396,6 +2268,10 @@ static HRESULT WINAPI GSTOutPin_DecideBufferSize(struct strmbase_source *iface,
         buffer_size = format->nAvgBytesPerSec;
     }
 
+    gst_pad_push_event(pin->my_sink, gst_event_new_reconfigure());
+    /* We do need to drop any buffers that might have been sent with the old
+     * caps, but this will be handled in parser_init_stream(). */
+
     props->cBuffers = max(props->cBuffers, 1);
     props->cbBuffer = max(props->cbBuffer, buffer_size);
     props->cbAlign = max(props->cbAlign, 1);
@@ -2495,6 +2371,16 @@ static HRESULT GST_RemoveOutputPins(struct parser *This)
 
     if (!This->container)
         return S_OK;
+
+    /* Unblock all of our streams. */
+    EnterCriticalSection(&This->cs);
+    for (i = 0; i < This->source_count; ++i)
+    {
+        This->sources[i]->flushing = true;
+        WakeConditionVariable(&This->sources[i]->event_empty_cv);
+    }
+    LeaveCriticalSection(&This->cs);
+
     gst_element_set_state(This->container, GST_STATE_NULL);
     gst_pad_unlink(This->my_src, This->their_sink);
     gst_object_unref(This->my_src);
@@ -2671,11 +2557,6 @@ static BOOL wave_parser_init_gst(struct parser *filter)
     if (WaitForMultipleObjects(2, events, FALSE, INFINITE))
         return FALSE;
 
-    filter->ignore_flush = TRUE;
-    gst_element_set_state(filter->container, GST_STATE_READY);
-    gst_element_get_state(filter->container, NULL, NULL, -1);
-    filter->ignore_flush = FALSE;
-
     return TRUE;
 }
 
@@ -2796,11 +2677,6 @@ static BOOL avi_splitter_init_gst(struct parser *filter)
             return FALSE;
     }
 
-    filter->ignore_flush = TRUE;
-    gst_element_set_state(filter->container, GST_STATE_READY);
-    gst_element_get_state(filter->container, NULL, NULL, -1);
-    filter->ignore_flush = FALSE;
-
     return TRUE;
 }
 
@@ -2930,11 +2806,6 @@ static BOOL mpeg_splitter_init_gst(struct parser *filter)
     if (WaitForMultipleObjects(2, events, FALSE, INFINITE))
         return FALSE;
 
-    filter->ignore_flush = TRUE;
-    gst_element_set_state(filter->container, GST_STATE_READY);
-    gst_element_get_state(filter->container, NULL, NULL, -1);
-    filter->ignore_flush = FALSE;
-
     return TRUE;
 }
 
@@ -2981,7 +2852,6 @@ static const struct strmbase_filter_ops mpeg_splitter_ops =
     .filter_destroy = parser_destroy,
     .filter_init_stream = parser_init_stream,
     .filter_cleanup_stream = parser_cleanup_stream,
-    .filter_wait_state = parser_wait_state,
 };
 
 HRESULT mpeg_splitter_create(IUnknown *outer, IUnknown **out)
-- 
2.30.0




More information about the wine-devel mailing list