[PATCH 2/6] winegstreamer: Manage our own thread for read requests in the media source.
Zebediah Figura
z.figura12 at gmail.com
Thu Feb 18 17:01:24 CST 2021
Signed-off-by: Zebediah Figura <z.figura12 at gmail.com>
---
dlls/winegstreamer/media_source.c | 139 ++++++++++++++++++++++++++----
1 file changed, 121 insertions(+), 18 deletions(-)
diff --git a/dlls/winegstreamer/media_source.c b/dlls/winegstreamer/media_source.c
index ffbf8041ed8..6fb2c7f6f48 100644
--- a/dlls/winegstreamer/media_source.c
+++ b/dlls/winegstreamer/media_source.c
@@ -113,6 +113,21 @@ struct media_source
HANDLE no_more_pads_event;
uint64_t file_size, next_pull_offset;
+
+ HANDLE read_thread;
+ bool read_thread_shutdown;
+
+ pthread_mutex_t mutex;
+ pthread_cond_t read_cond, read_done_cond;
+ struct
+ {
+ void *data;
+ uint64_t offset;
+ uint32_t size;
+ bool done;
+ bool ret;
+ } read_request;
+ bool shutdown;
};
static inline struct media_stream *impl_from_IMFMediaStream(IMFMediaStream *iface)
@@ -442,42 +457,109 @@ GstFlowReturn bytestream_wrapper_pull(GstPad *pad, GstObject *parent, guint64 of
GstBuffer **buf)
{
struct media_source *source = gst_pad_get_element_private(pad);
- IMFByteStream *byte_stream = source->byte_stream;
GstBuffer *new_buffer = NULL;
- ULONG bytes_read;
GstMapInfo info;
- BOOL is_eof;
- HRESULT hr;
+ bool ret;
TRACE("requesting %u bytes at %s from source %p into buffer %p\n", len, wine_dbgstr_longlong(ofs), source, *buf);
if (ofs == GST_BUFFER_OFFSET_NONE)
ofs = source->next_pull_offset;
source->next_pull_offset = ofs + len;
-
- if (FAILED(IMFByteStream_SetCurrentPosition(byte_stream, ofs)))
- return GST_FLOW_ERROR;
-
- if (FAILED(IMFByteStream_IsEndOfStream(byte_stream, &is_eof)))
- return GST_FLOW_ERROR;
- if (is_eof)
+ if (ofs >= source->file_size)
return GST_FLOW_EOS;
+ if (ofs + len >= source->file_size)
+ len = source->file_size - ofs;
if (!(*buf))
*buf = new_buffer = gst_buffer_new_and_alloc(len);
gst_buffer_map(*buf, &info, GST_MAP_WRITE);
- hr = IMFByteStream_Read(byte_stream, info.data, len, &bytes_read);
+
+ pthread_mutex_lock(&source->mutex);
+
+ assert(!source->read_request.data);
+ source->read_request.data = info.data;
+ source->read_request.offset = ofs;
+ source->read_request.size = len;
+ source->read_request.done = false;
+ pthread_cond_signal(&source->read_cond);
+
+ /* Note that we don't unblock this wait on GST_EVENT_FLUSH_START. We expect
+ * the upstream pin to flush if necessary. We should never be blocked on
+ * read_thread() not running. */
+
+ while (!source->read_request.done)
+ pthread_cond_wait(&source->read_done_cond, &source->mutex);
+
+ ret = source->read_request.ret;
+
+ pthread_mutex_unlock(&source->mutex);
+
gst_buffer_unmap(*buf, &info);
- gst_buffer_set_size(*buf, bytes_read);
+ if (!ret && new_buffer)
+ gst_buffer_unref(new_buffer);
+ return ret ? GST_FLOW_OK : GST_FLOW_ERROR;
+}
+
+static bool get_read_request(struct media_source *source, void **data, uint64_t *offset, uint32_t *size)
+{
+ pthread_mutex_lock(&source->mutex);
+
+ while (!source->shutdown && !source->read_request.data)
+ pthread_cond_wait(&source->read_cond, &source->mutex);
+
+ if (source->shutdown)
+ {
+ pthread_mutex_unlock(&source->mutex);
+ return false;
+ }
+
+ *data = source->read_request.data;
+ *offset = source->read_request.offset;
+ *size = source->read_request.size;
+
+ pthread_mutex_unlock(&source->mutex);
+ return true;
+}
+
+static void complete_read_request(struct media_source *source, bool ret)
+{
+ pthread_mutex_lock(&source->mutex);
+ source->read_request.done = true;
+ source->read_request.ret = ret;
+ source->read_request.data = NULL;
+ pthread_mutex_unlock(&source->mutex);
+ pthread_cond_signal(&source->read_done_cond);
+}
- if (FAILED(hr))
+static DWORD CALLBACK read_thread(void *arg)
+{
+ struct media_source *source = arg;
+ IMFByteStream *byte_stream = source->byte_stream;
+
+ TRACE("Starting read thread for media source %p.\n", source);
+
+ while (!source->read_thread_shutdown)
{
- if (new_buffer)
- gst_buffer_unref(new_buffer);
- return GST_FLOW_ERROR;
+ uint64_t offset;
+ ULONG ret_size;
+ uint32_t size;
+ HRESULT hr;
+ void *data;
+
+ if (!get_read_request(source, &data, &offset, &size))
+ continue;
+
+ if (SUCCEEDED(hr = IMFByteStream_SetCurrentPosition(byte_stream, offset)))
+ hr = IMFByteStream_Read(byte_stream, data, size, &ret_size);
+ if (SUCCEEDED(hr) && ret_size != size)
+ ERR("Unexpected short read: requested %u bytes, got %u.\n", size, ret_size);
+ complete_read_request(source, SUCCEEDED(hr));
}
- return GST_FLOW_OK;
+
+ TRACE("Media source is shutting down; exiting.\n");
+ return 0;
}
static gboolean bytestream_query(GstPad *pad, GstObject *parent, GstQuery *query)
@@ -1164,6 +1246,21 @@ static HRESULT WINAPI media_source_Shutdown(IMFMediaSource *iface)
IMFMediaStream_Release(&stream->IMFMediaStream_iface);
}
+ if (source->read_thread)
+ {
+ source->read_thread_shutdown = true;
+ pthread_mutex_lock(&source->mutex);
+ source->shutdown = true;
+ pthread_mutex_unlock(&source->mutex);
+ pthread_cond_signal(&source->read_cond);
+ WaitForSingleObject(source->read_thread, INFINITE);
+ CloseHandle(source->read_thread);
+ }
+
+ pthread_mutex_destroy(&source->mutex);
+ pthread_cond_destroy(&source->read_cond);
+ pthread_cond_destroy(&source->read_done_cond);
+
if (source->stream_count)
heap_free(source->streams);
@@ -1284,6 +1381,12 @@ static HRESULT media_source_constructor(IMFByteStream *bytestream, struct media_
if (FAILED(hr = MFAllocateWorkQueue(&object->async_commands_queue)))
goto fail;
+ pthread_mutex_init(&object->mutex, NULL);
+ pthread_cond_init(&object->read_cond, NULL);
+ pthread_cond_init(&object->read_done_cond, NULL);
+
+ object->read_thread = CreateThread(NULL, 0, read_thread, object, 0, NULL);
+
object->container = gst_bin_new(NULL);
object->bus = gst_bus_new();
gst_bus_set_sync_handler(object->bus, mf_src_bus_watch_wrapper, object, NULL);
--
2.30.1
More information about the wine-devel
mailing list