[PATCH v2 3/3] winegstreamer: Replace source pad interface with GstAppSrc.

Zebediah Figura zfigura at codeweavers.com
Mon Sep 13 14:24:57 CDT 2021


On 9/13/21 1:08 PM, Derek Lesho wrote:
> 
> On 9/13/21 1:57 PM, Zebediah Figura wrote:
>>> @@ -541,15 +536,63 @@ static bool CDECL
>>> wg_parser_get_next_read_offset(struct wg_parser *parser,
>>>    static void CDECL wg_parser_push_data(struct wg_parser *parser,
>>>            const void *data, uint32_t size)
>>>    {
>>> +    GstBuffer *buffer;
>>> +    GstFlowReturn ret;
>>> +    GError *error;
>>> +    GstMessage *message;
>>> +
>>> +    if (!data)
>>> +    {
>>> +        pthread_mutex_lock(&parser->mutex);
>>> +
>>> +        error = g_error_new(G_FILE_ERROR, G_FILE_ERROR_FAILED,
>>> "WG-Parser client failed to read data at offset %" G_GUINT64_FORMAT,
>>> parser->next_pull_offset);
>>> +        message = gst_message_new_error(NULL, error, "");
>>> +        if (!gst_bus_post(parser->bus, message))
>>> +        {
>>> +            GST_ERROR("Failed to post error message to bus!\n");
>>> +            gst_message_unref(message);
>>> +        }
>>
>> Is there a point in posting a message to the bus?
> The point would be to have a unified error path when something goes
> wrong.  (whether the error be GStreamer internal or a read failure)
>>

Oh, I guess we would probably need to unblock initialization here; I 
forgot that we did something other than just printing an error. Okay, 
that makes sense.

I don't think there's any point checking for failure from 
gst_bus_post(), though.

>>> + parser->read_request.pending_read = false;
>>
>> This is fine temporarily. Ultimately I think we should be terminating
>> the client-side thread on error, and if possible not calling
>> wg_parser_push_data() at all.
> It would be a little tricky to do so given that this thread exists
> throughout initialization, and not signaling the error to GStreamer
> would mean we have to add yet another shutdown path that doesn't
> interfere with the others.
>>
>>> +
>>> +        pthread_mutex_unlock(&parser->mutex);
>>> +        return;
>>> +    }
>>> +
>>> +    if (!size)
>>> +    {
>>> +        pthread_mutex_lock(&parser->mutex);
>>> +
>>> +        g_signal_emit_by_name(G_OBJECT(parser->appsrc),
>>> "end-of-stream", &ret);
>>> +        parser->read_request.pending_read = false;
>>> +
>>> +        pthread_mutex_unlock(&parser->mutex);
>>> +        return;
>>> +    }
>>> +
>>> +    /*  We could avoid this extra copy using gst_buffer_new_wrapped.
>>> +        However, PE wouldn't know when to release the buffer
>>> allocations as the buffer
>>> +        objects are queued, so we'd have to create a ring buffer the
>>> size of the gstappsrc
>>> +        queue on the PE side and validate that we don't overrun on
>>> the unix side.  I'm not
>>> +        yet convinced that trying to reduce copies of compressed
>>> data is worth the
>>> +        complexity.  */
>>> +    buffer = gst_buffer_new_and_alloc(size);
>>> +    gst_buffer_fill(buffer, 0, data, size);
>>> +
>>>        pthread_mutex_lock(&parser->mutex);
>>> -    parser->read_request.size = size;
>>> -    parser->read_request.done = true;
>>> -    parser->read_request.ret = !!data;
>>> -    if (data)
>>> -        memcpy(parser->read_request.data, data, size);
>>> -    parser->read_request.data = NULL;
>>> +    assert(parser->read_request.pending_read);
>>> +
>>> +    GST_BUFFER_OFFSET(buffer) = parser->next_pull_offset;
>>> +    g_signal_emit_by_name(G_OBJECT(parser->appsrc), "push-buffer",
>>> buffer, &ret);
>>> +
>>> +    /* In random-access mode, GST_FLOW_EOS shouldn't be returned */
>>> +    assert(ret == GST_FLOW_OK || ret == GST_FLOW_FLUSHING);
>>> +    if (ret == GST_FLOW_OK)
>>> +        parser->next_pull_offset += size;
>>> +    else
>>> +        gst_buffer_unref(buffer);
>>
>> According to my reading of the source code, GstAppSrc will take the
>> reference even if it returns GST_FLOW_FLUSHING. That's a bit
>> unintuitive and should probably be clearly documented on the GStreamer
>> side, but anyway...
> Good catch, thanks.
>>
>>> +
>>> +    parser->read_request.pending_read = false;
>>>        pthread_mutex_unlock(&parser->mutex);
>>> -    pthread_cond_signal(&parser->read_done_cond);
>>>    }
>>>      static void CDECL wg_parser_set_unlimited_buffering(struct
>>> wg_parser *parser)
>>
> 



More information about the wine-devel mailing list