[PATCH v2 3/3] winegstreamer: Replace source pad interface with GstAppSrc.
Zebediah Figura
zfigura at codeweavers.com
Mon Sep 13 14:24:57 CDT 2021
On 9/13/21 1:08 PM, Derek Lesho wrote:
>
> On 9/13/21 1:57 PM, Zebediah Figura wrote:
>>> @@ -541,15 +536,63 @@ static bool CDECL
>>> wg_parser_get_next_read_offset(struct wg_parser *parser,
>>> static void CDECL wg_parser_push_data(struct wg_parser *parser,
>>> const void *data, uint32_t size)
>>> {
>>> + GstBuffer *buffer;
>>> + GstFlowReturn ret;
>>> + GError *error;
>>> + GstMessage *message;
>>> +
>>> + if (!data)
>>> + {
>>> + pthread_mutex_lock(&parser->mutex);
>>> +
>>> + error = g_error_new(G_FILE_ERROR, G_FILE_ERROR_FAILED,
>>> "WG-Parser client failed to read data at offset %" G_GUINT64_FORMAT,
>>> parser->next_pull_offset);
>>> + message = gst_message_new_error(NULL, error, "");
>>> + if (!gst_bus_post(parser->bus, message))
>>> + {
>>> + GST_ERROR("Failed to post error message to bus!\n");
>>> + gst_message_unref(message);
>>> + }
>>
>> Is there a point in posting a message to the bus?
> The point would be to have a unified error path when something goes
> wrong. (whether the error be GStreamer internal or a read failure)
>>
Oh, I guess we would probably need to unblock initialization here; I
forgot that we did something other than just printing an error. Okay,
that makes sense.
I don't think there's any point checking for failure from
gst_bus_post(), though.
>>> + parser->read_request.pending_read = false;
>>
>> This is fine temporarily. Ultimately I think we should be terminating
>> the client-side thread on error, and if possible not calling
>> wg_parser_push_data() at all.
> It would be a little tricky to do so given that this thread exists
> throughout initialization, and not signaling the error to GStreamer
> would mean we have to add yet another shutdown path that doesn't
> interfere with the others.
>>
>>> +
>>> + pthread_mutex_unlock(&parser->mutex);
>>> + return;
>>> + }
>>> +
>>> + if (!size)
>>> + {
>>> + pthread_mutex_lock(&parser->mutex);
>>> +
>>> + g_signal_emit_by_name(G_OBJECT(parser->appsrc),
>>> "end-of-stream", &ret);
>>> + parser->read_request.pending_read = false;
>>> +
>>> + pthread_mutex_unlock(&parser->mutex);
>>> + return;
>>> + }
>>> +
>>> + /* We could avoid this extra copy using gst_buffer_new_wrapped.
>>> + However, PE wouldn't know when to release the buffer
>>> allocations as the buffer
>>> + objects are queued, so we'd have to create a ring buffer the
>>> size of the gstappsrc
>>> + queue on the PE side and validate that we don't overrun on
>>> the unix side. I'm not
>>> + yet convinced that trying to reduce copies of compressed
>>> data is worth the
>>> + complexity. */
>>> + buffer = gst_buffer_new_and_alloc(size);
>>> + gst_buffer_fill(buffer, 0, data, size);
>>> +
>>> pthread_mutex_lock(&parser->mutex);
>>> - parser->read_request.size = size;
>>> - parser->read_request.done = true;
>>> - parser->read_request.ret = !!data;
>>> - if (data)
>>> - memcpy(parser->read_request.data, data, size);
>>> - parser->read_request.data = NULL;
>>> + assert(parser->read_request.pending_read);
>>> +
>>> + GST_BUFFER_OFFSET(buffer) = parser->next_pull_offset;
>>> + g_signal_emit_by_name(G_OBJECT(parser->appsrc), "push-buffer",
>>> buffer, &ret);
>>> +
>>> + /* In random-access mode, GST_FLOW_EOS shouldn't be returned */
>>> + assert(ret == GST_FLOW_OK || ret == GST_FLOW_FLUSHING);
>>> + if (ret == GST_FLOW_OK)
>>> + parser->next_pull_offset += size;
>>> + else
>>> + gst_buffer_unref(buffer);
>>
>> According to my reading of the source code, GstAppSrc will take the
>> reference even if it returns GST_FLOW_FLUSHING. That's a bit
>> unintuitive and should probably be clearly documented on the GStreamer
>> side, but anyway...
> Good catch, thanks.
>>
>>> +
>>> + parser->read_request.pending_read = false;
>>> pthread_mutex_unlock(&parser->mutex);
>>> - pthread_cond_signal(&parser->read_done_cond);
>>> }
>>> static void CDECL wg_parser_set_unlimited_buffering(struct
>>> wg_parser *parser)
>>
>
More information about the wine-devel
mailing list