[PATCH 3/4] mf: Handle EndOfStream events within Media Session.

Nikolay Sivov nsivov at codeweavers.com
Mon Apr 13 11:05:57 CDT 2020


Signed-off-by: Nikolay Sivov <nsivov at codeweavers.com>
---
 dlls/mf/session.c | 260 ++++++++++++++++++++++++++++------------------
 1 file changed, 157 insertions(+), 103 deletions(-)

diff --git a/dlls/mf/session.c b/dlls/mf/session.c
index bc4cac6ef0..85bf4e9cba 100644
--- a/dlls/mf/session.c
+++ b/dlls/mf/session.c
@@ -127,6 +127,11 @@ struct transform_stream
     unsigned int requests;
 };
 
+enum topo_node_flags
+{
+    TOPO_NODE_END_OF_STREAM = 0x1,
+};
+
 struct topo_node
 {
     struct list entry;
@@ -134,6 +139,7 @@ struct topo_node
     TOPOID node_id;
     IMFTopologyNode *node;
     enum object_state state;
+    unsigned int flags;
     union
     {
         IMFMediaStream *source_stream;
@@ -651,17 +657,20 @@ static void session_set_caps(struct media_session *session, DWORD caps)
     IMFMediaEvent_Release(event);
 }
 
+static void transform_release_sample(struct sample *sample)
+{
+    list_remove(&sample->entry);
+    if (sample->sample)
+        IMFSample_Release(sample->sample);
+    heap_free(sample);
+}
+
 static void transform_stream_drop_samples(struct transform_stream *stream)
 {
     struct sample *sample, *sample2;
 
     LIST_FOR_EACH_ENTRY_SAFE(sample, sample2, &stream->samples, struct sample, entry)
-    {
-        list_remove(&sample->entry);
-        if (sample->sample)
-            IMFSample_Release(sample->sample);
-        heap_free(sample);
-    }
+        transform_release_sample(sample);
 }
 
 static void release_topo_node(struct topo_node *node)
@@ -2320,9 +2329,22 @@ static DWORD transform_node_get_stream_id(struct topo_node *node, BOOL output, D
     return map ? map[index] : index;
 }
 
-static HRESULT transform_node_get_sample(struct topo_node *node, DWORD output, IMFSample **sample)
+static struct sample *transform_create_sample(IMFSample *sample)
+{
+    struct sample *sample_entry = heap_alloc_zero(sizeof(*sample_entry));
+
+    if (sample_entry)
+    {
+        sample_entry->sample = sample;
+        if (sample_entry->sample)
+            IMFSample_AddRef(sample_entry->sample);
+    }
+
+    return sample_entry;
+}
+
+static HRESULT transform_node_pull_samples(struct topo_node *node)
 {
-    struct list *list = &node->u.transform.outputs[output].samples;
     MFT_OUTPUT_STREAM_INFO stream_info;
     MFT_OUTPUT_DATA_BUFFER *buffers;
     struct sample *queued_sample;
@@ -2330,17 +2352,6 @@ static HRESULT transform_node_get_sample(struct topo_node *node, DWORD output, I
     unsigned int i;
     HRESULT hr;
 
-    *sample = NULL;
-
-    if (!list_empty(list))
-    {
-        queued_sample = LIST_ENTRY(list_head(list), struct sample, entry);
-        list_remove(&queued_sample->entry);
-        *sample = queued_sample->sample;
-        heap_free(queued_sample);
-        return S_OK;
-    }
-
     if (!(buffers = heap_calloc(node->u.transform.output_count, sizeof(*buffers))))
         return E_OUTOFMEMORY;
 
@@ -2350,68 +2361,63 @@ static HRESULT transform_node_get_sample(struct topo_node *node, DWORD output, I
         buffers[i].pSample = NULL;
         buffers[i].dwStatus = 0;
         buffers[i].pEvents = NULL;
-    }
 
-    memset(&stream_info, 0, sizeof(stream_info));
-    if (FAILED(hr = IMFTransform_GetOutputStreamInfo(node->object.transform, buffers[output].dwStreamID, &stream_info)))
-        goto exit;
+        memset(&stream_info, 0, sizeof(stream_info));
+        if (FAILED(hr = IMFTransform_GetOutputStreamInfo(node->object.transform, buffers[i].dwStreamID, &stream_info)))
+            break;
 
-    if (!(stream_info.dwFlags & MFT_OUTPUT_STREAM_PROVIDES_SAMPLES))
-    {
-        IMFMediaBuffer *buffer = NULL;
+        if (!(stream_info.dwFlags & MFT_OUTPUT_STREAM_PROVIDES_SAMPLES))
+        {
+            IMFMediaBuffer *buffer = NULL;
 
-        hr = MFCreateAlignedMemoryBuffer(stream_info.cbSize, stream_info.cbAlignment, &buffer);
-        if (SUCCEEDED(hr))
-            hr = MFCreateSample(&buffers[output].pSample);
+            hr = MFCreateAlignedMemoryBuffer(stream_info.cbSize, stream_info.cbAlignment, &buffer);
+            if (SUCCEEDED(hr))
+                hr = MFCreateSample(&buffers[i].pSample);
 
-        if (SUCCEEDED(hr))
-            hr = IMFSample_AddBuffer(buffers[output].pSample, buffer);
+            if (SUCCEEDED(hr))
+                hr = IMFSample_AddBuffer(buffers[i].pSample, buffer);
 
-        if (buffer)
-            IMFMediaBuffer_Release(buffer);
+            if (buffer)
+                IMFMediaBuffer_Release(buffer);
 
-        if (FAILED(hr))
-            goto exit;
+            if (FAILED(hr))
+                break;
+        }
     }
 
-    hr = IMFTransform_ProcessOutput(node->object.transform, 0, node->u.transform.output_count, buffers, &status);
-    if (hr == S_OK)
-    {
-        /* Collect returned samples for all streams. */
-        for (i = 0; i < node->u.transform.output_count; ++i)
-        {
-            if (buffers[i].pEvents)
-                IMFCollection_Release(buffers[i].pEvents);
+    if (SUCCEEDED(hr))
+        hr = IMFTransform_ProcessOutput(node->object.transform, 0, node->u.transform.output_count, buffers, &status);
 
-            if (!buffers[i].pSample)
-                continue;
+    /* Collect returned samples for all streams. */
+    for (i = 0; i < node->u.transform.output_count; ++i)
+    {
+        if (buffers[i].pEvents)
+            IMFCollection_Release(buffers[i].pEvents);
 
-            if (i == output)
-            {
-                *sample = buffers[i].pSample;
-            }
-            else
-            {
-                queued_sample = heap_alloc(sizeof(*queued_sample));
-                queued_sample->sample = buffers[i].pSample;
-                list_add_tail(&node->u.transform.outputs[i].samples, &queued_sample->entry);
-            }
+        if (SUCCEEDED(hr) && !(buffers[i].dwStatus & MFT_OUTPUT_DATA_BUFFER_NO_SAMPLE))
+        {
+            queued_sample = transform_create_sample(buffers[i].pSample);
+            list_add_tail(&node->u.transform.outputs[i].samples, &queued_sample->entry);
         }
+
+        if (buffers[i].pSample)
+            IMFSample_Release(buffers[i].pSample);
     }
 
-exit:
     heap_free(buffers);
 
     return hr;
 }
 
-static void session_deliver_sample_to_node(struct media_session *session, IMFTopologyNode *node, DWORD input,
+static void session_deliver_sample_to_node(struct media_session *session, IMFTopologyNode *node, unsigned int input,
         IMFSample *sample)
 {
+    struct sample *sample_entry, *sample_entry2;
     DWORD stream_id, downstream_input;
     IMFTopologyNode *downstream_node;
     struct topo_node *topo_node;
     MF_TOPOLOGY_TYPE node_type;
+    BOOL drain = FALSE;
     TOPOID node_id;
     unsigned int i;
     HRESULT hr;
@@ -2424,48 +2430,92 @@ static void session_deliver_sample_to_node(struct media_session *session, IMFTop
     switch (node_type)
     {
         case MF_TOPOLOGY_OUTPUT_NODE:
-            if (topo_node->u.sink.requests)
+            if (sample)
+            {
+                if (topo_node->u.sink.requests)
+                {
+                    if (FAILED(hr = IMFStreamSink_ProcessSample(topo_node->object.sink_stream, sample)))
+                        WARN("Stream sink failed to process sample, hr %#x.\n", hr);
+                    topo_node->u.sink.requests--;
+                }
+            }
+            else if (FAILED(hr = IMFStreamSink_PlaceMarker(topo_node->object.sink_stream, MFSTREAMSINK_MARKER_ENDOFSEGMENT,
+                    NULL, NULL)))
             {
-                if (FAILED(hr = IMFStreamSink_ProcessSample(topo_node->object.sink_stream, sample)))
-                    WARN("Sample delivery failed, hr %#x.\n", hr);
-                topo_node->u.sink.requests--;
+                WARN("Failed to place sink marker, hr %#x.\n", hr);
             }
             break;
         case MF_TOPOLOGY_TRANSFORM_NODE:
 
-            stream_id = transform_node_get_stream_id(topo_node, FALSE, input);
+            transform_node_pull_samples(topo_node);
+
+            sample_entry = transform_create_sample(sample);
+            list_add_tail(&topo_node->u.transform.inputs[input].samples, &sample_entry->entry);
 
-            hr = IMFTransform_ProcessInput(topo_node->object.transform, stream_id, sample, 0);
-            if (hr == MF_E_NOTACCEPTING)
+            for (i = 0; i < topo_node->u.transform.input_count; ++i)
             {
-                struct sample *sample_entry = heap_alloc(sizeof(*sample_entry));
-                sample_entry->sample = sample;
-                list_add_tail(&topo_node->u.transform.inputs[input].samples, &sample_entry->entry);
+                stream_id = transform_node_get_stream_id(topo_node, FALSE, i);
+                LIST_FOR_EACH_ENTRY_SAFE(sample_entry, sample_entry2, &topo_node->u.transform.inputs[i].samples,
+                        struct sample, entry)
+                {
+                    if (sample_entry->sample)
+                    {
+                        if ((hr = IMFTransform_ProcessInput(topo_node->object.transform, stream_id,
+                                sample_entry->sample, 0)) == MF_E_NOTACCEPTING)
+                            break;
+                        if (FAILED(hr))
+                            WARN("Failed to process input for stream %u/%u, hr %#x.\n", i, stream_id, hr);
+                        transform_release_sample(sample_entry);
+                    }
+                    else
+                    {
+                        transform_stream_drop_samples(&topo_node->u.transform.inputs[i]);
+                        drain = TRUE;
+                    }
+                }
             }
-            else if (hr == S_OK)
+
+            if (drain)
+            {
+                if (FAILED(hr = IMFTransform_ProcessMessage(topo_node->object.transform, MFT_MESSAGE_COMMAND_DRAIN, 0)))
+                    WARN("Drain command failed for transform, hr %#x.\n", hr);
+            }
+
+            transform_node_pull_samples(topo_node);
+
+            /* Remaining unprocessed input has been discarded, now queue markers for every output. */
+            if (drain)
             {
-                /* Check if we need new output is available, push it down. */
                 for (i = 0; i < topo_node->u.transform.output_count; ++i)
+                {
+                     if ((sample_entry = transform_create_sample(NULL)))
+                         list_add_tail(&topo_node->u.transform.outputs[i].samples, &sample_entry->entry);
+                }
+            }
+
+            /* Push down all available output. */
+            for (i = 0; i < topo_node->u.transform.output_count; ++i)
+            {
+                if (FAILED(IMFTopologyNode_GetOutput(node, i, &downstream_node, &downstream_input)))
+                {
+                    WARN("Failed to get connected node for output %u.\n", i);
+                    continue;
+                }
+
+                LIST_FOR_EACH_ENTRY_SAFE(sample_entry, sample_entry2, &topo_node->u.transform.outputs[i].samples,
+                        struct sample, entry)
                 {
                     if (!topo_node->u.transform.outputs[i].requests)
-                        continue;
+                        break;
 
-                    sample = NULL;
-                    transform_node_get_sample(topo_node, i, &sample);
-                    if (sample)
-                    {
-                        if (SUCCEEDED(hr = IMFTopologyNode_GetOutput(node, i, &downstream_node, &downstream_input)))
-                        {
-                            session_deliver_sample_to_node(session, downstream_node, downstream_input, sample);
-                            topo_node->u.transform.outputs[i].requests--;
-                            IMFTopologyNode_Release(downstream_node);
-                        }
-                        IMFSample_Release(sample);
-                    }
+                    session_deliver_sample_to_node(session, downstream_node, downstream_input, sample_entry->sample);
+                    topo_node->u.transform.outputs[i].requests--;
+
+                    transform_release_sample(sample_entry);
                 }
+
+                IMFTopologyNode_Release(downstream_node);
             }
-            else
-                WARN("Transform failed to process input sample, hr %#x.\n", hr);
 
             break;
         case MF_TOPOLOGY_TEE_NODE:
@@ -2479,10 +2529,10 @@ static void session_deliver_sample_to_node(struct media_session *session, IMFTop
 static HRESULT session_request_sample_from_node(struct media_session *session, IMFTopologyNode *node, DWORD output)
 {
     IMFTopologyNode *downstream_node, *upstream_node;
-    MF_TOPOLOGY_TYPE node_type;
-    IMFSample *sample = NULL;
+    unsigned int downstream_input, upstream_output;
     struct topo_node *topo_node;
-    DWORD downstream_input, upstream_output;
+    MF_TOPOLOGY_TYPE node_type;
+    struct sample *sample;
     TOPOID node_id;
     HRESULT hr;
 
@@ -2499,20 +2549,9 @@ static HRESULT session_request_sample_from_node(struct media_session *session, I
             break;
         case MF_TOPOLOGY_TRANSFORM_NODE:
 
-            hr = transform_node_get_sample(topo_node, output, &sample);
-            if (sample)
-            {
-                if (SUCCEEDED(hr = IMFTopologyNode_GetOutput(node, output, &downstream_node, &downstream_input)))
-                {
-                    session_deliver_sample_to_node(session, downstream_node, downstream_input, sample);
-                    IMFTopologyNode_Release(downstream_node);
-                }
-                IMFSample_Release(sample);
-            }
-
-            /* Forward request to upstream node. */
-            if (hr == MF_E_TRANSFORM_NEED_MORE_INPUT || (hr == S_OK && !sample))
+            if (list_empty(&topo_node->u.transform.outputs[output].samples))
             {
+                /* Forward request to upstream node. */
                 if (SUCCEEDED(hr = IMFTopologyNode_GetInput(node, 0 /* FIXME */, &upstream_node, &upstream_output)))
                 {
                     if (SUCCEEDED(hr = session_request_sample_from_node(session, upstream_node, upstream_output)))
@@ -2520,6 +2559,17 @@ static HRESULT session_request_sample_from_node(struct media_session *session, I
                     IMFTopologyNode_Release(upstream_node);
                 }
             }
+            else
+            {
+                if (SUCCEEDED(hr = IMFTopologyNode_GetOutput(node, output, &downstream_node, &downstream_input)))
+                {
+                    sample = LIST_ENTRY(list_head(&topo_node->u.transform.outputs[output].samples), struct sample, entry);
+                    session_deliver_sample_to_node(session, downstream_node, downstream_input, sample->sample);
+                    transform_release_sample(sample);
+                    IMFTopologyNode_Release(downstream_node);
+                }
+            }
+
             break;
         case MF_TOPOLOGY_TEE_NODE:
             FIXME("Unhandled upstream node type %d.\n", node_type);
@@ -2567,7 +2617,7 @@ static void session_deliver_sample(struct media_session *session, IMFMediaStream
     DWORD downstream_input;
     HRESULT hr;
 
-    if (value->vt != VT_UNKNOWN || !value->punkVal)
+    if (value && (value->vt != VT_UNKNOWN || !value->punkVal))
     {
         WARN("Unexpected value type %d.\n", value->vt);
         return;
@@ -2585,13 +2635,16 @@ static void session_deliver_sample(struct media_session *session, IMFMediaStream
     if (!source_node)
         return;
 
+    if (!value)
+        source_node->flags |= TOPO_NODE_END_OF_STREAM;
+
     if (FAILED(hr = IMFTopologyNode_GetOutput(source_node->node, 0, &downstream_node, &downstream_input)))
     {
         WARN("Failed to get downstream node connection, hr %#x.\n", hr);
         return;
     }
 
-    session_deliver_sample_to_node(session, downstream_node, downstream_input, (IMFSample *)value->punkVal);
+    session_deliver_sample_to_node(session, downstream_node, downstream_input, value ? (IMFSample *)value->punkVal : NULL);
     IMFTopologyNode_Release(downstream_node);
 }
 
@@ -2712,9 +2765,10 @@ static HRESULT WINAPI session_events_callback_Invoke(IMFAsyncCallback *iface, IM
 
             break;
         case MEMediaSample:
+        case MEEndOfStream:
 
             EnterCriticalSection(&session->cs);
-            session_deliver_sample(session, (IMFMediaStream *)event_source, &value);
+            session_deliver_sample(session, (IMFMediaStream *)event_source, event_type == MEMediaSample ? &value : NULL);
             LeaveCriticalSection(&session->cs);
 
             break;
-- 
2.25.1




More information about the wine-devel mailing list