[PATCH 3/5] winegstreamer: Replace source pad interface with GstAppSrc.

Zebediah Figura (she/her) zfigura at codeweavers.com
Wed Sep 8 17:21:58 CDT 2021


On 9/8/21 2:25 PM, Derek Lesho wrote:
> On 9/6/21 4:14 PM, Zebediah Figura wrote:
> 
>>
>>> pthread_cond_wait(&parser->read_cond, &parser->mutex);
>>>         if (!parser->sink_connected)
>>> @@ -541,15 +537,47 @@ static bool CDECL 
>>> wg_parser_get_next_read_offset(struct wg_parser *parser,
>>>   static void CDECL wg_parser_push_data(struct wg_parser *parser,
>>>           const void *data, uint32_t size)
>>>   {
>>> +    GstBuffer *buffer;
>>> +    GstFlowReturn ret;
>>> +
>>> +    if (!data)
>>> +    {
>>> +        /* premature EOS should trigger an error path */
>>> +        pthread_mutex_lock(&parser->mutex);
>>> +        parser->read_request.offset = -1;
>>> +        g_signal_emit_by_name(G_OBJECT(parser->appsrc), 
>>> "end-of-stream", &ret);
>>> +        pthread_mutex_unlock(&parser->mutex);
>>> +        return;
>>> +    }
>>
>> Hmm, this is as much a commentary on 2/5, but, I feel like EOS should 
>> be signaled in a consistent way, probably by passing a short (or even 
>> zero) size with a valid buffer.
>>
>> The other way we can get here is error condition, and I'm not even 
>> sure we want to call wg_parser_push_data() in that case. The GStreamer 
>> side can't exactly do anything useful with that information.
> Well, we need some way to not block forever upon an error. 
> Traditionally, we used GStreamer error propagation for this; if we 
> returned an error in getrange_cb, any of the blocking initialization 
> functions would unblock and we could cleanup, and after initialization, 
> we'd get an error in get_event.  Absent this, we need another mechanism 
> to invoke a cleanup. wg_parser_disconnect(/wg_parser_destroy later) in 
> read_thread could work if we made sure that no other threads use the 
> objects after destruction, and for patch 2 we unblock getrange_cb upon 
> disconnection.  Is this what you were going for?

Ah, right, we'd temporarily break things if we didn't signal read errors 
somehow. So that's probably a good idea to keep around, at least 
temporarily.

I could be satisfied with, say, passing a NULL buffer to signal error, 
and a valid buffer but short or zero size to signal short read.

>>
>> In any case we shouldn't have to call 
>> g_signal_emit_by_name("end-of-stream") more than once in this function.
>>
>>> +
>>> +    /*  We could avoid this extra copy using gst_buffer_new_wrapped.
>>> +        However, PE wouldn't know when to release the buffer 
>>> allocations as the buffer
>>> +        objects are queued, so we'd have to create a ring buffer the 
>>> size of the gstappsrc
>>> +        queue on the PE side and validate that we don't overrun on 
>>> the unix side.  I'm not
>>> +        yet convinced that trying to reduce copies of compressed 
>>> data is worth the
>>> +        complexity.  */
>>> +    buffer = gst_buffer_new_and_alloc(size);
>>> +    gst_buffer_fill(buffer, 0, data, size);
>>> +
>>>       pthread_mutex_lock(&parser->mutex);
>>> -    parser->read_request.size = size;
>>> -    parser->read_request.done = true;
>>> -    parser->read_request.ret = !!data;
>>> -    if (data)
>>> -        memcpy(parser->read_request.data, data, size);
>>> -    parser->read_request.data = NULL;
>>> +
>>> +    assert(parser->read_request.offset != -1);
>>> +    GST_BUFFER_OFFSET(buffer) = parser->read_request.offset;
>>> +    g_signal_emit_by_name(G_OBJECT(parser->appsrc), "push-buffer", 
>>> buffer, &ret);
>>> +    parser->read_request.offset = -1;
>>> +
>>> +    /* In random-access mode, GST_FLOW_EOS shouldn't be returned */
>>> +    assert(ret == GST_FLOW_OK || ret == GST_FLOW_FLUSHING);
>>> +    if (ret == GST_FLOW_OK)
>>> +        parser->next_pull_offset += size;
>>> +    else
>>> +        gst_buffer_unref(buffer);
>>> +
>>> +    assert(parser->next_pull_offset <= parser->file_size);
>>> +    if (parser->next_pull_offset == parser->file_size)
>>> +        g_signal_emit_by_name(G_OBJECT(parser->appsrc), 
>>> "end-of-stream", &ret);
>>
>> The awkward thing about this is that either we might pass the wrong 
>> size to wg_parser_get_next_read_offset(), or appsrc guarantees that we 
>> don't but we're not validating it (i.e. we're only validating the 
>> offset). For what it's worth, I'm not sure the documentation 
>> guarantees either one :-/
>>
>> Either way, this should probably live in 
>> wg_parser_get_next_read_offset() and not here.
> My impression from "EOS should be signaled in a consistent way, probably 
> by passing a short (or even zero) size with a valid buffer. " was that 
> we should be determining EOS on the client side of the interface, and 
> push_data seems a more natural fit for something determined on the 
> client side, not to mention that in a future push mode, we will have to 
> be determining EOS on the client side.

Hmm, actually, you're probably right, we should signal EOS on the client 
side, especially if we want to support sources that don't have a fixed 
size (in theory these exist, although I've never seen one...)

Note though that quartz kind of awkwardly depends on the Unix library to 
do the right thing. So that would have to be fixed first.

>>
>>> +static void src_need_data(GstElement *appsrc, guint length, gpointer 
>>> user)
>>>   {
>>>
>>> +    struct wg_parser *parser = user;
>>>         pthread_mutex_lock(&parser->mutex);
>>>
>>> +    /* Sometimes GstAppSrc sends identical need-data requests when 
>>> it is woken up,
>>> +       we can rely on push-buffer not having completed (and hence 
>>> offset being -1)
>>> +       because they are blocked on the internal mutex held by the 
>>> pulling thread
>>> +       calling this callback. */
>>> +    if (parser->read_request.offset != -1)
>>>       {
>>
>> Well, sure, but do we really need this if block at all?
> 
> Yes, we do.  Without it, if there a spurious wakeup of appsrc's wait 
> loop in gst_app_src_create (get_range) [1], and the "push-buffer" signal 
> is signaled just after the loop's g_cond_wait reacquires the mutex on 
> return [1], we could have "push-buffer" blocked on acquisition of the 
> mutex [2] held by gst_app_src_create.  The function would block before 
> pushing the buffer to the internal queue.  Then, gst_app_src_create 
> continues and, seeing as there are no buffers in the queue, unlocks the 
> mutex and calls need_data.  Here we have our race:
> 
> - if the need_data's code is called first, read_request's offset and 
> size of overwritten with the same values. Then, the code waiting on the 
> mutex pushes the buffer responding to our request, and everything 
> continues as normal (as if another need_data never occurred).
> 
> (Prepare for the text wall 😁)
> 
> - If the push-buffer code waiting on the mutex continues first, the 
> internal buffer queue has the buffer written to it, and the push-buffer 
> signal returns.  Afterwards, read_request.offset is set to the sentinel 
> value indicating that the request has been responded to, and that a new 
> need-data is awaited, push_data returns.  Then, need_data is run, 
> acquiring the mutex too late to catch the previous buffer send, and it 
> requests a read of the same size directly following the prior read 
> request, since the offset had been updated by push_data. 
> gst_app_src_create then sees the pushed buffer and returns it back to 
> the getrange-related function, only for another getrange to come in with 
> an offset not directly following the last request's.  While the client 
> code is reading the invalid request's data, src_seek_data is called. 
> Right now we have an assert to make sure that there is no active request 
> when seek_data is called, but if we didn't, we'd just set 
> next_pull_offset to the seek's value.  Then, need_data would be called 
> again, but all it would do is update the size to the new request's 
> size.  After this, push_data (in response to the previous request) would 
> be called and, for simplicity's sake, if the size of the two requests 
> were identical, a push-buffer would be called, src_seek_data would pick 
> up and have no way of knowing that the data is coming from after 
> previous request's location in the file, which could cause any number of 
> errors.  If we keep around both next_pull_offset and 
> read_request.offset, we could compare them in push_data and discard the 
> buffer if they don't match, but this seems a lot more complicated than 
> catching this case earlier on and preventing all this confusion.
> 
> 1: 
> https://github.com/GStreamer/gst-plugins-base/blob/master/gst-libs/gst/app/gstappsrc.c#L1774 
> 
> 
> 2: 
> https://github.com/GStreamer/gst-plugins-base/blob/master/gst-libs/gst/app/gstappsrc.c#L2526 

Okay, so I was operating under the assumption that appsrc would drop 
buffers if the offset didn't match what it wanted, since that seemed 
like the only safe way to implement the behaviour the documentation 
describes. Apparently it doesn't. That means that I guess we can't send 
more than one buffer at a time, i.e. we really need to give it only what 
it asks for.

I'll have to submit a patch to GStreamer to clarify the documentation.

In effect I guess that means we should keep the need-data/enough-data 
introduced by this patch only for stream mode, i.e. push mode. For 
random access mode we should treat need-data as license to send only a 
single buffer. I.e. upon receiving wg_parser_push_data() I guess we 
should reset whatever flag or sentinel to nonsignaled, so that a 
subsequent wg_parser_get_next_read_offset() blocks.

>>> +static gboolean src_seek_data(GstElement *appsrc, guint64 offset, 
>>> gpointer user)
>>>   {
>>> +    struct wg_parser *parser = user;
>>> +    pthread_mutex_lock(&parser->mutex);
>>> +    assert(parser->read_request.offset == -1);
>>
>> Is this guaranteed, though?
> Do you mean by the publicly documented interface or the actual code? 
> Looking at the code, in random access mode, it seems that the answer is 
> yes.  getrange turns into a seek and need-data combo and getrange would 
> only be called after the previous buffer is sent. And for push mode, I 
> don't think we'll ever get a seek request (and if we did there'd be no 
> way to service it, as MFTs aren't seekable).  Either way though, the 
> assert probably isn't too necessary and is only there to increase 
> understanding, if can easily be left out if you'd like.

I guess this is essentially addressed by my previous comment.



More information about the wine-devel mailing list