[PATCH 2/2] winegstreamer: Always run gstreamer callbacks on a Wine thread

Andrew Eikum aeikum at codeweavers.com
Mon Jan 11 14:13:13 CST 2016


This gets winegstreamer with gstreamer-0.10 working for me again,
fixing Bug 30557. However, gstreamer-0.10 is no longer developed and
some distros don't ship it anymore.

I have more patches on top of this one to move winegstreamer to the
gstreamer-1.0 API. See Bug 31836. In order to avoid dependency churn
for packagers, I think it makes sense to apply them both during the
same release, so I intend to send those patches after this one is
accepted. If you prefer to review them all, I can send them all at
once.

Signed-off-by: Andrew Eikum <aeikum at codeweavers.com>
---
 dlls/winegstreamer/Makefile.in   |   2 +-
 dlls/winegstreamer/glibthread.c  | 390 ---------------------------------------
 dlls/winegstreamer/gst_cbs.c     | 289 +++++++++++++++++++++++++++++
 dlls/winegstreamer/gst_cbs.h     | 204 ++++++++++++++++++++
 dlls/winegstreamer/gst_private.h |   8 +-
 dlls/winegstreamer/gstdemux.c    | 264 +++++++++++++++++++++++---
 dlls/winegstreamer/gsttffilter.c |  45 +++--
 dlls/winegstreamer/main.c        |   3 +-
 8 files changed, 770 insertions(+), 435 deletions(-)
 delete mode 100644 dlls/winegstreamer/glibthread.c
 create mode 100644 dlls/winegstreamer/gst_cbs.c
 create mode 100644 dlls/winegstreamer/gst_cbs.h

diff --git a/dlls/winegstreamer/Makefile.in b/dlls/winegstreamer/Makefile.in
index 0c14ffd..fecabe8 100644
--- a/dlls/winegstreamer/Makefile.in
+++ b/dlls/winegstreamer/Makefile.in
@@ -4,7 +4,7 @@ EXTRAINCL = $(GSTREAMER_CFLAGS)
 EXTRALIBS = $(GSTREAMER_LIBS) $(PTHREAD_LIBS)
 
 C_SRCS = \
-	glibthread.c \
+	gst_cbs.c \
 	gstdemux.c \
 	gsttffilter.c \
 	main.c
diff --git a/dlls/winegstreamer/glibthread.c b/dlls/winegstreamer/glibthread.c
deleted file mode 100644
index 0d829a0..0000000
--- a/dlls/winegstreamer/glibthread.c
+++ /dev/null
@@ -1,390 +0,0 @@
-/* GLIB - Library of useful routines for C programming
- * Copyright (C) 1995-1997  Peter Mattis, Spencer Kimball and Josh MacDonald
- *
- * Wine GStreamer integration
- * Copyright 2010 Aric Stewart, CodeWeavers
- *
- * gthread.c: solaris thread system implementation
- * Copyright 1998-2001 Sebastian Wilhelmi; University of Karlsruhe
- * Copyright 2001 Hans Breuer
- *
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 2.1 of the License, or (at your option) any later version.
- *
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this library; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
- */
-
-/*
- * Modified by the GLib Team and others 1997-2000.  See the AUTHORS
- * file for a list of people on the GLib Team.  See the ChangeLog
- * files for a list of changes.  These files are distributed with
- * GLib at ftp://ftp.gtk.org/pub/gtk/.
- */
-
-#include "config.h"
-
-#ifdef HAVE_PTHREAD_H
-#include <pthread.h>
-#endif
-
-#include <glib.h>
-
-#include <errno.h>
-#include <stdarg.h>
-#include <stdlib.h>
-#include <stdio.h>
-
-#include "windef.h"
-#include "winbase.h"
-#include "winnls.h"
-#include "wine/debug.h"
-
-WINE_DEFAULT_DEBUG_CHANNEL(gstreamer);
-
-static gchar *
-g_win32_error_message (gint error)
-{
-  gchar *retval;
-  WCHAR *msg = NULL;
-  int nchars;
-
-  FormatMessageW (FORMAT_MESSAGE_ALLOCATE_BUFFER
-          |FORMAT_MESSAGE_IGNORE_INSERTS
-          |FORMAT_MESSAGE_FROM_SYSTEM,
-          NULL, error, 0,
-          (LPWSTR) &msg, 0, NULL);
-  if (msg != NULL)
-    {
-      nchars = WideCharToMultiByte(CP_UTF8, 0, msg, -1, NULL, 0, NULL, NULL);
-
-      if (nchars > 2 && msg[nchars-1] == '\n' && msg[nchars-2] == '\r')
-          msg[nchars-2] = '\0';
-
-      retval = g_utf16_to_utf8 (msg, -1, NULL, NULL, NULL);
-
-      LocalFree (msg);
-    }
-  else
-    retval = g_strdup ("");
-
-  return retval;
-}
-
-static gint g_thread_priority_map [G_THREAD_PRIORITY_URGENT + 1] = {
-    THREAD_PRIORITY_BELOW_NORMAL,
-    THREAD_PRIORITY_NORMAL,
-    THREAD_PRIORITY_ABOVE_NORMAL,
-    THREAD_PRIORITY_HIGHEST
-};
-
-static DWORD g_thread_self_tls;
-
-/* A "forward" declaration of this structure */
-static GThreadFunctions g_thread_functions_for_glib_use_default;
-
-typedef struct _GThreadData GThreadData;
-struct _GThreadData
-{
-  GThreadFunc func;
-  gpointer data;
-  HANDLE thread;
-  gboolean joinable;
-};
-
-static GMutex *
-g_mutex_new_posix_impl (void)
-{
-  GMutex *result = (GMutex *) g_new (pthread_mutex_t, 1);
-  pthread_mutex_init ((pthread_mutex_t *) result, NULL);
-  return result;
-}
-
-static void
-g_mutex_free_posix_impl (GMutex * mutex)
-{
-  pthread_mutex_destroy ((pthread_mutex_t *) mutex);
-  g_free (mutex);
-}
-
-/* NOTE: the functions g_mutex_lock and g_mutex_unlock may not use
-   functions from gmem.c and gmessages.c; */
-
-/* pthread_mutex_lock, pthread_mutex_unlock can be taken directly, as
-   signature and semantic are right, but without error check then!!!!,
-   we might want to change this therefore. */
-
-static gboolean
-g_mutex_trylock_posix_impl (GMutex * mutex)
-{
-  int result;
-
-  result = pthread_mutex_trylock ((pthread_mutex_t *) mutex);
-
-  if (result == EBUSY)
-    return FALSE;
-
-  if (result) ERR("pthread_mutex_trylock %x\n",result);
-  return TRUE;
-}
-
-static GCond *
-g_cond_new_posix_impl (void)
-{
-  GCond *result = (GCond *) g_new (pthread_cond_t, 1);
-  pthread_cond_init ((pthread_cond_t *) result, NULL);
-  return result;
-}
-
-/* pthread_cond_signal, pthread_cond_broadcast and pthread_cond_wait
-   can be taken directly, as signature and semantic are right, but
-   without error check then!!!!, we might want to change this
-   therefore. */
-
-#define G_NSEC_PER_SEC 1000000000
-
-static gboolean
-g_cond_timed_wait_posix_impl (GCond * cond,
-                              GMutex * entered_mutex,
-                              GTimeVal * abs_time)
-{
-  int result;
-  struct timespec end_time;
-  gboolean timed_out;
-
-  g_return_val_if_fail (cond != NULL, FALSE);
-  g_return_val_if_fail (entered_mutex != NULL, FALSE);
-
-  if (!abs_time)
-    {
-      result = pthread_cond_wait ((pthread_cond_t *)cond,
-                                  (pthread_mutex_t *) entered_mutex);
-      timed_out = FALSE;
-    }
-  else
-    {
-      end_time.tv_sec = abs_time->tv_sec;
-      end_time.tv_nsec = abs_time->tv_usec * (G_NSEC_PER_SEC / G_USEC_PER_SEC);
-
-      g_return_val_if_fail (end_time.tv_nsec < G_NSEC_PER_SEC, TRUE);
-
-      result = pthread_cond_timedwait ((pthread_cond_t *) cond,
-                                       (pthread_mutex_t *) entered_mutex,
-                                       &end_time);
-      timed_out = (result == ETIMEDOUT);
-    }
-
-  if (!timed_out)
-    if (result) ERR("pthread_cond_timedwait %x\n",result);
-
-  return !timed_out;
-}
-
-static void
-g_cond_free_posix_impl (GCond * cond)
-{
-  pthread_cond_destroy ((pthread_cond_t *) cond);
-  g_free (cond);
-}
-
-static GPrivate *
-g_private_new_posix_impl (GDestroyNotify destructor)
-{
-  GPrivate *result = (GPrivate *) g_new (pthread_key_t, 1);
-  pthread_key_create ((pthread_key_t *) result, destructor);
-  return result;
-}
-
-/* NOTE: the functions g_private_get and g_private_set may not use
-   functions from gmem.c and gmessages.c */
-
-static void
-g_private_set_posix_impl (GPrivate * private_key, gpointer value)
-{
-  if (!private_key)
-    return;
-  pthread_setspecific (*(pthread_key_t *) private_key, value);
-}
-
-static gpointer
-g_private_get_posix_impl (GPrivate * private_key)
-{
-  if (!private_key)
-    return NULL;
-  return pthread_getspecific (*(pthread_key_t *) private_key);
-}
-
-static void
-g_thread_set_priority_win32_impl (gpointer thread, GThreadPriority priority)
-{
-  GThreadData *target = *(GThreadData **)thread;
-
-  g_return_if_fail ((int)priority >= G_THREAD_PRIORITY_LOW);
-  g_return_if_fail ((int)priority <= G_THREAD_PRIORITY_URGENT);
-
-  SetThreadPriority (target->thread, g_thread_priority_map [priority]);
-}
-
-static void
-g_thread_self_win32_impl (gpointer thread)
-{
-  GThreadData *self = TlsGetValue (g_thread_self_tls);
-
-  if (!self)
-    {
-      /* This should only happen for the main thread! */
-      HANDLE handle = GetCurrentThread ();
-      HANDLE process = GetCurrentProcess ();
-      self = g_new (GThreadData, 1);
-      DuplicateHandle (process, handle, process, &self->thread, 0, FALSE,
-                        DUPLICATE_SAME_ACCESS);
-      TlsSetValue (g_thread_self_tls, self);
-      self->func = NULL;
-      self->data = NULL;
-      self->joinable = FALSE;
-    }
-
-  *(GThreadData **)thread = self;
-}
-
-static void
-g_thread_exit_win32_impl (void)
-{
-  GThreadData *self = TlsGetValue (g_thread_self_tls);
-
-  if (self)
-    {
-      if (!self->joinable)
-      {
-        CloseHandle (self->thread);
-        g_free (self);
-      }
-      TlsSetValue (g_thread_self_tls, NULL);
-    }
-
-  ExitThread (0);
-}
-
-static guint __stdcall
-g_thread_proxy (gpointer data)
-{
-  GThreadData *self = (GThreadData*) data;
-
-  TlsSetValue (g_thread_self_tls, self);
-
-  self->func (self->data);
-
-  g_thread_exit_win32_impl ();
-
-  g_assert_not_reached ();
-
-  return 0;
-}
-
-static void
-g_thread_create_win32_impl (GThreadFunc func,
-                            gpointer data,
-                            gulong stack_size,
-                            gboolean joinable,
-                            gboolean bound,
-                            GThreadPriority priority,
-                            gpointer thread,
-                            GError **error)
-{
-  guint ignore;
-  GThreadData *retval;
-
-  g_return_if_fail (func);
-  g_return_if_fail ((int)priority >= G_THREAD_PRIORITY_LOW);
-  g_return_if_fail ((int)priority <= G_THREAD_PRIORITY_URGENT);
-
-  retval = g_new(GThreadData, 1);
-  retval->func = func;
-  retval->data = data;
-
-  retval->joinable = joinable;
-
-  retval->thread = (HANDLE) CreateThread (NULL, stack_size, g_thread_proxy,
-                                          retval, 0, &ignore);
-
-  if (retval->thread == NULL)
-    {
-      gchar *win_error = g_win32_error_message (GetLastError ());
-      g_set_error (error, G_THREAD_ERROR, G_THREAD_ERROR_AGAIN,
-                   "Error creating thread: %s", win_error);
-      g_free (retval);
-      g_free (win_error);
-      return;
-    }
-
-  *(GThreadData **)thread = retval;
-
-  g_thread_set_priority_win32_impl (thread, priority);
-}
-
-static void
-g_thread_yield_win32_impl (void)
-{
-  Sleep(0);
-}
-
-static void
-g_thread_join_win32_impl (gpointer thread)
-{
-  GThreadData *target = *(GThreadData **)thread;
-
-  g_return_if_fail (target->joinable);
-
-  WaitForSingleObject (target->thread, INFINITE);
-
-  CloseHandle (target->thread);
-  g_free (target);
-}
-
-static GThreadFunctions g_thread_functions_for_glib_use_default =
-{
-  /* Posix functions here for speed */
-  g_mutex_new_posix_impl,
-  (void (*)(GMutex *)) pthread_mutex_lock,
-  g_mutex_trylock_posix_impl,
-  (void (*)(GMutex *)) pthread_mutex_unlock,
-  g_mutex_free_posix_impl,
-  g_cond_new_posix_impl,
-  (void (*)(GCond *)) pthread_cond_signal,
-  (void (*)(GCond *)) pthread_cond_broadcast,
-  (void (*)(GCond *, GMutex *)) pthread_cond_wait,
-  g_cond_timed_wait_posix_impl,
-  g_cond_free_posix_impl,
-  g_private_new_posix_impl,
-  g_private_get_posix_impl,
-  g_private_set_posix_impl,
-  /* win32 function required here */
-  g_thread_create_win32_impl,       /* thread */
-  g_thread_yield_win32_impl,
-  g_thread_join_win32_impl,
-  g_thread_exit_win32_impl,
-  g_thread_set_priority_win32_impl,
-  g_thread_self_win32_impl,
-  NULL                             /* no equal function necessary */
-};
-
-void g_thread_impl_init (void)
-{
-  static gboolean beenhere = FALSE;
-
-  if (beenhere)
-    return;
-
-  beenhere = TRUE;
-
-  g_thread_self_tls = TlsAlloc ();
-  g_thread_init(&g_thread_functions_for_glib_use_default);
-}
diff --git a/dlls/winegstreamer/gst_cbs.c b/dlls/winegstreamer/gst_cbs.c
new file mode 100644
index 0000000..c445104
--- /dev/null
+++ b/dlls/winegstreamer/gst_cbs.c
@@ -0,0 +1,289 @@
+/*
+ * Copyright 2015 Andrew Eikum for CodeWeavers
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
+ */
+
+#include "config.h"
+
+#include <gst/app/gstappsink.h>
+#include <gst/app/gstappsrc.h>
+#include <gst/app/gstappbuffer.h>
+#include <gst/gstutils.h>
+
+#include "wine/list.h"
+
+#include "gst_cbs.h"
+
+/* gstreamer calls our callbacks from threads that Wine did not create. Some
+ * callbacks execute code which requires Wine to have created the thread
+ * (critical sections, debug logging, dshow client code). Since gstreamer can't
+ * provide an API to override its thread creation, we have to intercept all
+ * callbacks in code which avoids the Wine thread requirement, and then
+ * dispatch those callbacks on a thread that is known to be created by Wine.
+ *
+ * This file must not contain any code that depends on the Wine TEB!
+ */
+
+static void call_cb(struct cb_data *cbdata)
+{
+    pthread_mutex_init(&cbdata->lock, NULL);
+    pthread_cond_init(&cbdata->cond, NULL);
+    cbdata->finished = 0;
+
+    if(is_wine_thread()){
+        /* The thread which triggered gstreamer to call this callback may
+         * already hold a critical section. If so, executing the callback on a
+         * worker thread can cause a deadlock. If  we are already on a Wine
+         * thread, then there is no need to run this callback on a worker
+         * thread anyway, which avoids the deadlock issue. */
+        perform_cb(NULL, cbdata);
+
+        pthread_cond_destroy(&cbdata->cond);
+        pthread_mutex_destroy(&cbdata->lock);
+
+        return;
+    }
+
+    pthread_mutex_lock(&cb_list_lock);
+
+    list_add_tail(&cb_list, &cbdata->entry);
+    pthread_cond_broadcast(&cb_list_cond);
+
+    pthread_mutex_lock(&cbdata->lock);
+
+    pthread_mutex_unlock(&cb_list_lock);
+
+    while(!cbdata->finished)
+        pthread_cond_wait(&cbdata->cond, &cbdata->lock);
+
+    pthread_mutex_unlock(&cbdata->lock);
+
+    pthread_cond_destroy(&cbdata->cond);
+    pthread_mutex_destroy(&cbdata->lock);
+}
+
+GstBusSyncReply watch_bus_wrapper(GstBus *bus, GstMessage *msg, gpointer user)
+{
+    struct cb_data cbdata = { WATCH_BUS,
+        { .watch_bus_data = { bus, msg, user} }
+    };
+
+    call_cb(&cbdata);
+
+    return cbdata.u.watch_bus_data.ret;
+}
+
+void existing_new_pad_wrapper(GstElement *bin, GstPad *pad, gboolean last,
+        gpointer user)
+{
+    struct cb_data cbdata = { EXISTING_NEW_PAD,
+        { .existing_new_pad_data = {bin, pad, last, user} }
+    };
+
+    call_cb(&cbdata);
+}
+
+gboolean check_get_range_wrapper(GstPad *pad)
+{
+    struct cb_data cbdata = { CHECK_GET_RANGE,
+        { .check_get_range_data = {pad} }
+    };
+
+    call_cb(&cbdata);
+
+    return cbdata.u.check_get_range_data.ret;
+}
+
+gboolean query_function_wrapper(GstPad *pad, GstQuery *query)
+{
+    struct cb_data cbdata = { QUERY_FUNCTION,
+        { .query_function_data = {pad, query} }
+    };
+
+    call_cb(&cbdata);
+
+    return cbdata.u.query_function_data.ret;
+}
+
+gboolean activate_push_wrapper(GstPad *pad, gboolean activate)
+{
+    struct cb_data cbdata = { ACTIVATE_PUSH,
+        { .activate_push_data = {pad, activate} }
+    };
+
+    call_cb(&cbdata);
+
+    return cbdata.u.activate_push_data.ret;
+}
+
+void no_more_pads_wrapper(GstElement *decodebin, gpointer user)
+{
+    struct cb_data cbdata = { NO_MORE_PADS,
+        { .no_more_pads_data = {decodebin, user} }
+    };
+
+    call_cb(&cbdata);
+}
+
+GstFlowReturn request_buffer_src_wrapper(GstPad *pad, guint64 ofs, guint len,
+        GstBuffer **buf)
+{
+    struct cb_data cbdata = { REQUEST_BUFFER_SRC,
+        { .request_buffer_src_data = {pad, ofs, len, buf} }
+    };
+
+    call_cb(&cbdata);
+
+    return cbdata.u.request_buffer_src_data.ret;
+}
+
+gboolean event_src_wrapper(GstPad *pad, GstEvent *event)
+{
+    struct cb_data cbdata = { EVENT_SRC,
+        { .event_src_data = {pad, event} }
+    };
+
+    call_cb(&cbdata);
+
+    return cbdata.u.event_src_data.ret;
+}
+
+gboolean event_sink_wrapper(GstPad *pad, GstEvent *event)
+{
+    struct cb_data cbdata = { EVENT_SINK,
+        { .event_sink_data = {pad, event} }
+    };
+
+    call_cb(&cbdata);
+
+    return cbdata.u.event_sink_data.ret;
+}
+
+GstFlowReturn request_buffer_sink_wrapper(GstPad *pad, guint64 ofs, guint size,
+        GstCaps *caps, GstBuffer **buf)
+{
+    struct cb_data cbdata = { REQUEST_BUFFER_SINK,
+        { .request_buffer_sink_data = {pad, ofs, size, caps, buf} }
+    };
+
+    call_cb(&cbdata);
+
+    return cbdata.u.request_buffer_sink_data.ret;
+}
+
+gboolean accept_caps_sink_wrapper(GstPad *pad, GstCaps *caps)
+{
+    struct cb_data cbdata = { ACCEPT_CAPS_SINK,
+        { .accept_caps_sink_data = {pad, caps} }
+    };
+
+    call_cb(&cbdata);
+
+    return cbdata.u.accept_caps_sink_data.ret;
+}
+
+gboolean setcaps_sink_wrapper(GstPad *pad, GstCaps *caps)
+{
+    struct cb_data cbdata = { SETCAPS_SINK,
+        { .setcaps_sink_data = {pad, caps} }
+    };
+
+    call_cb(&cbdata);
+
+    return cbdata.u.setcaps_sink_data.ret;
+}
+
+GstFlowReturn got_data_sink_wrapper(GstPad *pad, GstBuffer *buf)
+{
+    struct cb_data cbdata = { GOT_DATA_SINK,
+        { .got_data_sink_data = {pad, buf} }
+    };
+
+    call_cb(&cbdata);
+
+    return cbdata.u.got_data_sink_data.ret;
+}
+
+GstFlowReturn got_data_wrapper(GstPad *pad, GstBuffer *buf)
+{
+    struct cb_data cbdata = { GOT_DATA,
+        { .got_data_data = {pad, buf} }
+    };
+
+    call_cb(&cbdata);
+
+    return cbdata.u.got_data_data.ret;
+}
+
+GstFlowReturn request_buffer_wrapper(GstPad *pad, guint64 ofs, guint size,
+        GstCaps *caps, GstBuffer **buf)
+{
+    struct cb_data cbdata = { REQUEST_BUFFER,
+        { .request_buffer_data = {pad, ofs, size, caps, buf} }
+    };
+
+    call_cb(&cbdata);
+
+    return cbdata.u.request_buffer_data.ret;
+}
+
+void removed_decoded_pad_wrapper(GstElement *bin, GstPad *pad, gpointer user)
+{
+    struct cb_data cbdata = { REMOVED_DECODED_PAD,
+        { .removed_decoded_pad_data = {bin, pad, user} }
+    };
+
+    call_cb(&cbdata);
+}
+
+GstAutoplugSelectResult autoplug_blacklist_wrapper(GstElement *bin, GstPad *pad,
+        GstCaps *caps, GstElementFactory *fact, gpointer user)
+{
+    struct cb_data cbdata = { AUTOPLUG_BLACKLIST,
+        { .autoplug_blacklist_data = {bin, pad, caps, fact, user} }
+    };
+
+    call_cb(&cbdata);
+
+    return cbdata.u.autoplug_blacklist_data.ret;
+}
+
+void unknown_type_wrapper(GstElement *bin, GstPad *pad, GstCaps *caps, gpointer user)
+{
+    struct cb_data cbdata = { UNKNOWN_TYPE,
+        { .unknown_type_data = {bin, pad, caps, user} }
+    };
+
+    call_cb(&cbdata);
+}
+
+void release_sample_wrapper(gpointer data)
+{
+    struct cb_data cbdata = { RELEASE_SAMPLE,
+        { .release_sample_data = {data} }
+    };
+
+    call_cb(&cbdata);
+}
+
+void Gstreamer_transform_pad_added_wrapper(GstElement *filter, GstPad *pad, gpointer user)
+{
+    struct cb_data cbdata = { TRANSFORM_PAD_ADDED,
+        { .transform_pad_added_data = {filter, pad, user} }
+    };
+
+    call_cb(&cbdata);
+}
diff --git a/dlls/winegstreamer/gst_cbs.h b/dlls/winegstreamer/gst_cbs.h
new file mode 100644
index 0000000..adb99a1
--- /dev/null
+++ b/dlls/winegstreamer/gst_cbs.h
@@ -0,0 +1,204 @@
+/*
+ * Copyright 2015 Andrew Eikum for CodeWeavers
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
+ */
+
+#ifndef GST_CBS_H
+#define GST_CBS_H
+
+#include "wine/list.h"
+#include "windef.h"
+#include <pthread.h>
+
+typedef enum {
+  GST_AUTOPLUG_SELECT_TRY,
+  GST_AUTOPLUG_SELECT_EXPOSE,
+  GST_AUTOPLUG_SELECT_SKIP
+} GstAutoplugSelectResult;
+
+enum CB_TYPE {
+    WATCH_BUS,
+    EXISTING_NEW_PAD,
+    CHECK_GET_RANGE,
+    QUERY_FUNCTION,
+    ACTIVATE_PUSH,
+    NO_MORE_PADS,
+    REQUEST_BUFFER_SRC,
+    EVENT_SRC,
+    EVENT_SINK,
+    REQUEST_BUFFER_SINK,
+    ACCEPT_CAPS_SINK,
+    SETCAPS_SINK,
+    GOT_DATA_SINK,
+    GOT_DATA,
+    REQUEST_BUFFER,
+    REMOVED_DECODED_PAD,
+    AUTOPLUG_BLACKLIST,
+    UNKNOWN_TYPE,
+    RELEASE_SAMPLE,
+    TRANSFORM_PAD_ADDED
+};
+
+struct cb_data {
+    enum CB_TYPE type;
+    union {
+        struct watch_bus_data {
+            GstBus *bus;
+            GstMessage *msg;
+            gpointer user;
+            GstBusSyncReply ret;
+        } watch_bus_data;
+        struct existing_new_pad_data {
+            GstElement *bin;
+            GstPad *pad;
+            gboolean last;
+            gpointer user;
+        } existing_new_pad_data;
+        struct check_get_range_data {
+            GstPad *pad;
+            gboolean ret;
+        } check_get_range_data;
+        struct query_function_data {
+            GstPad *pad;
+            GstQuery *query;
+            gboolean ret;
+        } query_function_data;
+        struct activate_push_data {
+            GstPad *pad;
+            gboolean activate;
+            gboolean ret;
+        } activate_push_data;
+        struct no_more_pads_data {
+            GstElement *decodebin;
+            gpointer user;
+        } no_more_pads_data;
+        struct request_buffer_src_data {
+            GstPad *pad;
+            guint64 ofs;
+            guint len;
+            GstBuffer **buf;
+            GstFlowReturn ret;
+        } request_buffer_src_data;
+        struct event_src_data {
+            GstPad *pad;
+            GstEvent *event;
+            gboolean ret;
+        } event_src_data;
+        struct event_sink_data {
+            GstPad *pad;
+            GstEvent *event;
+            gboolean ret;
+        } event_sink_data;
+        struct request_buffer_sink_data {
+            GstPad *pad;
+            guint64 ofs;
+            guint size;
+            GstCaps *caps;
+            GstBuffer **buf;
+            GstFlowReturn ret;
+        } request_buffer_sink_data;
+        struct accept_caps_sink_data {
+            GstPad *pad;
+            GstCaps *caps;
+            gboolean ret;
+        } accept_caps_sink_data;
+        struct setcaps_sink_data {
+            GstPad *pad;
+            GstCaps *caps;
+            gboolean ret;
+        } setcaps_sink_data;
+        struct got_data_sink_data {
+            GstPad *pad;
+            GstBuffer *buf;
+            GstFlowReturn ret;
+        } got_data_sink_data;
+        struct got_data_data {
+            GstPad *pad;
+            GstBuffer *buf;
+            GstFlowReturn ret;
+        } got_data_data;
+        struct request_buffer_data {
+            GstPad *pad;
+            guint64 ofs;
+            guint size;
+            GstCaps *caps;
+            GstBuffer **buf;
+            GstFlowReturn ret;
+        } request_buffer_data;
+        struct removed_decoded_pad_data {
+            GstElement *bin;
+            GstPad *pad;
+            gpointer user;
+        } removed_decoded_pad_data;
+        struct autoplug_blacklist_data {
+            GstElement *bin;
+            GstPad *pad;
+            GstCaps *caps;
+            GstElementFactory *fact;
+            gpointer user;
+            GstAutoplugSelectResult ret;
+        } autoplug_blacklist_data;
+        struct unknown_type_data {
+            GstElement *bin;
+            GstPad *pad;
+            GstCaps *caps;
+            gpointer user;
+        } unknown_type_data;
+        struct release_sample_data {
+            gpointer data;
+        } release_sample_data;
+        struct transform_pad_added_data {
+            GstElement *filter;
+            GstPad *pad;
+            gpointer user;
+        } transform_pad_added_data;
+    } u;
+
+    int finished;
+    pthread_mutex_t lock;
+    pthread_cond_t cond;
+    struct list entry;
+};
+
+extern pthread_mutex_t cb_list_lock DECLSPEC_HIDDEN;
+extern pthread_cond_t cb_list_cond DECLSPEC_HIDDEN;
+extern struct list cb_list DECLSPEC_HIDDEN;
+void CALLBACK perform_cb(TP_CALLBACK_INSTANCE *instance, void *user) DECLSPEC_HIDDEN;
+BOOL is_wine_thread(void) DECLSPEC_HIDDEN;
+void mark_wine_thread(void) DECLSPEC_HIDDEN;
+
+GstBusSyncReply watch_bus_wrapper(GstBus *bus, GstMessage *msg, gpointer user) DECLSPEC_HIDDEN;
+void existing_new_pad_wrapper(GstElement *bin, GstPad *pad, gboolean last, gpointer user) DECLSPEC_HIDDEN;
+gboolean check_get_range_wrapper(GstPad *pad) DECLSPEC_HIDDEN;
+gboolean query_function_wrapper(GstPad *pad, GstQuery *query) DECLSPEC_HIDDEN;
+gboolean activate_push_wrapper(GstPad *pad, gboolean activate) DECLSPEC_HIDDEN;
+void no_more_pads_wrapper(GstElement *decodebin, gpointer user) DECLSPEC_HIDDEN;
+GstFlowReturn request_buffer_src_wrapper(GstPad *pad, guint64 ofs, guint len, GstBuffer **buf) DECLSPEC_HIDDEN;
+gboolean event_src_wrapper(GstPad *pad, GstEvent *event) DECLSPEC_HIDDEN;
+gboolean event_sink_wrapper(GstPad *pad, GstEvent *event) DECLSPEC_HIDDEN;
+GstFlowReturn request_buffer_sink_wrapper(GstPad *pad, guint64 ofs, guint size, GstCaps *caps, GstBuffer **buf) DECLSPEC_HIDDEN;
+gboolean accept_caps_sink_wrapper(GstPad *pad, GstCaps *caps) DECLSPEC_HIDDEN;
+gboolean setcaps_sink_wrapper(GstPad *pad, GstCaps *caps) DECLSPEC_HIDDEN;
+GstFlowReturn got_data_sink_wrapper(GstPad *pad, GstBuffer *buf) DECLSPEC_HIDDEN;
+GstFlowReturn got_data_wrapper(GstPad *pad, GstBuffer *buf) DECLSPEC_HIDDEN;
+GstFlowReturn request_buffer_wrapper(GstPad *pad, guint64 ofs, guint size, GstCaps *caps, GstBuffer **buf) DECLSPEC_HIDDEN;
+void removed_decoded_pad_wrapper(GstElement *bin, GstPad *pad, gpointer user) DECLSPEC_HIDDEN;
+GstAutoplugSelectResult autoplug_blacklist_wrapper(GstElement *bin, GstPad *pad, GstCaps *caps, GstElementFactory *fact, gpointer user) DECLSPEC_HIDDEN;
+void unknown_type_wrapper(GstElement *bin, GstPad *pad, GstCaps *caps, gpointer user) DECLSPEC_HIDDEN;
+void release_sample_wrapper(gpointer data) DECLSPEC_HIDDEN;
+void Gstreamer_transform_pad_added_wrapper(GstElement *filter, GstPad *pad, gpointer user) DECLSPEC_HIDDEN;
+
+#endif
diff --git a/dlls/winegstreamer/gst_private.h b/dlls/winegstreamer/gst_private.h
index d041d64..2f145ed 100644
--- a/dlls/winegstreamer/gst_private.h
+++ b/dlls/winegstreamer/gst_private.h
@@ -42,6 +42,12 @@ IUnknown * CALLBACK Gstreamer_Mp3_create(IUnknown *pUnkOuter, HRESULT *phr);
 IUnknown * CALLBACK Gstreamer_YUV_create(IUnknown *pUnkOuter, HRESULT *phr);
 IUnknown * CALLBACK Gstreamer_Splitter_create(IUnknown *pUnkOuter, HRESULT *phr);
 
-void g_thread_impl_init(void);
 DWORD Gstreamer_init(void);
+
+GstFlowReturn got_data(GstPad *pad, GstBuffer *buf) DECLSPEC_HIDDEN;
+GstFlowReturn request_buffer(GstPad *pad, guint64 ofs, guint size, GstCaps *caps, GstBuffer **buf) DECLSPEC_HIDDEN;
+void Gstreamer_transform_pad_added(GstElement *filter, GstPad *pad, gpointer user) DECLSPEC_HIDDEN;
+
+void start_dispatch_thread(void) DECLSPEC_HIDDEN;
+
 #endif /* __GST_PRIVATE_INCLUDED__ */
diff --git a/dlls/winegstreamer/gstdemux.c b/dlls/winegstreamer/gstdemux.c
index c17fc2f..82cc958 100644
--- a/dlls/winegstreamer/gstdemux.c
+++ b/dlls/winegstreamer/gstdemux.c
@@ -27,6 +27,7 @@
 
 #include "gst_private.h"
 #include "gst_guids.h"
+#include "gst_cbs.h"
 
 #include "vfwmsgs.h"
 #include "amvideo.h"
@@ -44,6 +45,8 @@
 
 WINE_DEFAULT_DEBUG_CHANNEL(gstreamer);
 
+static pthread_key_t wine_gst_key;
+
 typedef struct GSTOutPin GSTOutPin;
 typedef struct GSTInPin {
     BasePin pin;
@@ -97,6 +100,17 @@ static HRESULT WINAPI GST_ChangeCurrent(IMediaSeeking *iface);
 static HRESULT WINAPI GST_ChangeStop(IMediaSeeking *iface);
 static HRESULT WINAPI GST_ChangeRate(IMediaSeeking *iface);
 
+void mark_wine_thread(void)
+{
+    /* set it to non-NULL to indicate that this is a Wine thread */
+    pthread_setspecific(wine_gst_key, &wine_gst_key);
+}
+
+BOOL is_wine_thread(void)
+{
+    return pthread_getspecific(wine_gst_key) != NULL;
+}
+
 static gboolean amt_from_gst_caps_audio(GstCaps *caps, AM_MEDIA_TYPE *amt) {
     WAVEFORMATEXTENSIBLE *wfe;
     WAVEFORMATEX *wfx;
@@ -466,7 +480,7 @@ static DWORD CALLBACK push_data(LPVOID iface) {
         }
 
         IMediaSample_GetPointer(buf, &data);
-        gstbuf = gst_app_buffer_new(data, IMediaSample_GetActualDataLength(buf), release_sample, buf);
+        gstbuf = gst_app_buffer_new(data, IMediaSample_GetActualDataLength(buf), release_sample_wrapper, buf);
         if (!gstbuf) {
             IMediaSample_Release(buf);
             break;
@@ -621,7 +635,7 @@ static GstFlowReturn request_buffer_sink(GstPad *pad, guint64 ofs, guint size, G
     }
     IMediaSample_SetActualDataLength(sample, size);
     IMediaSample_GetPointer(sample, &ptr);
-    *buf = gst_app_buffer_new(ptr, size, release_sample, sample);
+    *buf = gst_app_buffer_new(ptr, size, release_sample_wrapper, sample);
     if (!*buf) {
         IMediaSample_Release(sample);
         ERR("Out of memory\n");
@@ -682,7 +696,8 @@ static DWORD CALLBACK push_data_init(LPVOID iface) {
     return 0;
 }
 
-static void removed_decoded_pad(GstElement *bin, GstPad *pad, GSTImpl *This) {
+static void removed_decoded_pad(GstElement *bin, GstPad *pad, gpointer user) {
+    GSTImpl *This = (GSTImpl*)user;
     int x;
     GSTOutPin *pin;
 
@@ -728,11 +743,11 @@ static void init_new_decoded_pad(GstElement *bin, GstPad *pad, gboolean last, GS
     typename = gst_structure_get_name(arg);
 
     mypad = gst_pad_new(NULL, GST_PAD_SINK);
-    gst_pad_set_chain_function(mypad, got_data_sink);
-    gst_pad_set_event_function(mypad, event_sink);
-    gst_pad_set_bufferalloc_function(mypad, request_buffer_sink);
-    gst_pad_set_acceptcaps_function(mypad, accept_caps_sink);
-    gst_pad_set_setcaps_function(mypad, setcaps_sink);
+    gst_pad_set_chain_function(mypad, got_data_sink_wrapper);
+    gst_pad_set_event_function(mypad, event_sink_wrapper);
+    gst_pad_set_bufferalloc_function(mypad, request_buffer_sink_wrapper);
+    gst_pad_set_acceptcaps_function(mypad, accept_caps_sink_wrapper);
+    gst_pad_set_setcaps_function(mypad, setcaps_sink_wrapper);
 
     if (!strcmp(typename, "audio/x-raw-int") ||
         !strcmp(typename, "audio/x-raw-float")) {
@@ -766,7 +781,8 @@ static void init_new_decoded_pad(GstElement *bin, GstPad *pad, gboolean last, GS
     }
 }
 
-static void existing_new_pad(GstElement *bin, GstPad *pad, gboolean last, GSTImpl *This) {
+static void existing_new_pad(GstElement *bin, GstPad *pad, gboolean last, gpointer user) {
+    GSTImpl *This = (GSTImpl*)user;
     int x;
 
     if (gst_pad_is_linked(pad))
@@ -858,18 +874,13 @@ static gboolean activate_push(GstPad *pad, gboolean activate) {
     return TRUE;
 }
 
-static void no_more_pads(GstElement *decodebin, GSTImpl *This) {
+static void no_more_pads(GstElement *decodebin, gpointer user) {
+    GSTImpl *This = (GSTImpl*)user;
     TRACE("Done\n");
     SetEvent(This->event);
 }
 
-typedef enum {
-  GST_AUTOPLUG_SELECT_TRY,
-  GST_AUTOPLUG_SELECT_EXPOSE,
-  GST_AUTOPLUG_SELECT_SKIP
-} GstAutoplugSelectResult;
-
-static GstAutoplugSelectResult autoplug_blacklist(GstElement *bin, GstPad *pad, GstCaps *caps, GstElementFactory *fact, GSTImpl *This) {
+static GstAutoplugSelectResult autoplug_blacklist(GstElement *bin, GstPad *pad, GstCaps *caps, GstElementFactory *fact, gpointer user) {
     const char *name = gst_element_factory_get_longname(fact);
 
     if (strstr(name, "Player protection")) {
@@ -904,7 +915,7 @@ static GstBusSyncReply watch_bus(GstBus *bus, GstMessage *msg, gpointer data) {
     return GST_BUS_DROP;
 }
 
-static void unknown_type(GstElement *bin, GstPad *pad, GstCaps *caps, GSTImpl *This) {
+static void unknown_type(GstElement *bin, GstPad *pad, GstCaps *caps, gpointer user) {
     gchar *strcaps = gst_caps_to_string(caps);
     FIXME("Could not find a filter for caps: %s\n", strcaps);
     g_free(strcaps);
@@ -928,7 +939,7 @@ static HRESULT GST_Connect(GSTInPin *pPin, IPin *pConnectPin, ALLOCATOR_PROPERTI
 
     if (!This->bus) {
         This->bus = gst_bus_new();
-        gst_bus_set_sync_handler(This->bus, watch_bus, This);
+        gst_bus_set_sync_handler(This->bus, watch_bus_wrapper, This);
     }
 
     This->gstfilter = gst_element_factory_make("decodebin2", NULL);
@@ -938,21 +949,21 @@ static HRESULT GST_Connect(GSTInPin *pPin, IPin *pConnectPin, ALLOCATOR_PROPERTI
         return E_FAIL;
     }
     gst_element_set_bus(This->gstfilter, This->bus);
-    g_signal_connect(This->gstfilter, "new-decoded-pad", G_CALLBACK(existing_new_pad), This);
-    g_signal_connect(This->gstfilter, "pad-removed", G_CALLBACK(removed_decoded_pad), This);
-    g_signal_connect(This->gstfilter, "autoplug-select", G_CALLBACK(autoplug_blacklist), This);
-    g_signal_connect(This->gstfilter, "unknown-type", G_CALLBACK(unknown_type), This);
+    g_signal_connect(This->gstfilter, "new-decoded-pad", G_CALLBACK(existing_new_pad_wrapper), This);
+    g_signal_connect(This->gstfilter, "pad-removed", G_CALLBACK(removed_decoded_pad_wrapper), This);
+    g_signal_connect(This->gstfilter, "autoplug-select", G_CALLBACK(autoplug_blacklist_wrapper), This);
+    g_signal_connect(This->gstfilter, "unknown-type", G_CALLBACK(unknown_type_wrapper), This);
 
     This->my_src = gst_pad_new_from_static_template(&src_template, "quartz-src");
-    gst_pad_set_getrange_function(This->my_src, request_buffer_src);
-    gst_pad_set_checkgetrange_function(This->my_src, check_get_range);
-    gst_pad_set_query_function(This->my_src, query_function);
-    gst_pad_set_activatepush_function(This->my_src, activate_push);
-    gst_pad_set_event_function(This->my_src, event_src);
+    gst_pad_set_getrange_function(This->my_src, request_buffer_src_wrapper);
+    gst_pad_set_checkgetrange_function(This->my_src, check_get_range_wrapper);
+    gst_pad_set_query_function(This->my_src, query_function_wrapper);
+    gst_pad_set_activatepush_function(This->my_src, activate_push_wrapper);
+    gst_pad_set_event_function(This->my_src, event_src_wrapper);
     gst_pad_set_element_private (This->my_src, This);
     This->their_sink = gst_element_get_static_pad(This->gstfilter, "sink");
 
-    g_signal_connect(This->gstfilter, "no-more-pads", G_CALLBACK(no_more_pads), This);
+    g_signal_connect(This->gstfilter, "no-more-pads", G_CALLBACK(no_more_pads_wrapper), This);
     ret = gst_pad_link(This->my_src, This->their_sink);
     gst_object_unref(This->their_sink);
     if (ret < 0) {
@@ -1042,6 +1053,8 @@ IUnknown * CALLBACK Gstreamer_Splitter_create(IUnknown *punkout, HRESULT *phr) {
         return NULL;
     }
 
+    mark_wine_thread();
+
     This = CoTaskMemAlloc(sizeof(*This));
     obj = (IUnknown*)This;
     if (!This)
@@ -1150,6 +1163,8 @@ static HRESULT WINAPI GST_Stop(IBaseFilter *iface) {
 
     TRACE("()\n");
 
+    mark_wine_thread();
+
     if (This->gstfilter)
         gst_element_set_state(This->gstfilter, GST_STATE_READY);
     return S_OK;
@@ -1165,6 +1180,8 @@ static HRESULT WINAPI GST_Pause(IBaseFilter *iface) {
     if (!This->gstfilter)
         return VFW_E_NOT_CONNECTED;
 
+    mark_wine_thread();
+
     gst_element_get_state(This->gstfilter, &now, NULL, -1);
     if (now == GST_STATE_PAUSED)
         return S_OK;
@@ -1187,6 +1204,8 @@ static HRESULT WINAPI GST_Run(IBaseFilter *iface, REFERENCE_TIME tStart) {
 
     TRACE("(%s)\n", wine_dbgstr_longlong(tStart));
 
+    mark_wine_thread();
+
     if (!This->gstfilter)
         return VFW_E_NOT_CONNECTED;
 
@@ -1236,6 +1255,8 @@ static HRESULT WINAPI GST_GetState(IBaseFilter *iface, DWORD dwMilliSecsTimeout,
 
     TRACE("(%d, %p)\n", dwMilliSecsTimeout, pState);
 
+    mark_wine_thread();
+
     if (!This->gstfilter) {
         *pState = State_Stopped;
         return S_OK;
@@ -1290,6 +1311,7 @@ static HRESULT WINAPI GST_ChangeRate(IMediaSeeking *iface) {
     GSTOutPin *This = impl_from_IMediaSeeking(iface);
     GstEvent *ev = gst_event_new_seek(This->seek.dRate, GST_FORMAT_TIME, 0, GST_SEEK_TYPE_NONE, -1, GST_SEEK_TYPE_NONE, -1);
     TRACE("(%p) New rate %g\n", iface, This->seek.dRate);
+    mark_wine_thread();
     gst_pad_push_event(This->my_sink, ev);
     return S_OK;
 }
@@ -1316,6 +1338,8 @@ static HRESULT WINAPI GST_Seeking_GetCurrentPosition(IMediaSeeking *iface, REFER
     if (!pos)
         return E_POINTER;
 
+    mark_wine_thread();
+
     if (!This->their_src) {
         *pos = This->seek.llCurrent;
         TRACE("Cached value\n");
@@ -1352,6 +1376,8 @@ static HRESULT WINAPI GST_Seeking_SetPositions(IMediaSeeking *iface, REFERENCE_T
     GstSeekType curtype, stoptype;
     GstEvent *e;
 
+    mark_wine_thread();
+
     if (!This->seek.llDuration)
         return E_NOTIMPL;
 
@@ -1425,6 +1451,7 @@ static ULONG WINAPI GST_QualityControl_Release(IQualityControl *iface)
 static HRESULT WINAPI GST_QualityControl_Notify(IQualityControl *iface, IBaseFilter *sender, Quality qm) {
     GSTOutPin *pin = impl_from_IQualityControl(iface);
     REFERENCE_TIME late = qm.Late;
+    mark_wine_thread();
     if (qm.Late < 0 && -qm.Late > qm.TimeStamp)
         late = -qm.TimeStamp;
     gst_pad_push_event(pin->my_sink, gst_event_new_qos(1000./qm.Proportion, late*100, qm.TimeStamp*100));
@@ -1474,6 +1501,8 @@ static ULONG WINAPI GSTOutPin_Release(IPin *iface) {
     ULONG refCount = InterlockedDecrement(&This->pin.pin.refCount);
     TRACE("(%p)->() Release from %d\n", iface, refCount + 1);
 
+    mark_wine_thread();
+
     if (!refCount) {
         if (This->their_src)
             gst_pad_unlink(This->their_src, This->my_sink);
@@ -1608,6 +1637,7 @@ static HRESULT GST_RemoveOutputPins(GSTImpl *This) {
     ULONG i;
     GSTOutPin **ppOldPins = This->ppPins;
     TRACE("(%p)\n", This);
+    mark_wine_thread();
 
     if (!This->gstfilter)
         return S_OK;
@@ -1657,6 +1687,8 @@ static HRESULT WINAPI GSTInPin_ReceiveConnection(IPin *iface, IPin *pReceivePin,
     TRACE("(%p/%p)->(%p, %p)\n", This, iface, pReceivePin, pmt);
     dump_AM_MEDIA_TYPE(pmt);
 
+    mark_wine_thread();
+
     EnterCriticalSection(This->pin.pCritSec);
     if (!This->pin.pConnectedTo) {
         ALLOCATOR_PROPERTIES props;
@@ -1711,6 +1743,8 @@ static HRESULT WINAPI GSTInPin_Disconnect(IPin *iface) {
     FILTER_STATE state;
     TRACE("()\n");
 
+    mark_wine_thread();
+
     hr = IBaseFilter_GetState(This->pin.pinInfo.pFilter, INFINITE, &state);
     EnterCriticalSection(This->pin.pCritSec);
     if (This->pin.pConnectedTo) {
@@ -1830,3 +1864,175 @@ static const IPinVtbl GST_InputPin_Vtbl = {
     GSTInPin_EndFlush,
     GSTInPin_NewSegment
 };
+
+pthread_mutex_t cb_list_lock = PTHREAD_MUTEX_INITIALIZER;
+pthread_cond_t cb_list_cond = PTHREAD_COND_INITIALIZER;
+struct list cb_list = LIST_INIT(cb_list);
+
+void CALLBACK perform_cb(TP_CALLBACK_INSTANCE *instance, void *user)
+{
+    struct cb_data *cbdata = user;
+
+    TRACE("got cb type: 0x%x\n", cbdata->type);
+
+    switch(cbdata->type)
+    {
+    case WATCH_BUS:
+        {
+            struct watch_bus_data *data = &cbdata->u.watch_bus_data;
+            cbdata->u.watch_bus_data.ret = watch_bus(data->bus, data->msg, data->user);
+            break;
+        }
+    case EXISTING_NEW_PAD:
+        {
+            struct existing_new_pad_data *data = &cbdata->u.existing_new_pad_data;
+            existing_new_pad(data->bin, data->pad, data->last, data->user);
+            break;
+        }
+    case CHECK_GET_RANGE:
+        {
+            struct check_get_range_data *data = &cbdata->u.check_get_range_data;
+            cbdata->u.check_get_range_data.ret = check_get_range(data->pad);
+            break;
+        }
+    case QUERY_FUNCTION:
+        {
+            struct query_function_data *data = &cbdata->u.query_function_data;
+            cbdata->u.query_function_data.ret = query_function(data->pad, data->query);
+            break;
+        }
+    case ACTIVATE_PUSH:
+        {
+            struct activate_push_data *data = &cbdata->u.activate_push_data;
+            cbdata->u.activate_push_data.ret = activate_push(data->pad, data->activate);
+            break;
+        }
+    case NO_MORE_PADS:
+        {
+            struct no_more_pads_data *data = &cbdata->u.no_more_pads_data;
+            no_more_pads(data->decodebin, data->user);
+            break;
+        }
+    case REQUEST_BUFFER_SRC:
+        {
+            struct request_buffer_src_data *data = &cbdata->u.request_buffer_src_data;
+            cbdata->u.request_buffer_src_data.ret = request_buffer_src(data->pad,
+                    data->ofs, data->len, data->buf);
+            break;
+        }
+    case EVENT_SRC:
+        {
+            struct event_src_data *data = &cbdata->u.event_src_data;
+            cbdata->u.event_src_data.ret = event_src(data->pad, data->event);
+            break;
+        }
+    case EVENT_SINK:
+        {
+            struct event_sink_data *data = &cbdata->u.event_sink_data;
+            cbdata->u.event_sink_data.ret = event_sink(data->pad, data->event);
+            break;
+        }
+    case REQUEST_BUFFER_SINK:
+        {
+            struct request_buffer_sink_data *data = &cbdata->u.request_buffer_sink_data;
+            cbdata->u.request_buffer_sink_data.ret = request_buffer_sink(data->pad,
+                    data->ofs, data->size, data->caps, data->buf);
+            break;
+        }
+    case ACCEPT_CAPS_SINK:
+        {
+            struct accept_caps_sink_data *data = &cbdata->u.accept_caps_sink_data;
+            cbdata->u.accept_caps_sink_data.ret = accept_caps_sink(data->pad, data->caps);
+            break;
+        }
+    case SETCAPS_SINK:
+        {
+            struct setcaps_sink_data *data = &cbdata->u.setcaps_sink_data;
+            cbdata->u.setcaps_sink_data.ret = setcaps_sink(data->pad, data->caps);
+            break;
+        }
+    case GOT_DATA_SINK:
+        {
+            struct got_data_sink_data *data = &cbdata->u.got_data_sink_data;
+            cbdata->u.got_data_sink_data.ret = got_data_sink(data->pad, data->buf);
+            break;
+        }
+    case GOT_DATA:
+        {
+            struct got_data_data *data = &cbdata->u.got_data_data;
+            cbdata->u.got_data_data.ret = got_data(data->pad, data->buf);
+            break;
+        }
+    case REQUEST_BUFFER:
+        {
+            struct request_buffer_data *data = &cbdata->u.request_buffer_data;
+            cbdata->u.request_buffer_data.ret = request_buffer(data->pad,
+                    data->ofs, data->size, data->caps, data->buf);
+            break;
+        }
+    case REMOVED_DECODED_PAD:
+        {
+            struct removed_decoded_pad_data *data = &cbdata->u.removed_decoded_pad_data;
+            removed_decoded_pad(data->bin, data->pad, data->user);
+            break;
+        }
+    case AUTOPLUG_BLACKLIST:
+        {
+            struct autoplug_blacklist_data *data = &cbdata->u.autoplug_blacklist_data;
+            cbdata->u.autoplug_blacklist_data.ret = autoplug_blacklist(data->bin,
+                    data->pad, data->caps, data->fact, data->user);
+            break;
+        }
+    case UNKNOWN_TYPE:
+        {
+            struct unknown_type_data *data = &cbdata->u.unknown_type_data;
+            unknown_type(data->bin, data->pad, data->caps, data->user);
+            break;
+        }
+    case RELEASE_SAMPLE:
+        {
+            struct release_sample_data *data = &cbdata->u.release_sample_data;
+            release_sample(data->data);
+            break;
+        }
+    case TRANSFORM_PAD_ADDED:
+        {
+            struct transform_pad_added_data *data = &cbdata->u.transform_pad_added_data;
+            Gstreamer_transform_pad_added(data->filter, data->pad, data->user);
+            break;
+        }
+    }
+
+    pthread_mutex_lock(&cbdata->lock);
+    cbdata->finished = 1;
+    pthread_cond_broadcast(&cbdata->cond);
+    pthread_mutex_unlock(&cbdata->lock);
+}
+
+static DWORD WINAPI dispatch_thread(void *user)
+{
+    struct cb_data *cbdata;
+
+    pthread_mutex_lock(&cb_list_lock);
+
+    while(1){
+        pthread_cond_wait(&cb_list_cond, &cb_list_lock);
+
+        while(!list_empty(&cb_list)){
+            cbdata = LIST_ENTRY(list_head(&cb_list), struct cb_data, entry);
+            list_remove(&cbdata->entry);
+
+            TrySubmitThreadpoolCallback(&perform_cb, cbdata, NULL);
+        }
+    }
+
+    pthread_mutex_unlock(&cb_list_lock);
+
+    return 0;
+}
+
+void start_dispatch_thread(void)
+{
+    pthread_key_create(&wine_gst_key, NULL);
+    CreateThread(NULL, 0, &dispatch_thread, NULL, 0, NULL);
+}
diff --git a/dlls/winegstreamer/gsttffilter.c b/dlls/winegstreamer/gsttffilter.c
index 28f5d5d..ed5457b 100644
--- a/dlls/winegstreamer/gsttffilter.c
+++ b/dlls/winegstreamer/gsttffilter.c
@@ -28,6 +28,7 @@
 
 #include "gst_private.h"
 #include "gst_guids.h"
+#include "gst_cbs.h"
 
 #include "uuids.h"
 #include "mmreg.h"
@@ -125,6 +126,8 @@ static HRESULT WINAPI Gstreamer_transform_ProcessBegin(TransformFilter *iface) {
     GstTfImpl *This = (GstTfImpl*)iface;
     int ret;
 
+    mark_wine_thread();
+
     ret = gst_element_set_state(This->filter, GST_STATE_PLAYING);
     TRACE("Returned: %i\n", ret);
     return S_OK;
@@ -146,12 +149,7 @@ static HRESULT WINAPI Gstreamer_transform_DecideBufferSize(TransformFilter *tf,
     return IMemAllocator_SetProperties(pAlloc, ppropInputRequest, &actual);
 }
 
-static void release_sample(void *data) {
-    TRACE("Releasing %p\n", data);
-    IMediaSample_Release((IMediaSample *)data);
-}
-
-static GstFlowReturn got_data(GstPad *pad, GstBuffer *buf) {
+GstFlowReturn got_data(GstPad *pad, GstBuffer *buf) {
     GstTfImpl *This = gst_pad_get_element_private(pad);
     IMediaSample *sample = GST_APP_BUFFER(buf)->priv;
     REFERENCE_TIME tStart, tStop;
@@ -188,7 +186,7 @@ static GstFlowReturn got_data(GstPad *pad, GstBuffer *buf) {
     return GST_FLOW_OK;
 }
 
-static GstFlowReturn request_buffer(GstPad *pad, guint64 ofs, guint size, GstCaps *caps, GstBuffer **buf) {
+GstFlowReturn request_buffer(GstPad *pad, guint64 ofs, guint size, GstCaps *caps, GstBuffer **buf) {
     GstTfImpl *This = gst_pad_get_element_private(pad);
     IMediaSample *sample;
     BYTE *ptr;
@@ -202,7 +200,7 @@ static GstFlowReturn request_buffer(GstPad *pad, guint64 ofs, guint size, GstCap
     }
     IMediaSample_SetActualDataLength(sample, size);
     IMediaSample_GetPointer(sample, &ptr);
-    *buf = gst_app_buffer_new(ptr, size, release_sample, sample);
+    *buf = gst_app_buffer_new(ptr, size, release_sample_wrapper, sample);
 
     if (!*buf) {
         IMediaSample_Release(sample);
@@ -224,9 +222,11 @@ static HRESULT WINAPI Gstreamer_transform_ProcessData(TransformFilter *iface, IM
     int ret;
     TRACE("Reading %p\n", sample);
 
+    mark_wine_thread();
+
     EnterCriticalSection(&This->tf.csReceive);
     IMediaSample_GetPointer(sample, &data);
-    buf = gst_app_buffer_new(data, IMediaSample_GetActualDataLength(sample), release_sample, sample);
+    buf = gst_app_buffer_new(data, IMediaSample_GetActualDataLength(sample), release_sample_wrapper, sample);
     if (!buf) {
         LeaveCriticalSection(&This->tf.csReceive);
         return S_OK;
@@ -267,6 +267,8 @@ static HRESULT WINAPI Gstreamer_transform_ProcessEnd(TransformFilter *iface) {
     GstTfImpl *This = (GstTfImpl*)iface;
     int ret;
 
+    mark_wine_thread();
+
     LeaveCriticalSection(&This->tf.csReceive);
     ret = gst_element_set_state(This->filter, GST_STATE_READY);
     EnterCriticalSection(&This->tf.csReceive);
@@ -274,8 +276,9 @@ static HRESULT WINAPI Gstreamer_transform_ProcessEnd(TransformFilter *iface) {
     return S_OK;
 }
 
-static void Gstreamer_transform_pad_added(GstElement *filter, GstPad *pad, GstTfImpl *This)
+void Gstreamer_transform_pad_added(GstElement *filter, GstPad *pad, gpointer user)
 {
+    GstTfImpl *This = (GstTfImpl*)user;
     int ret;
     if (!GST_PAD_IS_SRC(pad))
         return;
@@ -294,6 +297,8 @@ static HRESULT Gstreamer_transform_ConnectInput(GstTfImpl *This, const AM_MEDIA_
     BOOL done = FALSE, found = FALSE;
     int ret;
 
+    mark_wine_thread();
+
     This->filter = gst_element_factory_make(This->gstreamer_name, NULL);
     if (!This->filter) {
         FIXME("Could not make %s filter\n", This->gstreamer_name);
@@ -303,8 +308,8 @@ static HRESULT Gstreamer_transform_ConnectInput(GstTfImpl *This, const AM_MEDIA_
     gst_pad_set_element_private (This->my_src, This);
 
     This->my_sink = gst_pad_new(NULL, GST_PAD_SINK);
-    gst_pad_set_chain_function(This->my_sink, got_data);
-    gst_pad_set_bufferalloc_function(This->my_sink, request_buffer);
+    gst_pad_set_chain_function(This->my_sink, got_data_wrapper);
+    gst_pad_set_bufferalloc_function(This->my_sink, request_buffer_wrapper);
     gst_pad_set_element_private (This->my_sink, This);
 
     ret = gst_pad_set_caps(This->my_src, capsin);
@@ -362,7 +367,7 @@ static HRESULT Gstreamer_transform_ConnectInput(GstTfImpl *This, const AM_MEDIA_
     gst_iterator_free(it);
     found = !!This->their_src;
     if (!found)
-        g_signal_connect(This->filter, "pad-added", G_CALLBACK(Gstreamer_transform_pad_added), This);
+        g_signal_connect(This->filter, "pad-added", G_CALLBACK(Gstreamer_transform_pad_added_wrapper), This);
     ret = gst_pad_link(This->my_src, This->their_sink);
     if (ret < 0) {
         WARN("Failed to link with %i\n", ret);
@@ -382,6 +387,8 @@ static HRESULT Gstreamer_transform_ConnectInput(GstTfImpl *This, const AM_MEDIA_
 static HRESULT WINAPI Gstreamer_transform_Cleanup(TransformFilter *tf, PIN_DIRECTION dir) {
     GstTfImpl *This = (GstTfImpl*)tf;
 
+    mark_wine_thread();
+
     if (dir == PINDIR_INPUT)
     {
         if (This->filter) {
@@ -405,6 +412,7 @@ static HRESULT WINAPI Gstreamer_transform_Cleanup(TransformFilter *tf, PIN_DIREC
 static HRESULT WINAPI Gstreamer_transform_EndOfStream(TransformFilter *iface) {
     GstTfImpl *This = (GstTfImpl*)iface;
     TRACE("%p\n", This);
+    mark_wine_thread();
 
     gst_pad_push_event(This->my_src, gst_event_new_eos());
     return S_OK;
@@ -413,6 +421,7 @@ static HRESULT WINAPI Gstreamer_transform_EndOfStream(TransformFilter *iface) {
 static HRESULT WINAPI Gstreamer_transform_BeginFlush(TransformFilter *iface) {
     GstTfImpl *This = (GstTfImpl*)iface;
     TRACE("%p\n", This);
+    mark_wine_thread();
 
     gst_pad_push_event(This->my_src, gst_event_new_flush_start());
     return S_OK;
@@ -421,6 +430,7 @@ static HRESULT WINAPI Gstreamer_transform_BeginFlush(TransformFilter *iface) {
 static HRESULT WINAPI Gstreamer_transform_EndFlush(TransformFilter *iface) {
     GstTfImpl *This = (GstTfImpl*)iface;
     TRACE("%p\n", This);
+    mark_wine_thread();
 
     gst_pad_push_event(This->my_src, gst_event_new_flush_stop());
     return S_OK;
@@ -429,6 +439,7 @@ static HRESULT WINAPI Gstreamer_transform_EndFlush(TransformFilter *iface) {
 static HRESULT WINAPI Gstreamer_transform_NewSegment(TransformFilter *iface, REFERENCE_TIME tStart, REFERENCE_TIME tStop, double dRate) {
     GstTfImpl *This = (GstTfImpl*)iface;
     TRACE("%p\n", This);
+    mark_wine_thread();
 
     gst_pad_push_event(This->my_src, gst_event_new_new_segment_full(1,
                        1.0, dRate, GST_FORMAT_TIME, 0, tStop <= tStart ? -1 : tStop * 100, tStart*100));
@@ -438,6 +449,7 @@ static HRESULT WINAPI Gstreamer_transform_NewSegment(TransformFilter *iface, REF
 static HRESULT WINAPI Gstreamer_transform_QOS(TransformFilter *iface, IBaseFilter *sender, Quality qm) {
     GstTfImpl *This = (GstTfImpl*)iface;
     REFERENCE_TIME late = qm.Late;
+    mark_wine_thread();
     if (qm.Late < 0 && -qm.Late > qm.TimeStamp)
         late = -qm.TimeStamp;
     gst_pad_push_event(This->my_sink, gst_event_new_qos(1000. / qm.Proportion, late * 100, qm.TimeStamp * 100));
@@ -480,6 +492,8 @@ static HRESULT WINAPI Gstreamer_Mp3_SetMediaType(TransformFilter *tf, PIN_DIRECT
     HRESULT hr;
     int layer;
 
+    mark_wine_thread();
+
     if (dir != PINDIR_INPUT)
         return S_OK;
 
@@ -572,6 +586,7 @@ IUnknown * CALLBACK Gstreamer_Mp3_create(IUnknown *punkout, HRESULT *phr)
         *phr = E_FAIL;
         return NULL;
     }
+    mark_wine_thread();
     plugin = Gstreamer_FindMatch("audio/mpeg, mpegversion=(int) 1");
     if (!plugin)
     {
@@ -620,6 +635,8 @@ static HRESULT WINAPI Gstreamer_YUV_SetMediaType(TransformFilter *tf, PIN_DIRECT
     int avgtime;
     LONG width, height;
 
+    mark_wine_thread();
+
     if (dir != PINDIR_INPUT)
         return S_OK;
 
@@ -737,6 +754,8 @@ static HRESULT WINAPI Gstreamer_AudioConvert_SetMediaType(TransformFilter *tf, P
     BOOL inisfloat = FALSE;
     int indepth;
 
+    mark_wine_thread();
+
     if (dir != PINDIR_INPUT)
         return S_OK;
 
diff --git a/dlls/winegstreamer/main.c b/dlls/winegstreamer/main.c
index 4f0e84f..aa0673a 100644
--- a/dlls/winegstreamer/main.c
+++ b/dlls/winegstreamer/main.c
@@ -254,7 +254,6 @@ DWORD Gstreamer_init(void) {
         argv[0] = argv0;
         argv[1] = argv1;
         argv[2] = NULL;
-        g_thread_impl_init();
         inited = gst_init_check(&argc, &argv, &err);
         HeapFree(GetProcessHeap(), 0, argv);
         if (err) {
@@ -269,6 +268,8 @@ DWORD Gstreamer_init(void) {
                                (LPCWSTR)hInst, &newhandle);
             if (!newhandle)
                 ERR("Could not pin module %p\n", hInst);
+
+            start_dispatch_thread();
         }
     }
     return inited;
-- 
2.7.0




More information about the wine-patches mailing list