[PATCH v2 2/6] winegstreamer: Merge wg_parser_disconnect and wg_parser_destroy.

Derek Lesho dlesho at codeweavers.com
Fri Sep 17 14:58:54 CDT 2021


Signed-off-by: Derek Lesho <dlesho at codeweavers.com>
---
v2:
- Removed sink_connected.
- Fixed race condition with pad_removed_cb being called for the decodebin sink crashing due to stream_count no longer being set to 0.
---
 dlls/winegstreamer/gst_private.h   |   1 -
 dlls/winegstreamer/media_source.c  |  17 +--
 dlls/winegstreamer/quartz_parser.c |  39 +++----
 dlls/winegstreamer/wg_parser.c     | 159 +++++++++++------------------
 4 files changed, 80 insertions(+), 136 deletions(-)

diff --git a/dlls/winegstreamer/gst_private.h b/dlls/winegstreamer/gst_private.h
index 49e06b31369..22d9547ed72 100644
--- a/dlls/winegstreamer/gst_private.h
+++ b/dlls/winegstreamer/gst_private.h
@@ -165,7 +165,6 @@ struct unix_funcs
     void (CDECL *wg_parser_destroy)(struct wg_parser *parser);
 
     HRESULT (CDECL *wg_parser_connect)(struct wg_parser *parser, uint64_t file_size);
-    void (CDECL *wg_parser_disconnect)(struct wg_parser *parser);
 
     void (CDECL *wg_parser_begin_flush)(struct wg_parser *parser);
     void (CDECL *wg_parser_end_flush)(struct wg_parser *parser);
diff --git a/dlls/winegstreamer/media_source.c b/dlls/winegstreamer/media_source.c
index 825bad8da27..6c2bf92e2a2 100644
--- a/dlls/winegstreamer/media_source.c
+++ b/dlls/winegstreamer/media_source.c
@@ -103,7 +103,6 @@ struct media_source
     LONGLONG start_time;
 
     HANDLE read_thread;
-    bool read_thread_shutdown;
 };
 
 static inline struct media_stream *impl_from_IMFMediaStream(IMFMediaStream *iface)
@@ -538,7 +537,7 @@ static DWORD CALLBACK read_thread(void *arg)
 
     TRACE("Starting read thread for media source %p.\n", source);
 
-    while (!source->read_thread_shutdown)
+    for (;;)
     {
         uint64_t offset;
         ULONG ret_size;
@@ -546,7 +545,7 @@ static DWORD CALLBACK read_thread(void *arg)
         HRESULT hr;
 
         if (!unix_funcs->wg_parser_get_next_read_offset(source->wg_parser, &offset, &size))
-            continue;
+            break;
 
         if (offset >= file_size)
             size = 0;
@@ -1234,9 +1233,8 @@ static HRESULT WINAPI media_source_Shutdown(IMFMediaSource *iface)
 
     source->state = SOURCE_SHUTDOWN;
 
-    unix_funcs->wg_parser_disconnect(source->wg_parser);
+    unix_funcs->wg_parser_destroy(source->wg_parser);
 
-    source->read_thread_shutdown = true;
     WaitForSingleObject(source->read_thread, INFINITE);
     CloseHandle(source->read_thread);
 
@@ -1257,8 +1255,6 @@ static HRESULT WINAPI media_source_Shutdown(IMFMediaSource *iface)
         IMFMediaStream_Release(&stream->IMFMediaStream_iface);
     }
 
-    unix_funcs->wg_parser_destroy(source->wg_parser);
-
     free(source->streams);
 
     MFUnlockWorkQueue(source->async_commands_queue);
@@ -1426,16 +1422,13 @@ static HRESULT media_source_constructor(IMFByteStream *bytestream, struct media_
         free(stream);
     }
     free(object->streams);
-    if (stream_count != UINT_MAX)
-        unix_funcs->wg_parser_disconnect(object->wg_parser);
+    if (object->wg_parser)
+        unix_funcs->wg_parser_destroy(object->wg_parser);
     if (object->read_thread)
     {
-        object->read_thread_shutdown = true;
         WaitForSingleObject(object->read_thread, INFINITE);
         CloseHandle(object->read_thread);
     }
-    if (object->wg_parser)
-        unix_funcs->wg_parser_destroy(object->wg_parser);
     if (object->async_commands_queue)
         MFUnlockWorkQueue(object->async_commands_queue);
     if (object->event_queue)
diff --git a/dlls/winegstreamer/quartz_parser.c b/dlls/winegstreamer/quartz_parser.c
index a8e7e3d979f..bf69a881d57 100644
--- a/dlls/winegstreamer/quartz_parser.c
+++ b/dlls/winegstreamer/quartz_parser.c
@@ -59,6 +59,7 @@ struct parser
 
     HANDLE read_thread;
 
+    struct wg_parser * (*parser_create)(void);
     BOOL (*init_gst)(struct parser *filter);
     HRESULT (*source_query_accept)(struct parser_source *pin, const AM_MEDIA_TYPE *mt);
     HRESULT (*source_get_media_type)(struct parser_source *pin, unsigned int index, AM_MEDIA_TYPE *mt);
@@ -793,14 +794,14 @@ static DWORD CALLBACK read_thread(void *arg)
 
     TRACE("Starting read thread for filter %p.\n", filter);
 
-    while (filter->sink_connected)
+    for(;;)
     {
         uint64_t offset;
         uint32_t size;
         HRESULT hr;
 
         if (!unix_funcs->wg_parser_get_next_read_offset(filter->wg_parser, &offset, &size))
-            continue;
+            break;
 
         if (offset >= file_size)
             size = 0;
@@ -869,8 +870,6 @@ static void parser_destroy(struct strmbase_filter *iface)
         IAsyncReader_Release(filter->reader);
     filter->reader = NULL;
 
-    unix_funcs->wg_parser_destroy(filter->wg_parser);
-
     strmbase_sink_cleanup(&filter->sink);
     strmbase_filter_cleanup(&filter->filter);
     free(filter);
@@ -975,6 +974,12 @@ static HRESULT parser_sink_connect(struct strmbase_sink *iface, IPin *peer, cons
 
     IAsyncReader_Length(filter->reader, &file_size, &unused);
 
+    if (!(filter->wg_parser = filter->parser_create()))
+    {
+        hr = E_OUTOFMEMORY;
+        goto err;
+    }
+
     filter->sink_connected = true;
     filter->read_thread = CreateThread(NULL, 0, read_thread, filter, 0, NULL);
 
@@ -1115,11 +1120,7 @@ HRESULT decodebin_parser_create(IUnknown *outer, IUnknown **out)
     if (!(object = calloc(1, sizeof(*object))))
         return E_OUTOFMEMORY;
 
-    if (!(object->wg_parser = unix_funcs->wg_decodebin_parser_create()))
-    {
-        free(object);
-        return E_OUTOFMEMORY;
-    }
+    object->parser_create = unix_funcs->wg_decodebin_parser_create;
 
     strmbase_filter_init(&object->filter, outer, &CLSID_decodebin_parser, &filter_ops);
     strmbase_sink_init(&object->sink, &object->filter, L"input pin", &sink_ops, NULL);
@@ -1550,7 +1551,7 @@ static HRESULT GST_RemoveOutputPins(struct parser *This)
     if (!This->sink_connected)
         return S_OK;
 
-    unix_funcs->wg_parser_disconnect(This->wg_parser);
+    unix_funcs->wg_parser_destroy(This->wg_parser);
 
     /* read_thread() needs to stay alive to service any read requests GStreamer
      * sends, so we can only shut it down after GStreamer stops. */
@@ -1646,11 +1647,7 @@ HRESULT wave_parser_create(IUnknown *outer, IUnknown **out)
     if (!(object = calloc(1, sizeof(*object))))
         return E_OUTOFMEMORY;
 
-    if (!(object->wg_parser = unix_funcs->wg_wave_parser_create()))
-    {
-        free(object);
-        return E_OUTOFMEMORY;
-    }
+    object->parser_create = unix_funcs->wg_wave_parser_create;
 
     strmbase_filter_init(&object->filter, outer, &CLSID_WAVEParser, &filter_ops);
     strmbase_sink_init(&object->sink, &object->filter, L"input pin", &wave_parser_sink_ops, NULL);
@@ -1732,11 +1729,7 @@ HRESULT avi_splitter_create(IUnknown *outer, IUnknown **out)
     if (!(object = calloc(1, sizeof(*object))))
         return E_OUTOFMEMORY;
 
-    if (!(object->wg_parser = unix_funcs->wg_avi_parser_create()))
-    {
-        free(object);
-        return E_OUTOFMEMORY;
-    }
+    object->parser_create = unix_funcs->wg_avi_parser_create;
 
     strmbase_filter_init(&object->filter, outer, &CLSID_AviSplitter, &filter_ops);
     strmbase_sink_init(&object->sink, &object->filter, L"input pin", &avi_splitter_sink_ops, NULL);
@@ -1839,11 +1832,7 @@ HRESULT mpeg_splitter_create(IUnknown *outer, IUnknown **out)
     if (!(object = calloc(1, sizeof(*object))))
         return E_OUTOFMEMORY;
 
-    if (!(object->wg_parser = unix_funcs->wg_mpeg_audio_parser_create()))
-    {
-        free(object);
-        return E_OUTOFMEMORY;
-    }
+    object->parser_create = unix_funcs->wg_mpeg_audio_parser_create;
 
     strmbase_filter_init(&object->filter, outer, &CLSID_MPEG1Splitter, &mpeg_splitter_ops);
     strmbase_sink_init(&object->sink, &object->filter, L"Input", &mpeg_splitter_sink_ops, NULL);
diff --git a/dlls/winegstreamer/wg_parser.c b/dlls/winegstreamer/wg_parser.c
index 6b6b033b879..08823d52101 100644
--- a/dlls/winegstreamer/wg_parser.c
+++ b/dlls/winegstreamer/wg_parser.c
@@ -55,8 +55,8 @@ struct wg_parser
 
     pthread_mutex_t mutex;
 
-    pthread_cond_t init_cond;
-    bool no_more_pads, has_duration, error;
+    pthread_cond_t state_cond;
+    bool no_more_pads, has_duration, error, close_reader;
 
     pthread_cond_t read_cond;
     struct
@@ -66,7 +66,7 @@ struct wg_parser
         bool pending;
     } read_request;
 
-    bool flushing, sink_connected;
+    bool flushing;
 };
 
 struct wg_parser_stream
@@ -515,11 +515,13 @@ static bool CDECL wg_parser_get_next_read_offset(struct wg_parser *parser,
 {
     pthread_mutex_lock(&parser->mutex);
 
-    while (parser->sink_connected && !parser->read_request.pending)
+    while (!parser->close_reader && !parser->read_request.pending)
         pthread_cond_wait(&parser->read_cond, &parser->mutex);
 
-    if (!parser->sink_connected)
+    if (parser->close_reader)
     {
+        parser->close_reader = false;
+        pthread_cond_signal(&parser->state_cond);
         pthread_mutex_unlock(&parser->mutex);
         return false;
     }
@@ -543,13 +545,10 @@ static void CDECL wg_parser_push_data(struct wg_parser *parser,
     {
         pthread_mutex_lock(&parser->mutex);
 
-        if (parser->sink_connected)
-        {
-            error = g_error_new(G_FILE_ERROR, G_FILE_ERROR_FAILED, "WG-Parser client failed to read data at offset %" G_GUINT64_FORMAT, parser->read_request.offset);
-            message = gst_message_new_error(NULL, error, "");
-            gst_bus_post(parser->bus, message);
-            parser->read_request.pending = false;
-        }
+        error = g_error_new(G_FILE_ERROR, G_FILE_ERROR_FAILED, "WG-Parser client failed to read data at offset %" G_GUINT64_FORMAT, parser->read_request.offset);
+        message = gst_message_new_error(NULL, error, "");
+        gst_bus_post(parser->bus, message);
+        parser->read_request.pending = false;
 
         pthread_mutex_unlock(&parser->mutex);
         return;
@@ -559,8 +558,7 @@ static void CDECL wg_parser_push_data(struct wg_parser *parser,
     {
         pthread_mutex_lock(&parser->mutex);
 
-        if (parser->sink_connected)
-            g_signal_emit_by_name(G_OBJECT(parser->appsrc), "end-of-stream", &ret);
+        g_signal_emit_by_name(G_OBJECT(parser->appsrc), "end-of-stream", &ret);
         parser->read_request.pending = false;
 
         pthread_mutex_unlock(&parser->mutex);
@@ -577,14 +575,6 @@ static void CDECL wg_parser_push_data(struct wg_parser *parser,
     gst_buffer_fill(buffer, 0, data, size);
 
     pthread_mutex_lock(&parser->mutex);
-
-    if (!parser->sink_connected)
-    {
-        pthread_mutex_unlock(&parser->mutex);
-        gst_buffer_unref(buffer);
-        return;
-    }
-
     assert(parser->read_request.pending);
 
     GST_BUFFER_OFFSET(buffer) = parser->read_request.offset;
@@ -798,7 +788,7 @@ static void no_more_pads_cb(GstElement *element, gpointer user)
     pthread_mutex_lock(&parser->mutex);
     parser->no_more_pads = true;
     pthread_mutex_unlock(&parser->mutex);
-    pthread_cond_signal(&parser->init_cond);
+    pthread_cond_signal(&parser->state_cond);
 }
 
 static GstFlowReturn queue_stream_event(struct wg_parser_stream *stream,
@@ -885,7 +875,7 @@ static gboolean sink_event_cb(GstPad *pad, GstObject *parent, GstEvent *event)
                 pthread_mutex_lock(&parser->mutex);
                 stream->eos = true;
                 pthread_mutex_unlock(&parser->mutex);
-                pthread_cond_signal(&parser->init_cond);
+                pthread_cond_signal(&parser->state_cond);
             }
             break;
 
@@ -936,7 +926,7 @@ static gboolean sink_event_cb(GstPad *pad, GstObject *parent, GstEvent *event)
             wg_format_from_caps(&stream->preferred_format, caps);
             stream->has_caps = true;
             pthread_mutex_unlock(&parser->mutex);
-            pthread_cond_signal(&parser->init_cond);
+            pthread_cond_signal(&parser->state_cond);
             break;
         }
 
@@ -1228,6 +1218,9 @@ static void pad_removed_cb(GstElement *element, GstPad *pad, gpointer user)
     unsigned int i;
     char *name;
 
+    if (!GST_PAD_IS_SRC(pad))
+        return;
+
     GST_LOG("parser %p, element %p, pad %p.", parser, element, pad);
 
     for (i = 0; i < parser->stream_count; ++i)
@@ -1322,7 +1315,7 @@ static GstBusSyncReply bus_handler_cb(GstBus *bus, GstMessage *msg, gpointer use
         pthread_mutex_lock(&parser->mutex);
         parser->error = true;
         pthread_mutex_unlock(&parser->mutex);
-        pthread_cond_signal(&parser->init_cond);
+        pthread_cond_signal(&parser->state_cond);
         break;
 
     case GST_MESSAGE_WARNING:
@@ -1337,7 +1330,7 @@ static GstBusSyncReply bus_handler_cb(GstBus *bus, GstMessage *msg, gpointer use
         pthread_mutex_lock(&parser->mutex);
         parser->has_duration = true;
         pthread_mutex_unlock(&parser->mutex);
-        pthread_cond_signal(&parser->init_cond);
+        pthread_cond_signal(&parser->state_cond);
         break;
 
     default:
@@ -1352,8 +1345,6 @@ static HRESULT CDECL wg_parser_connect(struct wg_parser *parser, uint64_t file_s
     unsigned int i;
     int ret;
 
-    parser->sink_connected = true;
-
     if (!parser->bus)
     {
         parser->bus = gst_bus_new();
@@ -1376,24 +1367,24 @@ static HRESULT CDECL wg_parser_connect(struct wg_parser *parser, uint64_t file_s
     parser->error = false;
 
     if (!parser->init_gst(parser))
-        goto out;
+        return E_FAIL;
 
     gst_element_set_state(parser->container, GST_STATE_PAUSED);
     ret = gst_element_get_state(parser->container, NULL, NULL, -1);
     if (ret == GST_STATE_CHANGE_FAILURE)
     {
         GST_ERROR("Failed to play stream.\n");
-        goto out;
+        return E_FAIL;
     }
 
     pthread_mutex_lock(&parser->mutex);
 
     while (!parser->no_more_pads && !parser->error)
-        pthread_cond_wait(&parser->init_cond, &parser->mutex);
+        pthread_cond_wait(&parser->state_cond, &parser->mutex);
     if (parser->error)
     {
         pthread_mutex_unlock(&parser->mutex);
-        goto out;
+        return E_FAIL;
     }
 
     for (i = 0; i < parser->stream_count; ++i)
@@ -1402,7 +1393,7 @@ static HRESULT CDECL wg_parser_connect(struct wg_parser *parser, uint64_t file_s
         gint64 duration;
 
         while (!stream->has_caps && !parser->error)
-            pthread_cond_wait(&parser->init_cond, &parser->mutex);
+            pthread_cond_wait(&parser->state_cond, &parser->mutex);
 
         /* GStreamer doesn't actually provide any guarantees about when duration
          * is available, even for seekable streams. It's basically built for
@@ -1433,7 +1424,7 @@ static HRESULT CDECL wg_parser_connect(struct wg_parser *parser, uint64_t file_s
             if (parser->error)
             {
                 pthread_mutex_unlock(&parser->mutex);
-                goto out;
+                return E_FAIL;
             }
             if (gst_pad_query_duration(stream->their_src, GST_FORMAT_TIME, &duration))
             {
@@ -1461,7 +1452,7 @@ static HRESULT CDECL wg_parser_connect(struct wg_parser *parser, uint64_t file_s
             }
             else
             {
-                pthread_cond_wait(&parser->init_cond, &parser->mutex);
+                pthread_cond_wait(&parser->state_cond, &parser->mutex);
             }
         }
     }
@@ -1469,62 +1460,6 @@ static HRESULT CDECL wg_parser_connect(struct wg_parser *parser, uint64_t file_s
     pthread_mutex_unlock(&parser->mutex);
 
     return S_OK;
-
-out:
-    if (parser->container)
-        gst_element_set_state(parser->container, GST_STATE_NULL);
-
-    for (i = 0; i < parser->stream_count; ++i)
-        free_stream(parser->streams[i]);
-    parser->stream_count = 0;
-    free(parser->streams);
-    parser->streams = NULL;
-
-    if (parser->container)
-    {
-        gst_element_set_bus(parser->container, NULL);
-        gst_object_unref(parser->container);
-        parser->container = NULL;
-    }
-
-    pthread_mutex_lock(&parser->mutex);
-    parser->sink_connected = false;
-    pthread_mutex_unlock(&parser->mutex);
-    pthread_cond_signal(&parser->read_cond);
-
-    return E_FAIL;
-}
-
-static void CDECL wg_parser_disconnect(struct wg_parser *parser)
-{
-    unsigned int i;
-
-    /* Unblock all of our streams. */
-    pthread_mutex_lock(&parser->mutex);
-    for (i = 0; i < parser->stream_count; ++i)
-    {
-        parser->streams[i]->flushing = true;
-        pthread_cond_signal(&parser->streams[i]->event_empty_cond);
-    }
-    pthread_mutex_unlock(&parser->mutex);
-
-    gst_element_set_state(parser->container, GST_STATE_NULL);
-
-    pthread_mutex_lock(&parser->mutex);
-    parser->sink_connected = false;
-    pthread_mutex_unlock(&parser->mutex);
-    pthread_cond_signal(&parser->read_cond);
-
-    for (i = 0; i < parser->stream_count; ++i)
-        free_stream(parser->streams[i]);
-
-    parser->stream_count = 0;
-    free(parser->streams);
-    parser->streams = NULL;
-
-    gst_element_set_bus(parser->container, NULL);
-    gst_object_unref(parser->container);
-    parser->container = NULL;
 }
 
 static BOOL decodebin_parser_init_gst(struct wg_parser *parser)
@@ -1656,7 +1591,7 @@ static struct wg_parser *wg_parser_create(void)
         return NULL;
 
     pthread_mutex_init(&parser->mutex, NULL);
-    pthread_cond_init(&parser->init_cond, NULL);
+    pthread_cond_init(&parser->state_cond, NULL);
     pthread_cond_init(&parser->read_cond, NULL);
     parser->flushing = true;
 
@@ -1702,14 +1637,43 @@ static struct wg_parser * CDECL wg_wave_parser_create(void)
 
 static void CDECL wg_parser_destroy(struct wg_parser *parser)
 {
-    if (parser->bus)
+    unsigned int i;
+
+    pthread_mutex_lock(&parser->mutex);
+    parser->close_reader = true;
+    pthread_cond_signal(&parser->read_cond);
+    while (parser->close_reader)
+        pthread_cond_wait(&parser->state_cond, &parser->mutex);
+    pthread_mutex_unlock(&parser->mutex);
+
+    /* Unblock all of our streams. */
+    pthread_mutex_lock(&parser->mutex);
+    for (i = 0; i < parser->stream_count; ++i)
+    {
+        parser->streams[i]->flushing = true;
+        pthread_cond_signal(&parser->streams[i]->event_empty_cond);
+    }
+    pthread_mutex_unlock(&parser->mutex);
+
+    if (parser->container)
+        gst_element_set_state(parser->container, GST_STATE_NULL);
+
+    for (i = 0; i < parser->stream_count; ++i)
+        free_stream(parser->streams[i]);
+
+    free(parser->streams);
+
+    if (parser->container)
     {
-        gst_bus_set_sync_handler(parser->bus, NULL, NULL, NULL);
-        gst_object_unref(parser->bus);
+        gst_element_set_bus(parser->container, NULL);
+        gst_object_unref(parser->container);
     }
 
+    gst_bus_set_sync_handler(parser->bus, NULL, NULL, NULL);
+    gst_object_unref(parser->bus);
+
     pthread_mutex_destroy(&parser->mutex);
-    pthread_cond_destroy(&parser->init_cond);
+    pthread_cond_destroy(&parser->state_cond);
     pthread_cond_destroy(&parser->read_cond);
 
     free(parser);
@@ -1724,7 +1688,6 @@ static const struct unix_funcs funcs =
     wg_parser_destroy,
 
     wg_parser_connect,
-    wg_parser_disconnect,
 
     wg_parser_begin_flush,
     wg_parser_end_flush,
-- 
2.33.0




More information about the wine-devel mailing list