[PATCH 5/5] winegstreamer: Manage our own streaming thread.

Zebediah Figura z.figura12 at gmail.com
Tue Jan 19 21:35:38 CST 2021


This is a rather large and complex change. It comprises several parts:

(1) Instead of directly sending EOS, segment, and sample events to the
    downstream DirectShow sink, first queue them in a local buffer (i.e.
    "pin->event").

(2) Spawn a separate thread per source pin (i.e. "stream_thread") which consumes
    said events and sends them downstream.

(3) When flushing or stopping, explicitly wait for this thread to pause or stop
    (respectively).

There are a few reasons for this:

(1) It reduces the number of Unix -> PE callbacks we need to make, easing PE
    conversion. This is not a great advantage *a priori*, and may not be worth a
    similar dedicated "handler" thread for most modules, but winegstreamer is
    different—we were already marshalling these calls onto another thread, and
    now that marshalling can go away (almost).

(2) Because GStreamer can only do pad negotiation (and hence autoplugging) while
    running (in contrast to DirectShow, which can do it while stopped), we
    currently have to renegotiate every time the pipeline is started. Most
    applications don't start the graph more than once, but even that requires
    two negotiations, and startup time is demonstrably too high. It would be
    nice to keep the graph in PAUSED state all of the time, but this is
    difficult to do without more fine-grained control over the streaming thread.
    [In particular, we cannot reliably wait for all samples to be delivered
    except by stopping the GStreamer pipeline.]

Signed-off-by: Zebediah Figura <z.figura12 at gmail.com>
---
 dlls/winegstreamer/gst_private.h |   1 +
 dlls/winegstreamer/gstdemux.c    | 341 ++++++++++++++++++++++++++-----
 2 files changed, 290 insertions(+), 52 deletions(-)

diff --git a/dlls/winegstreamer/gst_private.h b/dlls/winegstreamer/gst_private.h
index e591a95f3ca..9cd3c8adff5 100644
--- a/dlls/winegstreamer/gst_private.h
+++ b/dlls/winegstreamer/gst_private.h
@@ -22,6 +22,7 @@
 #define __GST_PRIVATE_INCLUDED__
 
 #include <stdarg.h>
+#include <stdbool.h>
 #include <stdio.h>
 #include <gst/gst.h>
 #include <gst/video/video.h>
diff --git a/dlls/winegstreamer/gstdemux.c b/dlls/winegstreamer/gstdemux.c
index 4a0a95c68f3..30f6fbca168 100644
--- a/dlls/winegstreamer/gstdemux.c
+++ b/dlls/winegstreamer/gstdemux.c
@@ -60,6 +60,13 @@ struct parser
 
     LONGLONG filesize;
 
+    CRITICAL_SECTION cs;
+
+    /* FIXME: It would be nice to avoid duplicating these with strmbase.
+     * However, synchronization is tricky; we need access to be protected by a
+     * separate lock. */
+    bool streaming;
+
     BOOL initial, ignore_flush;
     GstElement *container;
     GstPad *my_src, *their_sink;
@@ -74,6 +81,24 @@ struct parser
     HRESULT (*source_get_media_type)(struct parser_source *pin, unsigned int index, AM_MEDIA_TYPE *mt);
 };
 
+enum parser_event_type
+{
+    PARSER_EVENT_NONE = 0,
+    PARSER_EVENT_BUFFER,
+    PARSER_EVENT_EOS,
+    PARSER_EVENT_SEGMENT,
+};
+
+struct parser_event
+{
+    enum parser_event_type type;
+    union
+    {
+        GstBuffer *buffer;
+        GstEvent *segment;
+    } u;
+};
+
 struct parser_source
 {
     struct strmbase_source pin;
@@ -85,6 +110,11 @@ struct parser_source
     GstSegment *segment;
     GstCaps *caps;
     SourceSeeking seek;
+
+    CONDITION_VARIABLE event_cv, event_empty_cv, flushing_cv, flush_stop_cv;
+    bool flushing, thread_blocked;
+    struct parser_event event;
+    HANDLE thread;
 };
 
 static inline struct parser *impl_from_strmbase_filter(struct strmbase_filter *iface)
@@ -657,49 +687,62 @@ static gboolean event_src(GstPad *pad, GstObject *parent, GstEvent *event)
     return ret;
 }
 
+static GstFlowReturn queue_stream_event(struct parser_source *pin, const struct parser_event *event)
+{
+    struct parser *filter = impl_from_strmbase_filter(pin->pin.pin.filter);
+
+    EnterCriticalSection(&filter->cs);
+    while (filter->streaming && !pin->flushing && pin->event.type != PARSER_EVENT_NONE)
+        SleepConditionVariableCS(&pin->event_empty_cv, &filter->cs, INFINITE);
+    if (!filter->streaming || pin->flushing)
+    {
+        LeaveCriticalSection(&filter->cs);
+        TRACE("Filter is flushing; discarding event.\n");
+        return GST_FLOW_FLUSHING;
+    }
+    pin->event = *event;
+    LeaveCriticalSection(&filter->cs);
+    WakeConditionVariable(&pin->event_cv);
+    TRACE("Event queued.\n");
+    return GST_FLOW_OK;
+}
+
 static gboolean event_sink(GstPad *pad, GstObject *parent, GstEvent *event)
 {
     struct parser_source *pin = gst_pad_get_element_private(pad);
+    struct parser *filter = impl_from_strmbase_filter(pin->pin.pin.filter);
 
     TRACE("pin %p, type \"%s\".\n", pin, GST_EVENT_TYPE_NAME(event));
 
-    switch (event->type) {
-        case GST_EVENT_SEGMENT: {
-            gdouble rate, applied_rate;
-            gint64 stop, pos;
-            const GstSegment *segment;
-
-            gst_event_parse_segment(event, &segment);
-
-            pos = segment->position;
-            stop = segment->stop;
-            rate = segment->rate;
-            applied_rate = segment->applied_rate;
-
-            if (segment->format != GST_FORMAT_TIME)
+    switch (event->type)
+    {
+        case GST_EVENT_SEGMENT:
+            if (pin->pin.pin.peer)
             {
-                FIXME("Unhandled format \"%s\".\n", gst_format_get_name(segment->format));
-                break;
+                struct parser_event stream_event;
+
+                stream_event.type = PARSER_EVENT_SEGMENT;
+                stream_event.u.segment = event;
+                if (queue_stream_event(pin, &stream_event) == GST_FLOW_OK)
+                {
+                    /* Transfer our reference to the event to the thread. */
+                    return TRUE;
+                }
             }
-
-            gst_segment_copy_into(segment, pin->segment);
-
-            pos /= 100;
-
-            if (stop > 0)
-                stop /= 100;
-
-            if (pin->pin.pin.peer)
-                IPin_NewSegment(pin->pin.pin.peer, pos, stop, rate*applied_rate);
-
             break;
-        }
+
         case GST_EVENT_EOS:
             if (pin->pin.pin.peer)
-                IPin_EndOfStream(pin->pin.pin.peer);
+            {
+                struct parser_event stream_event;
+
+                stream_event.type = PARSER_EVENT_EOS;
+                queue_stream_event(pin, &stream_event);
+            }
             else
                 SetEvent(pin->eos_event);
             break;
+
         case GST_EVENT_FLUSH_START:
             if (impl_from_strmbase_filter(pin->pin.pin.filter)->ignore_flush) {
                 /* gst-plugins-base prior to 1.7 contains a bug which causes
@@ -713,13 +756,53 @@ static gboolean event_sink(GstPad *pad, GstObject *parent, GstEvent *event)
                 break;
             }
             if (pin->pin.pin.peer)
+            {
                 IPin_BeginFlush(pin->pin.pin.peer);
+
+                EnterCriticalSection(&filter->cs);
+
+                pin->flushing = true;
+                WakeConditionVariable(&pin->event_cv);
+                WakeConditionVariable(&pin->event_empty_cv);
+                /* Wait for the thread to pause itself, to ensure that no stale
+                 * samples are sent. */
+                while (!pin->thread_blocked)
+                    SleepConditionVariableCS(&pin->flushing_cv, &filter->cs, INFINITE);
+
+                /* And flush out any buffered event. */
+                switch (pin->event.type)
+                {
+                    case PARSER_EVENT_NONE:
+                    case PARSER_EVENT_EOS:
+                        break;
+
+                    case PARSER_EVENT_BUFFER:
+                        gst_buffer_unref(pin->event.u.buffer);
+                        break;
+
+                    case PARSER_EVENT_SEGMENT:
+                        gst_event_unref(pin->event.u.segment);
+                        break;
+                }
+                pin->event.type = PARSER_EVENT_NONE;
+
+                LeaveCriticalSection(&filter->cs);
+            }
             break;
+
         case GST_EVENT_FLUSH_STOP:
             gst_segment_init(pin->segment, GST_FORMAT_TIME);
             if (pin->pin.pin.peer)
+            {
+                EnterCriticalSection(&filter->cs);
+                pin->flushing = false;
+                LeaveCriticalSection(&filter->cs);
+                WakeConditionVariable(&pin->flush_stop_cv);
+
                 IPin_EndFlush(pin->pin.pin.peer);
+            }
             break;
+
         case GST_EVENT_CAPS:
         {
             GstCaps *caps;
@@ -729,6 +812,7 @@ static gboolean event_sink(GstPad *pad, GstObject *parent, GstEvent *event)
             SetEvent(pin->caps_event);
             break;
         }
+
         default:
             WARN("Ignoring \"%s\" event.\n", GST_EVENT_TYPE_NAME(event));
     }
@@ -806,6 +890,36 @@ static DWORD CALLBACK push_data(LPVOID iface)
     return 0;
 }
 
+static GstFlowReturn got_data_sink(GstPad *pad, GstObject *parent, GstBuffer *buffer)
+{
+    struct parser_source *pin = gst_pad_get_element_private(pad);
+    struct parser *filter = impl_from_strmbase_filter(pin->pin.pin.filter);
+    struct parser_event stream_event;
+    GstFlowReturn ret;
+
+    TRACE("pad %p, pin %p, buffer %p.\n", pad, pin, buffer);
+
+    if (filter->initial)
+    {
+        gst_buffer_unref(buffer);
+        return GST_FLOW_OK;
+    }
+
+    if (!pin->pin.pin.peer)
+    {
+        gst_buffer_unref(buffer);
+        return GST_FLOW_NOT_LINKED;
+    }
+
+    stream_event.type = PARSER_EVENT_BUFFER;
+    stream_event.u.buffer = buffer;
+    /* Transfer our reference to the buffer to the thread. */
+    if ((ret = queue_stream_event(pin, &stream_event)) != GST_FLOW_OK)
+        gst_buffer_unref(buffer);
+    return ret;
+}
+
+/* Fill and send a single IMediaSample. */
 static HRESULT send_sample(struct parser_source *pin, IMediaSample *sample,
         GstBuffer *buf, GstMapInfo *info, gsize offset, gsize size, DWORD bytes_per_second)
 {
@@ -869,21 +983,15 @@ static HRESULT send_sample(struct parser_source *pin, IMediaSample *sample,
     return hr;
 }
 
-static GstFlowReturn got_data_sink(GstPad *pad, GstObject *parent, GstBuffer *buf)
+/* Send a single GStreamer buffer (splitting it into multiple IMediaSamples if
+ * necessary). */
+static void send_buffer(struct parser_source *pin, GstBuffer *buf)
 {
-    struct parser_source *pin = gst_pad_get_element_private(pad);
-    struct parser *This = impl_from_strmbase_filter(pin->pin.pin.filter);
-    HRESULT hr = S_OK;
+    HRESULT hr;
+    BYTE *ptr = NULL;
     IMediaSample *sample;
     GstMapInfo info;
 
-    TRACE("%p %p\n", pad, buf);
-
-    if (This->initial) {
-        gst_buffer_unref(buf);
-        return GST_FLOW_OK;
-    }
-
     gst_buffer_map(buf, &info, GST_MAP_READ);
 
     if (IsEqualGUID(&pin->pin.pin.mt.formattype, &FORMAT_WaveFormatEx)
@@ -937,14 +1045,93 @@ static GstFlowReturn got_data_sink(GstPad *pad, GstObject *parent, GstBuffer *bu
     gst_buffer_unmap(buf, &info);
 
     gst_buffer_unref(buf);
+}
 
-    if (hr == VFW_E_NOT_CONNECTED)
-        return GST_FLOW_NOT_LINKED;
+static DWORD CALLBACK stream_thread(void *arg)
+{
+    struct parser_source *pin = arg;
+    struct parser *filter = impl_from_strmbase_filter(pin->pin.pin.filter);
 
-    if (FAILED(hr))
-        return GST_FLOW_FLUSHING;
+    TRACE("Starting streaming thread for pin %p.\n", pin);
 
-    return GST_FLOW_OK;
+    for (;;)
+    {
+        struct parser_event event;
+
+        EnterCriticalSection(&filter->cs);
+
+        while (filter->streaming && !pin->flushing && pin->event.type == PARSER_EVENT_NONE)
+            SleepConditionVariableCS(&pin->event_cv, &filter->cs, INFINITE);
+
+        if (pin->flushing)
+        {
+            TRACE("Filter is flushing; pausing thread.\n");
+            pin->thread_blocked = true;
+            WakeConditionVariable(&pin->flushing_cv);
+            do
+                SleepConditionVariableCS(&pin->flush_stop_cv, &filter->cs, INFINITE);
+            while (pin->flushing);
+            pin->thread_blocked = false;
+            TRACE("Filter is no longer flushing; resuming thread.\n");
+        }
+
+        if (!filter->streaming)
+        {
+            LeaveCriticalSection(&filter->cs);
+            break;
+        }
+
+        if (!pin->event.type)
+        {
+            LeaveCriticalSection(&filter->cs);
+            continue;
+        }
+
+        event = pin->event;
+        pin->event.type = PARSER_EVENT_NONE;
+        WakeConditionVariable(&pin->event_empty_cv);
+
+        LeaveCriticalSection(&filter->cs);
+
+        TRACE("Got event of type %#x.\n", event.type);
+
+        switch (event.type)
+        {
+            case PARSER_EVENT_BUFFER:
+                send_buffer(pin, event.u.buffer);
+                break;
+
+            case PARSER_EVENT_EOS:
+                IPin_EndOfStream(pin->pin.pin.peer);
+                break;
+
+            case PARSER_EVENT_SEGMENT:
+            {
+                const GstSegment *segment;
+
+                gst_event_parse_segment(event.u.segment, &segment);
+
+                if (segment->format != GST_FORMAT_TIME)
+                {
+                    FIXME("Unhandled format \"%s\".\n", gst_format_get_name(segment->format));
+                    break;
+                }
+
+                gst_segment_copy_into(segment, pin->segment);
+
+                IPin_NewSegment(pin->pin.pin.peer, segment->position / 100,
+                        segment->stop / 100, segment->rate * segment->applied_rate);
+                gst_event_unref(event.u.segment);
+                break;
+            }
+
+            case PARSER_EVENT_NONE:
+                assert(0);
+        }
+    }
+
+    TRACE("Streaming stopped; exiting.\n");
+    return 0;
 }
 
 static GstFlowReturn request_buffer_src(GstPad *pad, GstObject *parent, guint64 ofs, guint len, GstBuffer **buffer)
@@ -1478,6 +1665,9 @@ static void parser_destroy(struct strmbase_filter *iface)
         gst_bus_set_sync_handler(filter->bus, NULL, NULL, NULL);
         gst_object_unref(filter->bus);
     }
+
+    filter->cs.DebugInfo->Spare[0] = 0;
+    DeleteCriticalSection(&filter->cs);
     strmbase_sink_cleanup(&filter->sink);
     strmbase_filter_cleanup(&filter->filter);
     heap_free(filter);
@@ -1493,12 +1683,21 @@ static HRESULT parser_init_stream(struct strmbase_filter *iface)
     if (!filter->container)
         return S_OK;
 
+    EnterCriticalSection(&filter->cs);
+    filter->streaming = true;
+    LeaveCriticalSection(&filter->cs);
+
     for (i = 0; i < filter->source_count; ++i)
     {
         HRESULT hr;
 
-        if (filter->sources[i]->pin.pin.peer && FAILED(hr = IMemAllocator_Commit(filter->sources[i]->pin.pAllocator)))
+        if (!filter->sources[i]->pin.pin.peer)
+            continue;
+
+        if (FAILED(hr = IMemAllocator_Commit(filter->sources[i]->pin.pAllocator)))
             ERR("Failed to commit allocator, hr %#x.\n", hr);
+
+        filter->sources[i]->thread = CreateThread(NULL, 0, stream_thread, filter->sources[i], 0, NULL);
     }
 
     if (filter->no_more_pads_event)
@@ -1544,6 +1743,21 @@ static HRESULT parser_cleanup_stream(struct strmbase_filter *iface)
     if (!filter->container)
         return S_OK;
 
+    EnterCriticalSection(&filter->cs);
+    filter->streaming = false;
+    LeaveCriticalSection(&filter->cs);
+
+    for (i = 0; i < filter->source_count; ++i)
+    {
+        struct parser_source *pin = filter->sources[i];
+
+        if (!pin->pin.pin.peer)
+            continue;
+
+        WakeConditionVariable(&pin->event_cv);
+        WakeConditionVariable(&pin->event_empty_cv);
+    }
+
     filter->ignore_flush = TRUE;
     if ((ret = gst_element_set_state(filter->container, GST_STATE_READY)) == GST_STATE_CHANGE_FAILURE)
     {
@@ -1555,8 +1769,16 @@ static HRESULT parser_cleanup_stream(struct strmbase_filter *iface)
 
     for (i = 0; i < filter->source_count; ++i)
     {
-        if (filter->sources[i]->pin.pin.peer)
-            IMemAllocator_Decommit(filter->sources[i]->pin.pAllocator);
+        struct parser_source *pin = filter->sources[i];
+
+        if (!pin->pin.pin.peer)
+            continue;
+
+        IMemAllocator_Decommit(pin->pin.pAllocator);
+
+        WaitForSingleObject(pin->thread, INFINITE);
+        CloseHandle(pin->thread);
+        pin->thread = NULL;
     }
 
     return S_OK;
@@ -1797,6 +2019,13 @@ static BOOL parser_init_gstreamer(void)
     return TRUE;
 }
 
+static void parser_init_common(struct parser *object)
+{
+    object->error_event = CreateEventW(NULL, TRUE, FALSE, NULL);
+    InitializeCriticalSection(&object->cs);
+    object->cs.DebugInfo->Spare[0] = (DWORD_PTR)(__FILE__ ": parser.cs");
+}
+
 HRESULT decodebin_parser_create(IUnknown *outer, IUnknown **out)
 {
     struct parser *object;
@@ -1809,11 +2038,12 @@ HRESULT decodebin_parser_create(IUnknown *outer, IUnknown **out)
     if (!(object = heap_alloc_zero(sizeof(*object))))
         return E_OUTOFMEMORY;
 
+    parser_init_common(object);
+
     strmbase_filter_init(&object->filter, outer, &CLSID_decodebin_parser, &filter_ops);
     strmbase_sink_init(&object->sink, &object->filter, wcsInputPinName, &sink_ops, NULL);
 
     object->no_more_pads_event = CreateEventW(NULL, FALSE, FALSE, NULL);
-    object->error_event = CreateEventW(NULL, TRUE, FALSE, NULL);
     object->init_gst = decodebin_parser_init_gst;
     object->source_query_accept = decodebin_parser_source_query_accept;
     object->source_get_media_type = decodebin_parser_source_get_media_type;
@@ -2203,6 +2433,10 @@ static struct parser_source *create_pin(struct parser *filter, const WCHAR *name
     pin->IQualityControl_iface.lpVtbl = &GSTOutPin_QualityControl_Vtbl;
     strmbase_seeking_init(&pin->seek, &GST_Seeking_Vtbl, GST_ChangeStop,
             GST_ChangeCurrent, GST_ChangeRate);
+    InitializeConditionVariable(&pin->event_cv);
+    InitializeConditionVariable(&pin->event_empty_cv);
+    InitializeConditionVariable(&pin->flushing_cv);
+    InitializeConditionVariable(&pin->flush_stop_cv);
     BaseFilterImpl_IncrementPinVersion(&filter->filter);
 
     sprintf(pad_name, "qz_sink_%u", filter->source_count);
@@ -2444,10 +2678,11 @@ HRESULT wave_parser_create(IUnknown *outer, IUnknown **out)
     if (!(object = heap_alloc_zero(sizeof(*object))))
         return E_OUTOFMEMORY;
 
+    parser_init_common(object);
+
     strmbase_filter_init(&object->filter, outer, &CLSID_WAVEParser, &filter_ops);
     strmbase_sink_init(&object->sink, &object->filter, sink_name, &wave_parser_sink_ops, NULL);
     object->init_gst = wave_parser_init_gst;
-    object->error_event = CreateEventW(NULL, TRUE, FALSE, NULL);
     object->source_query_accept = wave_parser_source_query_accept;
     object->source_get_media_type = wave_parser_source_get_media_type;
 
@@ -2568,10 +2803,11 @@ HRESULT avi_splitter_create(IUnknown *outer, IUnknown **out)
     if (!(object = heap_alloc_zero(sizeof(*object))))
         return E_OUTOFMEMORY;
 
+    parser_init_common(object);
+
     strmbase_filter_init(&object->filter, outer, &CLSID_AviSplitter, &filter_ops);
     strmbase_sink_init(&object->sink, &object->filter, sink_name, &avi_splitter_sink_ops, NULL);
     object->no_more_pads_event = CreateEventW(NULL, FALSE, FALSE, NULL);
-    object->error_event = CreateEventW(NULL, TRUE, FALSE, NULL);
     object->init_gst = avi_splitter_init_gst;
     object->source_query_accept = avi_splitter_source_query_accept;
     object->source_get_media_type = avi_splitter_source_get_media_type;
@@ -2725,12 +2961,13 @@ HRESULT mpeg_splitter_create(IUnknown *outer, IUnknown **out)
     if (!(object = heap_alloc_zero(sizeof(*object))))
         return E_OUTOFMEMORY;
 
+    parser_init_common(object);
+
     strmbase_filter_init(&object->filter, outer, &CLSID_MPEG1Splitter, &mpeg_splitter_ops);
     strmbase_sink_init(&object->sink, &object->filter, sink_name, &mpeg_splitter_sink_ops, NULL);
     object->IAMStreamSelect_iface.lpVtbl = &stream_select_vtbl;
 
     object->duration_event = CreateEventW(NULL, FALSE, FALSE, NULL);
-    object->error_event = CreateEventW(NULL, TRUE, FALSE, NULL);
     object->init_gst = mpeg_splitter_init_gst;
     object->source_query_accept = mpeg_splitter_source_query_accept;
     object->source_get_media_type = mpeg_splitter_source_get_media_type;
-- 
2.30.0




More information about the wine-devel mailing list