[PATCH 1/5] mfreadwrite: Use single reader lock for all streams.

Nikolay Sivov nsivov at codeweavers.com
Tue Mar 31 07:46:34 CDT 2020


Signed-off-by: Nikolay Sivov <nsivov at codeweavers.com>
---
 dlls/mfreadwrite/reader.c | 228 ++++++++++++++++++++++----------------
 1 file changed, 130 insertions(+), 98 deletions(-)

diff --git a/dlls/mfreadwrite/reader.c b/dlls/mfreadwrite/reader.c
index 9a7c5e5fd7..b8e1ab0106 100644
--- a/dlls/mfreadwrite/reader.c
+++ b/dlls/mfreadwrite/reader.c
@@ -108,9 +108,6 @@ struct media_stream
     IMFTransform *decoder;
     DWORD id;
     unsigned int index;
-    CRITICAL_SECTION cs;
-    CONDITION_VARIABLE sample_event;
-    struct list responses;
     enum media_stream_state state;
     BOOL selected;
     BOOL presented;
@@ -150,7 +147,9 @@ struct source_reader
     enum media_source_state source_state;
     struct media_stream *streams;
     DWORD stream_count;
+    struct list responses;
     CRITICAL_SECTION cs;
+    CONDITION_VARIABLE sample_event;
 };
 
 static inline struct source_reader *impl_from_IMFSourceReader(IMFSourceReader *iface)
@@ -327,7 +326,7 @@ static void source_reader_queue_response(struct source_reader *reader, struct me
     if (response->sample)
         IMFSample_AddRef(response->sample);
 
-    list_add_tail(&stream->responses, &response->entry);
+    list_add_tail(&reader->responses, &response->entry);
 
     if (stream->requests)
     {
@@ -343,7 +342,7 @@ static void source_reader_queue_response(struct source_reader *reader, struct me
             }
         }
         else
-            WakeAllConditionVariable(&stream->sample_event);
+            WakeAllConditionVariable(&reader->sample_event);
 
         stream->requests--;
     }
@@ -386,12 +385,15 @@ static HRESULT source_reader_new_stream_handler(struct source_reader *reader, IM
         return hr;
     }
 
+    EnterCriticalSection(&reader->cs);
+
     for (i = 0; i < reader->stream_count; ++i)
     {
         if (id == reader->streams[i].id)
         {
-            if (!InterlockedCompareExchangePointer((void **)&reader->streams[i].stream, stream, NULL))
+            if (!reader->streams[i].stream)
             {
+                reader->streams[i].stream = stream;
                 IMFMediaStream_AddRef(reader->streams[i].stream);
                 if (FAILED(hr = IMFMediaStream_BeginGetEvent(stream, &reader->stream_events_callback,
                         (IUnknown *)stream)))
@@ -399,10 +401,8 @@ static HRESULT source_reader_new_stream_handler(struct source_reader *reader, IM
                     WARN("Failed to subscribe to stream events, hr %#x.\n", hr);
                 }
 
-                EnterCriticalSection(&reader->streams[i].cs);
                 if (reader->streams[i].requests)
                     source_reader_request_sample(reader, &reader->streams[i]);
-                LeaveCriticalSection(&reader->streams[i].cs);
             }
             break;
         }
@@ -411,6 +411,8 @@ static HRESULT source_reader_new_stream_handler(struct source_reader *reader, IM
     if (i == reader->stream_count)
         WARN("Stream with id %#x was not present in presentation descriptor.\n", id);
 
+    LeaveCriticalSection(&reader->cs);
+
     IMFMediaStream_Release(stream);
 
     return hr;
@@ -630,21 +632,19 @@ static HRESULT source_reader_media_sample_handler(struct source_reader *reader,
         return hr;
     }
 
+    EnterCriticalSection(&reader->cs);
+
     for (i = 0; i < reader->stream_count; ++i)
     {
         if (id == reader->streams[i].id)
         {
             /* FIXME: propagate processing errors? */
 
-            EnterCriticalSection(&reader->streams[i].cs);
-
             reader->streams[i].flags &= ~STREAM_FLAG_SAMPLE_REQUESTED;
             hr = source_reader_process_sample(reader, &reader->streams[i], sample);
             if (reader->streams[i].requests)
                 source_reader_request_sample(reader, &reader->streams[i]);
 
-            LeaveCriticalSection(&reader->streams[i].cs);
-
             break;
         }
     }
@@ -652,6 +652,8 @@ static HRESULT source_reader_media_sample_handler(struct source_reader *reader,
     if (i == reader->stream_count)
         WARN("Stream with id %#x was not present in presentation descriptor.\n", id);
 
+    LeaveCriticalSection(&reader->cs);
+
     IMFSample_Release(sample);
 
     return hr;
@@ -675,14 +677,14 @@ static HRESULT source_reader_media_stream_state_handler(struct source_reader *re
         return hr;
     }
 
+    EnterCriticalSection(&reader->cs);
+
     for (i = 0; i < reader->stream_count; ++i)
     {
         struct media_stream *stream = &reader->streams[i];
 
         if (id == stream->id)
         {
-            EnterCriticalSection(&stream->cs);
-
             switch (event_type)
             {
                 case MEEndOfStream:
@@ -716,12 +718,12 @@ static HRESULT source_reader_media_stream_state_handler(struct source_reader *re
                     ;
             }
 
-            LeaveCriticalSection(&stream->cs);
-
             break;
         }
     }
 
+    LeaveCriticalSection(&reader->cs);
+
     return S_OK;
 }
 
@@ -790,18 +792,33 @@ static ULONG WINAPI source_reader_async_commands_callback_Release(IMFAsyncCallba
     return IMFSourceReader_Release(&reader->IMFSourceReader_iface);
 }
 
-static struct stream_response *media_stream_pop_response(struct media_stream *stream)
+static struct stream_response *media_stream_pop_response(struct source_reader *reader, struct media_stream *stream)
 {
-    struct stream_response *response = NULL;
+    struct stream_response *response;
     struct list *head;
 
-    if ((head = list_head(&stream->responses)))
+    if (stream)
     {
-        response = LIST_ENTRY(head, struct stream_response, entry);
-        list_remove(&response->entry);
+        LIST_FOR_EACH_ENTRY(response, &reader->responses, struct stream_response, entry)
+        {
+            if (response->stream_index == stream->index)
+            {
+                list_remove(&response->entry);
+                return response;
+            }
+        }
+    }
+    else
+    {
+        if ((head = list_head(&reader->responses)))
+        {
+            response = LIST_ENTRY(head, struct stream_response, entry);
+            list_remove(&response->entry);
+            return response;
+        }
     }
 
-    return response;
+    return NULL;
 }
 
 static void source_reader_release_response(struct stream_response *response)
@@ -857,13 +874,38 @@ static HRESULT source_reader_start_source(struct source_reader *reader)
     return hr;
 }
 
+static BOOL source_reader_got_response_for_stream(struct source_reader *reader, struct media_stream *stream)
+{
+    struct stream_response *response;
+
+    LIST_FOR_EACH_ENTRY(response, &reader->responses, struct stream_response, entry)
+    {
+        if (response->stream_index == stream->index)
+            return TRUE;
+    }
+
+    return FALSE;
+}
+
 static BOOL source_reader_get_read_result(struct source_reader *reader, struct media_stream *stream, DWORD flags,
         HRESULT *status, DWORD *stream_index, DWORD *stream_flags, LONGLONG *timestamp, IMFSample **sample)
 {
     struct stream_response *response = NULL;
     BOOL request_sample = FALSE;
 
-    if (list_empty(&stream->responses))
+    if ((response = media_stream_pop_response(reader, stream)))
+    {
+        *status = response->status;
+        *stream_index = stream->index;
+        *stream_flags = response->stream_flags;
+        *timestamp = response->timestamp;
+        *sample = response->sample;
+        if (*sample)
+            IMFSample_AddRef(*sample);
+
+        source_reader_release_response(response);
+    }
+    else
     {
         *status = S_OK;
         *stream_index = stream->index;
@@ -880,20 +922,6 @@ static BOOL source_reader_get_read_result(struct source_reader *reader, struct m
             *stream_flags = 0;
         }
     }
-    else
-    {
-        response = media_stream_pop_response(stream);
-
-        *status = response->status;
-        *stream_index = stream->index;
-        *stream_flags = response->stream_flags;
-        *timestamp = response->timestamp;
-        *sample = response->sample;
-        if (*sample)
-            IMFSample_AddRef(*sample);
-
-        source_reader_release_response(response);
-    }
 
     return !request_sample;
 }
@@ -925,12 +953,19 @@ static HRESULT source_reader_get_stream_read_index(struct source_reader *reader,
     return hr;
 }
 
-static void source_reader_release_responses(struct media_stream *stream)
+static void source_reader_release_responses(struct source_reader *reader, struct media_stream *stream)
 {
     struct stream_response *ptr, *next;
 
-    LIST_FOR_EACH_ENTRY_SAFE(ptr, next, &stream->responses, struct stream_response, entry)
+    LIST_FOR_EACH_ENTRY_SAFE(ptr, next, &reader->responses, struct stream_response, entry)
     {
+        if (stream && stream->index != ptr->stream_index &&
+                ptr->stream_index != MF_SOURCE_READER_FIRST_VIDEO_STREAM &&
+                ptr->stream_index != MF_SOURCE_READER_FIRST_AUDIO_STREAM &&
+                ptr->stream_index != MF_SOURCE_READER_ANY_STREAM)
+        {
+            continue;
+        }
         list_remove(&ptr->entry);
         source_reader_release_response(ptr);
     }
@@ -938,42 +973,42 @@ static void source_reader_release_responses(struct media_stream *stream)
 
 static void source_reader_flush_stream(struct source_reader *reader, DWORD stream_index)
 {
-    struct media_stream *stream = &reader->streams[stream_index];
-
-    EnterCriticalSection(&stream->cs);
+    struct media_stream *stream = stream_index == MF_SOURCE_READER_ALL_STREAMS ? NULL : &reader->streams[stream_index];
 
-    source_reader_release_responses(stream);
+    source_reader_release_responses(reader, stream);
     if (stream->decoder)
         IMFTransform_ProcessMessage(stream->decoder, MFT_MESSAGE_COMMAND_FLUSH, 0);
     stream->requests = 0;
-
-    LeaveCriticalSection(&stream->cs);
 }
 
 static HRESULT source_reader_flush(struct source_reader *reader, unsigned int index)
 {
     unsigned int stream_index;
 
-    switch (index)
+    EnterCriticalSection(&reader->cs);
+
+    if (index == MF_SOURCE_READER_ALL_STREAMS)
     {
-        case MF_SOURCE_READER_FIRST_VIDEO_STREAM:
-            stream_index = reader->first_video_stream_index;
-            break;
-        case MF_SOURCE_READER_FIRST_AUDIO_STREAM:
-            stream_index = reader->first_audio_stream_index;
-            break;
-        case MF_SOURCE_READER_ALL_STREAMS:
-            for (stream_index = 0; stream_index < reader->stream_count; ++stream_index)
-            {
-                source_reader_flush_stream(reader, stream_index);
-            }
+        source_reader_flush_stream(reader, index);
+    }
+    else
+    {
+        switch (index)
+        {
+            case MF_SOURCE_READER_FIRST_VIDEO_STREAM:
+                stream_index = reader->first_video_stream_index;
+                break;
+            case MF_SOURCE_READER_FIRST_AUDIO_STREAM:
+                stream_index = reader->first_audio_stream_index;
+                break;
+            default:
+                stream_index = index;
+        }
 
-            break;
-        default:
-            stream_index = index;
+        source_reader_flush_stream(reader, stream_index);
     }
 
-    source_reader_flush_stream(reader, stream_index);
+    LeaveCriticalSection(&reader->cs);
 
     return S_OK;
 }
@@ -1002,9 +1037,9 @@ static HRESULT WINAPI source_reader_async_commands_callback_Invoke(IMFAsyncCallb
             if (FAILED(hr = source_reader_get_stream_read_index(reader, command->stream_index, &stream_index)))
                 return hr;
 
-            stream = &reader->streams[stream_index];
+            EnterCriticalSection(&reader->cs);
 
-            EnterCriticalSection(&stream->cs);
+            stream = &reader->streams[stream_index];
 
             if (SUCCEEDED(hr = source_reader_start_source(reader)))
             {
@@ -1017,7 +1052,7 @@ static HRESULT WINAPI source_reader_async_commands_callback_Invoke(IMFAsyncCallb
                 }
             }
 
-            LeaveCriticalSection(&stream->cs);
+            LeaveCriticalSection(&reader->cs);
 
             if (report_sample)
                 IMFSourceReaderCallback_OnReadSample(reader->async_callback, status, stream_index, stream_flags,
@@ -1029,11 +1064,10 @@ static HRESULT WINAPI source_reader_async_commands_callback_Invoke(IMFAsyncCallb
             break;
 
         case SOURCE_READER_ASYNC_SAMPLE_READY:
-            stream = &reader->streams[command->stream_index];
 
-            EnterCriticalSection(&stream->cs);
-            response = media_stream_pop_response(stream);
-            LeaveCriticalSection(&stream->cs);
+            EnterCriticalSection(&reader->cs);
+            response = media_stream_pop_response(reader, NULL);
+            LeaveCriticalSection(&reader->cs);
 
             if (response)
             {
@@ -1126,10 +1160,8 @@ static ULONG WINAPI src_reader_Release(IMFSourceReader *iface)
                 IMFMediaType_Release(stream->current);
             if (stream->decoder)
                 IMFTransform_Release(stream->decoder);
-            DeleteCriticalSection(&stream->cs);
-
-            source_reader_release_responses(stream);
         }
+        source_reader_release_responses(reader, NULL);
         heap_free(reader->streams);
         DeleteCriticalSection(&reader->cs);
         heap_free(reader);
@@ -1544,39 +1576,40 @@ static HRESULT source_reader_read_sample(struct source_reader *reader, DWORD ind
     if (!actual_index)
         actual_index = &actual_index_tmp;
 
-    if (FAILED(hr = source_reader_get_stream_read_index(reader, index, &stream_index)))
-    {
-        *actual_index = index;
-        *stream_flags = MF_SOURCE_READERF_ERROR;
-        *timestamp = 0;
-        return hr;
-    }
-
-    *actual_index = stream_index;
-
-    stream = &reader->streams[stream_index];
-
-    EnterCriticalSection(&stream->cs);
+    EnterCriticalSection(&reader->cs);
 
     if (SUCCEEDED(hr = source_reader_start_source(reader)))
     {
-        if (!source_reader_get_read_result(reader, stream, flags, &hr, actual_index, stream_flags,
-               timestamp, sample))
+        if (SUCCEEDED(hr = source_reader_get_stream_read_index(reader, index, &stream_index)))
         {
-            while (list_empty(&stream->responses) && stream->state != STREAM_STATE_EOS)
+            *actual_index = stream_index;
+
+            stream = &reader->streams[stream_index];
+
+            if (!source_reader_get_read_result(reader, stream, flags, &hr, actual_index, stream_flags,
+                   timestamp, sample))
             {
-                stream->requests++;
-                if (FAILED(hr = source_reader_request_sample(reader, stream)))
-                    WARN("Failed to request a sample, hr %#x.\n", hr);
-                SleepConditionVariableCS(&stream->sample_event, &stream->cs, INFINITE);
-            }
+                while (!source_reader_got_response_for_stream(reader, stream) && stream->state != STREAM_STATE_EOS)
+                {
+                    stream->requests++;
+                    if (FAILED(hr = source_reader_request_sample(reader, stream)))
+                        WARN("Failed to request a sample, hr %#x.\n", hr);
+                    SleepConditionVariableCS(&reader->sample_event, &reader->cs, INFINITE);
+                }
 
-            source_reader_get_read_result(reader, stream, flags, &hr, actual_index, stream_flags,
-                   timestamp, sample);
+                source_reader_get_read_result(reader, stream, flags, &hr, actual_index, stream_flags,
+                       timestamp, sample);
+            }
+        }
+        else
+        {
+            *actual_index = index;
+            *stream_flags = MF_SOURCE_READERF_ERROR;
+            *timestamp = 0;
         }
     }
 
-    LeaveCriticalSection(&stream->cs);
+    LeaveCriticalSection(&reader->cs);
 
     TRACE("Stream %u, got sample %p, flags %#x.\n", *actual_index, *sample, *stream_flags);
 
@@ -1796,9 +1829,11 @@ static HRESULT create_source_reader_from_source(IMFMediaSource *source, IMFAttri
     object->stream_events_callback.lpVtbl = &stream_events_callback_vtbl;
     object->async_commands_callback.lpVtbl = &async_commands_callback_vtbl;
     object->refcount = 1;
+    list_init(&object->responses);
     object->source = source;
     IMFMediaSource_AddRef(object->source);
     InitializeCriticalSection(&object->cs);
+    InitializeConditionVariable(&object->sample_event);
 
     if (FAILED(hr = IMFMediaSource_CreatePresentationDescriptor(object->source, &object->descriptor)))
         goto failed;
@@ -1845,9 +1880,6 @@ static HRESULT create_source_reader_from_source(IMFMediaSource *source, IMFAttri
             break;
 
         object->streams[i].index = i;
-        InitializeCriticalSection(&object->streams[i].cs);
-        InitializeConditionVariable(&object->streams[i].sample_event);
-        list_init(&object->streams[i].responses);
     }
 
     if (FAILED(hr))
-- 
2.25.1




More information about the wine-devel mailing list