[RFC PATCH 4/5] winegstreamer: Merge wg_parser_disconnect and wg_parser_destroy.

Derek Lesho dlesho at codeweavers.com
Wed Sep 1 16:05:57 CDT 2021


Signed-off-by: Derek Lesho <dlesho at codeweavers.com>
---
 dlls/winegstreamer/gst_private.h   |   1 -
 dlls/winegstreamer/media_source.c  |  13 ++--
 dlls/winegstreamer/quartz_parser.c |  39 ++++-------
 dlls/winegstreamer/wg_parser.c     | 100 +++++++++++++++--------------
 4 files changed, 69 insertions(+), 84 deletions(-)

diff --git a/dlls/winegstreamer/gst_private.h b/dlls/winegstreamer/gst_private.h
index 9943682facb..36d88f7b723 100644
--- a/dlls/winegstreamer/gst_private.h
+++ b/dlls/winegstreamer/gst_private.h
@@ -164,7 +164,6 @@ struct unix_funcs
     void (CDECL *wg_parser_destroy)(struct wg_parser *parser);
 
     HRESULT (CDECL *wg_parser_connect)(struct wg_parser *parser, uint64_t file_size);
-    void (CDECL *wg_parser_disconnect)(struct wg_parser *parser);
 
     void (CDECL *wg_parser_begin_flush)(struct wg_parser *parser);
     void (CDECL *wg_parser_end_flush)(struct wg_parser *parser);
diff --git a/dlls/winegstreamer/media_source.c b/dlls/winegstreamer/media_source.c
index abd9a220a7f..75b3b399f4b 100644
--- a/dlls/winegstreamer/media_source.c
+++ b/dlls/winegstreamer/media_source.c
@@ -103,7 +103,6 @@ struct media_source
     LONGLONG start_time;
 
     HANDLE read_thread;
-    bool read_thread_shutdown;
 };
 
 static inline struct media_stream *impl_from_IMFMediaStream(IMFMediaStream *iface)
@@ -533,7 +532,7 @@ static DWORD CALLBACK read_thread(void *arg)
 
     TRACE("Starting read thread for media source %p.\n", source);
 
-    while (!source->read_thread_shutdown)
+    for(;;)
     {
         uint64_t offset;
         ULONG ret_size;
@@ -542,7 +541,7 @@ static DWORD CALLBACK read_thread(void *arg)
         void *data;
 
         if (!unix_funcs->wg_parser_get_next_read_offset(source->wg_parser, &offset, &size))
-            continue;
+            break;
 
         data = malloc(size);
         ret_size = 0;
@@ -1218,12 +1217,10 @@ static HRESULT WINAPI media_source_Shutdown(IMFMediaSource *iface)
 
     source->state = SOURCE_SHUTDOWN;
 
-    if (source->stream_count)
-        unix_funcs->wg_parser_disconnect(source->wg_parser);
+    unix_funcs->wg_parser_destroy(source->wg_parser);
 
     if (source->read_thread)
     {
-        source->read_thread_shutdown = true;
         WaitForSingleObject(source->read_thread, INFINITE);
         CloseHandle(source->read_thread);
     }
@@ -1254,8 +1251,6 @@ static HRESULT WINAPI media_source_Shutdown(IMFMediaSource *iface)
         IMFMediaStream_Release(&stream->IMFMediaStream_iface);
     }
 
-    unix_funcs->wg_parser_destroy(source->wg_parser);
-
     if (source->streams)
         free(source->streams);
 
@@ -1402,7 +1397,7 @@ static HRESULT media_source_constructor(IMFByteStream *bytestream, struct media_
     fail:
     WARN("Failed to construct MFMediaSource, hr %#x.\n", hr);
 
-    if (object->wg_parser)
+    if (parser)
         IMFMediaSource_Shutdown(&object->IMFMediaSource_iface);
     free(descriptors);
     IMFMediaSource_Release(&object->IMFMediaSource_iface);
diff --git a/dlls/winegstreamer/quartz_parser.c b/dlls/winegstreamer/quartz_parser.c
index c85cbe4cf44..3e35e7d1503 100644
--- a/dlls/winegstreamer/quartz_parser.c
+++ b/dlls/winegstreamer/quartz_parser.c
@@ -59,6 +59,7 @@ struct parser
 
     HANDLE read_thread;
 
+    struct wg_parser * (*parser_create)(void);
     BOOL (*init_gst)(struct parser *filter);
     HRESULT (*source_query_accept)(struct parser_source *pin, const AM_MEDIA_TYPE *mt);
     HRESULT (*source_get_media_type)(struct parser_source *pin, unsigned int index, AM_MEDIA_TYPE *mt);
@@ -788,7 +789,7 @@ static DWORD CALLBACK read_thread(void *arg)
 
     TRACE("Starting read thread for filter %p.\n", filter);
 
-    while (filter->sink_connected)
+    for(;;)
     {
         uint64_t offset;
         uint32_t size;
@@ -796,7 +797,7 @@ static DWORD CALLBACK read_thread(void *arg)
         void *data;
 
         if (!unix_funcs->wg_parser_get_next_read_offset(filter->wg_parser, &offset, &size))
-            continue;
+            break;
         data = malloc(size);
         hr = IAsyncReader_SyncRead(filter->reader, offset, size, data);
         if (FAILED(hr))
@@ -853,8 +854,6 @@ static void parser_destroy(struct strmbase_filter *iface)
         IAsyncReader_Release(filter->reader);
     filter->reader = NULL;
 
-    unix_funcs->wg_parser_destroy(filter->wg_parser);
-
     strmbase_sink_cleanup(&filter->sink);
     strmbase_filter_cleanup(&filter->filter);
     free(filter);
@@ -959,6 +958,12 @@ static HRESULT parser_sink_connect(struct strmbase_sink *iface, IPin *peer, cons
 
     IAsyncReader_Length(filter->reader, &file_size, &unused);
 
+    if (!(filter->wg_parser = filter->parser_create()))
+    {
+        hr = E_OUTOFMEMORY;
+        goto err;
+    }
+
     filter->sink_connected = true;
     filter->read_thread = CreateThread(NULL, 0, read_thread, filter, 0, NULL);
 
@@ -1096,11 +1101,7 @@ HRESULT decodebin_parser_create(IUnknown *outer, IUnknown **out)
     if (!(object = calloc(1, sizeof(*object))))
         return E_OUTOFMEMORY;
 
-    if (!(object->wg_parser = unix_funcs->wg_decodebin_parser_create()))
-    {
-        free(object);
-        return E_OUTOFMEMORY;
-    }
+    object->parser_create = unix_funcs->wg_decodebin_parser_create;
 
     strmbase_filter_init(&object->filter, outer, &CLSID_decodebin_parser, &filter_ops);
     strmbase_sink_init(&object->sink, &object->filter, L"input pin", &sink_ops, NULL);
@@ -1531,7 +1532,7 @@ static HRESULT GST_RemoveOutputPins(struct parser *This)
     if (!This->sink_connected)
         return S_OK;
 
-    unix_funcs->wg_parser_disconnect(This->wg_parser);
+    unix_funcs->wg_parser_destroy(This->wg_parser);
 
     /* read_thread() needs to stay alive to service any read requests GStreamer
      * sends, so we can only shut it down after GStreamer stops. */
@@ -1627,11 +1628,7 @@ HRESULT wave_parser_create(IUnknown *outer, IUnknown **out)
     if (!(object = calloc(1, sizeof(*object))))
         return E_OUTOFMEMORY;
 
-    if (!(object->wg_parser = unix_funcs->wg_wave_parser_create()))
-    {
-        free(object);
-        return E_OUTOFMEMORY;
-    }
+    object->parser_create = unix_funcs->wg_wave_parser_create;
 
     strmbase_filter_init(&object->filter, outer, &CLSID_WAVEParser, &filter_ops);
     strmbase_sink_init(&object->sink, &object->filter, L"input pin", &wave_parser_sink_ops, NULL);
@@ -1713,11 +1710,7 @@ HRESULT avi_splitter_create(IUnknown *outer, IUnknown **out)
     if (!(object = calloc(1, sizeof(*object))))
         return E_OUTOFMEMORY;
 
-    if (!(object->wg_parser = unix_funcs->wg_avi_parser_create()))
-    {
-        free(object);
-        return E_OUTOFMEMORY;
-    }
+    object->parser_create = unix_funcs->wg_avi_parser_create;
 
     strmbase_filter_init(&object->filter, outer, &CLSID_AviSplitter, &filter_ops);
     strmbase_sink_init(&object->sink, &object->filter, L"input pin", &avi_splitter_sink_ops, NULL);
@@ -1820,11 +1813,7 @@ HRESULT mpeg_splitter_create(IUnknown *outer, IUnknown **out)
     if (!(object = calloc(1, sizeof(*object))))
         return E_OUTOFMEMORY;
 
-    if (!(object->wg_parser = unix_funcs->wg_mpeg_audio_parser_create()))
-    {
-        free(object);
-        return E_OUTOFMEMORY;
-    }
+    object->parser_create = unix_funcs->wg_mpeg_audio_parser_create;
 
     strmbase_filter_init(&object->filter, outer, &CLSID_MPEG1Splitter, &mpeg_splitter_ops);
     strmbase_sink_init(&object->sink, &object->filter, L"Input", &mpeg_splitter_sink_ops, NULL);
diff --git a/dlls/winegstreamer/wg_parser.c b/dlls/winegstreamer/wg_parser.c
index 7f2627235f1..d29298632a4 100644
--- a/dlls/winegstreamer/wg_parser.c
+++ b/dlls/winegstreamer/wg_parser.c
@@ -58,8 +58,8 @@ struct wg_parser
 
     pthread_mutex_t mutex;
 
-    pthread_cond_t init_cond;
-    bool no_more_pads, has_duration, error;
+    pthread_cond_t state_cond;
+    bool no_more_pads, has_duration, error, shutdown;
 
     pthread_cond_t read_cond;
     struct
@@ -518,12 +518,13 @@ static bool CDECL wg_parser_get_next_read_offset(struct wg_parser *parser,
 {
     pthread_mutex_lock(&parser->mutex);
 
-    while (parser->sink_connected && parser->read_request.offset == -1)
+    while (!parser->shutdown && parser->read_request.offset == -1)
         pthread_cond_wait(&parser->read_cond, &parser->mutex);
 
-    if (!parser->sink_connected)
+    if (parser->shutdown)
     {
         pthread_mutex_unlock(&parser->mutex);
+        pthread_cond_signal(&parser->state_cond);
         return false;
     }
 
@@ -779,7 +780,7 @@ static void no_more_pads_cb(GstElement *element, gpointer user)
     pthread_mutex_lock(&parser->mutex);
     parser->no_more_pads = true;
     pthread_mutex_unlock(&parser->mutex);
-    pthread_cond_signal(&parser->init_cond);
+    pthread_cond_signal(&parser->state_cond);
 }
 
 static GstFlowReturn queue_stream_event(struct wg_parser_stream *stream,
@@ -866,7 +867,7 @@ static gboolean sink_event_cb(GstPad *pad, GstObject *parent, GstEvent *event)
                 pthread_mutex_lock(&parser->mutex);
                 stream->eos = true;
                 pthread_mutex_unlock(&parser->mutex);
-                pthread_cond_signal(&parser->init_cond);
+                pthread_cond_signal(&parser->state_cond);
             }
             break;
 
@@ -917,7 +918,7 @@ static gboolean sink_event_cb(GstPad *pad, GstObject *parent, GstEvent *event)
             wg_format_from_caps(&stream->preferred_format, caps);
             stream->has_caps = true;
             pthread_mutex_unlock(&parser->mutex);
-            pthread_cond_signal(&parser->init_cond);
+            pthread_cond_signal(&parser->state_cond);
             break;
         }
 
@@ -1264,7 +1265,7 @@ static GstBusSyncReply bus_handler_cb(GstBus *bus, GstMessage *msg, gpointer use
         pthread_mutex_lock(&parser->mutex);
         parser->error = true;
         pthread_mutex_unlock(&parser->mutex);
-        pthread_cond_signal(&parser->init_cond);
+        pthread_cond_signal(&parser->state_cond);
         break;
 
     case GST_MESSAGE_WARNING:
@@ -1279,7 +1280,7 @@ static GstBusSyncReply bus_handler_cb(GstBus *bus, GstMessage *msg, gpointer use
         pthread_mutex_lock(&parser->mutex);
         parser->has_duration = true;
         pthread_mutex_unlock(&parser->mutex);
-        pthread_cond_signal(&parser->init_cond);
+        pthread_cond_signal(&parser->state_cond);
         break;
 
     default:
@@ -1294,7 +1295,6 @@ static HRESULT CDECL wg_parser_connect(struct wg_parser *parser, uint64_t file_s
     unsigned int i;
 
     parser->file_size = file_size;
-    parser->sink_connected = true;
 
     if (!parser->bus)
     {
@@ -1327,7 +1327,7 @@ static HRESULT CDECL wg_parser_connect(struct wg_parser *parser, uint64_t file_s
         gint64 duration;
 
         while (!stream->has_caps && !parser->error)
-            pthread_cond_wait(&parser->init_cond, &parser->mutex);
+            pthread_cond_wait(&parser->state_cond, &parser->mutex);
 
         /* GStreamer doesn't actually provide any guarantees about when duration
          * is available, even for seekable streams. It's basically built for
@@ -1386,13 +1386,14 @@ static HRESULT CDECL wg_parser_connect(struct wg_parser *parser, uint64_t file_s
             }
             else
             {
-                pthread_cond_wait(&parser->init_cond, &parser->mutex);
+                pthread_cond_wait(&parser->state_cond, &parser->mutex);
             }
         }
     }
 
     pthread_mutex_unlock(&parser->mutex);
 
+    parser->sink_connected = true;
     return S_OK;
 }
 
@@ -1420,38 +1421,6 @@ static void free_stream(struct wg_parser_stream *stream)
     free(stream);
 }
 
-static void CDECL wg_parser_disconnect(struct wg_parser *parser)
-{
-    unsigned int i;
-
-    /* Unblock all of our streams. */
-    pthread_mutex_lock(&parser->mutex);
-    for (i = 0; i < parser->stream_count; ++i)
-    {
-        parser->streams[i]->flushing = true;
-        pthread_cond_signal(&parser->streams[i]->event_empty_cond);
-    }
-    pthread_mutex_unlock(&parser->mutex);
-
-    gst_element_set_state(parser->container, GST_STATE_NULL);
-
-    pthread_mutex_lock(&parser->mutex);
-    parser->sink_connected = false;
-    pthread_mutex_unlock(&parser->mutex);
-    pthread_cond_signal(&parser->read_cond);
-
-    for (i = 0; i < parser->stream_count; ++i)
-        free_stream(parser->streams[i]);
-
-    parser->stream_count = 0;
-    free(parser->streams);
-    parser->streams = NULL;
-
-    gst_element_set_bus(parser->container, NULL);
-    gst_object_unref(parser->container);
-    parser->container = NULL;
-}
-
 static BOOL decodebin_parser_init_gst(struct wg_parser *parser)
 {
     GstElement *element;
@@ -1488,7 +1457,7 @@ static BOOL decodebin_parser_init_gst(struct wg_parser *parser)
 
     pthread_mutex_lock(&parser->mutex);
     while (!parser->no_more_pads && !parser->error)
-        pthread_cond_wait(&parser->init_cond, &parser->mutex);
+        pthread_cond_wait(&parser->state_cond, &parser->mutex);
     if (parser->error)
     {
         pthread_mutex_unlock(&parser->mutex);
@@ -1533,7 +1502,7 @@ static BOOL avi_parser_init_gst(struct wg_parser *parser)
 
     pthread_mutex_lock(&parser->mutex);
     while (!parser->no_more_pads && !parser->error)
-        pthread_cond_wait(&parser->init_cond, &parser->mutex);
+        pthread_cond_wait(&parser->state_cond, &parser->mutex);
     if (parser->error)
     {
         pthread_mutex_unlock(&parser->mutex);
@@ -1631,7 +1600,7 @@ static struct wg_parser *wg_parser_create(void)
         return NULL;
 
     pthread_mutex_init(&parser->mutex, NULL);
-    pthread_cond_init(&parser->init_cond, NULL);
+    pthread_cond_init(&parser->state_cond, NULL);
     pthread_cond_init(&parser->read_cond, NULL);
     parser->flushing = true;
     parser->read_request.offset = -1;
@@ -1678,6 +1647,40 @@ static struct wg_parser * CDECL wg_wave_parser_create(void)
 
 static void CDECL wg_parser_destroy(struct wg_parser *parser)
 {
+    unsigned int i;
+
+    /* shut down read thread first to post-shutdown push_data */
+    pthread_mutex_lock(&parser->mutex);
+    parser->shutdown = true;
+    pthread_cond_signal(&parser->read_cond);
+    pthread_cond_wait(&parser->state_cond, &parser->mutex);
+    pthread_mutex_unlock(&parser->mutex);
+
+    if (parser->sink_connected)
+    {
+        /* Unblock all of our streams. */
+        pthread_mutex_lock(&parser->mutex);
+        for (i = 0; i < parser->stream_count; ++i)
+        {
+            parser->streams[i]->flushing = true;
+            pthread_cond_signal(&parser->streams[i]->event_empty_cond);
+        }
+        pthread_mutex_unlock(&parser->mutex);
+
+        gst_element_set_state(parser->container, GST_STATE_NULL);
+
+        for (i = 0; i < parser->stream_count; ++i)
+            free_stream(parser->streams[i]);
+
+        parser->stream_count = 0;
+        free(parser->streams);
+        parser->streams = NULL;
+
+        gst_element_set_bus(parser->container, NULL);
+        gst_object_unref(parser->container);
+        parser->container = NULL;
+    }
+
     if (parser->bus)
     {
         gst_bus_set_sync_handler(parser->bus, NULL, NULL, NULL);
@@ -1685,7 +1688,7 @@ static void CDECL wg_parser_destroy(struct wg_parser *parser)
     }
 
     pthread_mutex_destroy(&parser->mutex);
-    pthread_cond_destroy(&parser->init_cond);
+    pthread_cond_destroy(&parser->state_cond);
     pthread_cond_destroy(&parser->read_cond);
 
     free(parser);
@@ -1700,7 +1703,6 @@ static const struct unix_funcs funcs =
     wg_parser_destroy,
 
     wg_parser_connect,
-    wg_parser_disconnect,
 
     wg_parser_begin_flush,
     wg_parser_end_flush,
-- 
2.33.0




More information about the wine-devel mailing list