Zebediah Figura : winegstreamer: Use pthread condition variables to queue stream events.

Alexandre Julliard julliard at winehq.org
Mon Jan 25 16:42:05 CST 2021


Module: wine
Branch: master
Commit: 54e012b5b7ca55f4b6866a39f4c98292d544e48c
URL:    https://source.winehq.org/git/wine.git/?a=commit;h=54e012b5b7ca55f4b6866a39f4c98292d544e48c

Author: Zebediah Figura <z.figura12 at gmail.com>
Date:   Sat Jan 23 12:43:44 2021 -0600

winegstreamer: Use pthread condition variables to queue stream events.

Signed-off-by: Zebediah Figura <z.figura12 at gmail.com>
Signed-off-by: Alexandre Julliard <julliard at winehq.org>

---

 dlls/winegstreamer/gstdemux.c | 73 +++++++++++++++++++++----------------------
 1 file changed, 35 insertions(+), 38 deletions(-)

diff --git a/dlls/winegstreamer/gstdemux.c b/dlls/winegstreamer/gstdemux.c
index 22df838e4df..d16a8d28dd4 100644
--- a/dlls/winegstreamer/gstdemux.c
+++ b/dlls/winegstreamer/gstdemux.c
@@ -60,8 +60,6 @@ struct parser
 
     LONGLONG filesize;
 
-    CRITICAL_SECTION cs;
-
     /* FIXME: It would be nice to avoid duplicating these with strmbase.
      * However, synchronization is tricky; we need access to be protected by a
      * separate lock. */
@@ -117,7 +115,7 @@ struct parser_source
     SourceSeeking seek;
 
     CRITICAL_SECTION flushing_cs;
-    CONDITION_VARIABLE event_cv, event_empty_cv;
+    pthread_cond_t event_cond, event_empty_cond;
     bool flushing, eos;
     struct parser_event event;
     HANDLE thread;
@@ -686,18 +684,18 @@ static GstFlowReturn queue_stream_event(struct parser_source *pin, const struct
 {
     struct parser *filter = impl_from_strmbase_filter(pin->pin.pin.filter);
 
-    EnterCriticalSection(&filter->cs);
+    pthread_mutex_lock(&filter->mutex);
     while (!pin->flushing && pin->event.type != PARSER_EVENT_NONE)
-        SleepConditionVariableCS(&pin->event_empty_cv, &filter->cs, INFINITE);
+        pthread_cond_wait(&pin->event_empty_cond, &filter->mutex);
     if (pin->flushing)
     {
-        LeaveCriticalSection(&filter->cs);
+        pthread_mutex_unlock(&filter->mutex);
         TRACE("Filter is flushing; discarding event.\n");
         return GST_FLOW_FLUSHING;
     }
     pin->event = *event;
-    LeaveCriticalSection(&filter->cs);
-    WakeConditionVariable(&pin->event_cv);
+    pthread_mutex_unlock(&filter->mutex);
+    pthread_cond_signal(&pin->event_cond);
     TRACE("Event queued.\n");
     return GST_FLOW_OK;
 }
@@ -755,10 +753,10 @@ static gboolean event_sink(GstPad *pad, GstObject *parent, GstEvent *event)
         case GST_EVENT_FLUSH_START:
             if (pin->pin.pin.peer)
             {
-                EnterCriticalSection(&filter->cs);
+                pthread_mutex_lock(&filter->mutex);
 
                 pin->flushing = true;
-                WakeConditionVariable(&pin->event_empty_cv);
+                pthread_cond_signal(&pin->event_empty_cond);
 
                 switch (pin->event.type)
                 {
@@ -773,7 +771,7 @@ static gboolean event_sink(GstPad *pad, GstObject *parent, GstEvent *event)
                 }
                 pin->event.type = PARSER_EVENT_NONE;
 
-                LeaveCriticalSection(&filter->cs);
+                pthread_mutex_unlock(&filter->mutex);
             }
             break;
 
@@ -781,9 +779,9 @@ static gboolean event_sink(GstPad *pad, GstObject *parent, GstEvent *event)
             gst_segment_init(pin->segment, GST_FORMAT_TIME);
             if (pin->pin.pin.peer)
             {
-                EnterCriticalSection(&filter->cs);
+                pthread_mutex_lock(&filter->mutex);
                 pin->flushing = false;
-                LeaveCriticalSection(&filter->cs);
+                pthread_mutex_unlock(&filter->mutex);
             }
             break;
 
@@ -1037,14 +1035,14 @@ static DWORD CALLBACK stream_thread(void *arg)
         struct parser_event event;
 
         EnterCriticalSection(&pin->flushing_cs);
-        EnterCriticalSection(&filter->cs);
+        pthread_mutex_lock(&filter->mutex);
 
         while (!filter->flushing && pin->event.type == PARSER_EVENT_NONE)
-            SleepConditionVariableCS(&pin->event_cv, &filter->cs, INFINITE);
+            pthread_cond_wait(&pin->event_cond, &filter->mutex);
 
         if (filter->flushing)
         {
-            LeaveCriticalSection(&filter->cs);
+            pthread_mutex_unlock(&filter->mutex);
             LeaveCriticalSection(&pin->flushing_cs);
             TRACE("Filter is flushing.\n");
             continue;
@@ -1052,16 +1050,16 @@ static DWORD CALLBACK stream_thread(void *arg)
 
         if (!pin->event.type)
         {
-            LeaveCriticalSection(&filter->cs);
+            pthread_mutex_unlock(&filter->mutex);
             LeaveCriticalSection(&pin->flushing_cs);
             continue;
         }
 
         event = pin->event;
         pin->event.type = PARSER_EVENT_NONE;
-        WakeConditionVariable(&pin->event_empty_cv);
+        pthread_cond_signal(&pin->event_empty_cond);
 
-        LeaveCriticalSection(&filter->cs);
+        pthread_mutex_unlock(&filter->mutex);
 
         TRACE("Got event of type %#x.\n", event.type);
 
@@ -1598,8 +1596,6 @@ static void parser_destroy(struct strmbase_filter *iface)
     pthread_cond_destroy(&filter->init_cond);
     pthread_mutex_destroy(&filter->mutex);
 
-    filter->cs.DebugInfo->Spare[0] = 0;
-    DeleteCriticalSection(&filter->cs);
     strmbase_sink_cleanup(&filter->sink);
     strmbase_filter_cleanup(&filter->filter);
     heap_free(filter);
@@ -1616,9 +1612,9 @@ static HRESULT parser_init_stream(struct strmbase_filter *iface)
         return S_OK;
 
     filter->streaming = true;
-    EnterCriticalSection(&filter->cs);
+    pthread_mutex_lock(&filter->mutex);
     filter->flushing = false;
-    LeaveCriticalSection(&filter->cs);
+    pthread_mutex_unlock(&filter->mutex);
 
     /* DirectShow retains the old seek positions, but resets to them every time
      * it transitions from stopped -> paused. */
@@ -1656,9 +1652,9 @@ static HRESULT parser_cleanup_stream(struct strmbase_filter *iface)
         return S_OK;
 
     filter->streaming = false;
-    EnterCriticalSection(&filter->cs);
+    pthread_mutex_lock(&filter->mutex);
     filter->flushing = true;
-    LeaveCriticalSection(&filter->cs);
+    pthread_mutex_unlock(&filter->mutex);
 
     for (i = 0; i < filter->source_count; ++i)
     {
@@ -1667,7 +1663,7 @@ static HRESULT parser_cleanup_stream(struct strmbase_filter *iface)
         if (!pin->pin.pin.peer)
             continue;
 
-        WakeConditionVariable(&pin->event_cv);
+        pthread_cond_signal(&pin->event_cond);
     }
 
     for (i = 0; i < filter->source_count; ++i)
@@ -1893,8 +1889,6 @@ static void parser_init_common(struct parser *object)
 {
     pthread_mutex_init(&object->mutex, NULL);
     pthread_cond_init(&object->init_cond, NULL);
-    InitializeCriticalSection(&object->cs);
-    object->cs.DebugInfo->Spare[0] = (DWORD_PTR)(__FILE__ ": parser.cs");
     object->flushing = true;
 }
 
@@ -2044,15 +2038,15 @@ static HRESULT WINAPI GST_Seeking_SetPositions(IMediaSeeking *iface,
 
     if (!(current_flags & AM_SEEKING_NoFlush))
     {
-        EnterCriticalSection(&filter->cs);
+        pthread_mutex_lock(&filter->mutex);
         filter->flushing = true;
-        LeaveCriticalSection(&filter->cs);
+        pthread_mutex_unlock(&filter->mutex);
 
         for (i = 0; i < filter->source_count; ++i)
         {
             if (filter->sources[i]->pin.pin.peer)
             {
-                WakeConditionVariable(&pin->event_cv);
+                pthread_cond_signal(&pin->event_cond);
                 IPin_BeginFlush(filter->sources[i]->pin.pin.peer);
             }
         }
@@ -2093,9 +2087,9 @@ static HRESULT WINAPI GST_Seeking_SetPositions(IMediaSeeking *iface,
 
     if (!(current_flags & AM_SEEKING_NoFlush))
     {
-        EnterCriticalSection(&filter->cs);
+        pthread_mutex_lock(&filter->mutex);
         filter->flushing = false;
-        LeaveCriticalSection(&filter->cs);
+        pthread_mutex_unlock(&filter->mutex);
 
         for (i = 0; i < filter->source_count; ++i)
         {
@@ -2329,6 +2323,9 @@ static void free_source_pin(struct parser_source *pin)
     gst_object_unref(pin->my_sink);
     gst_segment_free(pin->segment);
 
+    pthread_cond_destroy(&pin->event_cond);
+    pthread_cond_destroy(&pin->event_empty_cond);
+
     pin->flushing_cs.DebugInfo->Spare[0] = 0;
     DeleteCriticalSection(&pin->flushing_cs);
 
@@ -2365,8 +2362,8 @@ static struct parser_source *create_pin(struct parser *filter, const WCHAR *name
     pin->IQualityControl_iface.lpVtbl = &GSTOutPin_QualityControl_Vtbl;
     strmbase_seeking_init(&pin->seek, &GST_Seeking_Vtbl, GST_ChangeStop,
             GST_ChangeCurrent, GST_ChangeRate);
-    InitializeConditionVariable(&pin->event_cv);
-    InitializeConditionVariable(&pin->event_empty_cv);
+    pthread_cond_init(&pin->event_cond, NULL);
+    pthread_cond_init(&pin->event_empty_cond, NULL);
     BaseFilterImpl_IncrementPinVersion(&filter->filter);
 
     InitializeCriticalSection(&pin->flushing_cs);
@@ -2394,13 +2391,13 @@ static HRESULT GST_RemoveOutputPins(struct parser *This)
         return S_OK;
 
     /* Unblock all of our streams. */
-    EnterCriticalSection(&This->cs);
+    pthread_mutex_lock(&This->mutex);
     for (i = 0; i < This->source_count; ++i)
     {
         This->sources[i]->flushing = true;
-        WakeConditionVariable(&This->sources[i]->event_empty_cv);
+        pthread_cond_signal(&This->sources[i]->event_empty_cond);
     }
-    LeaveCriticalSection(&This->cs);
+    pthread_mutex_unlock(&This->mutex);
 
     gst_element_set_state(This->container, GST_STATE_NULL);
     gst_pad_unlink(This->my_src, This->their_sink);




More information about the wine-cvs mailing list