[PATCH v2 3/3] winegstreamer: Replace source pad interface with GstAppSrc.

Zebediah Figura zfigura at codeweavers.com
Mon Sep 13 12:57:50 CDT 2021


On 9/10/21 12:04 PM, Derek Lesho wrote:
> Signed-off-by: Derek Lesho <dlesho at codeweavers.com>
> ---
> v2:
> - Remodel read_request to only store a flag and the request size.
> - Fix race condition documented at https://gitlab.freedesktop.org/gstreamer/gst-plugins-base/-/issues/937
> - Send error message to bus to indicate a read error, instead of using an EOS.  (gst_app_src_create will now unblock on shutdown).
> ---
>   dlls/winegstreamer/wg_parser.c | 395 +++++++++------------------------
>   1 file changed, 99 insertions(+), 296 deletions(-)
> 
> diff --git a/dlls/winegstreamer/wg_parser.c b/dlls/winegstreamer/wg_parser.c
> index fc3ea49d0a7..68b26d7093c 100644
> --- a/dlls/winegstreamer/wg_parser.c
> +++ b/dlls/winegstreamer/wg_parser.c
> @@ -33,6 +33,7 @@
>   #include <gst/gst.h>
>   #include <gst/video/video.h>
>   #include <gst/audio/audio.h>
> +#include <gst/app/gstappsrc.h>
>   
>   /* GStreamer callbacks may be called on threads not created by Wine, and
>    * therefore cannot access the Wine TEB. This means that we must use GStreamer
> @@ -49,28 +50,22 @@ struct wg_parser
>       struct wg_parser_stream **streams;
>       unsigned int stream_count;
>   
> -    GstElement *container, *decodebin;
> +    GstElement *container, *appsrc, *decodebin;
>       GstBus *bus;
> -    GstPad *my_src, *their_sink;
>   
> -    guint64 file_size, start_offset, next_offset, stop_offset;
> +    guint64 file_size;
>       guint64 next_pull_offset;
>   
> -    pthread_t push_thread;
> -
>       pthread_mutex_t mutex;
>   
>       pthread_cond_t init_cond;
>       bool no_more_pads, has_duration, error;
>   
> -    pthread_cond_t read_cond, read_done_cond;
> +    pthread_cond_t read_cond;
>       struct
>       {
> -        void *data;
> -        uint64_t offset;
>           uint32_t size;
> -        bool done;
> -        bool ret;
> +        bool pending_read;
>       } read_request;
>   
>       bool flushing, sink_connected;

I would personally be inclined to either keep "read_request.offset" 
rather than "next_pull_offset", or get rid of the read_request 
structure. Probably the former.

> @@ -522,7 +517,7 @@ static bool CDECL wg_parser_get_next_read_offset(struct wg_parser *parser,
>   {
>       pthread_mutex_lock(&parser->mutex);
>   
> -    while (parser->sink_connected && !parser->read_request.data)
> +    while (parser->sink_connected && !parser->read_request.pending_read)
>           pthread_cond_wait(&parser->read_cond, &parser->mutex);
>   
>       if (!parser->sink_connected)
> @@ -531,7 +526,7 @@ static bool CDECL wg_parser_get_next_read_offset(struct wg_parser *parser,
>           return false;
>       }
>   
> -    *offset = parser->read_request.offset;
> +    *offset = parser->next_pull_offset;
>       *size = parser->read_request.size;
>   
>       pthread_mutex_unlock(&parser->mutex);
> @@ -541,15 +536,63 @@ static bool CDECL wg_parser_get_next_read_offset(struct wg_parser *parser,
>   static void CDECL wg_parser_push_data(struct wg_parser *parser,
>           const void *data, uint32_t size)
>   {
> +    GstBuffer *buffer;
> +    GstFlowReturn ret;
> +    GError *error;
> +    GstMessage *message;
> +
> +    if (!data)
> +    {
> +        pthread_mutex_lock(&parser->mutex);
> +
> +        error = g_error_new(G_FILE_ERROR, G_FILE_ERROR_FAILED, "WG-Parser client failed to read data at offset %" G_GUINT64_FORMAT, parser->next_pull_offset);
> +        message = gst_message_new_error(NULL, error, "");
> +        if (!gst_bus_post(parser->bus, message))
> +        {
> +            GST_ERROR("Failed to post error message to bus!\n");
> +            gst_message_unref(message);
> +        }

Is there a point in posting a message to the bus?

> +        parser->read_request.pending_read = false;

This is fine temporarily. Ultimately I think we should be terminating 
the client-side thread on error, and if possible not calling 
wg_parser_push_data() at all.

> +
> +        pthread_mutex_unlock(&parser->mutex);
> +        return;
> +    }
> +
> +    if (!size)
> +    {
> +        pthread_mutex_lock(&parser->mutex);
> +
> +        g_signal_emit_by_name(G_OBJECT(parser->appsrc), "end-of-stream", &ret);
> +        parser->read_request.pending_read = false;
> +
> +        pthread_mutex_unlock(&parser->mutex);
> +        return;
> +    }
> +
> +    /*  We could avoid this extra copy using gst_buffer_new_wrapped.
> +        However, PE wouldn't know when to release the buffer allocations as the buffer
> +        objects are queued, so we'd have to create a ring buffer the size of the gstappsrc
> +        queue on the PE side and validate that we don't overrun on the unix side.  I'm not
> +        yet convinced that trying to reduce copies of compressed data is worth the
> +        complexity.  */
> +    buffer = gst_buffer_new_and_alloc(size);
> +    gst_buffer_fill(buffer, 0, data, size);
> +
>       pthread_mutex_lock(&parser->mutex);
> -    parser->read_request.size = size;
> -    parser->read_request.done = true;
> -    parser->read_request.ret = !!data;
> -    if (data)
> -        memcpy(parser->read_request.data, data, size);
> -    parser->read_request.data = NULL;
> +    assert(parser->read_request.pending_read);
> +
> +    GST_BUFFER_OFFSET(buffer) = parser->next_pull_offset;
> +    g_signal_emit_by_name(G_OBJECT(parser->appsrc), "push-buffer", buffer, &ret);
> +
> +    /* In random-access mode, GST_FLOW_EOS shouldn't be returned */
> +    assert(ret == GST_FLOW_OK || ret == GST_FLOW_FLUSHING);
> +    if (ret == GST_FLOW_OK)
> +        parser->next_pull_offset += size;
> +    else
> +        gst_buffer_unref(buffer);

According to my reading of the source code, GstAppSrc will take the 
reference even if it returns GST_FLOW_FLUSHING. That's a bit unintuitive 
and should probably be clearly documented on the GStreamer side, but 
anyway...

> +
> +    parser->read_request.pending_read = false;
>       pthread_mutex_unlock(&parser->mutex);
> -    pthread_cond_signal(&parser->read_done_cond);
>   }
>   
>   static void CDECL wg_parser_set_unlimited_buffering(struct wg_parser *parser)



More information about the wine-devel mailing list