[PATCH v2 05/18] winegstreamer: Use the demuxer to establish IMFMediaStreams.

Derek Lesho dlesho at codeweavers.com
Wed Apr 1 17:05:26 CDT 2020


Signed-off-by: Derek Lesho <dlesho at codeweavers.com>
---
 dlls/winegstreamer/gst_cbs.c      |  44 +++
 dlls/winegstreamer/gst_cbs.h      |  13 +
 dlls/winegstreamer/media_source.c | 440 +++++++++++++++++++++++++++++-
 3 files changed, 496 insertions(+), 1 deletion(-)

diff --git a/dlls/winegstreamer/gst_cbs.c b/dlls/winegstreamer/gst_cbs.c
index 51b017ded6..5038ea3397 100644
--- a/dlls/winegstreamer/gst_cbs.c
+++ b/dlls/winegstreamer/gst_cbs.c
@@ -372,3 +372,47 @@ GstBusSyncReply watch_source_bus_wrapper(GstBus *bus, GstMessage *message, gpoin
 
     return cbdata.u.watch_bus_data.ret;
 }
+
+void source_stream_added_wrapper(GstElement *bin, GstPad *pad, gpointer user)
+{
+    struct cb_data cbdata = { SOURCE_STREAM_ADDED };
+
+    cbdata.u.pad_added_data.element = bin;
+    cbdata.u.pad_added_data.pad = pad;
+    cbdata.u.pad_added_data.user = user;
+
+    call_cb(&cbdata);
+}
+
+void source_stream_removed_wrapper(GstElement *element, GstPad *pad, gpointer user)
+{
+    struct cb_data cbdata = { SOURCE_STREAM_REMOVED };
+
+    cbdata.u.pad_removed_data.element = element;
+    cbdata.u.pad_removed_data.pad = pad;
+    cbdata.u.pad_removed_data.user = user;
+
+    call_cb(&cbdata);
+}
+
+void source_all_streams_wrapper(GstElement *element, gpointer user)
+{
+    struct cb_data cbdata = { SOURCE_ALL_STREAMS };
+
+    cbdata.u.no_more_pads_data.element = element;
+    cbdata.u.no_more_pads_data.user = user;
+
+    call_cb(&cbdata);
+}
+
+GstFlowReturn stream_new_sample_wrapper(GstElement *appsink, gpointer user)
+{
+    struct cb_data cbdata = { STREAM_NEW_SAMPLE };
+
+    cbdata.u.new_sample_data.appsink = appsink;
+    cbdata.u.new_sample_data.user = user;
+
+    call_cb(&cbdata);
+
+    return cbdata.u.new_sample_data.ret;
+}
diff --git a/dlls/winegstreamer/gst_cbs.h b/dlls/winegstreamer/gst_cbs.h
index 0d7acaf0b8..106368a064 100644
--- a/dlls/winegstreamer/gst_cbs.h
+++ b/dlls/winegstreamer/gst_cbs.h
@@ -49,6 +49,10 @@ enum CB_TYPE {
     ACTIVATE_BYTESTREAM_PAD_MODE,
     PROCESS_BYTESTREAM_PAD_EVENT,
     WATCH_SOURCE_BUS,
+    SOURCE_STREAM_ADDED,
+    SOURCE_STREAM_REMOVED,
+    SOURCE_ALL_STREAMS,
+    STREAM_NEW_SAMPLE,
     MEDIA_SOURCE_MAX,
 };
 
@@ -134,6 +138,11 @@ struct cb_data {
             GstQuery *query;
             gboolean ret;
         } query_sink_data;
+        struct new_sample_data {
+            GstElement *appsink;
+            gpointer user;
+            GstFlowReturn ret;
+        } new_sample_data;
     } u;
 
     int finished;
@@ -166,5 +175,9 @@ gboolean query_bytestream_wrapper(GstPad *pad, GstObject *parent, GstQuery *quer
 gboolean activate_bytestream_pad_mode_wrapper(GstPad *pad, GstObject *parent, GstPadMode mode, gboolean activate) DECLSPEC_HIDDEN;
 gboolean process_bytestream_pad_event_wrapper(GstPad *pad, GstObject *parent, GstEvent *event) DECLSPEC_HIDDEN;
 GstBusSyncReply watch_source_bus_wrapper(GstBus *bus, GstMessage *message, gpointer user) DECLSPEC_HIDDEN;
+void source_stream_added_wrapper(GstElement *bin, GstPad *pad, gpointer user) DECLSPEC_HIDDEN;
+void source_stream_removed_wrapper(GstElement *element, GstPad *pad, gpointer user) DECLSPEC_HIDDEN;
+void source_all_streams_wrapper(GstElement *element, gpointer user) DECLSPEC_HIDDEN;
+GstFlowReturn stream_new_sample_wrapper(GstElement *appsink, gpointer user) DECLSPEC_HIDDEN;
 
 #endif
diff --git a/dlls/winegstreamer/media_source.c b/dlls/winegstreamer/media_source.c
index d6380f059e..ae9880847c 100644
--- a/dlls/winegstreamer/media_source.c
+++ b/dlls/winegstreamer/media_source.c
@@ -5,6 +5,7 @@
 #include "gst_private.h"
 #include "gst_cbs.h"
 
+#include <assert.h>
 #include <stdarg.h>
 
 #define COBJMACROS
@@ -31,6 +32,28 @@ static struct source_desc
     }
 };
 
+struct media_source;
+
+struct media_stream
+{
+    IMFMediaStream IMFMediaStream_iface;
+    LONG ref;
+    struct media_source *parent_source;
+    IMFMediaEventQueue *event_queue;
+    IMFStreamDescriptor *descriptor;
+    GstElement *appsink;
+    GstPad *their_src, *my_sink;
+    /* usually reflects state of source */
+    enum
+    {
+        STREAM_INACTIVE,
+        STREAM_ENABLED,
+        STREAM_PAUSED,
+        STREAM_RUNNING,
+        STREAM_SHUTDOWN,
+    } state;
+};
+
 struct media_source
 {
     IMFMediaSource IMFMediaSource_iface;
@@ -52,8 +75,252 @@ struct media_source
         SOURCE_RUNNING,
         SOURCE_SHUTDOWN,
     } state;
+    CRITICAL_SECTION streams_cs;
+    HANDLE init_complete_event;
 };
 
+/* stream */
+
+static inline struct media_stream *impl_from_IMFMediaStream(IMFMediaStream *iface)
+{
+    return CONTAINING_RECORD(iface, struct media_stream, IMFMediaStream_iface);
+}
+
+static HRESULT WINAPI media_stream_QueryInterface(IMFMediaStream *iface, REFIID riid, void **out)
+{
+    struct media_stream *This = impl_from_IMFMediaStream(iface);
+
+    TRACE("(%p)->(%s %p)\n", This, debugstr_guid(riid), out);
+
+    if (IsEqualIID(riid, &IID_IMFMediaStream) ||
+        IsEqualIID(riid, &IID_IMFMediaEventGenerator) ||
+        IsEqualIID(riid, &IID_IUnknown))
+    {
+        *out = &This->IMFMediaStream_iface;
+    }
+    else
+    {
+        FIXME("(%s, %p)\n", debugstr_guid(riid), out);
+        *out = NULL;
+        return E_NOINTERFACE;
+    }
+
+    IUnknown_AddRef((IUnknown*)*out);
+    return S_OK;
+}
+
+static ULONG WINAPI media_stream_AddRef(IMFMediaStream *iface)
+{
+    struct media_stream *This = impl_from_IMFMediaStream(iface);
+    ULONG ref = InterlockedIncrement(&This->ref);
+
+    TRACE("(%p) ref=%u\n", This, ref);
+
+    return ref;
+}
+
+static ULONG WINAPI media_stream_Release(IMFMediaStream *iface)
+{
+    struct media_stream *This = impl_from_IMFMediaStream(iface);
+
+    ULONG ref = InterlockedDecrement(&This->ref);
+
+    TRACE("(%p) ref=%u\n", This, ref);
+
+    if (!ref)
+    {
+        if (This->state != STREAM_SHUTDOWN)
+            ERR("incomplete cleanup\n");
+        heap_free(This);
+    }
+
+    return ref;
+}
+
+static HRESULT WINAPI media_stream_GetEvent(IMFMediaStream *iface, DWORD flags, IMFMediaEvent **event)
+{
+    struct media_stream *This = impl_from_IMFMediaStream(iface);
+
+    TRACE("(%p)->(%#x, %p)\n", This, flags, event);
+
+    if (This->state == STREAM_SHUTDOWN)
+        return MF_E_SHUTDOWN;
+
+    return IMFMediaEventQueue_GetEvent(This->event_queue, flags, event);
+}
+
+static HRESULT WINAPI media_stream_BeginGetEvent(IMFMediaStream *iface, IMFAsyncCallback *callback, IUnknown *state)
+{
+    struct media_stream *This = impl_from_IMFMediaStream(iface);
+
+    TRACE("(%p)->(%p, %p)\n", This, callback, state);
+
+    if (This->state == STREAM_SHUTDOWN)
+        return MF_E_SHUTDOWN;
+
+    return IMFMediaEventQueue_BeginGetEvent(This->event_queue, callback, state);
+}
+
+static HRESULT WINAPI media_stream_EndGetEvent(IMFMediaStream *iface, IMFAsyncResult *result, IMFMediaEvent **event)
+{
+    struct media_stream *This = impl_from_IMFMediaStream(iface);
+
+    TRACE("(%p)->(%p, %p)\n", This, result, event);
+
+    if (This->state == STREAM_SHUTDOWN)
+        return MF_E_SHUTDOWN;
+
+    return IMFMediaEventQueue_EndGetEvent(This->event_queue, result, event);
+}
+
+static HRESULT WINAPI media_stream_QueueEvent(IMFMediaStream *iface, MediaEventType event_type, REFGUID ext_type,
+        HRESULT hr, const PROPVARIANT *value)
+{
+    struct media_stream *This = impl_from_IMFMediaStream(iface);
+
+    TRACE("(%p)->(%d, %s, %#x, %p)\n", This, event_type, debugstr_guid(ext_type), hr, value);
+
+    if (This->state == STREAM_SHUTDOWN)
+        return MF_E_SHUTDOWN;
+
+    return IMFMediaEventQueue_QueueEventParamVar(This->event_queue, event_type, ext_type, hr, value);
+}
+
+static HRESULT WINAPI media_stream_GetMediaSource(IMFMediaStream *iface, IMFMediaSource **source)
+{
+    struct media_stream *This = impl_from_IMFMediaStream(iface);
+
+    FIXME("stub (%p)->(%p)\n", This, source);
+
+    if (This->state == STREAM_SHUTDOWN)
+        return MF_E_SHUTDOWN;
+
+    return E_NOTIMPL;
+}
+
+static HRESULT WINAPI media_stream_GetStreamDescriptor(IMFMediaStream* iface, IMFStreamDescriptor **descriptor)
+{
+    struct media_stream *This = impl_from_IMFMediaStream(iface);
+
+    TRACE("(%p)->(%p)\n", This, descriptor);
+
+    if (This->state == STREAM_SHUTDOWN)
+        return MF_E_SHUTDOWN;
+
+    return E_NOTIMPL;
+}
+
+static HRESULT WINAPI media_stream_RequestSample(IMFMediaStream *iface, IUnknown *token)
+{
+    struct media_stream *This = impl_from_IMFMediaStream(iface);
+
+    TRACE("(%p)->(%p)\n", iface, token);
+
+    if (This->state == STREAM_SHUTDOWN)
+        return MF_E_SHUTDOWN;
+
+    return E_NOTIMPL;
+}
+
+static const IMFMediaStreamVtbl media_stream_vtbl =
+{
+    media_stream_QueryInterface,
+    media_stream_AddRef,
+    media_stream_Release,
+    media_stream_GetEvent,
+    media_stream_BeginGetEvent,
+    media_stream_EndGetEvent,
+    media_stream_QueueEvent,
+    media_stream_GetMediaSource,
+    media_stream_GetStreamDescriptor,
+    media_stream_RequestSample
+};
+
+static GstFlowReturn stream_new_sample(GstElement *appsink, gpointer user)
+{
+    struct media_stream *This = (struct media_stream *) user;
+    GstSample *discard_sample;
+
+    TRACE("(%p) got sample\n", This);
+
+    g_signal_emit_by_name(This->appsink, "pull-sample", &discard_sample);
+    gst_sample_unref(discard_sample);
+    return GST_FLOW_OK;
+}
+
+static void media_stream_teardown(struct media_stream *This)
+{
+    TRACE("(%p)\n", This);
+
+    This->state = STREAM_SHUTDOWN;
+
+    if (This->their_src)
+        gst_object_unref(GST_OBJECT(This->their_src));
+    if (This->my_sink)
+        gst_object_unref(GST_OBJECT(This->my_sink));
+    if (This->descriptor)
+        IMFStreamDescriptor_Release(This->descriptor);
+    if (This->event_queue)
+        IMFMediaEventQueue_Release(This->event_queue);
+    if (This->parent_source)
+        IMFMediaSource_Release(&This->parent_source->IMFMediaSource_iface);
+}
+
+static HRESULT media_stream_constructor(struct media_source *source, GstPad *pad, DWORD stream_id, struct media_stream **out_stream)
+{
+    HRESULT hr;
+    struct media_stream *This = heap_alloc_zero(sizeof(*This));
+
+    TRACE("(%p %p)->(%p)\n", source, pad, out_stream);
+
+    This->state = STREAM_INACTIVE;
+
+    if (FAILED(hr = IMFMediaSource_AddRef(&source->IMFMediaSource_iface)))
+    {
+        goto fail;
+    }
+    This->parent_source = source;
+
+    if (FAILED(hr = MFCreateEventQueue(&This->event_queue)))
+    {
+        goto fail;
+    }
+
+    if (!(This->appsink = gst_element_factory_make("appsink", NULL)))
+    {
+        hr = E_OUTOFMEMORY;
+        goto fail;
+    }
+    gst_bin_add(GST_BIN(This->parent_source->container), This->appsink);
+
+    g_object_set(This->appsink, "emit-signals", TRUE, NULL);
+    g_object_set(This->appsink, "sync", FALSE, NULL);
+    g_signal_connect(This->appsink, "new-sample", G_CALLBACK(stream_new_sample_wrapper), This);
+
+    This->their_src = pad;
+    This->my_sink = gst_element_get_static_pad(This->appsink, "sink");
+    gst_pad_set_element_private(pad, This);
+
+    gst_pad_link(This->their_src, This->my_sink);
+
+    gst_element_sync_state_with_parent(This->appsink);
+
+    This->IMFMediaStream_iface.lpVtbl = &media_stream_vtbl;
+    This->ref = 1;
+
+    TRACE("->(%p)\n", This);
+
+    *out_stream = This;
+    return S_OK;
+
+    fail:
+    WARN("Failed to construct media stream, hr %#x.\n", hr);
+
+    media_stream_teardown(This);
+    heap_free(This);
+    return hr;
+}
+
 /* source */
 
 static inline struct media_source *impl_from_IMFMediaSource(IMFMediaSource *iface)
@@ -235,6 +502,19 @@ static HRESULT media_source_teardown(struct media_source *This)
     if (This->byte_stream)
         IMFByteStream_Release(This->byte_stream);
 
+    for (unsigned int i = 0; i < This->stream_count; i++)
+    {
+        media_stream_teardown(This->streams[i]);
+        IMFMediaStream_Release(&This->streams[i]->IMFMediaStream_iface);
+    }
+
+    if (This->stream_count)
+        heap_free(This->streams);
+
+    if (This->init_complete_event)
+        CloseHandle(This->init_complete_event);
+    DeleteCriticalSection(&This->streams_cs);
+
     return S_OK;
 }
 
@@ -433,6 +713,115 @@ GstBusSyncReply watch_source_bus(GstBus *bus, GstMessage *message, gpointer user
     return GST_BUS_DROP;
 }
 
+static void source_stream_added(GstElement *element, GstPad *pad, gpointer user)
+{
+    struct media_stream *stream;
+    struct media_source *source = (struct media_source *) user;
+    struct media_stream **new_stream_array;
+    gchar *g_stream_id;
+    const char *stream_id_string;
+    DWORD stream_id;
+
+    EnterCriticalSection(&source->streams_cs);
+
+    g_stream_id = gst_pad_get_stream_id(pad);
+    stream_id_string = strstr(g_stream_id, "/");
+    sscanf(stream_id_string, "/%03u", &stream_id);
+    TRACE("stream-id: %u\n", stream_id);
+    g_free(g_stream_id);
+
+    /* find existing stream */
+    for (unsigned int i = 0; i < source->stream_count; i++)
+    {
+        DWORD existing_stream_id;
+        IMFStreamDescriptor *descriptor = source->streams[i]->descriptor;
+
+        if (FAILED(IMFStreamDescriptor_GetStreamIdentifier(descriptor, &existing_stream_id)))
+            goto leave;
+
+        if (existing_stream_id == stream_id)
+        {
+            struct media_stream *existing_stream = source->streams[i];
+            GstPadLinkReturn ret;
+
+            TRACE("Found existing stream %p\n", existing_stream);
+
+            if (!existing_stream->my_sink)
+            {
+                ERR("Couldn't find our sink\n");
+                goto leave;
+            }
+
+            existing_stream->their_src = pad;
+            gst_pad_set_element_private(pad, existing_stream);
+
+            if ((ret = gst_pad_link(existing_stream->their_src, existing_stream->my_sink)) != GST_PAD_LINK_OK)
+            {
+                ERR("Error linking demuxer to stream %p, err = %d\n", existing_stream, ret);
+            }
+            gst_element_sync_state_with_parent(existing_stream->appsink);
+
+            goto leave;
+        }
+    }
+
+    if (FAILED(media_stream_constructor(source, pad, stream_id, &stream)))
+    {
+        goto leave;
+    }
+
+    if (!(new_stream_array = heap_realloc(source->streams, (source->stream_count + 1) * (sizeof(*new_stream_array)))))
+    {
+        ERR("Failed to add stream to source\n");
+        goto leave;
+    }
+
+    source->streams = new_stream_array;
+    source->streams[source->stream_count++] = stream;
+
+    leave:
+    LeaveCriticalSection(&source->streams_cs);
+    return;
+}
+
+static void source_stream_removed(GstElement *element, GstPad *pad, gpointer user)
+{
+    struct media_stream *stream;
+
+    if (gst_pad_get_direction(pad) != GST_PAD_SRC)
+    {
+        return;
+    }
+
+    stream = (struct media_stream *) gst_pad_get_element_private(pad);
+
+    if (stream)
+    {
+        TRACE("Stream %p of Source %p removed\n", stream, stream->parent_source);
+
+        assert (stream->their_src == pad);
+
+        gst_pad_unlink(stream->their_src, stream->my_sink);
+
+        stream->their_src = NULL;
+        gst_pad_set_element_private(pad, NULL);
+    }
+}
+
+static void source_all_streams(GstElement *element, gpointer user)
+{
+    struct media_source *source = (struct media_source *) user;
+
+    EnterCriticalSection(&source->streams_cs);
+    if (source->state != SOURCE_OPENING)
+        goto leave;
+
+    SetEvent(source->init_complete_event);
+
+    leave:
+    LeaveCriticalSection(&source->streams_cs);
+}
+
 static HRESULT media_source_constructor(IMFByteStream *bytestream, enum source_type type, struct media_source **out_media_source)
 {
     GstStaticPadTemplate src_template = GST_STATIC_PAD_TEMPLATE(
@@ -510,11 +899,36 @@ static HRESULT media_source_constructor(IMFByteStream *bytestream, enum source_t
         goto fail;
     }
 
-    This->state = SOURCE_STOPPED;
+    g_signal_connect(This->demuxer, "pad-added", G_CALLBACK(source_stream_added_wrapper), This);
+    g_signal_connect(This->demuxer, "pad-removed", G_CALLBACK(source_stream_removed_wrapper), This);
+    g_signal_connect(This->demuxer, "no-more-pads", G_CALLBACK(source_all_streams_wrapper), This);
+
+    InitializeCriticalSection(&This->streams_cs);
+    This->init_complete_event = CreateEventA(NULL, TRUE, FALSE, NULL);
 
+    This->state = SOURCE_OPENING;
+
+    /* Setup interface early as the streams interact with us during initialization */
     This->IMFMediaSource_iface.lpVtbl = &IMFMediaSource_vtbl;
     This->ref = 1;
 
+    gst_element_set_state(This->container, GST_STATE_PAUSED);
+    ret = gst_element_get_state(This->container, NULL, NULL, -1);
+    if (ret == GST_STATE_CHANGE_FAILURE)
+    {
+        ERR("Failed to play source.\n");
+        hr = E_OUTOFMEMORY;
+        goto fail;
+    }
+
+    WaitForSingleObject(This->init_complete_event, INFINITE);
+    CloseHandle(This->init_complete_event);
+    This->init_complete_event = NULL;
+
+    gst_element_set_state(This->container, GST_STATE_READY);
+
+    This->state = SOURCE_STOPPED;
+
     *out_media_source = This;
     return S_OK;
 
@@ -1024,6 +1438,30 @@ void perform_cb_media_source(struct cb_data *cbdata)
             cbdata->u.watch_bus_data.ret = watch_source_bus(data->bus, data->msg, data->user);
             break;
         }
+    case SOURCE_STREAM_ADDED:
+        {
+            struct pad_added_data *data = &cbdata->u.pad_added_data;
+            source_stream_added(data->element, data->pad, data->user);
+            break;
+        }
+    case SOURCE_STREAM_REMOVED:
+        {
+            struct pad_removed_data *data = &cbdata->u.pad_removed_data;
+            source_stream_removed(data->element, data->pad, data->user);
+            break;
+        }
+    case SOURCE_ALL_STREAMS:
+        {
+            struct no_more_pads_data *data = &cbdata->u.no_more_pads_data;
+            source_all_streams(data->element, data->user);
+            break;
+        }
+    case STREAM_NEW_SAMPLE:
+        {
+            struct new_sample_data *data = &cbdata->u.new_sample_data;
+            cbdata->u.new_sample_data.ret = stream_new_sample(data->appsink, data->user);
+            break;
+        }
     default:
         {
             ERR("Wrong callback forwarder called\n");
-- 
2.26.0




More information about the wine-devel mailing list