[PATCH 3/5] winegstreamer: Replace source pad interface with GstAppSrc.

Derek Lesho dlesho at codeweavers.com
Wed Sep 8 14:25:52 CDT 2021


On 9/6/21 4:14 PM, Zebediah Figura wrote:

>
>> pthread_cond_wait(&parser->read_cond, &parser->mutex);
>>         if (!parser->sink_connected)
>> @@ -541,15 +537,47 @@ static bool CDECL 
>> wg_parser_get_next_read_offset(struct wg_parser *parser,
>>   static void CDECL wg_parser_push_data(struct wg_parser *parser,
>>           const void *data, uint32_t size)
>>   {
>> +    GstBuffer *buffer;
>> +    GstFlowReturn ret;
>> +
>> +    if (!data)
>> +    {
>> +        /* premature EOS should trigger an error path */
>> +        pthread_mutex_lock(&parser->mutex);
>> +        parser->read_request.offset = -1;
>> +        g_signal_emit_by_name(G_OBJECT(parser->appsrc), 
>> "end-of-stream", &ret);
>> +        pthread_mutex_unlock(&parser->mutex);
>> +        return;
>> +    }
>
> Hmm, this is as much a commentary on 2/5, but, I feel like EOS should 
> be signaled in a consistent way, probably by passing a short (or even 
> zero) size with a valid buffer.
>
> The other way we can get here is error condition, and I'm not even 
> sure we want to call wg_parser_push_data() in that case. The GStreamer 
> side can't exactly do anything useful with that information.
Well, we need some way to not block forever upon an error. 
Traditionally, we used GStreamer error propagation for this; if we 
returned an error in getrange_cb, any of the blocking initialization 
functions would unblock and we could cleanup, and after initialization, 
we'd get an error in get_event.  Absent this, we need another mechanism 
to invoke a cleanup. wg_parser_disconnect(/wg_parser_destroy later) in 
read_thread could work if we made sure that no other threads use the 
objects after destruction, and for patch 2 we unblock getrange_cb upon 
disconnection.  Is this what you were going for?
>
> In any case we shouldn't have to call 
> g_signal_emit_by_name("end-of-stream") more than once in this function.
>
>> +
>> +    /*  We could avoid this extra copy using gst_buffer_new_wrapped.
>> +        However, PE wouldn't know when to release the buffer 
>> allocations as the buffer
>> +        objects are queued, so we'd have to create a ring buffer the 
>> size of the gstappsrc
>> +        queue on the PE side and validate that we don't overrun on 
>> the unix side.  I'm not
>> +        yet convinced that trying to reduce copies of compressed 
>> data is worth the
>> +        complexity.  */
>> +    buffer = gst_buffer_new_and_alloc(size);
>> +    gst_buffer_fill(buffer, 0, data, size);
>> +
>>       pthread_mutex_lock(&parser->mutex);
>> -    parser->read_request.size = size;
>> -    parser->read_request.done = true;
>> -    parser->read_request.ret = !!data;
>> -    if (data)
>> -        memcpy(parser->read_request.data, data, size);
>> -    parser->read_request.data = NULL;
>> +
>> +    assert(parser->read_request.offset != -1);
>> +    GST_BUFFER_OFFSET(buffer) = parser->read_request.offset;
>> +    g_signal_emit_by_name(G_OBJECT(parser->appsrc), "push-buffer", 
>> buffer, &ret);
>> +    parser->read_request.offset = -1;
>> +
>> +    /* In random-access mode, GST_FLOW_EOS shouldn't be returned */
>> +    assert(ret == GST_FLOW_OK || ret == GST_FLOW_FLUSHING);
>> +    if (ret == GST_FLOW_OK)
>> +        parser->next_pull_offset += size;
>> +    else
>> +        gst_buffer_unref(buffer);
>> +
>> +    assert(parser->next_pull_offset <= parser->file_size);
>> +    if (parser->next_pull_offset == parser->file_size)
>> +        g_signal_emit_by_name(G_OBJECT(parser->appsrc), 
>> "end-of-stream", &ret);
>
> The awkward thing about this is that either we might pass the wrong 
> size to wg_parser_get_next_read_offset(), or appsrc guarantees that we 
> don't but we're not validating it (i.e. we're only validating the 
> offset). For what it's worth, I'm not sure the documentation 
> guarantees either one :-/
>
> Either way, this should probably live in 
> wg_parser_get_next_read_offset() and not here.
My impression from "EOS should be signaled in a consistent way, probably 
by passing a short (or even zero) size with a valid buffer. " was that 
we should be determining EOS on the client side of the interface, and 
push_data seems a more natural fit for something determined on the 
client side, not to mention that in a future push mode, we will have to 
be determining EOS on the client side.
>
>> +static void src_need_data(GstElement *appsrc, guint length, gpointer 
>> user)
>>   {
>>
>> +    struct wg_parser *parser = user;
>>         pthread_mutex_lock(&parser->mutex);
>>
>> +    /* Sometimes GstAppSrc sends identical need-data requests when 
>> it is woken up,
>> +       we can rely on push-buffer not having completed (and hence 
>> offset being -1)
>> +       because they are blocked on the internal mutex held by the 
>> pulling thread
>> +       calling this callback. */
>> +    if (parser->read_request.offset != -1)
>>       {
>
> Well, sure, but do we really need this if block at all?

Yes, we do.  Without it, if there a spurious wakeup of appsrc's wait 
loop in gst_app_src_create (get_range) [1], and the "push-buffer" signal 
is signaled just after the loop's g_cond_wait reacquires the mutex on 
return [1], we could have "push-buffer" blocked on acquisition of the 
mutex [2] held by gst_app_src_create.  The function would block before 
pushing the buffer to the internal queue.  Then, gst_app_src_create 
continues and, seeing as there are no buffers in the queue, unlocks the 
mutex and calls need_data.  Here we have our race:

- if the need_data's code is called first, read_request's offset and 
size of overwritten with the same values. Then, the code waiting on the 
mutex pushes the buffer responding to our request, and everything 
continues as normal (as if another need_data never occurred).

(Prepare for the text wall 😁)

- If the push-buffer code waiting on the mutex continues first, the 
internal buffer queue has the buffer written to it, and the push-buffer 
signal returns.  Afterwards, read_request.offset is set to the sentinel 
value indicating that the request has been responded to, and that a new 
need-data is awaited, push_data returns.  Then, need_data is run, 
acquiring the mutex too late to catch the previous buffer send, and it 
requests a read of the same size directly following the prior read 
request, since the offset had been updated by push_data.  
gst_app_src_create then sees the pushed buffer and returns it back to 
the getrange-related function, only for another getrange to come in with 
an offset not directly following the last request's.  While the client 
code is reading the invalid request's data, src_seek_data is called. 
Right now we have an assert to make sure that there is no active request 
when seek_data is called, but if we didn't, we'd just set 
next_pull_offset to the seek's value.  Then, need_data would be called 
again, but all it would do is update the size to the new request's 
size.  After this, push_data (in response to the previous request) would 
be called and, for simplicity's sake, if the size of the two requests 
were identical, a push-buffer would be called, src_seek_data would pick 
up and have no way of knowing that the data is coming from after 
previous request's location in the file, which could cause any number of 
errors.  If we keep around both next_pull_offset and 
read_request.offset, we could compare them in push_data and discard the 
buffer if they don't match, but this seems a lot more complicated than 
catching this case earlier on and preventing all this confusion.

1: 
https://github.com/GStreamer/gst-plugins-base/blob/master/gst-libs/gst/app/gstappsrc.c#L1774

2: 
https://github.com/GStreamer/gst-plugins-base/blob/master/gst-libs/gst/app/gstappsrc.c#L2526

>> +static gboolean src_seek_data(GstElement *appsrc, guint64 offset, 
>> gpointer user)
>>   {
>> +    struct wg_parser *parser = user;
>> +    pthread_mutex_lock(&parser->mutex);
>> +    assert(parser->read_request.offset == -1);
>
> Is this guaranteed, though?
Do you mean by the publicly documented interface or the actual code?  
Looking at the code, in random access mode, it seems that the answer is 
yes.  getrange turns into a seek and need-data combo and getrange would 
only be called after the previous buffer is sent. And for push mode, I 
don't think we'll ever get a seek request (and if we did there'd be no 
way to service it, as MFTs aren't seekable).  Either way though, the 
assert probably isn't too necessary and is only there to increase 
understanding, if can easily be left out if you'd like.
>
>



More information about the wine-devel mailing list