[PATCH 3/5] winegstreamer: Replace source pad interface with GstAppSrc.
Derek Lesho
dlesho at codeweavers.com
Wed Sep 1 16:05:56 CDT 2021
Signed-off-by: Derek Lesho <dlesho at codeweavers.com>
---
dlls/winegstreamer/wg_parser.c | 379 +++++++--------------------------
1 file changed, 81 insertions(+), 298 deletions(-)
diff --git a/dlls/winegstreamer/wg_parser.c b/dlls/winegstreamer/wg_parser.c
index 5de8ba84ed3..7f2627235f1 100644
--- a/dlls/winegstreamer/wg_parser.c
+++ b/dlls/winegstreamer/wg_parser.c
@@ -33,6 +33,7 @@
#include <gst/gst.h>
#include <gst/video/video.h>
#include <gst/audio/audio.h>
+#include <gst/app/gstappsrc.h>
/* GStreamer callbacks may be called on threads not created by Wine, and
* therefore cannot access the Wine TEB. This means that we must use GStreamer
@@ -49,28 +50,23 @@ struct wg_parser
struct wg_parser_stream **streams;
unsigned int stream_count;
- GstElement *container, *decodebin;
+ GstElement *container, *appsrc, *decodebin;
GstBus *bus;
- GstPad *my_src, *their_sink;
- guint64 file_size, start_offset, next_offset, stop_offset;
+ guint64 file_size;
guint64 next_pull_offset;
- pthread_t push_thread;
-
pthread_mutex_t mutex;
pthread_cond_t init_cond;
bool no_more_pads, has_duration, error;
- pthread_cond_t read_cond, read_done_cond;
+ pthread_cond_t read_cond;
struct
{
void *data;
uint64_t offset;
uint32_t size;
- bool done;
- bool ret;
} read_request;
bool flushing, sink_connected;
@@ -522,7 +518,7 @@ static bool CDECL wg_parser_get_next_read_offset(struct wg_parser *parser,
{
pthread_mutex_lock(&parser->mutex);
- while (parser->sink_connected && !parser->read_request.data)
+ while (parser->sink_connected && parser->read_request.offset == -1)
pthread_cond_wait(&parser->read_cond, &parser->mutex);
if (!parser->sink_connected)
@@ -541,15 +537,47 @@ static bool CDECL wg_parser_get_next_read_offset(struct wg_parser *parser,
static void CDECL wg_parser_push_data(struct wg_parser *parser,
const void *data, uint32_t size)
{
+ GstBuffer *buffer;
+ GstFlowReturn ret;
+
+ if (!data)
+ {
+ /* premature EOS should trigger an error path */
+ pthread_mutex_lock(&parser->mutex);
+ parser->read_request.offset = -1;
+ g_signal_emit_by_name(G_OBJECT(parser->appsrc), "end-of-stream", &ret);
+ pthread_mutex_unlock(&parser->mutex);
+ return;
+ }
+
+ /* We could avoid this extra copy using gst_buffer_new_wrapped.
+ However, PE wouldn't know when to release the buffer allocations as the buffer
+ objects are queued, so we'd have to create a ring buffer the size of the gstappsrc
+ queue on the PE side and validate that we don't overrun on the unix side. I'm not
+ yet convinced that trying to reduce copies of compressed data is worth the
+ complexity. */
+ buffer = gst_buffer_new_and_alloc(size);
+ gst_buffer_fill(buffer, 0, data, size);
+
pthread_mutex_lock(&parser->mutex);
- parser->read_request.size = size;
- parser->read_request.done = true;
- parser->read_request.ret = !!data;
- if (data)
- memcpy(parser->read_request.data, data, size);
- parser->read_request.data = NULL;
+
+ assert(parser->read_request.offset != -1);
+ GST_BUFFER_OFFSET(buffer) = parser->read_request.offset;
+ g_signal_emit_by_name(G_OBJECT(parser->appsrc), "push-buffer", buffer, &ret);
+ parser->read_request.offset = -1;
+
+ /* In random-access mode, GST_FLOW_EOS shouldn't be returned */
+ assert(ret == GST_FLOW_OK || ret == GST_FLOW_FLUSHING);
+ if (ret == GST_FLOW_OK)
+ parser->next_pull_offset += size;
+ else
+ gst_buffer_unref(buffer);
+
+ assert(parser->next_pull_offset <= parser->file_size);
+ if (parser->next_pull_offset == parser->file_size)
+ g_signal_emit_by_name(G_OBJECT(parser->appsrc), "end-of-stream", &ret);
+
pthread_mutex_unlock(&parser->mutex);
- pthread_cond_signal(&parser->read_done_cond);
}
static void CDECL wg_parser_set_unlimited_buffering(struct wg_parser *parser)
@@ -1180,198 +1208,41 @@ static void pad_removed_cb(GstElement *element, GstPad *pad, gpointer user)
g_free(name);
}
-static GstFlowReturn src_getrange_cb(GstPad *pad, GstObject *parent,
- guint64 offset, guint size, GstBuffer **buffer)
+static void src_need_data(GstElement *appsrc, guint length, gpointer user)
{
- struct wg_parser *parser = gst_pad_get_element_private(pad);
- GstBuffer *new_buffer = NULL;
- GstMapInfo map_info;
- bool ret;
-
- GST_LOG("pad %p, offset %" G_GINT64_MODIFIER "u, length %u, buffer %p.", pad, offset, size, *buffer);
-
- if (offset == GST_BUFFER_OFFSET_NONE)
- offset = parser->next_pull_offset;
- parser->next_pull_offset = offset + size;
- if (offset >= parser->file_size)
- return GST_FLOW_EOS;
- if (offset + size >= parser->file_size)
- size = parser->file_size - offset;
-
- if (!*buffer)
- *buffer = new_buffer = gst_buffer_new_and_alloc(size);
-
- gst_buffer_map(*buffer, &map_info, GST_MAP_WRITE);
+ struct wg_parser *parser = user;
pthread_mutex_lock(&parser->mutex);
- assert(!parser->read_request.data);
- parser->read_request.data = map_info.data;
- parser->read_request.offset = offset;
- parser->read_request.size = size;
- parser->read_request.done = false;
- pthread_cond_signal(&parser->read_cond);
-
- /* Note that we don't unblock this wait on GST_EVENT_FLUSH_START. We expect
- * the upstream pin to flush if necessary. We should never be blocked on
- * read_thread() not running. */
-
- while (!parser->read_request.done)
- pthread_cond_wait(&parser->read_done_cond, &parser->mutex);
-
- ret = parser->read_request.ret;
- gst_buffer_set_size(*buffer, parser->read_request.size);
-
- pthread_mutex_unlock(&parser->mutex);
-
- gst_buffer_unmap(*buffer, &map_info);
-
- GST_LOG("Request returned %d.", ret);
-
- if (!ret && new_buffer)
- gst_buffer_unref(new_buffer);
-
- return ret ? GST_FLOW_OK : GST_FLOW_ERROR;
-}
-
-static gboolean src_query_cb(GstPad *pad, GstObject *parent, GstQuery *query)
-{
- struct wg_parser *parser = gst_pad_get_element_private(pad);
- GstFormat format;
-
- GST_LOG("parser %p, type %s.", parser, GST_QUERY_TYPE_NAME(query));
-
- switch (GST_QUERY_TYPE(query))
- {
- case GST_QUERY_DURATION:
- gst_query_parse_duration(query, &format, NULL);
- if (format == GST_FORMAT_PERCENT)
- {
- gst_query_set_duration(query, GST_FORMAT_PERCENT, GST_FORMAT_PERCENT_MAX);
- return TRUE;
- }
- else if (format == GST_FORMAT_BYTES)
- {
- gst_query_set_duration(query, GST_FORMAT_BYTES, parser->file_size);
- return TRUE;
- }
- return FALSE;
-
- case GST_QUERY_SEEKING:
- gst_query_parse_seeking (query, &format, NULL, NULL, NULL);
- if (format != GST_FORMAT_BYTES)
- {
- GST_WARNING("Cannot seek using format \"%s\".", gst_format_get_name(format));
- return FALSE;
- }
- gst_query_set_seeking(query, GST_FORMAT_BYTES, 1, 0, parser->file_size);
- return TRUE;
-
- case GST_QUERY_SCHEDULING:
- gst_query_set_scheduling(query, GST_SCHEDULING_FLAG_SEEKABLE, 1, -1, 0);
- gst_query_add_scheduling_mode(query, GST_PAD_MODE_PUSH);
- gst_query_add_scheduling_mode(query, GST_PAD_MODE_PULL);
- return TRUE;
-
- default:
- GST_WARNING("Unhandled query type %s.", GST_QUERY_TYPE_NAME(query));
- return FALSE;
- }
-}
-
-static void *push_data(void *arg)
-{
- struct wg_parser *parser = arg;
- GstBuffer *buffer;
- guint max_size;
-
- GST_DEBUG("Starting push thread.");
-
- if (!(buffer = gst_buffer_new_allocate(NULL, 16384, NULL)))
- {
- GST_ERROR("Failed to allocate memory.");
- return NULL;
- }
-
- max_size = parser->stop_offset ? parser->stop_offset : parser->file_size;
-
- for (;;)
+ /* Sometimes GstAppSrc sends identical need-data requests when it is woken up,
+ we can rely on push-buffer not having completed (and hence offset being -1)
+ because they are blocked on the internal mutex held by the pulling thread
+ calling this callback. */
+ if (parser->read_request.offset != -1)
{
- ULONG size;
- int ret;
-
- if (parser->next_offset >= max_size)
- break;
- size = min(16384, max_size - parser->next_offset);
-
- if ((ret = src_getrange_cb(parser->my_src, NULL, parser->next_offset, size, &buffer)) < 0)
- {
- GST_ERROR("Failed to read data, ret %s.", gst_flow_get_name(ret));
- break;
- }
-
- parser->next_offset += size;
-
- buffer->duration = buffer->pts = -1;
- if ((ret = gst_pad_push(parser->my_src, buffer)) < 0)
- {
- GST_ERROR("Failed to push data, ret %s.", gst_flow_get_name(ret));
- break;
- }
+ pthread_mutex_unlock(&parser->mutex);
+ return;
}
+ parser->read_request.offset = parser->next_pull_offset;
+ parser->read_request.size = length;
- gst_buffer_unref(buffer);
-
- gst_pad_push_event(parser->my_src, gst_event_new_eos());
-
- GST_DEBUG("Stopping push thread.");
+ pthread_cond_signal(&parser->read_cond);
- return NULL;
+ pthread_mutex_unlock(&parser->mutex);
}
-static gboolean activate_push(GstPad *pad, gboolean activate)
+static gboolean src_seek_data(GstElement *appsrc, guint64 offset, gpointer user)
{
- struct wg_parser *parser = gst_pad_get_element_private(pad);
-
- if (!activate)
- {
- if (parser->push_thread)
- {
- pthread_join(parser->push_thread, NULL);
- parser->push_thread = 0;
- }
- }
- else if (!parser->push_thread)
- {
- int ret;
+ struct wg_parser *parser = user;
- if ((ret = pthread_create(&parser->push_thread, NULL, push_data, parser)))
- {
- GST_ERROR("Failed to create push thread: %s", strerror(errno));
- parser->push_thread = 0;
- return FALSE;
- }
- }
- return TRUE;
-}
+ pthread_mutex_lock(&parser->mutex);
-static gboolean src_activate_mode_cb(GstPad *pad, GstObject *parent, GstPadMode mode, gboolean activate)
-{
- struct wg_parser *parser = gst_pad_get_element_private(pad);
+ assert(parser->read_request.offset == -1);
+ parser->next_pull_offset = offset;
- GST_DEBUG("%s source pad for parser %p in %s mode.",
- activate ? "Activating" : "Deactivating", parser, gst_pad_mode_get_name(mode));
+ pthread_mutex_unlock(&parser->mutex);
- switch (mode)
- {
- case GST_PAD_MODE_PULL:
- return TRUE;
- case GST_PAD_MODE_PUSH:
- return activate_push(pad, activate);
- case GST_PAD_MODE_NONE:
- break;
- }
- return FALSE;
+ return true;
}
static GstBusSyncReply bus_handler_cb(GstBus *bus, GstMessage *msg, gpointer user)
@@ -1418,85 +1289,8 @@ static GstBusSyncReply bus_handler_cb(GstBus *bus, GstMessage *msg, gpointer use
return GST_BUS_DROP;
}
-static gboolean src_perform_seek(struct wg_parser *parser, GstEvent *event)
-{
- BOOL thread = !!parser->push_thread;
- GstSeekType cur_type, stop_type;
- GstFormat seek_format;
- GstEvent *flush_event;
- GstSeekFlags flags;
- gint64 cur, stop;
- guint32 seqnum;
- gdouble rate;
-
- gst_event_parse_seek(event, &rate, &seek_format, &flags,
- &cur_type, &cur, &stop_type, &stop);
-
- if (seek_format != GST_FORMAT_BYTES)
- {
- GST_FIXME("Unhandled format \"%s\".", gst_format_get_name(seek_format));
- return FALSE;
- }
-
- seqnum = gst_event_get_seqnum(event);
-
- /* send flush start */
- if (flags & GST_SEEK_FLAG_FLUSH)
- {
- flush_event = gst_event_new_flush_start();
- gst_event_set_seqnum(flush_event, seqnum);
- gst_pad_push_event(parser->my_src, flush_event);
- if (thread)
- gst_pad_set_active(parser->my_src, 1);
- }
-
- parser->next_offset = parser->start_offset = cur;
-
- /* and prepare to continue streaming */
- if (flags & GST_SEEK_FLAG_FLUSH)
- {
- flush_event = gst_event_new_flush_stop(TRUE);
- gst_event_set_seqnum(flush_event, seqnum);
- gst_pad_push_event(parser->my_src, flush_event);
- if (thread)
- gst_pad_set_active(parser->my_src, 1);
- }
-
- return TRUE;
-}
-
-static gboolean src_event_cb(GstPad *pad, GstObject *parent, GstEvent *event)
-{
- struct wg_parser *parser = gst_pad_get_element_private(pad);
- gboolean ret = TRUE;
-
- GST_LOG("parser %p, type \"%s\".", parser, GST_EVENT_TYPE_NAME(event));
-
- switch (event->type)
- {
- case GST_EVENT_SEEK:
- ret = src_perform_seek(parser, event);
- break;
-
- case GST_EVENT_FLUSH_START:
- case GST_EVENT_FLUSH_STOP:
- case GST_EVENT_QOS:
- case GST_EVENT_RECONFIGURE:
- break;
-
- default:
- GST_WARNING("Ignoring \"%s\" event.", GST_EVENT_TYPE_NAME(event));
- ret = FALSE;
- break;
- }
- gst_event_unref(event);
- return ret;
-}
-
static HRESULT CDECL wg_parser_connect(struct wg_parser *parser, uint64_t file_size)
{
- GstStaticPadTemplate src_template = GST_STATIC_PAD_TEMPLATE("quartz_src",
- GST_PAD_SRC, GST_PAD_ALWAYS, GST_STATIC_CAPS_ANY);
unsigned int i;
parser->file_size = file_size;
@@ -1511,14 +1305,15 @@ static HRESULT CDECL wg_parser_connect(struct wg_parser *parser, uint64_t file_s
parser->container = gst_bin_new(NULL);
gst_element_set_bus(parser->container, parser->bus);
- parser->my_src = gst_pad_new_from_static_template(&src_template, "quartz-src");
- gst_pad_set_getrange_function(parser->my_src, src_getrange_cb);
- gst_pad_set_query_function(parser->my_src, src_query_cb);
- gst_pad_set_activatemode_function(parser->my_src, src_activate_mode_cb);
- gst_pad_set_event_function(parser->my_src, src_event_cb);
- gst_pad_set_element_private(parser->my_src, parser);
+ if (!(parser->appsrc = create_element("appsrc", "base")))
+ return E_FAIL;
+ gst_bin_add(GST_BIN(parser->container), parser->appsrc);
+
+ g_object_set(parser->appsrc, "stream-type", GST_APP_STREAM_TYPE_RANDOM_ACCESS, NULL);
+ g_object_set(parser->appsrc, "size", parser->file_size, NULL);
+ g_signal_connect(parser->appsrc, "need-data", G_CALLBACK(src_need_data), parser);
+ g_signal_connect(parser->appsrc, "seek-data", G_CALLBACK(src_seek_data), parser);
- parser->start_offset = parser->next_offset = parser->stop_offset = 0;
parser->next_pull_offset = 0;
if (!parser->init_gst(parser))
@@ -1598,7 +1393,6 @@ static HRESULT CDECL wg_parser_connect(struct wg_parser *parser, uint64_t file_s
pthread_mutex_unlock(&parser->mutex);
- parser->next_offset = 0;
return S_OK;
}
@@ -1640,10 +1434,6 @@ static void CDECL wg_parser_disconnect(struct wg_parser *parser)
pthread_mutex_unlock(&parser->mutex);
gst_element_set_state(parser->container, GST_STATE_NULL);
- gst_pad_unlink(parser->my_src, parser->their_sink);
- gst_object_unref(parser->my_src);
- gst_object_unref(parser->their_sink);
- parser->my_src = parser->their_sink = NULL;
pthread_mutex_lock(&parser->mutex);
parser->sink_connected = false;
@@ -1678,15 +1468,13 @@ static BOOL decodebin_parser_init_gst(struct wg_parser *parser)
g_signal_connect(element, "autoplug-select", G_CALLBACK(autoplug_select_cb), parser);
g_signal_connect(element, "no-more-pads", G_CALLBACK(no_more_pads_cb), parser);
- parser->their_sink = gst_element_get_static_pad(element, "sink");
-
pthread_mutex_lock(&parser->mutex);
parser->no_more_pads = parser->error = false;
pthread_mutex_unlock(&parser->mutex);
- if ((ret = gst_pad_link(parser->my_src, parser->their_sink)) < 0)
+ if (!gst_element_link(parser->appsrc, parser->decodebin))
{
- GST_ERROR("Failed to link pads, error %d.\n", ret);
+ GST_ERROR("Failed to link app source.\n");
return FALSE;
}
@@ -1725,15 +1513,13 @@ static BOOL avi_parser_init_gst(struct wg_parser *parser)
g_signal_connect(element, "pad-removed", G_CALLBACK(pad_removed_cb), parser);
g_signal_connect(element, "no-more-pads", G_CALLBACK(no_more_pads_cb), parser);
- parser->their_sink = gst_element_get_static_pad(element, "sink");
-
pthread_mutex_lock(&parser->mutex);
parser->no_more_pads = parser->error = false;
pthread_mutex_unlock(&parser->mutex);
- if ((ret = gst_pad_link(parser->my_src, parser->their_sink)) < 0)
+ if (!gst_element_link(parser->appsrc, element))
{
- GST_ERROR("Failed to link pads, error %d.\n", ret);
+ GST_ERROR("Failed to link app source.\n");
return FALSE;
}
@@ -1769,10 +1555,9 @@ static BOOL mpeg_audio_parser_init_gst(struct wg_parser *parser)
gst_bin_add(GST_BIN(parser->container), element);
- parser->their_sink = gst_element_get_static_pad(element, "sink");
- if ((ret = gst_pad_link(parser->my_src, parser->their_sink)) < 0)
+ if (!gst_element_link(parser->appsrc, element))
{
- GST_ERROR("Failed to link sink pads, error %d.\n", ret);
+ GST_ERROR("Failed to link app source.\n");
return FALSE;
}
@@ -1809,10 +1594,9 @@ static BOOL wave_parser_init_gst(struct wg_parser *parser)
gst_bin_add(GST_BIN(parser->container), element);
- parser->their_sink = gst_element_get_static_pad(element, "sink");
- if ((ret = gst_pad_link(parser->my_src, parser->their_sink)) < 0)
+ if (!gst_element_link(parser->appsrc, element))
{
- GST_ERROR("Failed to link sink pads, error %d.\n", ret);
+ GST_ERROR("Failed to link app source.\n");
return FALSE;
}
@@ -1849,8 +1633,8 @@ static struct wg_parser *wg_parser_create(void)
pthread_mutex_init(&parser->mutex, NULL);
pthread_cond_init(&parser->init_cond, NULL);
pthread_cond_init(&parser->read_cond, NULL);
- pthread_cond_init(&parser->read_done_cond, NULL);
parser->flushing = true;
+ parser->read_request.offset = -1;
GST_DEBUG("Created winegstreamer parser %p.\n", parser);
return parser;
@@ -1903,7 +1687,6 @@ static void CDECL wg_parser_destroy(struct wg_parser *parser)
pthread_mutex_destroy(&parser->mutex);
pthread_cond_destroy(&parser->init_cond);
pthread_cond_destroy(&parser->read_cond);
- pthread_cond_destroy(&parser->read_done_cond);
free(parser);
}
--
2.33.0
More information about the wine-devel
mailing list