Zebediah Figura : winegstreamer: Manage our own thread for read requests in the media source.

Alexandre Julliard julliard at winehq.org
Fri Feb 19 17:20:07 CST 2021


Module: wine
Branch: master
Commit: b44d3a3908b949332487635b65ab24a98370484a
URL:    https://source.winehq.org/git/wine.git/?a=commit;h=b44d3a3908b949332487635b65ab24a98370484a

Author: Zebediah Figura <z.figura12 at gmail.com>
Date:   Thu Feb 18 17:01:24 2021 -0600

winegstreamer: Manage our own thread for read requests in the media source.

Signed-off-by: Zebediah Figura <z.figura12 at gmail.com>
Signed-off-by: Alexandre Julliard <julliard at winehq.org>

---

 dlls/winegstreamer/media_source.c | 139 +++++++++++++++++++++++++++++++++-----
 1 file changed, 121 insertions(+), 18 deletions(-)

diff --git a/dlls/winegstreamer/media_source.c b/dlls/winegstreamer/media_source.c
index ffbf8041ed8..6fb2c7f6f48 100644
--- a/dlls/winegstreamer/media_source.c
+++ b/dlls/winegstreamer/media_source.c
@@ -113,6 +113,21 @@ struct media_source
     HANDLE no_more_pads_event;
 
     uint64_t file_size, next_pull_offset;
+
+    HANDLE read_thread;
+    bool read_thread_shutdown;
+
+    pthread_mutex_t mutex;
+    pthread_cond_t read_cond, read_done_cond;
+    struct
+    {
+        void *data;
+        uint64_t offset;
+        uint32_t size;
+        bool done;
+        bool ret;
+    } read_request;
+    bool shutdown;
 };
 
 static inline struct media_stream *impl_from_IMFMediaStream(IMFMediaStream *iface)
@@ -442,42 +457,109 @@ GstFlowReturn bytestream_wrapper_pull(GstPad *pad, GstObject *parent, guint64 of
         GstBuffer **buf)
 {
     struct media_source *source = gst_pad_get_element_private(pad);
-    IMFByteStream *byte_stream = source->byte_stream;
     GstBuffer *new_buffer = NULL;
-    ULONG bytes_read;
     GstMapInfo info;
-    BOOL is_eof;
-    HRESULT hr;
+    bool ret;
 
     TRACE("requesting %u bytes at %s from source %p into buffer %p\n", len, wine_dbgstr_longlong(ofs), source, *buf);
 
     if (ofs == GST_BUFFER_OFFSET_NONE)
         ofs = source->next_pull_offset;
     source->next_pull_offset = ofs + len;
-
-    if (FAILED(IMFByteStream_SetCurrentPosition(byte_stream, ofs)))
-        return GST_FLOW_ERROR;
-
-    if (FAILED(IMFByteStream_IsEndOfStream(byte_stream, &is_eof)))
-        return GST_FLOW_ERROR;
-    if (is_eof)
+    if (ofs >= source->file_size)
         return GST_FLOW_EOS;
+    if (ofs + len >= source->file_size)
+        len = source->file_size - ofs;
 
     if (!(*buf))
         *buf = new_buffer = gst_buffer_new_and_alloc(len);
     gst_buffer_map(*buf, &info, GST_MAP_WRITE);
-    hr = IMFByteStream_Read(byte_stream, info.data, len, &bytes_read);
+
+    pthread_mutex_lock(&source->mutex);
+
+    assert(!source->read_request.data);
+    source->read_request.data = info.data;
+    source->read_request.offset = ofs;
+    source->read_request.size = len;
+    source->read_request.done = false;
+    pthread_cond_signal(&source->read_cond);
+
+    /* Note that we don't unblock this wait on GST_EVENT_FLUSH_START. We expect
+     * the upstream pin to flush if necessary. We should never be blocked on
+     * read_thread() not running. */
+
+    while (!source->read_request.done)
+        pthread_cond_wait(&source->read_done_cond, &source->mutex);
+
+    ret = source->read_request.ret;
+
+    pthread_mutex_unlock(&source->mutex);
+
     gst_buffer_unmap(*buf, &info);
 
-    gst_buffer_set_size(*buf, bytes_read);
+    if (!ret && new_buffer)
+        gst_buffer_unref(new_buffer);
+    return ret ? GST_FLOW_OK : GST_FLOW_ERROR;
+}
+
+static bool get_read_request(struct media_source *source, void **data, uint64_t *offset, uint32_t *size)
+{
+    pthread_mutex_lock(&source->mutex);
+
+    while (!source->shutdown && !source->read_request.data)
+        pthread_cond_wait(&source->read_cond, &source->mutex);
+
+    if (source->shutdown)
+    {
+        pthread_mutex_unlock(&source->mutex);
+        return false;
+    }
+
+    *data = source->read_request.data;
+    *offset = source->read_request.offset;
+    *size = source->read_request.size;
+
+    pthread_mutex_unlock(&source->mutex);
+    return true;
+}
+
+static void complete_read_request(struct media_source *source, bool ret)
+{
+    pthread_mutex_lock(&source->mutex);
+    source->read_request.done = true;
+    source->read_request.ret = ret;
+    source->read_request.data = NULL;
+    pthread_mutex_unlock(&source->mutex);
+    pthread_cond_signal(&source->read_done_cond);
+}
 
-    if (FAILED(hr))
+static DWORD CALLBACK read_thread(void *arg)
+{
+    struct media_source *source = arg;
+    IMFByteStream *byte_stream = source->byte_stream;
+
+    TRACE("Starting read thread for media source %p.\n", source);
+
+    while (!source->read_thread_shutdown)
     {
-        if (new_buffer)
-            gst_buffer_unref(new_buffer);
-        return GST_FLOW_ERROR;
+        uint64_t offset;
+        ULONG ret_size;
+        uint32_t size;
+        HRESULT hr;
+        void *data;
+
+        if (!get_read_request(source, &data, &offset, &size))
+            continue;
+
+        if (SUCCEEDED(hr = IMFByteStream_SetCurrentPosition(byte_stream, offset)))
+            hr = IMFByteStream_Read(byte_stream, data, size, &ret_size);
+        if (SUCCEEDED(hr) && ret_size != size)
+            ERR("Unexpected short read: requested %u bytes, got %u.\n", size, ret_size);
+        complete_read_request(source, SUCCEEDED(hr));
     }
-    return GST_FLOW_OK;
+
+    TRACE("Media source is shutting down; exiting.\n");
+    return 0;
 }
 
 static gboolean bytestream_query(GstPad *pad, GstObject *parent, GstQuery *query)
@@ -1164,6 +1246,21 @@ static HRESULT WINAPI media_source_Shutdown(IMFMediaSource *iface)
         IMFMediaStream_Release(&stream->IMFMediaStream_iface);
     }
 
+    if (source->read_thread)
+    {
+        source->read_thread_shutdown = true;
+        pthread_mutex_lock(&source->mutex);
+        source->shutdown = true;
+        pthread_mutex_unlock(&source->mutex);
+        pthread_cond_signal(&source->read_cond);
+        WaitForSingleObject(source->read_thread, INFINITE);
+        CloseHandle(source->read_thread);
+    }
+
+    pthread_mutex_destroy(&source->mutex);
+    pthread_cond_destroy(&source->read_cond);
+    pthread_cond_destroy(&source->read_done_cond);
+
     if (source->stream_count)
         heap_free(source->streams);
 
@@ -1284,6 +1381,12 @@ static HRESULT media_source_constructor(IMFByteStream *bytestream, struct media_
     if (FAILED(hr = MFAllocateWorkQueue(&object->async_commands_queue)))
         goto fail;
 
+    pthread_mutex_init(&object->mutex, NULL);
+    pthread_cond_init(&object->read_cond, NULL);
+    pthread_cond_init(&object->read_done_cond, NULL);
+
+    object->read_thread = CreateThread(NULL, 0, read_thread, object, 0, NULL);
+
     object->container = gst_bin_new(NULL);
     object->bus = gst_bus_new();
     gst_bus_set_sync_handler(object->bus, mf_src_bus_watch_wrapper, object, NULL);




More information about the wine-cvs mailing list