[PATCH 3/5] winegstreamer: Use a pthread condition variable to wait for filter initialization.

Zebediah Figura z.figura12 at gmail.com
Thu Jan 21 17:22:53 CST 2021


Signed-off-by: Zebediah Figura <z.figura12 at gmail.com>
---
 dlls/winegstreamer/gstdemux.c | 120 ++++++++++++++++++++++------------
 1 file changed, 79 insertions(+), 41 deletions(-)

diff --git a/dlls/winegstreamer/gstdemux.c b/dlls/winegstreamer/gstdemux.c
index d79622094f6..bab9804bdbd 100644
--- a/dlls/winegstreamer/gstdemux.c
+++ b/dlls/winegstreamer/gstdemux.c
@@ -71,7 +71,10 @@ struct parser
     GstPad *my_src, *their_sink;
     GstBus *bus;
     guint64 start, nextofs, nextpullofs, stop;
-    HANDLE no_more_pads_event, duration_event, error_event;
+
+    pthread_mutex_t mutex;
+    pthread_cond_t init_cond;
+    bool no_more_pads, has_duration, error;
 
     HANDLE push_thread;
 
@@ -109,14 +112,13 @@ struct parser_source
 
     GstPad *their_src, *post_sink, *post_src, *my_sink;
     GstElement *flip;
-    HANDLE caps_event, eos_event;
     GstSegment *segment;
     GstCaps *caps;
     SourceSeeking seek;
 
     CRITICAL_SECTION flushing_cs;
     CONDITION_VARIABLE event_cv, event_empty_cv;
-    bool flushing;
+    bool flushing, eos;
     struct parser_event event;
     HANDLE thread;
 };
@@ -742,7 +744,12 @@ static gboolean event_sink(GstPad *pad, GstObject *parent, GstEvent *event)
                 queue_stream_event(pin, &stream_event);
             }
             else
-                SetEvent(pin->eos_event);
+            {
+                pthread_mutex_lock(&filter->mutex);
+                pin->eos = true;
+                pthread_mutex_unlock(&filter->mutex);
+                pthread_cond_signal(&filter->init_cond);
+            }
             break;
 
         case GST_EVENT_FLUSH_START:
@@ -785,8 +792,10 @@ static gboolean event_sink(GstPad *pad, GstObject *parent, GstEvent *event)
             GstCaps *caps;
 
             gst_event_parse_caps(event, &caps);
+            pthread_mutex_lock(&filter->mutex);
             gst_caps_replace(&pin->caps, caps);
-            SetEvent(pin->caps_event);
+            pthread_mutex_unlock(&filter->mutex);
+            pthread_cond_signal(&filter->init_cond);
             break;
         }
 
@@ -1390,7 +1399,11 @@ static void no_more_pads(GstElement *decodebin, gpointer user)
 {
     struct parser *filter = user;
     TRACE("filter %p.\n", filter);
-    SetEvent(filter->no_more_pads_event);
+
+    pthread_mutex_lock(&filter->mutex);
+    filter->no_more_pads = true;
+    pthread_mutex_unlock(&filter->mutex);
+    pthread_cond_signal(&filter->init_cond);
 }
 
 static GstAutoplugSelectResult autoplug_blacklist(GstElement *bin, GstPad *pad, GstCaps *caps, GstElementFactory *fact, gpointer user)
@@ -1428,8 +1441,12 @@ static GstBusSyncReply watch_bus(GstBus *bus, GstMessage *msg, gpointer data)
         ERR("%s\n", dbg_info);
         g_error_free(err);
         g_free(dbg_info);
-        SetEvent(filter->error_event);
+        pthread_mutex_lock(&filter->mutex);
+        filter->error = true;
+        pthread_mutex_unlock(&filter->mutex);
+        pthread_cond_signal(&filter->init_cond);
         break;
+
     case GST_MESSAGE_WARNING:
         gst_message_parse_warning(msg, &err, &dbg_info);
         WARN("%s: %s\n", GST_OBJECT_NAME(msg->src), err->message);
@@ -1437,9 +1454,14 @@ static GstBusSyncReply watch_bus(GstBus *bus, GstMessage *msg, gpointer data)
         g_error_free(err);
         g_free(dbg_info);
         break;
+
     case GST_MESSAGE_DURATION_CHANGED:
-        SetEvent(filter->duration_event);
+        pthread_mutex_lock(&filter->mutex);
+        filter->has_duration = true;
+        pthread_mutex_unlock(&filter->mutex);
+        pthread_cond_signal(&filter->init_cond);
         break;
+
     default:
         break;
     }
@@ -1499,17 +1521,25 @@ static HRESULT GST_Connect(struct parser *This, IPin *pConnectPin)
     if (!This->init_gst(This))
         return E_FAIL;
 
+    pthread_mutex_lock(&This->mutex);
+
     for (i = 0; i < This->source_count; ++i)
     {
         struct parser_source *pin = This->sources[i];
-        const HANDLE events[2] = {pin->caps_event, This->error_event};
 
         pin->seek.llDuration = pin->seek.llStop = query_duration(pin->their_src);
         pin->seek.llCurrent = 0;
-        if (WaitForMultipleObjects(2, events, FALSE, INFINITE))
+        while (!pin->caps && !This->error)
+            pthread_cond_wait(&This->init_cond, &This->mutex);
+        if (This->error)
+        {
+            pthread_mutex_unlock(&This->mutex);
             return E_FAIL;
+        }
     }
 
+    pthread_mutex_unlock(&This->mutex);
+
     This->nextofs = This->nextpullofs = 0;
     return S_OK;
 }
@@ -1545,10 +1575,6 @@ static void parser_destroy(struct strmbase_filter *iface)
     struct parser *filter = impl_from_strmbase_filter(iface);
     HRESULT hr;
 
-    CloseHandle(filter->no_more_pads_event);
-    CloseHandle(filter->duration_event);
-    CloseHandle(filter->error_event);
-
     /* Don't need to clean up output pins, disconnecting input pin will do that */
     if (filter->sink.pin.peer)
     {
@@ -1568,6 +1594,9 @@ static void parser_destroy(struct strmbase_filter *iface)
         gst_object_unref(filter->bus);
     }
 
+    pthread_cond_destroy(&filter->init_cond);
+    pthread_mutex_destroy(&filter->mutex);
+
     filter->cs.DebugInfo->Spare[0] = 0;
     DeleteCriticalSection(&filter->cs);
     strmbase_sink_cleanup(&filter->sink);
@@ -1721,7 +1750,6 @@ static const struct strmbase_sink_ops sink_ops =
 static BOOL decodebin_parser_init_gst(struct parser *filter)
 {
     GstElement *element = gst_element_factory_make("decodebin", NULL);
-    HANDLE events[2];
     int ret;
 
     if (!element)
@@ -1739,7 +1767,10 @@ static BOOL decodebin_parser_init_gst(struct parser *filter)
     g_signal_connect(element, "no-more-pads", G_CALLBACK(no_more_pads_wrapper), filter);
 
     filter->their_sink = gst_element_get_static_pad(element, "sink");
-    ResetEvent(filter->no_more_pads_event);
+
+    pthread_mutex_lock(&filter->mutex);
+    filter->no_more_pads = filter->error = false;
+    pthread_mutex_unlock(&filter->mutex);
 
     if ((ret = gst_pad_link(filter->my_src, filter->their_sink)) < 0)
     {
@@ -1755,11 +1786,15 @@ static BOOL decodebin_parser_init_gst(struct parser *filter)
         return FALSE;
     }
 
-    events[0] = filter->no_more_pads_event;
-    events[1] = filter->error_event;
-    if (WaitForMultipleObjects(2, events, FALSE, INFINITE))
+    pthread_mutex_lock(&filter->mutex);
+    while (!filter->no_more_pads && !filter->error)
+        pthread_cond_wait(&filter->init_cond, &filter->mutex);
+    if (filter->error)
+    {
+        pthread_mutex_unlock(&filter->mutex);
         return FALSE;
-
+    }
+    pthread_mutex_unlock(&filter->mutex);
     return TRUE;
 }
 
@@ -1855,7 +1890,8 @@ static BOOL parser_init_gstreamer(void)
 
 static void parser_init_common(struct parser *object)
 {
-    object->error_event = CreateEventW(NULL, TRUE, FALSE, NULL);
+    pthread_mutex_init(&object->mutex, NULL);
+    pthread_cond_init(&object->init_cond, NULL);
     InitializeCriticalSection(&object->cs);
     object->cs.DebugInfo->Spare[0] = (DWORD_PTR)(__FILE__ ": parser.cs");
     object->flushing = true;
@@ -1878,7 +1914,6 @@ HRESULT decodebin_parser_create(IUnknown *outer, IUnknown **out)
     strmbase_filter_init(&object->filter, outer, &CLSID_decodebin_parser, &filter_ops);
     strmbase_sink_init(&object->sink, &object->filter, wcsInputPinName, &sink_ops, NULL);
 
-    object->no_more_pads_event = CreateEventW(NULL, FALSE, FALSE, NULL);
     object->init_gst = decodebin_parser_init_gst;
     object->source_query_accept = decodebin_parser_source_query_accept;
     object->source_get_media_type = decodebin_parser_source_get_media_type;
@@ -2291,8 +2326,6 @@ static void free_source_pin(struct parser_source *pin)
         gst_object_unref(pin->their_src);
     }
     gst_object_unref(pin->my_sink);
-    CloseHandle(pin->caps_event);
-    CloseHandle(pin->eos_event);
     gst_segment_free(pin->segment);
 
     pin->flushing_cs.DebugInfo->Spare[0] = 0;
@@ -2326,8 +2359,6 @@ static struct parser_source *create_pin(struct parser *filter, const WCHAR *name
         return NULL;
 
     strmbase_source_init(&pin->pin, &filter->filter, name, &source_ops);
-    pin->caps_event = CreateEventW(NULL, FALSE, FALSE, NULL);
-    pin->eos_event = CreateEventW(NULL, FALSE, FALSE, NULL);
     pin->segment = gst_segment_new();
     gst_segment_init(pin->segment, GST_FORMAT_TIME);
     pin->IQualityControl_iface.lpVtbl = &GSTOutPin_QualityControl_Vtbl;
@@ -2606,7 +2637,6 @@ static const struct strmbase_sink_ops avi_splitter_sink_ops =
 static BOOL avi_splitter_init_gst(struct parser *filter)
 {
     GstElement *element = gst_element_factory_make("avidemux", NULL);
-    HANDLE events[2];
     int ret;
 
     if (!element)
@@ -2623,7 +2653,10 @@ static BOOL avi_splitter_init_gst(struct parser *filter)
     g_signal_connect(element, "no-more-pads", G_CALLBACK(no_more_pads_wrapper), filter);
 
     filter->their_sink = gst_element_get_static_pad(element, "sink");
-    ResetEvent(filter->no_more_pads_event);
+
+    pthread_mutex_lock(&filter->mutex);
+    filter->no_more_pads = filter->error = false;
+    pthread_mutex_unlock(&filter->mutex);
 
     if ((ret = gst_pad_link(filter->my_src, filter->their_sink)) < 0)
     {
@@ -2639,11 +2672,15 @@ static BOOL avi_splitter_init_gst(struct parser *filter)
         return FALSE;
     }
 
-    events[0] = filter->no_more_pads_event;
-    events[1] = filter->error_event;
-    if (WaitForMultipleObjects(2, events, FALSE, INFINITE))
+    pthread_mutex_lock(&filter->mutex);
+    while (!filter->no_more_pads && !filter->error)
+        pthread_cond_wait(&filter->init_cond, &filter->mutex);
+    if (filter->error)
+    {
+        pthread_mutex_unlock(&filter->mutex);
         return FALSE;
-
+    }
+    pthread_mutex_unlock(&filter->mutex);
     return TRUE;
 }
 
@@ -2686,7 +2723,6 @@ HRESULT avi_splitter_create(IUnknown *outer, IUnknown **out)
 
     strmbase_filter_init(&object->filter, outer, &CLSID_AviSplitter, &filter_ops);
     strmbase_sink_init(&object->sink, &object->filter, sink_name, &avi_splitter_sink_ops, NULL);
-    object->no_more_pads_event = CreateEventW(NULL, FALSE, FALSE, NULL);
     object->init_gst = avi_splitter_init_gst;
     object->source_query_accept = avi_splitter_source_query_accept;
     object->source_get_media_type = avi_splitter_source_get_media_type;
@@ -2721,8 +2757,6 @@ static BOOL mpeg_splitter_init_gst(struct parser *filter)
     static const WCHAR source_name[] = {'A','u','d','i','o',0};
     struct parser_source *pin;
     GstElement *element;
-    HANDLE events[3];
-    DWORD res;
     int ret;
 
     if (!(element = gst_element_factory_make("mpegaudioparse", NULL)))
@@ -2759,11 +2793,16 @@ static BOOL mpeg_splitter_init_gst(struct parser *filter)
         return FALSE;
     }
 
-    events[0] = filter->duration_event;
-    events[1] = filter->error_event;
-    events[2] = pin->eos_event;
-    res = WaitForMultipleObjects(3, events, FALSE, INFINITE);
-    return (res != 1);
+    pthread_mutex_lock(&filter->mutex);
+    while (!filter->has_duration && !filter->error && !pin->eos)
+        pthread_cond_wait(&filter->init_cond, &filter->mutex);
+    if (filter->error)
+    {
+        pthread_mutex_unlock(&filter->mutex);
+        return FALSE;
+    }
+    pthread_mutex_unlock(&filter->mutex);
+    return TRUE;
 }
 
 static HRESULT mpeg_splitter_source_query_accept(struct parser_source *pin, const AM_MEDIA_TYPE *mt)
@@ -2830,7 +2869,6 @@ HRESULT mpeg_splitter_create(IUnknown *outer, IUnknown **out)
     strmbase_sink_init(&object->sink, &object->filter, sink_name, &mpeg_splitter_sink_ops, NULL);
     object->IAMStreamSelect_iface.lpVtbl = &stream_select_vtbl;
 
-    object->duration_event = CreateEventW(NULL, FALSE, FALSE, NULL);
     object->init_gst = mpeg_splitter_init_gst;
     object->source_query_accept = mpeg_splitter_source_query_accept;
     object->source_get_media_type = mpeg_splitter_source_get_media_type;
-- 
2.30.0




More information about the wine-devel mailing list