[PATCH] Revert "winegstreamer: Replace source pad interface with GstAppSrc."

Zebediah Figura zfigura at codeweavers.com
Thu Sep 23 23:10:28 CDT 2021


This reverts commit 1aa359a100bae859b278007e8bf90673eebd7db0.

appsrc suffers from a rather problematic race condition surrounding flushes [1].
Essentially, it's possible for a flushing seek to begin and end between
wg_parser_get_next_read_request and wg_parser_push_data. The race condition is
not easy to fix, and in light of it it's not clear if we want to use appsrc.

[1] https://www.winehq.org/pipermail/wine-devel/2021-September/196043.html

Wine-Bug: https://bugs.winehq.org/show_bug.cgi?id=51774
Signed-off-by: Zebediah Figura <zfigura at codeweavers.com>
---
Since the bug won't be fixed before 6.18, and since the way forward may not use
appsrc anyway, revert the patch for now.

 dlls/winegstreamer/wg_parser.c | 429 ++++++++++++++++++++++++---------
 1 file changed, 310 insertions(+), 119 deletions(-)

diff --git a/dlls/winegstreamer/wg_parser.c b/dlls/winegstreamer/wg_parser.c
index 6b6b033b879..f0815e37689 100644
--- a/dlls/winegstreamer/wg_parser.c
+++ b/dlls/winegstreamer/wg_parser.c
@@ -33,7 +33,6 @@
 #include <gst/gst.h>
 #include <gst/video/video.h>
 #include <gst/audio/audio.h>
-#include <gst/app/gstappsrc.h>
 
 /* GStreamer callbacks may be called on threads not created by Wine, and
  * therefore cannot access the Wine TEB. This means that we must use GStreamer
@@ -50,20 +49,28 @@ struct wg_parser
     struct wg_parser_stream **streams;
     unsigned int stream_count;
 
-    GstElement *container, *appsrc, *decodebin;
+    GstElement *container, *decodebin;
     GstBus *bus;
+    GstPad *my_src, *their_sink;
+
+    guint64 file_size, start_offset, next_offset, stop_offset;
+    guint64 next_pull_offset;
+
+    pthread_t push_thread;
 
     pthread_mutex_t mutex;
 
     pthread_cond_t init_cond;
     bool no_more_pads, has_duration, error;
 
-    pthread_cond_t read_cond;
+    pthread_cond_t read_cond, read_done_cond;
     struct
     {
+        void *data;
         uint64_t offset;
         uint32_t size;
-        bool pending;
+        bool done;
+        bool ret;
     } read_request;
 
     bool flushing, sink_connected;
@@ -515,7 +522,7 @@ static bool CDECL wg_parser_get_next_read_offset(struct wg_parser *parser,
 {
     pthread_mutex_lock(&parser->mutex);
 
-    while (parser->sink_connected && !parser->read_request.pending)
+    while (parser->sink_connected && !parser->read_request.data)
         pthread_cond_wait(&parser->read_cond, &parser->mutex);
 
     if (!parser->sink_connected)
@@ -534,69 +541,15 @@ static bool CDECL wg_parser_get_next_read_offset(struct wg_parser *parser,
 static void CDECL wg_parser_push_data(struct wg_parser *parser,
         const void *data, uint32_t size)
 {
-    GstMessage *message;
-    GstFlowReturn ret;
-    GstBuffer *buffer;
-    GError *error;
-
-    if (!data)
-    {
-        pthread_mutex_lock(&parser->mutex);
-
-        if (parser->sink_connected)
-        {
-            error = g_error_new(G_FILE_ERROR, G_FILE_ERROR_FAILED, "WG-Parser client failed to read data at offset %" G_GUINT64_FORMAT, parser->read_request.offset);
-            message = gst_message_new_error(NULL, error, "");
-            gst_bus_post(parser->bus, message);
-            parser->read_request.pending = false;
-        }
-
-        pthread_mutex_unlock(&parser->mutex);
-        return;
-    }
-
-    if (!size)
-    {
-        pthread_mutex_lock(&parser->mutex);
-
-        if (parser->sink_connected)
-            g_signal_emit_by_name(G_OBJECT(parser->appsrc), "end-of-stream", &ret);
-        parser->read_request.pending = false;
-
-        pthread_mutex_unlock(&parser->mutex);
-        return;
-    }
-
-    /* We will always perform an extra blit here. We can avoid this in some
-     * cases by wrapping a client-allocated buffer using
-     * gst_buffer_new_wrapped(). However, releasing the memory is non-trivial,
-     * since GStreamer will hold onto a reference for an arbitrarily long
-     * period of time. Until there's evidence to suggest that the blit causes a
-     * performance problem, leave it alone. */
-    buffer = gst_buffer_new_and_alloc(size);
-    gst_buffer_fill(buffer, 0, data, size);
-
     pthread_mutex_lock(&parser->mutex);
-
-    if (!parser->sink_connected)
-    {
-        pthread_mutex_unlock(&parser->mutex);
-        gst_buffer_unref(buffer);
-        return;
-    }
-
-    assert(parser->read_request.pending);
-
-    GST_BUFFER_OFFSET(buffer) = parser->read_request.offset;
-    g_signal_emit_by_name(G_OBJECT(parser->appsrc), "push-buffer", buffer, &ret);
-
-    /* In random-access mode, GST_FLOW_EOS shouldn't be returned. */
-    assert(ret == GST_FLOW_OK || ret == GST_FLOW_FLUSHING);
-    if (ret == GST_FLOW_OK)
-        parser->read_request.offset += size;
-
-    parser->read_request.pending = false;
+    parser->read_request.size = size;
+    parser->read_request.done = true;
+    parser->read_request.ret = !!data;
+    if (data)
+        memcpy(parser->read_request.data, data, size);
+    parser->read_request.data = NULL;
     pthread_mutex_unlock(&parser->mutex);
+    pthread_cond_signal(&parser->read_done_cond);
 }
 
 static void CDECL wg_parser_set_unlimited_buffering(struct wg_parser *parser)
@@ -1251,56 +1204,196 @@ static void pad_removed_cb(GstElement *element, GstPad *pad, gpointer user)
     g_free(name);
 }
 
-static void src_need_data(GstElement *appsrc, guint length, gpointer user)
+static GstFlowReturn src_getrange_cb(GstPad *pad, GstObject *parent,
+        guint64 offset, guint size, GstBuffer **buffer)
 {
-    struct wg_parser *parser = user;
-    guint64 queued_bytes;
+    struct wg_parser *parser = gst_pad_get_element_private(pad);
+    GstBuffer *new_buffer = NULL;
+    GstMapInfo map_info;
+    bool ret;
+
+    GST_LOG("pad %p, offset %" G_GINT64_MODIFIER "u, length %u, buffer %p.", pad, offset, size, *buffer);
+
+    if (offset == GST_BUFFER_OFFSET_NONE)
+        offset = parser->next_pull_offset;
+    parser->next_pull_offset = offset + size;
+
+    if (!*buffer)
+        *buffer = new_buffer = gst_buffer_new_and_alloc(size);
+
+    gst_buffer_map(*buffer, &map_info, GST_MAP_WRITE);
 
     pthread_mutex_lock(&parser->mutex);
 
-    /* As of GStreamer 1.18, appsrc suffers from a race condition. When in
-     * random access mode (and when underlyingly in pull mode), appsrc may
-     * spuriously send multiple requests for the same offset and length. If it
-     * receives the same buffer twice (or consecutive buffers), it will blindly
-     * queue them and satisfy subsequent getrange requests from downstream
-     * elements with the wrong buffers.
-     *
-     * Internally, this function is called inside of a loop, which also pops
-     * buffers from the internal queue. Accordingly we can safely treat this
-     * request as spurious by checking if we have already sent data; since the
-     * data is consumed by this thread it will not have been consumed yet.
-     *
-     * The bug is documented in greater detail here:
-     *
-     * https://gitlab.freedesktop.org/gstreamer/gst-plugins-base/-/issues/937
-     */
-    g_object_get(G_OBJECT(appsrc), "current-level-bytes", &queued_bytes, NULL);
-    if (queued_bytes)
-    {
-        pthread_mutex_unlock(&parser->mutex);
-        return;
-    }
-
-    parser->read_request.pending = true;
-    parser->read_request.size = length;
-
+    assert(!parser->read_request.data);
+    parser->read_request.data = map_info.data;
+    parser->read_request.offset = offset;
+    parser->read_request.size = size;
+    parser->read_request.done = false;
     pthread_cond_signal(&parser->read_cond);
 
+    /* Note that we don't unblock this wait on GST_EVENT_FLUSH_START. We expect
+     * the upstream pin to flush if necessary. We should never be blocked on
+     * read_thread() not running. */
+
+    while (!parser->read_request.done)
+        pthread_cond_wait(&parser->read_done_cond, &parser->mutex);
+
+    ret = parser->read_request.ret;
+    gst_buffer_set_size(*buffer, parser->read_request.size);
+
     pthread_mutex_unlock(&parser->mutex);
+
+    gst_buffer_unmap(*buffer, &map_info);
+
+    GST_LOG("Request returned %d.", ret);
+
+    if ((!ret || !size) && new_buffer)
+        gst_buffer_unref(new_buffer);
+
+    if (ret)
+        return size ? GST_FLOW_OK : GST_FLOW_EOS;
+    return GST_FLOW_ERROR;
 }
 
-static gboolean src_seek_data(GstElement *appsrc, guint64 offset, gpointer user)
+static gboolean src_query_cb(GstPad *pad, GstObject *parent, GstQuery *query)
 {
-    struct wg_parser *parser = user;
+    struct wg_parser *parser = gst_pad_get_element_private(pad);
+    GstFormat format;
 
-    pthread_mutex_lock(&parser->mutex);
+    GST_LOG("parser %p, type %s.", parser, GST_QUERY_TYPE_NAME(query));
 
-    assert(!parser->read_request.pending);
-    parser->read_request.offset = offset;
+    switch (GST_QUERY_TYPE(query))
+    {
+        case GST_QUERY_DURATION:
+            gst_query_parse_duration(query, &format, NULL);
+            if (format == GST_FORMAT_PERCENT)
+            {
+                gst_query_set_duration(query, GST_FORMAT_PERCENT, GST_FORMAT_PERCENT_MAX);
+                return TRUE;
+            }
+            else if (format == GST_FORMAT_BYTES)
+            {
+                gst_query_set_duration(query, GST_FORMAT_BYTES, parser->file_size);
+                return TRUE;
+            }
+            return FALSE;
 
-    pthread_mutex_unlock(&parser->mutex);
+        case GST_QUERY_SEEKING:
+            gst_query_parse_seeking (query, &format, NULL, NULL, NULL);
+            if (format != GST_FORMAT_BYTES)
+            {
+                GST_WARNING("Cannot seek using format \"%s\".", gst_format_get_name(format));
+                return FALSE;
+            }
+            gst_query_set_seeking(query, GST_FORMAT_BYTES, 1, 0, parser->file_size);
+            return TRUE;
 
-    return true;
+        case GST_QUERY_SCHEDULING:
+            gst_query_set_scheduling(query, GST_SCHEDULING_FLAG_SEEKABLE, 1, -1, 0);
+            gst_query_add_scheduling_mode(query, GST_PAD_MODE_PUSH);
+            gst_query_add_scheduling_mode(query, GST_PAD_MODE_PULL);
+            return TRUE;
+
+        default:
+            GST_WARNING("Unhandled query type %s.", GST_QUERY_TYPE_NAME(query));
+            return FALSE;
+    }
+}
+
+static void *push_data(void *arg)
+{
+    struct wg_parser *parser = arg;
+    GstBuffer *buffer;
+    guint max_size;
+
+    GST_DEBUG("Starting push thread.");
+
+    if (!(buffer = gst_buffer_new_allocate(NULL, 16384, NULL)))
+    {
+        GST_ERROR("Failed to allocate memory.");
+        return NULL;
+    }
+
+    max_size = parser->stop_offset ? parser->stop_offset : parser->file_size;
+
+    for (;;)
+    {
+        ULONG size;
+        int ret;
+
+        if (parser->next_offset >= max_size)
+            break;
+        size = min(16384, max_size - parser->next_offset);
+
+        if ((ret = src_getrange_cb(parser->my_src, NULL, parser->next_offset, size, &buffer)) < 0)
+        {
+            GST_ERROR("Failed to read data, ret %s.", gst_flow_get_name(ret));
+            break;
+        }
+
+        parser->next_offset += size;
+
+        buffer->duration = buffer->pts = -1;
+        if ((ret = gst_pad_push(parser->my_src, buffer)) < 0)
+        {
+            GST_ERROR("Failed to push data, ret %s.", gst_flow_get_name(ret));
+            break;
+        }
+    }
+
+    gst_buffer_unref(buffer);
+
+    gst_pad_push_event(parser->my_src, gst_event_new_eos());
+
+    GST_DEBUG("Stopping push thread.");
+
+    return NULL;
+}
+
+static gboolean activate_push(GstPad *pad, gboolean activate)
+{
+    struct wg_parser *parser = gst_pad_get_element_private(pad);
+
+    if (!activate)
+    {
+        if (parser->push_thread)
+        {
+            pthread_join(parser->push_thread, NULL);
+            parser->push_thread = 0;
+        }
+    }
+    else if (!parser->push_thread)
+    {
+        int ret;
+
+        if ((ret = pthread_create(&parser->push_thread, NULL, push_data, parser)))
+        {
+            GST_ERROR("Failed to create push thread: %s", strerror(errno));
+            parser->push_thread = 0;
+            return FALSE;
+        }
+    }
+    return TRUE;
+}
+
+static gboolean src_activate_mode_cb(GstPad *pad, GstObject *parent, GstPadMode mode, gboolean activate)
+{
+    struct wg_parser *parser = gst_pad_get_element_private(pad);
+
+    GST_DEBUG("%s source pad for parser %p in %s mode.",
+            activate ? "Activating" : "Deactivating", parser, gst_pad_mode_get_name(mode));
+
+    switch (mode)
+    {
+        case GST_PAD_MODE_PULL:
+            return TRUE;
+        case GST_PAD_MODE_PUSH:
+            return activate_push(pad, activate);
+        case GST_PAD_MODE_NONE:
+            break;
+    }
+    return FALSE;
 }
 
 static GstBusSyncReply bus_handler_cb(GstBus *bus, GstMessage *msg, gpointer user)
@@ -1347,11 +1440,89 @@ static GstBusSyncReply bus_handler_cb(GstBus *bus, GstMessage *msg, gpointer use
     return GST_BUS_DROP;
 }
 
+static gboolean src_perform_seek(struct wg_parser *parser, GstEvent *event)
+{
+    BOOL thread = !!parser->push_thread;
+    GstSeekType cur_type, stop_type;
+    GstFormat seek_format;
+    GstEvent *flush_event;
+    GstSeekFlags flags;
+    gint64 cur, stop;
+    guint32 seqnum;
+    gdouble rate;
+
+    gst_event_parse_seek(event, &rate, &seek_format, &flags,
+                         &cur_type, &cur, &stop_type, &stop);
+
+    if (seek_format != GST_FORMAT_BYTES)
+    {
+        GST_FIXME("Unhandled format \"%s\".", gst_format_get_name(seek_format));
+        return FALSE;
+    }
+
+    seqnum = gst_event_get_seqnum(event);
+
+    /* send flush start */
+    if (flags & GST_SEEK_FLAG_FLUSH)
+    {
+        flush_event = gst_event_new_flush_start();
+        gst_event_set_seqnum(flush_event, seqnum);
+        gst_pad_push_event(parser->my_src, flush_event);
+        if (thread)
+            gst_pad_set_active(parser->my_src, 1);
+    }
+
+    parser->next_offset = parser->start_offset = cur;
+
+    /* and prepare to continue streaming */
+    if (flags & GST_SEEK_FLAG_FLUSH)
+    {
+        flush_event = gst_event_new_flush_stop(TRUE);
+        gst_event_set_seqnum(flush_event, seqnum);
+        gst_pad_push_event(parser->my_src, flush_event);
+        if (thread)
+            gst_pad_set_active(parser->my_src, 1);
+    }
+
+    return TRUE;
+}
+
+static gboolean src_event_cb(GstPad *pad, GstObject *parent, GstEvent *event)
+{
+    struct wg_parser *parser = gst_pad_get_element_private(pad);
+    gboolean ret = TRUE;
+
+    GST_LOG("parser %p, type \"%s\".", parser, GST_EVENT_TYPE_NAME(event));
+
+    switch (event->type)
+    {
+        case GST_EVENT_SEEK:
+            ret = src_perform_seek(parser, event);
+            break;
+
+        case GST_EVENT_FLUSH_START:
+        case GST_EVENT_FLUSH_STOP:
+        case GST_EVENT_QOS:
+        case GST_EVENT_RECONFIGURE:
+            break;
+
+        default:
+            GST_WARNING("Ignoring \"%s\" event.", GST_EVENT_TYPE_NAME(event));
+            ret = FALSE;
+            break;
+    }
+    gst_event_unref(event);
+    return ret;
+}
+
 static HRESULT CDECL wg_parser_connect(struct wg_parser *parser, uint64_t file_size)
 {
+    GstStaticPadTemplate src_template = GST_STATIC_PAD_TEMPLATE("quartz_src",
+            GST_PAD_SRC, GST_PAD_ALWAYS, GST_STATIC_CAPS_ANY);
     unsigned int i;
     int ret;
 
+    parser->file_size = file_size;
     parser->sink_connected = true;
 
     if (!parser->bus)
@@ -1363,16 +1534,15 @@ static HRESULT CDECL wg_parser_connect(struct wg_parser *parser, uint64_t file_s
     parser->container = gst_bin_new(NULL);
     gst_element_set_bus(parser->container, parser->bus);
 
-    if (!(parser->appsrc = create_element("appsrc", "base")))
-        return E_FAIL;
-    gst_bin_add(GST_BIN(parser->container), parser->appsrc);
+    parser->my_src = gst_pad_new_from_static_template(&src_template, "quartz-src");
+    gst_pad_set_getrange_function(parser->my_src, src_getrange_cb);
+    gst_pad_set_query_function(parser->my_src, src_query_cb);
+    gst_pad_set_activatemode_function(parser->my_src, src_activate_mode_cb);
+    gst_pad_set_event_function(parser->my_src, src_event_cb);
+    gst_pad_set_element_private(parser->my_src, parser);
 
-    g_object_set(parser->appsrc, "stream-type", GST_APP_STREAM_TYPE_RANDOM_ACCESS, NULL);
-    g_object_set(parser->appsrc, "size", file_size, NULL);
-    g_signal_connect(parser->appsrc, "need-data", G_CALLBACK(src_need_data), parser);
-    g_signal_connect(parser->appsrc, "seek-data", G_CALLBACK(src_seek_data), parser);
-
-    parser->read_request.offset = 0;
+    parser->start_offset = parser->next_offset = parser->stop_offset = 0;
+    parser->next_pull_offset = 0;
     parser->error = false;
 
     if (!parser->init_gst(parser))
@@ -1468,11 +1638,18 @@ static HRESULT CDECL wg_parser_connect(struct wg_parser *parser, uint64_t file_s
 
     pthread_mutex_unlock(&parser->mutex);
 
+    parser->next_offset = 0;
     return S_OK;
 
 out:
     if (parser->container)
         gst_element_set_state(parser->container, GST_STATE_NULL);
+    if (parser->their_sink)
+    {
+        gst_pad_unlink(parser->my_src, parser->their_sink);
+        gst_object_unref(parser->their_sink);
+        parser->my_src = parser->their_sink = NULL;
+    }
 
     for (i = 0; i < parser->stream_count; ++i)
         free_stream(parser->streams[i]);
@@ -1509,6 +1686,10 @@ static void CDECL wg_parser_disconnect(struct wg_parser *parser)
     pthread_mutex_unlock(&parser->mutex);
 
     gst_element_set_state(parser->container, GST_STATE_NULL);
+    gst_pad_unlink(parser->my_src, parser->their_sink);
+    gst_object_unref(parser->my_src);
+    gst_object_unref(parser->their_sink);
+    parser->my_src = parser->their_sink = NULL;
 
     pthread_mutex_lock(&parser->mutex);
     parser->sink_connected = false;
@@ -1530,6 +1711,7 @@ static void CDECL wg_parser_disconnect(struct wg_parser *parser)
 static BOOL decodebin_parser_init_gst(struct wg_parser *parser)
 {
     GstElement *element;
+    int ret;
 
     if (!(element = create_element("decodebin", "base")))
         return FALSE;
@@ -1542,13 +1724,15 @@ static BOOL decodebin_parser_init_gst(struct wg_parser *parser)
     g_signal_connect(element, "autoplug-select", G_CALLBACK(autoplug_select_cb), parser);
     g_signal_connect(element, "no-more-pads", G_CALLBACK(no_more_pads_cb), parser);
 
+    parser->their_sink = gst_element_get_static_pad(element, "sink");
+
     pthread_mutex_lock(&parser->mutex);
     parser->no_more_pads = false;
     pthread_mutex_unlock(&parser->mutex);
 
-    if (!gst_element_link(parser->appsrc, parser->decodebin))
+    if ((ret = gst_pad_link(parser->my_src, parser->their_sink)) < 0)
     {
-        GST_ERROR("Failed to link appsrc.\n");
+        GST_ERROR("Failed to link pads, error %d.\n", ret);
         return FALSE;
     }
 
@@ -1558,6 +1742,7 @@ static BOOL decodebin_parser_init_gst(struct wg_parser *parser)
 static BOOL avi_parser_init_gst(struct wg_parser *parser)
 {
     GstElement *element;
+    int ret;
 
     if (!(element = create_element("avidemux", "good")))
         return FALSE;
@@ -1568,13 +1753,15 @@ static BOOL avi_parser_init_gst(struct wg_parser *parser)
     g_signal_connect(element, "pad-removed", G_CALLBACK(pad_removed_cb), parser);
     g_signal_connect(element, "no-more-pads", G_CALLBACK(no_more_pads_cb), parser);
 
+    parser->their_sink = gst_element_get_static_pad(element, "sink");
+
     pthread_mutex_lock(&parser->mutex);
     parser->no_more_pads = false;
     pthread_mutex_unlock(&parser->mutex);
 
-    if (!gst_element_link(parser->appsrc, element))
+    if ((ret = gst_pad_link(parser->my_src, parser->their_sink)) < 0)
     {
-        GST_ERROR("Failed to link appsrc.\n");
+        GST_ERROR("Failed to link pads, error %d.\n", ret);
         return FALSE;
     }
 
@@ -1592,9 +1779,10 @@ static BOOL mpeg_audio_parser_init_gst(struct wg_parser *parser)
 
     gst_bin_add(GST_BIN(parser->container), element);
 
-    if (!gst_element_link(parser->appsrc, element))
+    parser->their_sink = gst_element_get_static_pad(element, "sink");
+    if ((ret = gst_pad_link(parser->my_src, parser->their_sink)) < 0)
     {
-        GST_ERROR("Failed to link appsrc.\n");
+        GST_ERROR("Failed to link sink pads, error %d.\n", ret);
         return FALSE;
     }
 
@@ -1625,9 +1813,10 @@ static BOOL wave_parser_init_gst(struct wg_parser *parser)
 
     gst_bin_add(GST_BIN(parser->container), element);
 
-    if (!gst_element_link(parser->appsrc, element))
+    parser->their_sink = gst_element_get_static_pad(element, "sink");
+    if ((ret = gst_pad_link(parser->my_src, parser->their_sink)) < 0)
     {
-        GST_ERROR("Failed to link appsrc.\n");
+        GST_ERROR("Failed to link sink pads, error %d.\n", ret);
         return FALSE;
     }
 
@@ -1658,6 +1847,7 @@ static struct wg_parser *wg_parser_create(void)
     pthread_mutex_init(&parser->mutex, NULL);
     pthread_cond_init(&parser->init_cond, NULL);
     pthread_cond_init(&parser->read_cond, NULL);
+    pthread_cond_init(&parser->read_done_cond, NULL);
     parser->flushing = true;
 
     GST_DEBUG("Created winegstreamer parser %p.\n", parser);
@@ -1711,6 +1901,7 @@ static void CDECL wg_parser_destroy(struct wg_parser *parser)
     pthread_mutex_destroy(&parser->mutex);
     pthread_cond_destroy(&parser->init_cond);
     pthread_cond_destroy(&parser->read_cond);
+    pthread_cond_destroy(&parser->read_done_cond);
 
     free(parser);
 }
-- 
2.33.0




More information about the wine-devel mailing list