[PATCH 3/5] winegstreamer: Replace source pad interface with GstAppSrc.
Derek Lesho
dlesho at codeweavers.com
Wed Sep 8 14:25:52 CDT 2021
On 9/6/21 4:14 PM, Zebediah Figura wrote:
>
>> pthread_cond_wait(&parser->read_cond, &parser->mutex);
>> if (!parser->sink_connected)
>> @@ -541,15 +537,47 @@ static bool CDECL
>> wg_parser_get_next_read_offset(struct wg_parser *parser,
>> static void CDECL wg_parser_push_data(struct wg_parser *parser,
>> const void *data, uint32_t size)
>> {
>> + GstBuffer *buffer;
>> + GstFlowReturn ret;
>> +
>> + if (!data)
>> + {
>> + /* premature EOS should trigger an error path */
>> + pthread_mutex_lock(&parser->mutex);
>> + parser->read_request.offset = -1;
>> + g_signal_emit_by_name(G_OBJECT(parser->appsrc),
>> "end-of-stream", &ret);
>> + pthread_mutex_unlock(&parser->mutex);
>> + return;
>> + }
>
> Hmm, this is as much a commentary on 2/5, but, I feel like EOS should
> be signaled in a consistent way, probably by passing a short (or even
> zero) size with a valid buffer.
>
> The other way we can get here is error condition, and I'm not even
> sure we want to call wg_parser_push_data() in that case. The GStreamer
> side can't exactly do anything useful with that information.
Well, we need some way to not block forever upon an error.
Traditionally, we used GStreamer error propagation for this; if we
returned an error in getrange_cb, any of the blocking initialization
functions would unblock and we could cleanup, and after initialization,
we'd get an error in get_event. Absent this, we need another mechanism
to invoke a cleanup. wg_parser_disconnect(/wg_parser_destroy later) in
read_thread could work if we made sure that no other threads use the
objects after destruction, and for patch 2 we unblock getrange_cb upon
disconnection. Is this what you were going for?
>
> In any case we shouldn't have to call
> g_signal_emit_by_name("end-of-stream") more than once in this function.
>
>> +
>> + /* We could avoid this extra copy using gst_buffer_new_wrapped.
>> + However, PE wouldn't know when to release the buffer
>> allocations as the buffer
>> + objects are queued, so we'd have to create a ring buffer the
>> size of the gstappsrc
>> + queue on the PE side and validate that we don't overrun on
>> the unix side. I'm not
>> + yet convinced that trying to reduce copies of compressed
>> data is worth the
>> + complexity. */
>> + buffer = gst_buffer_new_and_alloc(size);
>> + gst_buffer_fill(buffer, 0, data, size);
>> +
>> pthread_mutex_lock(&parser->mutex);
>> - parser->read_request.size = size;
>> - parser->read_request.done = true;
>> - parser->read_request.ret = !!data;
>> - if (data)
>> - memcpy(parser->read_request.data, data, size);
>> - parser->read_request.data = NULL;
>> +
>> + assert(parser->read_request.offset != -1);
>> + GST_BUFFER_OFFSET(buffer) = parser->read_request.offset;
>> + g_signal_emit_by_name(G_OBJECT(parser->appsrc), "push-buffer",
>> buffer, &ret);
>> + parser->read_request.offset = -1;
>> +
>> + /* In random-access mode, GST_FLOW_EOS shouldn't be returned */
>> + assert(ret == GST_FLOW_OK || ret == GST_FLOW_FLUSHING);
>> + if (ret == GST_FLOW_OK)
>> + parser->next_pull_offset += size;
>> + else
>> + gst_buffer_unref(buffer);
>> +
>> + assert(parser->next_pull_offset <= parser->file_size);
>> + if (parser->next_pull_offset == parser->file_size)
>> + g_signal_emit_by_name(G_OBJECT(parser->appsrc),
>> "end-of-stream", &ret);
>
> The awkward thing about this is that either we might pass the wrong
> size to wg_parser_get_next_read_offset(), or appsrc guarantees that we
> don't but we're not validating it (i.e. we're only validating the
> offset). For what it's worth, I'm not sure the documentation
> guarantees either one :-/
>
> Either way, this should probably live in
> wg_parser_get_next_read_offset() and not here.
My impression from "EOS should be signaled in a consistent way, probably
by passing a short (or even zero) size with a valid buffer. " was that
we should be determining EOS on the client side of the interface, and
push_data seems a more natural fit for something determined on the
client side, not to mention that in a future push mode, we will have to
be determining EOS on the client side.
>
>> +static void src_need_data(GstElement *appsrc, guint length, gpointer
>> user)
>> {
>>
>> + struct wg_parser *parser = user;
>> pthread_mutex_lock(&parser->mutex);
>>
>> + /* Sometimes GstAppSrc sends identical need-data requests when
>> it is woken up,
>> + we can rely on push-buffer not having completed (and hence
>> offset being -1)
>> + because they are blocked on the internal mutex held by the
>> pulling thread
>> + calling this callback. */
>> + if (parser->read_request.offset != -1)
>> {
>
> Well, sure, but do we really need this if block at all?
Yes, we do. Without it, if there a spurious wakeup of appsrc's wait
loop in gst_app_src_create (get_range) [1], and the "push-buffer" signal
is signaled just after the loop's g_cond_wait reacquires the mutex on
return [1], we could have "push-buffer" blocked on acquisition of the
mutex [2] held by gst_app_src_create. The function would block before
pushing the buffer to the internal queue. Then, gst_app_src_create
continues and, seeing as there are no buffers in the queue, unlocks the
mutex and calls need_data. Here we have our race:
- if the need_data's code is called first, read_request's offset and
size of overwritten with the same values. Then, the code waiting on the
mutex pushes the buffer responding to our request, and everything
continues as normal (as if another need_data never occurred).
(Prepare for the text wall 😁)
- If the push-buffer code waiting on the mutex continues first, the
internal buffer queue has the buffer written to it, and the push-buffer
signal returns. Afterwards, read_request.offset is set to the sentinel
value indicating that the request has been responded to, and that a new
need-data is awaited, push_data returns. Then, need_data is run,
acquiring the mutex too late to catch the previous buffer send, and it
requests a read of the same size directly following the prior read
request, since the offset had been updated by push_data.
gst_app_src_create then sees the pushed buffer and returns it back to
the getrange-related function, only for another getrange to come in with
an offset not directly following the last request's. While the client
code is reading the invalid request's data, src_seek_data is called.
Right now we have an assert to make sure that there is no active request
when seek_data is called, but if we didn't, we'd just set
next_pull_offset to the seek's value. Then, need_data would be called
again, but all it would do is update the size to the new request's
size. After this, push_data (in response to the previous request) would
be called and, for simplicity's sake, if the size of the two requests
were identical, a push-buffer would be called, src_seek_data would pick
up and have no way of knowing that the data is coming from after
previous request's location in the file, which could cause any number of
errors. If we keep around both next_pull_offset and
read_request.offset, we could compare them in push_data and discard the
buffer if they don't match, but this seems a lot more complicated than
catching this case earlier on and preventing all this confusion.
1:
https://github.com/GStreamer/gst-plugins-base/blob/master/gst-libs/gst/app/gstappsrc.c#L1774
2:
https://github.com/GStreamer/gst-plugins-base/blob/master/gst-libs/gst/app/gstappsrc.c#L2526
>> +static gboolean src_seek_data(GstElement *appsrc, guint64 offset,
>> gpointer user)
>> {
>> + struct wg_parser *parser = user;
>> + pthread_mutex_lock(&parser->mutex);
>> + assert(parser->read_request.offset == -1);
>
> Is this guaranteed, though?
Do you mean by the publicly documented interface or the actual code?
Looking at the code, in random access mode, it seems that the answer is
yes. getrange turns into a seek and need-data combo and getrange would
only be called after the previous buffer is sent. And for push mode, I
don't think we'll ever get a seek request (and if we did there'd be no
way to service it, as MFTs aren't seekable). Either way though, the
assert probably isn't too necessary and is only there to increase
understanding, if can easily be left out if you'd like.
>
>
More information about the wine-devel
mailing list