[PATCH v2 3/3] winegstreamer: Replace source pad interface with GstAppSrc.
Zebediah Figura
zfigura at codeweavers.com
Mon Sep 13 12:57:50 CDT 2021
On 9/10/21 12:04 PM, Derek Lesho wrote:
> Signed-off-by: Derek Lesho <dlesho at codeweavers.com>
> ---
> v2:
> - Remodel read_request to only store a flag and the request size.
> - Fix race condition documented at https://gitlab.freedesktop.org/gstreamer/gst-plugins-base/-/issues/937
> - Send error message to bus to indicate a read error, instead of using an EOS. (gst_app_src_create will now unblock on shutdown).
> ---
> dlls/winegstreamer/wg_parser.c | 395 +++++++++------------------------
> 1 file changed, 99 insertions(+), 296 deletions(-)
>
> diff --git a/dlls/winegstreamer/wg_parser.c b/dlls/winegstreamer/wg_parser.c
> index fc3ea49d0a7..68b26d7093c 100644
> --- a/dlls/winegstreamer/wg_parser.c
> +++ b/dlls/winegstreamer/wg_parser.c
> @@ -33,6 +33,7 @@
> #include <gst/gst.h>
> #include <gst/video/video.h>
> #include <gst/audio/audio.h>
> +#include <gst/app/gstappsrc.h>
>
> /* GStreamer callbacks may be called on threads not created by Wine, and
> * therefore cannot access the Wine TEB. This means that we must use GStreamer
> @@ -49,28 +50,22 @@ struct wg_parser
> struct wg_parser_stream **streams;
> unsigned int stream_count;
>
> - GstElement *container, *decodebin;
> + GstElement *container, *appsrc, *decodebin;
> GstBus *bus;
> - GstPad *my_src, *their_sink;
>
> - guint64 file_size, start_offset, next_offset, stop_offset;
> + guint64 file_size;
> guint64 next_pull_offset;
>
> - pthread_t push_thread;
> -
> pthread_mutex_t mutex;
>
> pthread_cond_t init_cond;
> bool no_more_pads, has_duration, error;
>
> - pthread_cond_t read_cond, read_done_cond;
> + pthread_cond_t read_cond;
> struct
> {
> - void *data;
> - uint64_t offset;
> uint32_t size;
> - bool done;
> - bool ret;
> + bool pending_read;
> } read_request;
>
> bool flushing, sink_connected;
I would personally be inclined to either keep "read_request.offset"
rather than "next_pull_offset", or get rid of the read_request
structure. Probably the former.
> @@ -522,7 +517,7 @@ static bool CDECL wg_parser_get_next_read_offset(struct wg_parser *parser,
> {
> pthread_mutex_lock(&parser->mutex);
>
> - while (parser->sink_connected && !parser->read_request.data)
> + while (parser->sink_connected && !parser->read_request.pending_read)
> pthread_cond_wait(&parser->read_cond, &parser->mutex);
>
> if (!parser->sink_connected)
> @@ -531,7 +526,7 @@ static bool CDECL wg_parser_get_next_read_offset(struct wg_parser *parser,
> return false;
> }
>
> - *offset = parser->read_request.offset;
> + *offset = parser->next_pull_offset;
> *size = parser->read_request.size;
>
> pthread_mutex_unlock(&parser->mutex);
> @@ -541,15 +536,63 @@ static bool CDECL wg_parser_get_next_read_offset(struct wg_parser *parser,
> static void CDECL wg_parser_push_data(struct wg_parser *parser,
> const void *data, uint32_t size)
> {
> + GstBuffer *buffer;
> + GstFlowReturn ret;
> + GError *error;
> + GstMessage *message;
> +
> + if (!data)
> + {
> + pthread_mutex_lock(&parser->mutex);
> +
> + error = g_error_new(G_FILE_ERROR, G_FILE_ERROR_FAILED, "WG-Parser client failed to read data at offset %" G_GUINT64_FORMAT, parser->next_pull_offset);
> + message = gst_message_new_error(NULL, error, "");
> + if (!gst_bus_post(parser->bus, message))
> + {
> + GST_ERROR("Failed to post error message to bus!\n");
> + gst_message_unref(message);
> + }
Is there a point in posting a message to the bus?
> + parser->read_request.pending_read = false;
This is fine temporarily. Ultimately I think we should be terminating
the client-side thread on error, and if possible not calling
wg_parser_push_data() at all.
> +
> + pthread_mutex_unlock(&parser->mutex);
> + return;
> + }
> +
> + if (!size)
> + {
> + pthread_mutex_lock(&parser->mutex);
> +
> + g_signal_emit_by_name(G_OBJECT(parser->appsrc), "end-of-stream", &ret);
> + parser->read_request.pending_read = false;
> +
> + pthread_mutex_unlock(&parser->mutex);
> + return;
> + }
> +
> + /* We could avoid this extra copy using gst_buffer_new_wrapped.
> + However, PE wouldn't know when to release the buffer allocations as the buffer
> + objects are queued, so we'd have to create a ring buffer the size of the gstappsrc
> + queue on the PE side and validate that we don't overrun on the unix side. I'm not
> + yet convinced that trying to reduce copies of compressed data is worth the
> + complexity. */
> + buffer = gst_buffer_new_and_alloc(size);
> + gst_buffer_fill(buffer, 0, data, size);
> +
> pthread_mutex_lock(&parser->mutex);
> - parser->read_request.size = size;
> - parser->read_request.done = true;
> - parser->read_request.ret = !!data;
> - if (data)
> - memcpy(parser->read_request.data, data, size);
> - parser->read_request.data = NULL;
> + assert(parser->read_request.pending_read);
> +
> + GST_BUFFER_OFFSET(buffer) = parser->next_pull_offset;
> + g_signal_emit_by_name(G_OBJECT(parser->appsrc), "push-buffer", buffer, &ret);
> +
> + /* In random-access mode, GST_FLOW_EOS shouldn't be returned */
> + assert(ret == GST_FLOW_OK || ret == GST_FLOW_FLUSHING);
> + if (ret == GST_FLOW_OK)
> + parser->next_pull_offset += size;
> + else
> + gst_buffer_unref(buffer);
According to my reading of the source code, GstAppSrc will take the
reference even if it returns GST_FLOW_FLUSHING. That's a bit unintuitive
and should probably be clearly documented on the GStreamer side, but
anyway...
> +
> + parser->read_request.pending_read = false;
> pthread_mutex_unlock(&parser->mutex);
> - pthread_cond_signal(&parser->read_done_cond);
> }
>
> static void CDECL wg_parser_set_unlimited_buffering(struct wg_parser *parser)
More information about the wine-devel
mailing list