[PATCH 3/5] winegstreamer: Replace source pad interface with GstAppSrc.

Zebediah Figura zfigura at codeweavers.com
Mon Sep 6 15:14:34 CDT 2021


On 9/1/21 4:05 PM, Derek Lesho wrote:
> Signed-off-by: Derek Lesho <dlesho at codeweavers.com>
> ---
>   dlls/winegstreamer/wg_parser.c | 379 +++++++--------------------------
>   1 file changed, 81 insertions(+), 298 deletions(-)
> 
> diff --git a/dlls/winegstreamer/wg_parser.c b/dlls/winegstreamer/wg_parser.c
> index 5de8ba84ed3..7f2627235f1 100644
> --- a/dlls/winegstreamer/wg_parser.c
> +++ b/dlls/winegstreamer/wg_parser.c
> @@ -33,6 +33,7 @@
>   #include <gst/gst.h>
>   #include <gst/video/video.h>
>   #include <gst/audio/audio.h>
> +#include <gst/app/gstappsrc.h>
>   
>   /* GStreamer callbacks may be called on threads not created by Wine, and
>    * therefore cannot access the Wine TEB. This means that we must use GStreamer
> @@ -49,28 +50,23 @@ struct wg_parser
>       struct wg_parser_stream **streams;
>       unsigned int stream_count;
>   
> -    GstElement *container, *decodebin;
> +    GstElement *container, *appsrc, *decodebin;
>       GstBus *bus;
> -    GstPad *my_src, *their_sink;
>   
> -    guint64 file_size, start_offset, next_offset, stop_offset;
> +    guint64 file_size;
>       guint64 next_pull_offset;
>   
> -    pthread_t push_thread;
> -
>       pthread_mutex_t mutex;
>   
>       pthread_cond_t init_cond;
>       bool no_more_pads, has_duration, error;
>   
> -    pthread_cond_t read_cond, read_done_cond;
> +    pthread_cond_t read_cond;
>       struct
>       {
>           void *data;

This field should be unused now, and as far as I can tell it is.

>           uint64_t offset;
>           uint32_t size;
> -        bool done;
> -        bool ret;
>       } read_request;
>   
>       bool flushing, sink_connected;
> @@ -522,7 +518,7 @@ static bool CDECL wg_parser_get_next_read_offset(struct wg_parser *parser,
>   {
>       pthread_mutex_lock(&parser->mutex);
>   
> -    while (parser->sink_connected && !parser->read_request.data)
> +    while (parser->sink_connected && parser->read_request.offset == -1)

"offset" is unsigned. I don't necessarily object to using a sentinel 
value, but it should probably be UINT64_MAX in that case.

That said... now we rather awkwardly duplicate the offset into 
next_pull_offset and read_request.offset. As far as I can tell, either 
these are identical, or read_request.offset is -1, which makes me think 
that one of them should just be a boolean flag.

(Actually, this isn't quite true, because I think you don't update 
read_request.offset unless we get need-data, but you still return it 
from wg_parser_get_next_read_offset(). That means that if we don't get 
need-data we'll end up sending the same buffer offset twice, which, 
while not apparently invalid, is probably not what we wanted to do. So 
there's another reason we probably want one offset and a boolean flag.)

>           pthread_cond_wait(&parser->read_cond, &parser->mutex);
>   
>       if (!parser->sink_connected)
> @@ -541,15 +537,47 @@ static bool CDECL wg_parser_get_next_read_offset(struct wg_parser *parser,
>   static void CDECL wg_parser_push_data(struct wg_parser *parser,
>           const void *data, uint32_t size)
>   {
> +    GstBuffer *buffer;
> +    GstFlowReturn ret;
> +
> +    if (!data)
> +    {
> +        /* premature EOS should trigger an error path */
> +        pthread_mutex_lock(&parser->mutex);
> +        parser->read_request.offset = -1;
> +        g_signal_emit_by_name(G_OBJECT(parser->appsrc), "end-of-stream", &ret);
> +        pthread_mutex_unlock(&parser->mutex);
> +        return;
> +    }

Hmm, this is as much a commentary on 2/5, but, I feel like EOS should be 
signaled in a consistent way, probably by passing a short (or even zero) 
size with a valid buffer.

The other way we can get here is error condition, and I'm not even sure 
we want to call wg_parser_push_data() in that case. The GStreamer side 
can't exactly do anything useful with that information.

In any case we shouldn't have to call 
g_signal_emit_by_name("end-of-stream") more than once in this function.

> +
> +    /*  We could avoid this extra copy using gst_buffer_new_wrapped.
> +        However, PE wouldn't know when to release the buffer allocations as the buffer
> +        objects are queued, so we'd have to create a ring buffer the size of the gstappsrc
> +        queue on the PE side and validate that we don't overrun on the unix side.  I'm not
> +        yet convinced that trying to reduce copies of compressed data is worth the
> +        complexity.  */
> +    buffer = gst_buffer_new_and_alloc(size);
> +    gst_buffer_fill(buffer, 0, data, size);
> +
>       pthread_mutex_lock(&parser->mutex);
> -    parser->read_request.size = size;
> -    parser->read_request.done = true;
> -    parser->read_request.ret = !!data;
> -    if (data)
> -        memcpy(parser->read_request.data, data, size);
> -    parser->read_request.data = NULL;
> +
> +    assert(parser->read_request.offset != -1);
> +    GST_BUFFER_OFFSET(buffer) = parser->read_request.offset;
> +    g_signal_emit_by_name(G_OBJECT(parser->appsrc), "push-buffer", buffer, &ret);
> +    parser->read_request.offset = -1;
> +
> +    /* In random-access mode, GST_FLOW_EOS shouldn't be returned */
> +    assert(ret == GST_FLOW_OK || ret == GST_FLOW_FLUSHING);
> +    if (ret == GST_FLOW_OK)
> +        parser->next_pull_offset += size;
> +    else
> +        gst_buffer_unref(buffer);
> +
> +    assert(parser->next_pull_offset <= parser->file_size);
> +    if (parser->next_pull_offset == parser->file_size)
> +        g_signal_emit_by_name(G_OBJECT(parser->appsrc), "end-of-stream", &ret);

The awkward thing about this is that either we might pass the wrong size 
to wg_parser_get_next_read_offset(), or appsrc guarantees that we don't 
but we're not validating it (i.e. we're only validating the offset). For 
what it's worth, I'm not sure the documentation guarantees either one :-/

Either way, this should probably live in 
wg_parser_get_next_read_offset() and not here.

> +
>       pthread_mutex_unlock(&parser->mutex);
> -    pthread_cond_signal(&parser->read_done_cond);
>   }
>   
>   static void CDECL wg_parser_set_unlimited_buffering(struct wg_parser *parser)
> @@ -1180,198 +1208,41 @@ static void pad_removed_cb(GstElement *element, GstPad *pad, gpointer user)
>       g_free(name);
>   }
>   
> -static GstFlowReturn src_getrange_cb(GstPad *pad, GstObject *parent,
> -        guint64 offset, guint size, GstBuffer **buffer)
> +static void src_need_data(GstElement *appsrc, guint length, gpointer user)
>   {
> -    struct wg_parser *parser = gst_pad_get_element_private(pad);
> -    GstBuffer *new_buffer = NULL;
> -    GstMapInfo map_info;
> -    bool ret;
> -
> -    GST_LOG("pad %p, offset %" G_GINT64_MODIFIER "u, length %u, buffer %p.", pad, offset, size, *buffer);
> -
> -    if (offset == GST_BUFFER_OFFSET_NONE)
> -        offset = parser->next_pull_offset;
> -    parser->next_pull_offset = offset + size;
> -    if (offset >= parser->file_size)
> -        return GST_FLOW_EOS;
> -    if (offset + size >= parser->file_size)
> -        size = parser->file_size - offset;
> -
> -    if (!*buffer)
> -        *buffer = new_buffer = gst_buffer_new_and_alloc(size);
> -
> -    gst_buffer_map(*buffer, &map_info, GST_MAP_WRITE);
> +    struct wg_parser *parser = user;
>   
>       pthread_mutex_lock(&parser->mutex);
>   
> -    assert(!parser->read_request.data);
> -    parser->read_request.data = map_info.data;
> -    parser->read_request.offset = offset;
> -    parser->read_request.size = size;
> -    parser->read_request.done = false;
> -    pthread_cond_signal(&parser->read_cond);
> -
> -    /* Note that we don't unblock this wait on GST_EVENT_FLUSH_START. We expect
> -     * the upstream pin to flush if necessary. We should never be blocked on
> -     * read_thread() not running. */
> -
> -    while (!parser->read_request.done)
> -        pthread_cond_wait(&parser->read_done_cond, &parser->mutex);
> -
> -    ret = parser->read_request.ret;
> -    gst_buffer_set_size(*buffer, parser->read_request.size);
> -
> -    pthread_mutex_unlock(&parser->mutex);
> -
> -    gst_buffer_unmap(*buffer, &map_info);
> -
> -    GST_LOG("Request returned %d.", ret);
> -
> -    if (!ret && new_buffer)
> -        gst_buffer_unref(new_buffer);
> -
> -    return ret ? GST_FLOW_OK : GST_FLOW_ERROR;
> -}
> -
> -static gboolean src_query_cb(GstPad *pad, GstObject *parent, GstQuery *query)
> -{
> -    struct wg_parser *parser = gst_pad_get_element_private(pad);
> -    GstFormat format;
> -
> -    GST_LOG("parser %p, type %s.", parser, GST_QUERY_TYPE_NAME(query));
> -
> -    switch (GST_QUERY_TYPE(query))
> -    {
> -        case GST_QUERY_DURATION:
> -            gst_query_parse_duration(query, &format, NULL);
> -            if (format == GST_FORMAT_PERCENT)
> -            {
> -                gst_query_set_duration(query, GST_FORMAT_PERCENT, GST_FORMAT_PERCENT_MAX);
> -                return TRUE;
> -            }
> -            else if (format == GST_FORMAT_BYTES)
> -            {
> -                gst_query_set_duration(query, GST_FORMAT_BYTES, parser->file_size);
> -                return TRUE;
> -            }
> -            return FALSE;
> -
> -        case GST_QUERY_SEEKING:
> -            gst_query_parse_seeking (query, &format, NULL, NULL, NULL);
> -            if (format != GST_FORMAT_BYTES)
> -            {
> -                GST_WARNING("Cannot seek using format \"%s\".", gst_format_get_name(format));
> -                return FALSE;
> -            }
> -            gst_query_set_seeking(query, GST_FORMAT_BYTES, 1, 0, parser->file_size);
> -            return TRUE;
> -
> -        case GST_QUERY_SCHEDULING:
> -            gst_query_set_scheduling(query, GST_SCHEDULING_FLAG_SEEKABLE, 1, -1, 0);
> -            gst_query_add_scheduling_mode(query, GST_PAD_MODE_PUSH);
> -            gst_query_add_scheduling_mode(query, GST_PAD_MODE_PULL);
> -            return TRUE;
> -
> -        default:
> -            GST_WARNING("Unhandled query type %s.", GST_QUERY_TYPE_NAME(query));
> -            return FALSE;
> -    }
> -}
> -
> -static void *push_data(void *arg)
> -{
> -    struct wg_parser *parser = arg;
> -    GstBuffer *buffer;
> -    guint max_size;
> -
> -    GST_DEBUG("Starting push thread.");
> -
> -    if (!(buffer = gst_buffer_new_allocate(NULL, 16384, NULL)))
> -    {
> -        GST_ERROR("Failed to allocate memory.");
> -        return NULL;
> -    }
> -
> -    max_size = parser->stop_offset ? parser->stop_offset : parser->file_size;
> -
> -    for (;;)
> +    /* Sometimes GstAppSrc sends identical need-data requests when it is woken up,
> +       we can rely on push-buffer not having completed (and hence offset being -1)
> +       because they are blocked on the internal mutex held by the pulling thread
> +       calling this callback. */
> +    if (parser->read_request.offset != -1)
>       {

Well, sure, but do we really need this if block at all?

> -        ULONG size;
> -        int ret;
> -
> -        if (parser->next_offset >= max_size)
> -            break;
> -        size = min(16384, max_size - parser->next_offset);
> -
> -        if ((ret = src_getrange_cb(parser->my_src, NULL, parser->next_offset, size, &buffer)) < 0)
> -        {
> -            GST_ERROR("Failed to read data, ret %s.", gst_flow_get_name(ret));
> -            break;
> -        }
> -
> -        parser->next_offset += size;
> -
> -        buffer->duration = buffer->pts = -1;
> -        if ((ret = gst_pad_push(parser->my_src, buffer)) < 0)
> -        {
> -            GST_ERROR("Failed to push data, ret %s.", gst_flow_get_name(ret));
> -            break;
> -        }
> +        pthread_mutex_unlock(&parser->mutex);
> +        return;
>       }
> +    parser->read_request.offset = parser->next_pull_offset;
> +    parser->read_request.size = length;
>   
> -    gst_buffer_unref(buffer);
> -
> -    gst_pad_push_event(parser->my_src, gst_event_new_eos());
> -
> -    GST_DEBUG("Stopping push thread.");
> +    pthread_cond_signal(&parser->read_cond);
>   
> -    return NULL;
> +    pthread_mutex_unlock(&parser->mutex);
>   }
>   
> -static gboolean activate_push(GstPad *pad, gboolean activate)
> +static gboolean src_seek_data(GstElement *appsrc, guint64 offset, gpointer user)
>   {
> -    struct wg_parser *parser = gst_pad_get_element_private(pad);
> -
> -    if (!activate)
> -    {
> -        if (parser->push_thread)
> -        {
> -            pthread_join(parser->push_thread, NULL);
> -            parser->push_thread = 0;
> -        }
> -    }
> -    else if (!parser->push_thread)
> -    {
> -        int ret;
> +    struct wg_parser *parser = user;
>   
> -        if ((ret = pthread_create(&parser->push_thread, NULL, push_data, parser)))
> -        {
> -            GST_ERROR("Failed to create push thread: %s", strerror(errno));
> -            parser->push_thread = 0;
> -            return FALSE;
> -        }
> -    }
> -    return TRUE;
> -}
> +    pthread_mutex_lock(&parser->mutex);
>   
> -static gboolean src_activate_mode_cb(GstPad *pad, GstObject *parent, GstPadMode mode, gboolean activate)
> -{
> -    struct wg_parser *parser = gst_pad_get_element_private(pad);
> +    assert(parser->read_request.offset == -1);

Is this guaranteed, though?

> +    parser->next_pull_offset = offset;
>   
> -    GST_DEBUG("%s source pad for parser %p in %s mode.",
> -            activate ? "Activating" : "Deactivating", parser, gst_pad_mode_get_name(mode));
> +    pthread_mutex_unlock(&parser->mutex);
>   
> -    switch (mode)
> -    {
> -        case GST_PAD_MODE_PULL:
> -            return TRUE;
> -        case GST_PAD_MODE_PUSH:
> -            return activate_push(pad, activate);
> -        case GST_PAD_MODE_NONE:
> -            break;
> -    }
> -    return FALSE;
> +    return true;
>   }
>   
>   static GstBusSyncReply bus_handler_cb(GstBus *bus, GstMessage *msg, gpointer user)




More information about the wine-devel mailing list