[PATCH 5/6] mfreadwrite: Implement async reader mode.

Nikolay Sivov nsivov at codeweavers.com
Mon Mar 23 08:22:15 CDT 2020


Signed-off-by: Nikolay Sivov <nsivov at codeweavers.com>
---
 dlls/mfreadwrite/main.c | 616 +++++++++++++++++++++++++++++-----------
 1 file changed, 447 insertions(+), 169 deletions(-)

diff --git a/dlls/mfreadwrite/main.c b/dlls/mfreadwrite/main.c
index 5cb3af84f5..fd5c7d3ebc 100644
--- a/dlls/mfreadwrite/main.c
+++ b/dlls/mfreadwrite/main.c
@@ -113,6 +113,23 @@ struct media_stream
     BOOL selected;
     BOOL presented;
     DWORD flags;
+    unsigned int requests;
+};
+
+enum source_reader_async_op
+{
+    SOURCE_READER_ASYNC_READ,
+    SOURCE_READER_ASYNC_FLUSH,
+    SOURCE_READER_ASYNC_SAMPLE_READY,
+};
+
+struct source_reader_async_command
+{
+    IUnknown IUnknown_iface;
+    LONG refcount;
+    enum source_reader_async_op op;
+    unsigned int flags;
+    unsigned int stream_index;
 };
 
 struct source_reader
@@ -120,6 +137,7 @@ struct source_reader
     IMFSourceReader IMFSourceReader_iface;
     IMFAsyncCallback source_events_callback;
     IMFAsyncCallback stream_events_callback;
+    IMFAsyncCallback async_commands_callback;
     LONG refcount;
     IMFMediaSource *source;
     IMFPresentationDescriptor *descriptor;
@@ -154,11 +172,74 @@ static struct source_reader *impl_from_stream_callback_IMFAsyncCallback(IMFAsync
     return CONTAINING_RECORD(iface, struct source_reader, stream_events_callback);
 }
 
+static struct source_reader *impl_from_async_commands_callback_IMFAsyncCallback(IMFAsyncCallback *iface)
+{
+    return CONTAINING_RECORD(iface, struct source_reader, async_commands_callback);
+}
+
+static struct source_reader_async_command *impl_from_async_command_IUnknown(IUnknown *iface)
+{
+    return CONTAINING_RECORD(iface, struct source_reader_async_command, IUnknown_iface);
+}
+
 static inline struct sink_writer *impl_from_IMFSinkWriter(IMFSinkWriter *iface)
 {
     return CONTAINING_RECORD(iface, struct sink_writer, IMFSinkWriter_iface);
 }
 
+static HRESULT WINAPI source_reader_async_command_QueryInterface(IUnknown *iface, REFIID riid, void **obj)
+{
+    if (IsEqualIID(riid, &IID_IUnknown))
+    {
+        *obj = iface;
+        IUnknown_AddRef(iface);
+        return S_OK;
+    }
+
+    WARN("Unsupported interface %s.\n", debugstr_guid(riid));
+    *obj = NULL;
+    return E_NOINTERFACE;
+}
+
+static ULONG WINAPI source_reader_async_command_AddRef(IUnknown *iface)
+{
+    struct source_reader_async_command *command = impl_from_async_command_IUnknown(iface);
+    return InterlockedIncrement(&command->refcount);
+}
+
+static ULONG WINAPI source_reader_async_command_Release(IUnknown *iface)
+{
+    struct source_reader_async_command *command = impl_from_async_command_IUnknown(iface);
+    ULONG refcount = InterlockedIncrement(&command->refcount);
+
+    if (!refcount)
+        heap_free(command);
+
+    return refcount;
+}
+
+static const IUnknownVtbl source_reader_async_command_vtbl =
+{
+    source_reader_async_command_QueryInterface,
+    source_reader_async_command_AddRef,
+    source_reader_async_command_Release,
+};
+
+static HRESULT source_reader_create_async_op(enum source_reader_async_op op, struct source_reader_async_command **ret)
+{
+    struct source_reader_async_command *command;
+
+    if (!(command = heap_alloc_zero(sizeof(*command))))
+        return E_OUTOFMEMORY;
+
+    command->IUnknown_iface.lpVtbl = &source_reader_async_command_vtbl;
+    command->op = op;
+
+    *ret = command;
+
+    return S_OK;
+}
+
 static HRESULT media_event_get_object(IMFMediaEvent *event, REFIID riid, void **obj)
 {
     PROPVARIANT value;
@@ -203,7 +284,7 @@ static HRESULT media_stream_get_id(IMFMediaStream *stream, DWORD *id)
     return hr;
 }
 
-static HRESULT WINAPI source_reader_source_events_callback_QueryInterface(IMFAsyncCallback *iface,
+static HRESULT WINAPI source_reader_callback_QueryInterface(IMFAsyncCallback *iface,
         REFIID riid, void **obj)
 {
     TRACE("%p, %s, %p.\n", iface, debugstr_guid(riid), obj);
@@ -233,12 +314,65 @@ static ULONG WINAPI source_reader_source_events_callback_Release(IMFAsyncCallbac
     return IMFSourceReader_Release(&reader->IMFSourceReader_iface);
 }
 
-static HRESULT WINAPI source_reader_source_events_callback_GetParameters(IMFAsyncCallback *iface,
+static HRESULT WINAPI source_reader_callback_GetParameters(IMFAsyncCallback *iface,
         DWORD *flags, DWORD *queue)
 {
     return E_NOTIMPL;
 }
 
+static void source_reader_queue_response(struct source_reader *reader, struct media_stream *stream, HRESULT status,
+        DWORD stream_flags, LONGLONG timestamp, IMFSample *sample)
+{
+    struct source_reader_async_command *command;
+    struct stream_response *response;
+    HRESULT hr;
+
+    response = heap_alloc_zero(sizeof(*response));
+    response->status = status;
+    response->stream_index = stream->index;
+    response->stream_flags = stream_flags;
+    response->timestamp = timestamp;
+    response->sample = sample;
+    if (response->sample)
+        IMFSample_AddRef(response->sample);
+
+    list_add_tail(&stream->responses, &response->entry);
+
+    if (stream->requests)
+    {
+        if (reader->async_callback)
+        {
+            if (SUCCEEDED(source_reader_create_async_op(SOURCE_READER_ASYNC_SAMPLE_READY, &command)))
+            {
+                command->stream_index = stream->index;
+                if (FAILED(hr = MFPutWorkItem(MFASYNC_CALLBACK_QUEUE_STANDARD, &reader->async_commands_callback,
+                        &command->IUnknown_iface)))
+                    WARN("Failed to submit async result, hr %#x.\n", hr);
+                IUnknown_Release(&command->IUnknown_iface);
+            }
+        }
+        else
+            WakeAllConditionVariable(&stream->sample_event);
+
+        stream->requests--;
+    }
+}
+
+static HRESULT source_reader_request_sample(struct source_reader *reader, struct media_stream *stream)
+{
+    HRESULT hr = S_OK;
+
+    if (stream->stream && !(stream->flags & STREAM_FLAG_SAMPLE_REQUESTED))
+    {
+        if (FAILED(hr = IMFMediaStream_RequestSample(stream->stream, NULL)))
+            WARN("Sample request failed, hr %#x.\n", hr);
+        else
+            stream->flags |= STREAM_FLAG_SAMPLE_REQUESTED;
+    }
+
+    return hr;
+}
+
 static HRESULT source_reader_new_stream_handler(struct source_reader *reader, IMFMediaEvent *event)
 {
     IMFMediaStream *stream;
@@ -274,8 +408,10 @@ static HRESULT source_reader_new_stream_handler(struct source_reader *reader, IM
                     WARN("Failed to subscribe to stream events, hr %#x.\n", hr);
                 }
 
-                /* Wake so any waiting ReadSample() calls have a chance to make requests. */
-                WakeAllConditionVariable(&reader->streams[i].sample_event);
+                EnterCriticalSection(&reader->streams[i].cs);
+                if (reader->streams[i].requests)
+                    source_reader_request_sample(reader, &reader->streams[i]);
+                LeaveCriticalSection(&reader->streams[i].cs);
             }
             break;
         }
@@ -358,10 +494,10 @@ static HRESULT WINAPI source_reader_source_events_callback_Invoke(IMFAsyncCallba
 
 static const IMFAsyncCallbackVtbl source_events_callback_vtbl =
 {
-    source_reader_source_events_callback_QueryInterface,
+    source_reader_callback_QueryInterface,
     source_reader_source_events_callback_AddRef,
     source_reader_source_events_callback_Release,
-    source_reader_source_events_callback_GetParameters,
+    source_reader_callback_GetParameters,
     source_reader_source_events_callback_Invoke,
 };
 
@@ -377,24 +513,7 @@ static ULONG WINAPI source_reader_stream_events_callback_Release(IMFAsyncCallbac
     return IMFSourceReader_Release(&reader->IMFSourceReader_iface);
 }
 
-static void source_reader_queue_response(struct media_stream *stream, HRESULT status, DWORD stream_index,
-        DWORD stream_flags, LONGLONG timestamp, IMFSample *sample)
-{
-    struct stream_response *response;
-
-    response = heap_alloc_zero(sizeof(*response));
-    response->status = status;
-    response->stream_index = stream_index;
-    response->stream_flags = stream_flags;
-    response->timestamp = timestamp;
-    response->sample = sample;
-    if (response->sample)
-        IMFSample_AddRef(response->sample);
-
-    list_add_tail(&stream->responses, &response->entry);
-}
-
-static HRESULT source_reader_pull_stream_samples(struct media_stream *stream)
+static HRESULT source_reader_pull_stream_samples(struct source_reader *reader, struct media_stream *stream)
 {
     MFT_OUTPUT_STREAM_INFO stream_info = { 0 };
     MFT_OUTPUT_DATA_BUFFER out_buffer;
@@ -435,7 +554,7 @@ static HRESULT source_reader_pull_stream_samples(struct media_stream *stream)
         if (FAILED(IMFSample_GetSampleTime(out_buffer.pSample, &timestamp)))
             WARN("Sample time wasn't set.\n");
 
-        source_reader_queue_response(stream, S_OK /* FIXME */, stream->index, 0, timestamp, out_buffer.pSample);
+        source_reader_queue_response(reader, stream, S_OK /* FIXME */, 0, timestamp, out_buffer.pSample);
         if (out_buffer.pSample)
             IMFSample_Release(out_buffer.pSample);
         if (out_buffer.pEvents)
@@ -445,7 +564,8 @@ static HRESULT source_reader_pull_stream_samples(struct media_stream *stream)
     return hr;
 }
 
-static HRESULT source_reader_process_sample(struct media_stream *stream, IMFSample *sample)
+static HRESULT source_reader_process_sample(struct source_reader *reader, struct media_stream *stream,
+        IMFSample *sample)
 {
     LONGLONG timestamp;
     HRESULT hr;
@@ -456,13 +576,13 @@ static HRESULT source_reader_process_sample(struct media_stream *stream, IMFSamp
         if (FAILED(IMFSample_GetSampleTime(sample, &timestamp)))
             WARN("Sample time wasn't set.\n");
 
-        source_reader_queue_response(stream, S_OK, stream->index, 0, timestamp, sample);
+        source_reader_queue_response(reader, stream, S_OK, 0, timestamp, sample);
         return S_OK;
     }
 
     /* It's assumed that decoder has 1 input and 1 output, both id's are 0. */
 
-    hr = source_reader_pull_stream_samples(stream);
+    hr = source_reader_pull_stream_samples(reader, stream);
     if (hr == MF_E_TRANSFORM_NEED_MORE_INPUT)
     {
         if (FAILED(hr = IMFTransform_ProcessInput(stream->decoder, 0, sample, 0)))
@@ -471,7 +591,7 @@ static HRESULT source_reader_process_sample(struct media_stream *stream, IMFSamp
             return hr;
         }
 
-        if ((hr = source_reader_pull_stream_samples(stream)) == MF_E_TRANSFORM_NEED_MORE_INPUT)
+        if ((hr = source_reader_pull_stream_samples(reader, stream)) == MF_E_TRANSFORM_NEED_MORE_INPUT)
             return S_OK;
     }
     else
@@ -507,17 +627,17 @@ static HRESULT source_reader_media_sample_handler(struct source_reader *reader,
     {
         if (id == reader->streams[i].id)
         {
+            /* FIXME: propagate processing errors? */
+
             EnterCriticalSection(&reader->streams[i].cs);
 
             reader->streams[i].flags &= ~STREAM_FLAG_SAMPLE_REQUESTED;
-            hr = source_reader_process_sample(&reader->streams[i], sample);
+            hr = source_reader_process_sample(reader, &reader->streams[i], sample);
+            if (reader->streams[i].requests)
+                source_reader_request_sample(reader, &reader->streams[i]);
 
             LeaveCriticalSection(&reader->streams[i].cs);
 
-            /* FIXME: propagate processing errors? */
-
-            WakeAllConditionVariable(&reader->streams[i].sample_event);
-
             break;
         }
     }
@@ -564,10 +684,12 @@ static HRESULT source_reader_media_stream_state_handler(struct source_reader *re
                     if (stream->decoder && SUCCEEDED(IMFTransform_ProcessMessage(stream->decoder,
                             MFT_MESSAGE_COMMAND_DRAIN, 0)))
                     {
-                        if ((hr = source_reader_pull_stream_samples(stream)) != MF_E_TRANSFORM_NEED_MORE_INPUT)
+                        if ((hr = source_reader_pull_stream_samples(reader, stream)) != MF_E_TRANSFORM_NEED_MORE_INPUT)
                             WARN("Failed to pull pending samples, hr %#x.\n", hr);
                     }
 
+                    source_reader_queue_response(reader, stream, S_OK, MF_SOURCE_READERF_ENDOFSTREAM, 0, NULL);
+
                     break;
                 case MEStreamSeeked:
                 case MEStreamStarted:
@@ -579,9 +701,8 @@ static HRESULT source_reader_media_stream_state_handler(struct source_reader *re
                     timestamp = SUCCEEDED(hr) ? value.u.hVal.QuadPart : 0;
                     PropVariantClear(&value);
 
-                    source_reader_queue_response(stream, hr, stream->index, MF_SOURCE_READERF_STREAMTICK, timestamp, NULL);
+                    source_reader_queue_response(reader, stream, hr, MF_SOURCE_READERF_STREAMTICK, timestamp, NULL);
 
-                    WakeAllConditionVariable(&stream->sample_event);
                     break;
                 default:
                     ;
@@ -589,8 +710,6 @@ static HRESULT source_reader_media_stream_state_handler(struct source_reader *re
 
             LeaveCriticalSection(&stream->cs);
 
-            WakeAllConditionVariable(&stream->sample_event);
-
             break;
         }
     }
@@ -644,13 +763,248 @@ static HRESULT WINAPI source_reader_stream_events_callback_Invoke(IMFAsyncCallba
 
 static const IMFAsyncCallbackVtbl stream_events_callback_vtbl =
 {
-    source_reader_source_events_callback_QueryInterface,
+    source_reader_callback_QueryInterface,
     source_reader_stream_events_callback_AddRef,
     source_reader_stream_events_callback_Release,
-    source_reader_source_events_callback_GetParameters,
+    source_reader_callback_GetParameters,
     source_reader_stream_events_callback_Invoke,
 };
 
+static ULONG WINAPI source_reader_async_commands_callback_AddRef(IMFAsyncCallback *iface)
+{
+    struct source_reader *reader = impl_from_async_commands_callback_IMFAsyncCallback(iface);
+    return IMFSourceReader_AddRef(&reader->IMFSourceReader_iface);
+}
+
+static ULONG WINAPI source_reader_async_commands_callback_Release(IMFAsyncCallback *iface)
+{
+    struct source_reader *reader = impl_from_async_commands_callback_IMFAsyncCallback(iface);
+    return IMFSourceReader_Release(&reader->IMFSourceReader_iface);
+}
+
+static struct stream_response *media_stream_pop_response(struct media_stream *stream)
+{
+    struct stream_response *response = NULL;
+    struct list *head;
+
+    if ((head = list_head(&stream->responses)))
+    {
+        response = LIST_ENTRY(head, struct stream_response, entry);
+        list_remove(&response->entry);
+    }
+
+    return response;
+}
+
+static void source_reader_release_response(struct stream_response *response)
+{
+    if (response->sample)
+        IMFSample_Release(response->sample);
+    heap_free(response);
+}
+
+static HRESULT source_reader_get_stream_selection(const struct source_reader *reader, DWORD index, BOOL *selected)
+{
+    IMFStreamDescriptor *sd;
+
+    if (FAILED(IMFPresentationDescriptor_GetStreamDescriptorByIndex(reader->descriptor, index, selected, &sd)))
+        return MF_E_INVALIDSTREAMNUMBER;
+    IMFStreamDescriptor_Release(sd);
+
+    return S_OK;
+}
+
+static HRESULT source_reader_start_source(struct source_reader *reader)
+{
+    BOOL selection_changed = FALSE;
+    PROPVARIANT position;
+    HRESULT hr = S_OK;
+    DWORD i;
+
+    if (reader->source_state == SOURCE_STATE_STARTED)
+    {
+        for (i = 0; i < reader->stream_count; ++i)
+        {
+            if (FAILED(hr = source_reader_get_stream_selection(reader, i, &reader->streams[i].selected)))
+                return hr;
+            selection_changed = reader->streams[i].selected ^ reader->streams[i].presented;
+            if (selection_changed)
+                break;
+        }
+    }
+
+    position.u.hVal.QuadPart = 0;
+    if (reader->source_state != SOURCE_STATE_STARTED || selection_changed)
+    {
+        position.vt = reader->source_state == SOURCE_STATE_STARTED ? VT_EMPTY : VT_I8;
+
+        /* Update cached stream selection if descriptor was accepted. */
+        if (SUCCEEDED(hr = IMFMediaSource_Start(reader->source, reader->descriptor, &GUID_NULL, &position)))
+        {
+            for (i = 0; i < reader->stream_count; ++i)
+                reader->streams[i].presented = reader->streams[i].selected;
+        }
+    }
+
+    return hr;
+}
+
+static BOOL source_reader_get_read_result(struct source_reader *reader, struct media_stream *stream, DWORD flags,
+        HRESULT *status, DWORD *stream_index, DWORD *stream_flags, LONGLONG *timestamp, IMFSample **sample)
+{
+    struct stream_response *response = NULL;
+    BOOL request_sample = FALSE;
+
+    if (list_empty(&stream->responses))
+    {
+        *status = S_OK;
+        *stream_index = stream->index;
+        *timestamp = 0;
+        *sample = NULL;
+
+        if (stream->state == STREAM_STATE_EOS)
+        {
+            *stream_flags = MF_SOURCE_READERF_ENDOFSTREAM;
+        }
+        else
+        {
+            request_sample = !(flags & MF_SOURCE_READER_CONTROLF_DRAIN);
+            *stream_flags = 0;
+        }
+    }
+    else
+    {
+        response = media_stream_pop_response(stream);
+
+        *status = response->status;
+        *stream_index = stream->index;
+        *stream_flags = response->stream_flags;
+        *timestamp = response->timestamp;
+        *sample = response->sample;
+        if (*sample)
+            IMFSample_AddRef(*sample);
+
+        source_reader_release_response(response);
+    }
+
+    return request_sample;
+}
+
+static HRESULT source_reader_get_stream_read_index(struct source_reader *reader, DWORD index, DWORD *stream_index)
+{
+    BOOL selected;
+    HRESULT hr;
+
+    switch (index)
+    {
+        case MF_SOURCE_READER_FIRST_VIDEO_STREAM:
+            *stream_index = reader->first_video_stream_index;
+            break;
+        case MF_SOURCE_READER_FIRST_AUDIO_STREAM:
+            *stream_index = reader->first_audio_stream_index;
+            break;
+        case MF_SOURCE_READER_ANY_STREAM:
+            FIXME("Non-specific requests are not supported.\n");
+            return E_NOTIMPL;
+        default:
+            *stream_index = index;
+    }
+
+    /* Can't read from deselected streams. */
+    if (SUCCEEDED(hr = source_reader_get_stream_selection(reader, *stream_index, &selected)) && !selected)
+        hr = MF_E_INVALIDREQUEST;
+
+    return hr;
+}
+
+static HRESULT WINAPI source_reader_async_commands_callback_Invoke(IMFAsyncCallback *iface, IMFAsyncResult *result)
+{
+    struct source_reader *reader = impl_from_async_commands_callback_IMFAsyncCallback(iface);
+    struct source_reader_async_command *command;
+    struct stream_response *response;
+    DWORD stream_index, stream_flags;
+    BOOL request_sample = FALSE;
+    struct media_stream *stream;
+    IMFSample *sample = NULL;
+    LONGLONG timestamp = 0;
+    HRESULT hr, status;
+    IUnknown *state;
+
+    if (FAILED(hr = IMFAsyncResult_GetState(result, &state)))
+        return hr;
+
+    command = impl_from_async_command_IUnknown(state);
+
+    switch (command->op)
+    {
+        case SOURCE_READER_ASYNC_READ:
+            if (FAILED(hr = source_reader_get_stream_read_index(reader, command->stream_index, &stream_index)))
+                return hr;
+
+            stream = &reader->streams[stream_index];
+
+            EnterCriticalSection(&stream->cs);
+
+            if (SUCCEEDED(hr = source_reader_start_source(reader)))
+            {
+                request_sample = source_reader_get_read_result(reader, stream, command->flags, &status, &stream_index,
+                        &stream_flags, &timestamp, &sample);
+
+                if (request_sample)
+                {
+                    stream->requests++;
+                    source_reader_request_sample(reader, stream);
+                    /* FIXME: set error stream/reader state on request failure */
+                }
+            }
+
+            LeaveCriticalSection(&stream->cs);
+
+            if (!request_sample)
+                IMFSourceReaderCallback_OnReadSample(reader->async_callback, status, stream_index, stream_flags,
+                        timestamp, sample);
+
+            if (sample)
+                IMFSample_Release(sample);
+
+            break;
+
+        case SOURCE_READER_ASYNC_SAMPLE_READY:
+            stream = &reader->streams[command->stream_index];
+
+            EnterCriticalSection(&stream->cs);
+            response = media_stream_pop_response(stream);
+            LeaveCriticalSection(&stream->cs);
+
+            if (response)
+            {
+                IMFSourceReaderCallback_OnReadSample(reader->async_callback, response->status, response->stream_index,
+                        response->stream_flags, response->timestamp, response->sample);
+                source_reader_release_response(response);
+            }
+
+            break;
+        case SOURCE_READER_ASYNC_FLUSH:
+            FIXME("Async flushing is not implemented.\n");
+            break;
+        default:
+            ;
+    }
+
+    IUnknown_Release(state);
+
+    return S_OK;
+}
+
+static const IMFAsyncCallbackVtbl async_commands_callback_vtbl =
+{
+    source_reader_callback_QueryInterface,
+    source_reader_async_commands_callback_AddRef,
+    source_reader_async_commands_callback_Release,
+    source_reader_callback_GetParameters,
+    source_reader_async_commands_callback_Invoke,
+};
+
 static HRESULT WINAPI src_reader_QueryInterface(IMFSourceReader *iface, REFIID riid, void **out)
 {
     struct source_reader *reader = impl_from_IMFSourceReader(iface);
@@ -683,13 +1037,6 @@ static ULONG WINAPI src_reader_AddRef(IMFSourceReader *iface)
     return refcount;
 }
 
-static void source_reader_release_response(struct stream_response *response)
-{
-    if (response->sample)
-        IMFSample_Release(response->sample);
-    heap_free(response);
-}
-
 static ULONG WINAPI src_reader_Release(IMFSourceReader *iface)
 {
     struct source_reader *reader = impl_from_IMFSourceReader(iface);
@@ -735,17 +1082,6 @@ static ULONG WINAPI src_reader_Release(IMFSourceReader *iface)
     return refcount;
 }
 
-static HRESULT source_reader_get_stream_selection(const struct source_reader *reader, DWORD index, BOOL *selected)
-{
-    IMFStreamDescriptor *sd;
-
-    if (FAILED(IMFPresentationDescriptor_GetStreamDescriptorByIndex(reader->descriptor, index, selected, &sd)))
-        return MF_E_INVALIDSTREAMNUMBER;
-    IMFStreamDescriptor_Release(sd);
-
-    return S_OK;
-}
-
 static HRESULT WINAPI src_reader_GetStreamSelection(IMFSourceReader *iface, DWORD index, BOOL *selected)
 {
     struct source_reader *reader = impl_from_IMFSourceReader(iface);
@@ -1132,101 +1468,36 @@ static HRESULT WINAPI src_reader_SetCurrentPosition(IMFSourceReader *iface, REFG
     return IMFMediaSource_Start(reader->source, reader->descriptor, format, position);
 }
 
-static struct stream_response *media_stream_pop_response(struct media_stream *stream)
-{
-    struct stream_response *response = NULL;
-    struct list *head;
-
-    if ((head = list_head(&stream->responses)))
-    {
-        response = LIST_ENTRY(head, struct stream_response, entry);
-        list_remove(&response->entry);
-    }
-
-    return response;
-}
-
-static HRESULT source_reader_start_source(struct source_reader *reader)
-{
-    BOOL selection_changed = FALSE;
-    PROPVARIANT position;
-    HRESULT hr = S_OK;
-    DWORD i;
-
-    if (reader->source_state == SOURCE_STATE_STARTED)
-    {
-        for (i = 0; i < reader->stream_count; ++i)
-        {
-            if (FAILED(hr = source_reader_get_stream_selection(reader, i, &reader->streams[i].selected)))
-                return hr;
-            selection_changed = reader->streams[i].selected ^ reader->streams[i].presented;
-            if (selection_changed)
-                break;
-        }
-    }
-
-    position.u.hVal.QuadPart = 0;
-    if (reader->source_state != SOURCE_STATE_STARTED || selection_changed)
-    {
-        position.vt = reader->source_state == SOURCE_STATE_STARTED ? VT_EMPTY : VT_I8;
-
-        /* Update cached stream selection if descriptor was accepted. */
-        if (SUCCEEDED(hr = IMFMediaSource_Start(reader->source, reader->descriptor, &GUID_NULL, &position)))
-        {
-            for (i = 0; i < reader->stream_count; ++i)
-                reader->streams[i].presented = reader->streams[i].selected;
-        }
-    }
-
-    return hr;
-}
-
 static HRESULT source_reader_read_sample(struct source_reader *reader, DWORD index, DWORD flags, DWORD *actual_index,
         DWORD *stream_flags, LONGLONG *timestamp, IMFSample **sample)
 {
-    struct stream_response *response;
+    unsigned int actual_index_tmp;
     struct media_stream *stream;
+    LONGLONG timestamp_tmp;
+    BOOL request_sample;
     DWORD stream_index;
     HRESULT hr = S_OK;
-    BOOL selected;
 
     if (!stream_flags || !sample)
         return E_POINTER;
 
     *sample = NULL;
 
-    if (timestamp)
-        *timestamp = 0;
-
-    switch (index)
-    {
-        case MF_SOURCE_READER_FIRST_VIDEO_STREAM:
-            stream_index = reader->first_video_stream_index;
-            break;
-        case MF_SOURCE_READER_FIRST_AUDIO_STREAM:
-            stream_index = reader->first_audio_stream_index;
-            break;
-        case MF_SOURCE_READER_ANY_STREAM:
-            FIXME("Non-specific requests are not supported.\n");
-            return E_NOTIMPL;
-        default:
-            stream_index = index;
-    }
+    if (!timestamp)
+        timestamp = &timestamp_tmp;
 
-    /* Can't read from deselected streams. */
-    if (SUCCEEDED(hr = source_reader_get_stream_selection(reader, stream_index, &selected)) && !selected)
-        hr = MF_E_INVALIDREQUEST;
+    if (!actual_index)
+        actual_index = &actual_index_tmp;
 
-    if (FAILED(hr))
+    if (FAILED(hr = source_reader_get_stream_read_index(reader, index, &stream_index)))
     {
+        *actual_index = index;
         *stream_flags = MF_SOURCE_READERF_ERROR;
-        if (actual_index)
-            *actual_index = index;
+        *timestamp = 0;
         return hr;
     }
 
-    if (actual_index)
-        *actual_index = stream_index;
+    *actual_index = stream_index;
 
     stream = &reader->streams[stream_index];
 
@@ -1234,57 +1505,36 @@ static HRESULT source_reader_read_sample(struct source_reader *reader, DWORD ind
 
     if (SUCCEEDED(hr = source_reader_start_source(reader)))
     {
-        if (!(flags & MF_SOURCE_READER_CONTROLF_DRAIN))
+        request_sample = source_reader_get_read_result(reader, stream, flags, &hr, actual_index, stream_flags,
+               timestamp, sample);
+
+        if (request_sample)
         {
             while (list_empty(&stream->responses) && stream->state != STREAM_STATE_EOS)
             {
-                if (stream->stream && !(stream->flags & STREAM_FLAG_SAMPLE_REQUESTED))
-                {
-                    if (FAILED(hr = IMFMediaStream_RequestSample(stream->stream, NULL)))
-                        WARN("Sample request failed, hr %#x.\n", hr);
-                    else
-                        stream->flags |= STREAM_FLAG_SAMPLE_REQUESTED;
-                }
+                stream->requests++;
+                if (FAILED(hr = source_reader_request_sample(reader, stream)))
+                    WARN("Failed to request a sample, hr %#x.\n", hr);
                 SleepConditionVariableCS(&stream->sample_event, &stream->cs, INFINITE);
             }
-        }
 
-        if ((response = media_stream_pop_response(stream)))
-        {
-            *stream_flags = response->stream_flags;
-            if (timestamp)
-                *timestamp = response->timestamp;
-            *sample = response->sample;
-            if (*sample)
-                IMFSample_AddRef(*sample);
-            hr = response->status;
-            source_reader_release_response(response);
-        }
-        else
-        {
-            *stream_flags = list_empty(&stream->responses) && stream->state == STREAM_STATE_EOS ?
-                    MF_SOURCE_READERF_ENDOFSTREAM : 0;
+            source_reader_get_read_result(reader, stream, flags, &hr, actual_index, stream_flags,
+                   timestamp, sample);
         }
     }
 
     LeaveCriticalSection(&stream->cs);
 
-    TRACE("Got sample %p, flags %#x.\n", *sample, *stream_flags);
+    TRACE("Stream %u, got sample %p, flags %#x.\n", *actual_index, *sample, *stream_flags);
 
     return hr;
 }
 
-static HRESULT source_reader_read_sample_async(struct source_reader *reader, DWORD index, DWORD flags)
-{
-    FIXME("Async mode is not implemented.\n");
-
-    return E_NOTIMPL;
-}
-
 static HRESULT WINAPI src_reader_ReadSample(IMFSourceReader *iface, DWORD index, DWORD flags, DWORD *actual_index,
         DWORD *stream_flags, LONGLONG *timestamp, IMFSample **sample)
 {
     struct source_reader *reader = impl_from_IMFSourceReader(iface);
+    struct source_reader_async_command *command;
     HRESULT hr;
 
     TRACE("%p, %#x, %#x, %p, %p, %p, %p\n", iface, index, flags, actual_index, stream_flags, timestamp, sample);
@@ -1294,7 +1544,14 @@ static HRESULT WINAPI src_reader_ReadSample(IMFSourceReader *iface, DWORD index,
         if (actual_index || stream_flags || timestamp || sample)
             return E_INVALIDARG;
 
-        hr = source_reader_read_sample_async(reader, index, flags);
+        if (FAILED(hr = source_reader_create_async_op(SOURCE_READER_ASYNC_READ, &command)))
+            return hr;
+
+        command->stream_index = index;
+        command->flags = flags;
+
+        hr = MFPutWorkItem(MFASYNC_CALLBACK_QUEUE_STANDARD, &reader->async_commands_callback, &command->IUnknown_iface);
+        IUnknown_Release(&command->IUnknown_iface);
     }
     else
         hr = source_reader_read_sample(reader, index, flags, actual_index, stream_flags, timestamp, sample);
@@ -1304,9 +1561,29 @@ static HRESULT WINAPI src_reader_ReadSample(IMFSourceReader *iface, DWORD index,
 
 static HRESULT WINAPI src_reader_Flush(IMFSourceReader *iface, DWORD index)
 {
+    struct source_reader *reader = impl_from_IMFSourceReader(iface);
+    struct source_reader_async_command *command;
+    HRESULT hr;
+
     FIXME("%p, %#x.\n", iface, index);
 
-    return E_NOTIMPL;
+    if (reader->async_callback)
+    {
+        if (FAILED(hr = source_reader_create_async_op(SOURCE_READER_ASYNC_FLUSH, &command)))
+            return hr;
+
+        command->stream_index = index;
+
+        hr = MFPutWorkItem(MFASYNC_CALLBACK_QUEUE_MULTITHREADED, &reader->async_commands_callback,
+                &command->IUnknown_iface);
+        IUnknown_Release(&command->IUnknown_iface);
+    }
+    else
+    {
+        hr = E_NOTIMPL;
+    }
+
+    return hr;
 }
 
 static HRESULT WINAPI src_reader_GetServiceForStream(IMFSourceReader *iface, DWORD index, REFGUID service,
@@ -1466,6 +1743,7 @@ static HRESULT create_source_reader_from_source(IMFMediaSource *source, IMFAttri
     object->IMFSourceReader_iface.lpVtbl = &srcreader_vtbl;
     object->source_events_callback.lpVtbl = &source_events_callback_vtbl;
     object->stream_events_callback.lpVtbl = &stream_events_callback_vtbl;
+    object->async_commands_callback.lpVtbl = &async_commands_callback_vtbl;
     object->refcount = 1;
     object->source = source;
     IMFMediaSource_AddRef(object->source);
-- 
2.25.1




More information about the wine-devel mailing list