[PATCH 2/2] winegstreamer: Always run gstreamer callbacks on a Wine thread
Andrew Eikum
aeikum at codeweavers.com
Mon Jan 11 14:13:13 CST 2016
This gets winegstreamer with gstreamer-0.10 working for me again,
fixing Bug 30557. However, gstreamer-0.10 is no longer developed and
some distros don't ship it anymore.
I have more patches on top of this one to move winegstreamer to the
gstreamer-1.0 API. See Bug 31836. In order to avoid dependency churn
for packagers, I think it makes sense to apply them both during the
same release, so I intend to send those patches after this one is
accepted. If you prefer to review them all, I can send them all at
once.
Signed-off-by: Andrew Eikum <aeikum at codeweavers.com>
---
dlls/winegstreamer/Makefile.in | 2 +-
dlls/winegstreamer/glibthread.c | 390 ---------------------------------------
dlls/winegstreamer/gst_cbs.c | 289 +++++++++++++++++++++++++++++
dlls/winegstreamer/gst_cbs.h | 204 ++++++++++++++++++++
dlls/winegstreamer/gst_private.h | 8 +-
dlls/winegstreamer/gstdemux.c | 264 +++++++++++++++++++++++---
dlls/winegstreamer/gsttffilter.c | 45 +++--
dlls/winegstreamer/main.c | 3 +-
8 files changed, 770 insertions(+), 435 deletions(-)
delete mode 100644 dlls/winegstreamer/glibthread.c
create mode 100644 dlls/winegstreamer/gst_cbs.c
create mode 100644 dlls/winegstreamer/gst_cbs.h
diff --git a/dlls/winegstreamer/Makefile.in b/dlls/winegstreamer/Makefile.in
index 0c14ffd..fecabe8 100644
--- a/dlls/winegstreamer/Makefile.in
+++ b/dlls/winegstreamer/Makefile.in
@@ -4,7 +4,7 @@ EXTRAINCL = $(GSTREAMER_CFLAGS)
EXTRALIBS = $(GSTREAMER_LIBS) $(PTHREAD_LIBS)
C_SRCS = \
- glibthread.c \
+ gst_cbs.c \
gstdemux.c \
gsttffilter.c \
main.c
diff --git a/dlls/winegstreamer/glibthread.c b/dlls/winegstreamer/glibthread.c
deleted file mode 100644
index 0d829a0..0000000
--- a/dlls/winegstreamer/glibthread.c
+++ /dev/null
@@ -1,390 +0,0 @@
-/* GLIB - Library of useful routines for C programming
- * Copyright (C) 1995-1997 Peter Mattis, Spencer Kimball and Josh MacDonald
- *
- * Wine GStreamer integration
- * Copyright 2010 Aric Stewart, CodeWeavers
- *
- * gthread.c: solaris thread system implementation
- * Copyright 1998-2001 Sebastian Wilhelmi; University of Karlsruhe
- * Copyright 2001 Hans Breuer
- *
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 2.1 of the License, or (at your option) any later version.
- *
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this library; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
- */
-
-/*
- * Modified by the GLib Team and others 1997-2000. See the AUTHORS
- * file for a list of people on the GLib Team. See the ChangeLog
- * files for a list of changes. These files are distributed with
- * GLib at ftp://ftp.gtk.org/pub/gtk/.
- */
-
-#include "config.h"
-
-#ifdef HAVE_PTHREAD_H
-#include <pthread.h>
-#endif
-
-#include <glib.h>
-
-#include <errno.h>
-#include <stdarg.h>
-#include <stdlib.h>
-#include <stdio.h>
-
-#include "windef.h"
-#include "winbase.h"
-#include "winnls.h"
-#include "wine/debug.h"
-
-WINE_DEFAULT_DEBUG_CHANNEL(gstreamer);
-
-static gchar *
-g_win32_error_message (gint error)
-{
- gchar *retval;
- WCHAR *msg = NULL;
- int nchars;
-
- FormatMessageW (FORMAT_MESSAGE_ALLOCATE_BUFFER
- |FORMAT_MESSAGE_IGNORE_INSERTS
- |FORMAT_MESSAGE_FROM_SYSTEM,
- NULL, error, 0,
- (LPWSTR) &msg, 0, NULL);
- if (msg != NULL)
- {
- nchars = WideCharToMultiByte(CP_UTF8, 0, msg, -1, NULL, 0, NULL, NULL);
-
- if (nchars > 2 && msg[nchars-1] == '\n' && msg[nchars-2] == '\r')
- msg[nchars-2] = '\0';
-
- retval = g_utf16_to_utf8 (msg, -1, NULL, NULL, NULL);
-
- LocalFree (msg);
- }
- else
- retval = g_strdup ("");
-
- return retval;
-}
-
-static gint g_thread_priority_map [G_THREAD_PRIORITY_URGENT + 1] = {
- THREAD_PRIORITY_BELOW_NORMAL,
- THREAD_PRIORITY_NORMAL,
- THREAD_PRIORITY_ABOVE_NORMAL,
- THREAD_PRIORITY_HIGHEST
-};
-
-static DWORD g_thread_self_tls;
-
-/* A "forward" declaration of this structure */
-static GThreadFunctions g_thread_functions_for_glib_use_default;
-
-typedef struct _GThreadData GThreadData;
-struct _GThreadData
-{
- GThreadFunc func;
- gpointer data;
- HANDLE thread;
- gboolean joinable;
-};
-
-static GMutex *
-g_mutex_new_posix_impl (void)
-{
- GMutex *result = (GMutex *) g_new (pthread_mutex_t, 1);
- pthread_mutex_init ((pthread_mutex_t *) result, NULL);
- return result;
-}
-
-static void
-g_mutex_free_posix_impl (GMutex * mutex)
-{
- pthread_mutex_destroy ((pthread_mutex_t *) mutex);
- g_free (mutex);
-}
-
-/* NOTE: the functions g_mutex_lock and g_mutex_unlock may not use
- functions from gmem.c and gmessages.c; */
-
-/* pthread_mutex_lock, pthread_mutex_unlock can be taken directly, as
- signature and semantic are right, but without error check then!!!!,
- we might want to change this therefore. */
-
-static gboolean
-g_mutex_trylock_posix_impl (GMutex * mutex)
-{
- int result;
-
- result = pthread_mutex_trylock ((pthread_mutex_t *) mutex);
-
- if (result == EBUSY)
- return FALSE;
-
- if (result) ERR("pthread_mutex_trylock %x\n",result);
- return TRUE;
-}
-
-static GCond *
-g_cond_new_posix_impl (void)
-{
- GCond *result = (GCond *) g_new (pthread_cond_t, 1);
- pthread_cond_init ((pthread_cond_t *) result, NULL);
- return result;
-}
-
-/* pthread_cond_signal, pthread_cond_broadcast and pthread_cond_wait
- can be taken directly, as signature and semantic are right, but
- without error check then!!!!, we might want to change this
- therefore. */
-
-#define G_NSEC_PER_SEC 1000000000
-
-static gboolean
-g_cond_timed_wait_posix_impl (GCond * cond,
- GMutex * entered_mutex,
- GTimeVal * abs_time)
-{
- int result;
- struct timespec end_time;
- gboolean timed_out;
-
- g_return_val_if_fail (cond != NULL, FALSE);
- g_return_val_if_fail (entered_mutex != NULL, FALSE);
-
- if (!abs_time)
- {
- result = pthread_cond_wait ((pthread_cond_t *)cond,
- (pthread_mutex_t *) entered_mutex);
- timed_out = FALSE;
- }
- else
- {
- end_time.tv_sec = abs_time->tv_sec;
- end_time.tv_nsec = abs_time->tv_usec * (G_NSEC_PER_SEC / G_USEC_PER_SEC);
-
- g_return_val_if_fail (end_time.tv_nsec < G_NSEC_PER_SEC, TRUE);
-
- result = pthread_cond_timedwait ((pthread_cond_t *) cond,
- (pthread_mutex_t *) entered_mutex,
- &end_time);
- timed_out = (result == ETIMEDOUT);
- }
-
- if (!timed_out)
- if (result) ERR("pthread_cond_timedwait %x\n",result);
-
- return !timed_out;
-}
-
-static void
-g_cond_free_posix_impl (GCond * cond)
-{
- pthread_cond_destroy ((pthread_cond_t *) cond);
- g_free (cond);
-}
-
-static GPrivate *
-g_private_new_posix_impl (GDestroyNotify destructor)
-{
- GPrivate *result = (GPrivate *) g_new (pthread_key_t, 1);
- pthread_key_create ((pthread_key_t *) result, destructor);
- return result;
-}
-
-/* NOTE: the functions g_private_get and g_private_set may not use
- functions from gmem.c and gmessages.c */
-
-static void
-g_private_set_posix_impl (GPrivate * private_key, gpointer value)
-{
- if (!private_key)
- return;
- pthread_setspecific (*(pthread_key_t *) private_key, value);
-}
-
-static gpointer
-g_private_get_posix_impl (GPrivate * private_key)
-{
- if (!private_key)
- return NULL;
- return pthread_getspecific (*(pthread_key_t *) private_key);
-}
-
-static void
-g_thread_set_priority_win32_impl (gpointer thread, GThreadPriority priority)
-{
- GThreadData *target = *(GThreadData **)thread;
-
- g_return_if_fail ((int)priority >= G_THREAD_PRIORITY_LOW);
- g_return_if_fail ((int)priority <= G_THREAD_PRIORITY_URGENT);
-
- SetThreadPriority (target->thread, g_thread_priority_map [priority]);
-}
-
-static void
-g_thread_self_win32_impl (gpointer thread)
-{
- GThreadData *self = TlsGetValue (g_thread_self_tls);
-
- if (!self)
- {
- /* This should only happen for the main thread! */
- HANDLE handle = GetCurrentThread ();
- HANDLE process = GetCurrentProcess ();
- self = g_new (GThreadData, 1);
- DuplicateHandle (process, handle, process, &self->thread, 0, FALSE,
- DUPLICATE_SAME_ACCESS);
- TlsSetValue (g_thread_self_tls, self);
- self->func = NULL;
- self->data = NULL;
- self->joinable = FALSE;
- }
-
- *(GThreadData **)thread = self;
-}
-
-static void
-g_thread_exit_win32_impl (void)
-{
- GThreadData *self = TlsGetValue (g_thread_self_tls);
-
- if (self)
- {
- if (!self->joinable)
- {
- CloseHandle (self->thread);
- g_free (self);
- }
- TlsSetValue (g_thread_self_tls, NULL);
- }
-
- ExitThread (0);
-}
-
-static guint __stdcall
-g_thread_proxy (gpointer data)
-{
- GThreadData *self = (GThreadData*) data;
-
- TlsSetValue (g_thread_self_tls, self);
-
- self->func (self->data);
-
- g_thread_exit_win32_impl ();
-
- g_assert_not_reached ();
-
- return 0;
-}
-
-static void
-g_thread_create_win32_impl (GThreadFunc func,
- gpointer data,
- gulong stack_size,
- gboolean joinable,
- gboolean bound,
- GThreadPriority priority,
- gpointer thread,
- GError **error)
-{
- guint ignore;
- GThreadData *retval;
-
- g_return_if_fail (func);
- g_return_if_fail ((int)priority >= G_THREAD_PRIORITY_LOW);
- g_return_if_fail ((int)priority <= G_THREAD_PRIORITY_URGENT);
-
- retval = g_new(GThreadData, 1);
- retval->func = func;
- retval->data = data;
-
- retval->joinable = joinable;
-
- retval->thread = (HANDLE) CreateThread (NULL, stack_size, g_thread_proxy,
- retval, 0, &ignore);
-
- if (retval->thread == NULL)
- {
- gchar *win_error = g_win32_error_message (GetLastError ());
- g_set_error (error, G_THREAD_ERROR, G_THREAD_ERROR_AGAIN,
- "Error creating thread: %s", win_error);
- g_free (retval);
- g_free (win_error);
- return;
- }
-
- *(GThreadData **)thread = retval;
-
- g_thread_set_priority_win32_impl (thread, priority);
-}
-
-static void
-g_thread_yield_win32_impl (void)
-{
- Sleep(0);
-}
-
-static void
-g_thread_join_win32_impl (gpointer thread)
-{
- GThreadData *target = *(GThreadData **)thread;
-
- g_return_if_fail (target->joinable);
-
- WaitForSingleObject (target->thread, INFINITE);
-
- CloseHandle (target->thread);
- g_free (target);
-}
-
-static GThreadFunctions g_thread_functions_for_glib_use_default =
-{
- /* Posix functions here for speed */
- g_mutex_new_posix_impl,
- (void (*)(GMutex *)) pthread_mutex_lock,
- g_mutex_trylock_posix_impl,
- (void (*)(GMutex *)) pthread_mutex_unlock,
- g_mutex_free_posix_impl,
- g_cond_new_posix_impl,
- (void (*)(GCond *)) pthread_cond_signal,
- (void (*)(GCond *)) pthread_cond_broadcast,
- (void (*)(GCond *, GMutex *)) pthread_cond_wait,
- g_cond_timed_wait_posix_impl,
- g_cond_free_posix_impl,
- g_private_new_posix_impl,
- g_private_get_posix_impl,
- g_private_set_posix_impl,
- /* win32 function required here */
- g_thread_create_win32_impl, /* thread */
- g_thread_yield_win32_impl,
- g_thread_join_win32_impl,
- g_thread_exit_win32_impl,
- g_thread_set_priority_win32_impl,
- g_thread_self_win32_impl,
- NULL /* no equal function necessary */
-};
-
-void g_thread_impl_init (void)
-{
- static gboolean beenhere = FALSE;
-
- if (beenhere)
- return;
-
- beenhere = TRUE;
-
- g_thread_self_tls = TlsAlloc ();
- g_thread_init(&g_thread_functions_for_glib_use_default);
-}
diff --git a/dlls/winegstreamer/gst_cbs.c b/dlls/winegstreamer/gst_cbs.c
new file mode 100644
index 0000000..c445104
--- /dev/null
+++ b/dlls/winegstreamer/gst_cbs.c
@@ -0,0 +1,289 @@
+/*
+ * Copyright 2015 Andrew Eikum for CodeWeavers
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
+ */
+
+#include "config.h"
+
+#include <gst/app/gstappsink.h>
+#include <gst/app/gstappsrc.h>
+#include <gst/app/gstappbuffer.h>
+#include <gst/gstutils.h>
+
+#include "wine/list.h"
+
+#include "gst_cbs.h"
+
+/* gstreamer calls our callbacks from threads that Wine did not create. Some
+ * callbacks execute code which requires Wine to have created the thread
+ * (critical sections, debug logging, dshow client code). Since gstreamer can't
+ * provide an API to override its thread creation, we have to intercept all
+ * callbacks in code which avoids the Wine thread requirement, and then
+ * dispatch those callbacks on a thread that is known to be created by Wine.
+ *
+ * This file must not contain any code that depends on the Wine TEB!
+ */
+
+static void call_cb(struct cb_data *cbdata)
+{
+ pthread_mutex_init(&cbdata->lock, NULL);
+ pthread_cond_init(&cbdata->cond, NULL);
+ cbdata->finished = 0;
+
+ if(is_wine_thread()){
+ /* The thread which triggered gstreamer to call this callback may
+ * already hold a critical section. If so, executing the callback on a
+ * worker thread can cause a deadlock. If we are already on a Wine
+ * thread, then there is no need to run this callback on a worker
+ * thread anyway, which avoids the deadlock issue. */
+ perform_cb(NULL, cbdata);
+
+ pthread_cond_destroy(&cbdata->cond);
+ pthread_mutex_destroy(&cbdata->lock);
+
+ return;
+ }
+
+ pthread_mutex_lock(&cb_list_lock);
+
+ list_add_tail(&cb_list, &cbdata->entry);
+ pthread_cond_broadcast(&cb_list_cond);
+
+ pthread_mutex_lock(&cbdata->lock);
+
+ pthread_mutex_unlock(&cb_list_lock);
+
+ while(!cbdata->finished)
+ pthread_cond_wait(&cbdata->cond, &cbdata->lock);
+
+ pthread_mutex_unlock(&cbdata->lock);
+
+ pthread_cond_destroy(&cbdata->cond);
+ pthread_mutex_destroy(&cbdata->lock);
+}
+
+GstBusSyncReply watch_bus_wrapper(GstBus *bus, GstMessage *msg, gpointer user)
+{
+ struct cb_data cbdata = { WATCH_BUS,
+ { .watch_bus_data = { bus, msg, user} }
+ };
+
+ call_cb(&cbdata);
+
+ return cbdata.u.watch_bus_data.ret;
+}
+
+void existing_new_pad_wrapper(GstElement *bin, GstPad *pad, gboolean last,
+ gpointer user)
+{
+ struct cb_data cbdata = { EXISTING_NEW_PAD,
+ { .existing_new_pad_data = {bin, pad, last, user} }
+ };
+
+ call_cb(&cbdata);
+}
+
+gboolean check_get_range_wrapper(GstPad *pad)
+{
+ struct cb_data cbdata = { CHECK_GET_RANGE,
+ { .check_get_range_data = {pad} }
+ };
+
+ call_cb(&cbdata);
+
+ return cbdata.u.check_get_range_data.ret;
+}
+
+gboolean query_function_wrapper(GstPad *pad, GstQuery *query)
+{
+ struct cb_data cbdata = { QUERY_FUNCTION,
+ { .query_function_data = {pad, query} }
+ };
+
+ call_cb(&cbdata);
+
+ return cbdata.u.query_function_data.ret;
+}
+
+gboolean activate_push_wrapper(GstPad *pad, gboolean activate)
+{
+ struct cb_data cbdata = { ACTIVATE_PUSH,
+ { .activate_push_data = {pad, activate} }
+ };
+
+ call_cb(&cbdata);
+
+ return cbdata.u.activate_push_data.ret;
+}
+
+void no_more_pads_wrapper(GstElement *decodebin, gpointer user)
+{
+ struct cb_data cbdata = { NO_MORE_PADS,
+ { .no_more_pads_data = {decodebin, user} }
+ };
+
+ call_cb(&cbdata);
+}
+
+GstFlowReturn request_buffer_src_wrapper(GstPad *pad, guint64 ofs, guint len,
+ GstBuffer **buf)
+{
+ struct cb_data cbdata = { REQUEST_BUFFER_SRC,
+ { .request_buffer_src_data = {pad, ofs, len, buf} }
+ };
+
+ call_cb(&cbdata);
+
+ return cbdata.u.request_buffer_src_data.ret;
+}
+
+gboolean event_src_wrapper(GstPad *pad, GstEvent *event)
+{
+ struct cb_data cbdata = { EVENT_SRC,
+ { .event_src_data = {pad, event} }
+ };
+
+ call_cb(&cbdata);
+
+ return cbdata.u.event_src_data.ret;
+}
+
+gboolean event_sink_wrapper(GstPad *pad, GstEvent *event)
+{
+ struct cb_data cbdata = { EVENT_SINK,
+ { .event_sink_data = {pad, event} }
+ };
+
+ call_cb(&cbdata);
+
+ return cbdata.u.event_sink_data.ret;
+}
+
+GstFlowReturn request_buffer_sink_wrapper(GstPad *pad, guint64 ofs, guint size,
+ GstCaps *caps, GstBuffer **buf)
+{
+ struct cb_data cbdata = { REQUEST_BUFFER_SINK,
+ { .request_buffer_sink_data = {pad, ofs, size, caps, buf} }
+ };
+
+ call_cb(&cbdata);
+
+ return cbdata.u.request_buffer_sink_data.ret;
+}
+
+gboolean accept_caps_sink_wrapper(GstPad *pad, GstCaps *caps)
+{
+ struct cb_data cbdata = { ACCEPT_CAPS_SINK,
+ { .accept_caps_sink_data = {pad, caps} }
+ };
+
+ call_cb(&cbdata);
+
+ return cbdata.u.accept_caps_sink_data.ret;
+}
+
+gboolean setcaps_sink_wrapper(GstPad *pad, GstCaps *caps)
+{
+ struct cb_data cbdata = { SETCAPS_SINK,
+ { .setcaps_sink_data = {pad, caps} }
+ };
+
+ call_cb(&cbdata);
+
+ return cbdata.u.setcaps_sink_data.ret;
+}
+
+GstFlowReturn got_data_sink_wrapper(GstPad *pad, GstBuffer *buf)
+{
+ struct cb_data cbdata = { GOT_DATA_SINK,
+ { .got_data_sink_data = {pad, buf} }
+ };
+
+ call_cb(&cbdata);
+
+ return cbdata.u.got_data_sink_data.ret;
+}
+
+GstFlowReturn got_data_wrapper(GstPad *pad, GstBuffer *buf)
+{
+ struct cb_data cbdata = { GOT_DATA,
+ { .got_data_data = {pad, buf} }
+ };
+
+ call_cb(&cbdata);
+
+ return cbdata.u.got_data_data.ret;
+}
+
+GstFlowReturn request_buffer_wrapper(GstPad *pad, guint64 ofs, guint size,
+ GstCaps *caps, GstBuffer **buf)
+{
+ struct cb_data cbdata = { REQUEST_BUFFER,
+ { .request_buffer_data = {pad, ofs, size, caps, buf} }
+ };
+
+ call_cb(&cbdata);
+
+ return cbdata.u.request_buffer_data.ret;
+}
+
+void removed_decoded_pad_wrapper(GstElement *bin, GstPad *pad, gpointer user)
+{
+ struct cb_data cbdata = { REMOVED_DECODED_PAD,
+ { .removed_decoded_pad_data = {bin, pad, user} }
+ };
+
+ call_cb(&cbdata);
+}
+
+GstAutoplugSelectResult autoplug_blacklist_wrapper(GstElement *bin, GstPad *pad,
+ GstCaps *caps, GstElementFactory *fact, gpointer user)
+{
+ struct cb_data cbdata = { AUTOPLUG_BLACKLIST,
+ { .autoplug_blacklist_data = {bin, pad, caps, fact, user} }
+ };
+
+ call_cb(&cbdata);
+
+ return cbdata.u.autoplug_blacklist_data.ret;
+}
+
+void unknown_type_wrapper(GstElement *bin, GstPad *pad, GstCaps *caps, gpointer user)
+{
+ struct cb_data cbdata = { UNKNOWN_TYPE,
+ { .unknown_type_data = {bin, pad, caps, user} }
+ };
+
+ call_cb(&cbdata);
+}
+
+void release_sample_wrapper(gpointer data)
+{
+ struct cb_data cbdata = { RELEASE_SAMPLE,
+ { .release_sample_data = {data} }
+ };
+
+ call_cb(&cbdata);
+}
+
+void Gstreamer_transform_pad_added_wrapper(GstElement *filter, GstPad *pad, gpointer user)
+{
+ struct cb_data cbdata = { TRANSFORM_PAD_ADDED,
+ { .transform_pad_added_data = {filter, pad, user} }
+ };
+
+ call_cb(&cbdata);
+}
diff --git a/dlls/winegstreamer/gst_cbs.h b/dlls/winegstreamer/gst_cbs.h
new file mode 100644
index 0000000..adb99a1
--- /dev/null
+++ b/dlls/winegstreamer/gst_cbs.h
@@ -0,0 +1,204 @@
+/*
+ * Copyright 2015 Andrew Eikum for CodeWeavers
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
+ */
+
+#ifndef GST_CBS_H
+#define GST_CBS_H
+
+#include "wine/list.h"
+#include "windef.h"
+#include <pthread.h>
+
+typedef enum {
+ GST_AUTOPLUG_SELECT_TRY,
+ GST_AUTOPLUG_SELECT_EXPOSE,
+ GST_AUTOPLUG_SELECT_SKIP
+} GstAutoplugSelectResult;
+
+enum CB_TYPE {
+ WATCH_BUS,
+ EXISTING_NEW_PAD,
+ CHECK_GET_RANGE,
+ QUERY_FUNCTION,
+ ACTIVATE_PUSH,
+ NO_MORE_PADS,
+ REQUEST_BUFFER_SRC,
+ EVENT_SRC,
+ EVENT_SINK,
+ REQUEST_BUFFER_SINK,
+ ACCEPT_CAPS_SINK,
+ SETCAPS_SINK,
+ GOT_DATA_SINK,
+ GOT_DATA,
+ REQUEST_BUFFER,
+ REMOVED_DECODED_PAD,
+ AUTOPLUG_BLACKLIST,
+ UNKNOWN_TYPE,
+ RELEASE_SAMPLE,
+ TRANSFORM_PAD_ADDED
+};
+
+struct cb_data {
+ enum CB_TYPE type;
+ union {
+ struct watch_bus_data {
+ GstBus *bus;
+ GstMessage *msg;
+ gpointer user;
+ GstBusSyncReply ret;
+ } watch_bus_data;
+ struct existing_new_pad_data {
+ GstElement *bin;
+ GstPad *pad;
+ gboolean last;
+ gpointer user;
+ } existing_new_pad_data;
+ struct check_get_range_data {
+ GstPad *pad;
+ gboolean ret;
+ } check_get_range_data;
+ struct query_function_data {
+ GstPad *pad;
+ GstQuery *query;
+ gboolean ret;
+ } query_function_data;
+ struct activate_push_data {
+ GstPad *pad;
+ gboolean activate;
+ gboolean ret;
+ } activate_push_data;
+ struct no_more_pads_data {
+ GstElement *decodebin;
+ gpointer user;
+ } no_more_pads_data;
+ struct request_buffer_src_data {
+ GstPad *pad;
+ guint64 ofs;
+ guint len;
+ GstBuffer **buf;
+ GstFlowReturn ret;
+ } request_buffer_src_data;
+ struct event_src_data {
+ GstPad *pad;
+ GstEvent *event;
+ gboolean ret;
+ } event_src_data;
+ struct event_sink_data {
+ GstPad *pad;
+ GstEvent *event;
+ gboolean ret;
+ } event_sink_data;
+ struct request_buffer_sink_data {
+ GstPad *pad;
+ guint64 ofs;
+ guint size;
+ GstCaps *caps;
+ GstBuffer **buf;
+ GstFlowReturn ret;
+ } request_buffer_sink_data;
+ struct accept_caps_sink_data {
+ GstPad *pad;
+ GstCaps *caps;
+ gboolean ret;
+ } accept_caps_sink_data;
+ struct setcaps_sink_data {
+ GstPad *pad;
+ GstCaps *caps;
+ gboolean ret;
+ } setcaps_sink_data;
+ struct got_data_sink_data {
+ GstPad *pad;
+ GstBuffer *buf;
+ GstFlowReturn ret;
+ } got_data_sink_data;
+ struct got_data_data {
+ GstPad *pad;
+ GstBuffer *buf;
+ GstFlowReturn ret;
+ } got_data_data;
+ struct request_buffer_data {
+ GstPad *pad;
+ guint64 ofs;
+ guint size;
+ GstCaps *caps;
+ GstBuffer **buf;
+ GstFlowReturn ret;
+ } request_buffer_data;
+ struct removed_decoded_pad_data {
+ GstElement *bin;
+ GstPad *pad;
+ gpointer user;
+ } removed_decoded_pad_data;
+ struct autoplug_blacklist_data {
+ GstElement *bin;
+ GstPad *pad;
+ GstCaps *caps;
+ GstElementFactory *fact;
+ gpointer user;
+ GstAutoplugSelectResult ret;
+ } autoplug_blacklist_data;
+ struct unknown_type_data {
+ GstElement *bin;
+ GstPad *pad;
+ GstCaps *caps;
+ gpointer user;
+ } unknown_type_data;
+ struct release_sample_data {
+ gpointer data;
+ } release_sample_data;
+ struct transform_pad_added_data {
+ GstElement *filter;
+ GstPad *pad;
+ gpointer user;
+ } transform_pad_added_data;
+ } u;
+
+ int finished;
+ pthread_mutex_t lock;
+ pthread_cond_t cond;
+ struct list entry;
+};
+
+extern pthread_mutex_t cb_list_lock DECLSPEC_HIDDEN;
+extern pthread_cond_t cb_list_cond DECLSPEC_HIDDEN;
+extern struct list cb_list DECLSPEC_HIDDEN;
+void CALLBACK perform_cb(TP_CALLBACK_INSTANCE *instance, void *user) DECLSPEC_HIDDEN;
+BOOL is_wine_thread(void) DECLSPEC_HIDDEN;
+void mark_wine_thread(void) DECLSPEC_HIDDEN;
+
+GstBusSyncReply watch_bus_wrapper(GstBus *bus, GstMessage *msg, gpointer user) DECLSPEC_HIDDEN;
+void existing_new_pad_wrapper(GstElement *bin, GstPad *pad, gboolean last, gpointer user) DECLSPEC_HIDDEN;
+gboolean check_get_range_wrapper(GstPad *pad) DECLSPEC_HIDDEN;
+gboolean query_function_wrapper(GstPad *pad, GstQuery *query) DECLSPEC_HIDDEN;
+gboolean activate_push_wrapper(GstPad *pad, gboolean activate) DECLSPEC_HIDDEN;
+void no_more_pads_wrapper(GstElement *decodebin, gpointer user) DECLSPEC_HIDDEN;
+GstFlowReturn request_buffer_src_wrapper(GstPad *pad, guint64 ofs, guint len, GstBuffer **buf) DECLSPEC_HIDDEN;
+gboolean event_src_wrapper(GstPad *pad, GstEvent *event) DECLSPEC_HIDDEN;
+gboolean event_sink_wrapper(GstPad *pad, GstEvent *event) DECLSPEC_HIDDEN;
+GstFlowReturn request_buffer_sink_wrapper(GstPad *pad, guint64 ofs, guint size, GstCaps *caps, GstBuffer **buf) DECLSPEC_HIDDEN;
+gboolean accept_caps_sink_wrapper(GstPad *pad, GstCaps *caps) DECLSPEC_HIDDEN;
+gboolean setcaps_sink_wrapper(GstPad *pad, GstCaps *caps) DECLSPEC_HIDDEN;
+GstFlowReturn got_data_sink_wrapper(GstPad *pad, GstBuffer *buf) DECLSPEC_HIDDEN;
+GstFlowReturn got_data_wrapper(GstPad *pad, GstBuffer *buf) DECLSPEC_HIDDEN;
+GstFlowReturn request_buffer_wrapper(GstPad *pad, guint64 ofs, guint size, GstCaps *caps, GstBuffer **buf) DECLSPEC_HIDDEN;
+void removed_decoded_pad_wrapper(GstElement *bin, GstPad *pad, gpointer user) DECLSPEC_HIDDEN;
+GstAutoplugSelectResult autoplug_blacklist_wrapper(GstElement *bin, GstPad *pad, GstCaps *caps, GstElementFactory *fact, gpointer user) DECLSPEC_HIDDEN;
+void unknown_type_wrapper(GstElement *bin, GstPad *pad, GstCaps *caps, gpointer user) DECLSPEC_HIDDEN;
+void release_sample_wrapper(gpointer data) DECLSPEC_HIDDEN;
+void Gstreamer_transform_pad_added_wrapper(GstElement *filter, GstPad *pad, gpointer user) DECLSPEC_HIDDEN;
+
+#endif
diff --git a/dlls/winegstreamer/gst_private.h b/dlls/winegstreamer/gst_private.h
index d041d64..2f145ed 100644
--- a/dlls/winegstreamer/gst_private.h
+++ b/dlls/winegstreamer/gst_private.h
@@ -42,6 +42,12 @@ IUnknown * CALLBACK Gstreamer_Mp3_create(IUnknown *pUnkOuter, HRESULT *phr);
IUnknown * CALLBACK Gstreamer_YUV_create(IUnknown *pUnkOuter, HRESULT *phr);
IUnknown * CALLBACK Gstreamer_Splitter_create(IUnknown *pUnkOuter, HRESULT *phr);
-void g_thread_impl_init(void);
DWORD Gstreamer_init(void);
+
+GstFlowReturn got_data(GstPad *pad, GstBuffer *buf) DECLSPEC_HIDDEN;
+GstFlowReturn request_buffer(GstPad *pad, guint64 ofs, guint size, GstCaps *caps, GstBuffer **buf) DECLSPEC_HIDDEN;
+void Gstreamer_transform_pad_added(GstElement *filter, GstPad *pad, gpointer user) DECLSPEC_HIDDEN;
+
+void start_dispatch_thread(void) DECLSPEC_HIDDEN;
+
#endif /* __GST_PRIVATE_INCLUDED__ */
diff --git a/dlls/winegstreamer/gstdemux.c b/dlls/winegstreamer/gstdemux.c
index c17fc2f..82cc958 100644
--- a/dlls/winegstreamer/gstdemux.c
+++ b/dlls/winegstreamer/gstdemux.c
@@ -27,6 +27,7 @@
#include "gst_private.h"
#include "gst_guids.h"
+#include "gst_cbs.h"
#include "vfwmsgs.h"
#include "amvideo.h"
@@ -44,6 +45,8 @@
WINE_DEFAULT_DEBUG_CHANNEL(gstreamer);
+static pthread_key_t wine_gst_key;
+
typedef struct GSTOutPin GSTOutPin;
typedef struct GSTInPin {
BasePin pin;
@@ -97,6 +100,17 @@ static HRESULT WINAPI GST_ChangeCurrent(IMediaSeeking *iface);
static HRESULT WINAPI GST_ChangeStop(IMediaSeeking *iface);
static HRESULT WINAPI GST_ChangeRate(IMediaSeeking *iface);
+void mark_wine_thread(void)
+{
+ /* set it to non-NULL to indicate that this is a Wine thread */
+ pthread_setspecific(wine_gst_key, &wine_gst_key);
+}
+
+BOOL is_wine_thread(void)
+{
+ return pthread_getspecific(wine_gst_key) != NULL;
+}
+
static gboolean amt_from_gst_caps_audio(GstCaps *caps, AM_MEDIA_TYPE *amt) {
WAVEFORMATEXTENSIBLE *wfe;
WAVEFORMATEX *wfx;
@@ -466,7 +480,7 @@ static DWORD CALLBACK push_data(LPVOID iface) {
}
IMediaSample_GetPointer(buf, &data);
- gstbuf = gst_app_buffer_new(data, IMediaSample_GetActualDataLength(buf), release_sample, buf);
+ gstbuf = gst_app_buffer_new(data, IMediaSample_GetActualDataLength(buf), release_sample_wrapper, buf);
if (!gstbuf) {
IMediaSample_Release(buf);
break;
@@ -621,7 +635,7 @@ static GstFlowReturn request_buffer_sink(GstPad *pad, guint64 ofs, guint size, G
}
IMediaSample_SetActualDataLength(sample, size);
IMediaSample_GetPointer(sample, &ptr);
- *buf = gst_app_buffer_new(ptr, size, release_sample, sample);
+ *buf = gst_app_buffer_new(ptr, size, release_sample_wrapper, sample);
if (!*buf) {
IMediaSample_Release(sample);
ERR("Out of memory\n");
@@ -682,7 +696,8 @@ static DWORD CALLBACK push_data_init(LPVOID iface) {
return 0;
}
-static void removed_decoded_pad(GstElement *bin, GstPad *pad, GSTImpl *This) {
+static void removed_decoded_pad(GstElement *bin, GstPad *pad, gpointer user) {
+ GSTImpl *This = (GSTImpl*)user;
int x;
GSTOutPin *pin;
@@ -728,11 +743,11 @@ static void init_new_decoded_pad(GstElement *bin, GstPad *pad, gboolean last, GS
typename = gst_structure_get_name(arg);
mypad = gst_pad_new(NULL, GST_PAD_SINK);
- gst_pad_set_chain_function(mypad, got_data_sink);
- gst_pad_set_event_function(mypad, event_sink);
- gst_pad_set_bufferalloc_function(mypad, request_buffer_sink);
- gst_pad_set_acceptcaps_function(mypad, accept_caps_sink);
- gst_pad_set_setcaps_function(mypad, setcaps_sink);
+ gst_pad_set_chain_function(mypad, got_data_sink_wrapper);
+ gst_pad_set_event_function(mypad, event_sink_wrapper);
+ gst_pad_set_bufferalloc_function(mypad, request_buffer_sink_wrapper);
+ gst_pad_set_acceptcaps_function(mypad, accept_caps_sink_wrapper);
+ gst_pad_set_setcaps_function(mypad, setcaps_sink_wrapper);
if (!strcmp(typename, "audio/x-raw-int") ||
!strcmp(typename, "audio/x-raw-float")) {
@@ -766,7 +781,8 @@ static void init_new_decoded_pad(GstElement *bin, GstPad *pad, gboolean last, GS
}
}
-static void existing_new_pad(GstElement *bin, GstPad *pad, gboolean last, GSTImpl *This) {
+static void existing_new_pad(GstElement *bin, GstPad *pad, gboolean last, gpointer user) {
+ GSTImpl *This = (GSTImpl*)user;
int x;
if (gst_pad_is_linked(pad))
@@ -858,18 +874,13 @@ static gboolean activate_push(GstPad *pad, gboolean activate) {
return TRUE;
}
-static void no_more_pads(GstElement *decodebin, GSTImpl *This) {
+static void no_more_pads(GstElement *decodebin, gpointer user) {
+ GSTImpl *This = (GSTImpl*)user;
TRACE("Done\n");
SetEvent(This->event);
}
-typedef enum {
- GST_AUTOPLUG_SELECT_TRY,
- GST_AUTOPLUG_SELECT_EXPOSE,
- GST_AUTOPLUG_SELECT_SKIP
-} GstAutoplugSelectResult;
-
-static GstAutoplugSelectResult autoplug_blacklist(GstElement *bin, GstPad *pad, GstCaps *caps, GstElementFactory *fact, GSTImpl *This) {
+static GstAutoplugSelectResult autoplug_blacklist(GstElement *bin, GstPad *pad, GstCaps *caps, GstElementFactory *fact, gpointer user) {
const char *name = gst_element_factory_get_longname(fact);
if (strstr(name, "Player protection")) {
@@ -904,7 +915,7 @@ static GstBusSyncReply watch_bus(GstBus *bus, GstMessage *msg, gpointer data) {
return GST_BUS_DROP;
}
-static void unknown_type(GstElement *bin, GstPad *pad, GstCaps *caps, GSTImpl *This) {
+static void unknown_type(GstElement *bin, GstPad *pad, GstCaps *caps, gpointer user) {
gchar *strcaps = gst_caps_to_string(caps);
FIXME("Could not find a filter for caps: %s\n", strcaps);
g_free(strcaps);
@@ -928,7 +939,7 @@ static HRESULT GST_Connect(GSTInPin *pPin, IPin *pConnectPin, ALLOCATOR_PROPERTI
if (!This->bus) {
This->bus = gst_bus_new();
- gst_bus_set_sync_handler(This->bus, watch_bus, This);
+ gst_bus_set_sync_handler(This->bus, watch_bus_wrapper, This);
}
This->gstfilter = gst_element_factory_make("decodebin2", NULL);
@@ -938,21 +949,21 @@ static HRESULT GST_Connect(GSTInPin *pPin, IPin *pConnectPin, ALLOCATOR_PROPERTI
return E_FAIL;
}
gst_element_set_bus(This->gstfilter, This->bus);
- g_signal_connect(This->gstfilter, "new-decoded-pad", G_CALLBACK(existing_new_pad), This);
- g_signal_connect(This->gstfilter, "pad-removed", G_CALLBACK(removed_decoded_pad), This);
- g_signal_connect(This->gstfilter, "autoplug-select", G_CALLBACK(autoplug_blacklist), This);
- g_signal_connect(This->gstfilter, "unknown-type", G_CALLBACK(unknown_type), This);
+ g_signal_connect(This->gstfilter, "new-decoded-pad", G_CALLBACK(existing_new_pad_wrapper), This);
+ g_signal_connect(This->gstfilter, "pad-removed", G_CALLBACK(removed_decoded_pad_wrapper), This);
+ g_signal_connect(This->gstfilter, "autoplug-select", G_CALLBACK(autoplug_blacklist_wrapper), This);
+ g_signal_connect(This->gstfilter, "unknown-type", G_CALLBACK(unknown_type_wrapper), This);
This->my_src = gst_pad_new_from_static_template(&src_template, "quartz-src");
- gst_pad_set_getrange_function(This->my_src, request_buffer_src);
- gst_pad_set_checkgetrange_function(This->my_src, check_get_range);
- gst_pad_set_query_function(This->my_src, query_function);
- gst_pad_set_activatepush_function(This->my_src, activate_push);
- gst_pad_set_event_function(This->my_src, event_src);
+ gst_pad_set_getrange_function(This->my_src, request_buffer_src_wrapper);
+ gst_pad_set_checkgetrange_function(This->my_src, check_get_range_wrapper);
+ gst_pad_set_query_function(This->my_src, query_function_wrapper);
+ gst_pad_set_activatepush_function(This->my_src, activate_push_wrapper);
+ gst_pad_set_event_function(This->my_src, event_src_wrapper);
gst_pad_set_element_private (This->my_src, This);
This->their_sink = gst_element_get_static_pad(This->gstfilter, "sink");
- g_signal_connect(This->gstfilter, "no-more-pads", G_CALLBACK(no_more_pads), This);
+ g_signal_connect(This->gstfilter, "no-more-pads", G_CALLBACK(no_more_pads_wrapper), This);
ret = gst_pad_link(This->my_src, This->their_sink);
gst_object_unref(This->their_sink);
if (ret < 0) {
@@ -1042,6 +1053,8 @@ IUnknown * CALLBACK Gstreamer_Splitter_create(IUnknown *punkout, HRESULT *phr) {
return NULL;
}
+ mark_wine_thread();
+
This = CoTaskMemAlloc(sizeof(*This));
obj = (IUnknown*)This;
if (!This)
@@ -1150,6 +1163,8 @@ static HRESULT WINAPI GST_Stop(IBaseFilter *iface) {
TRACE("()\n");
+ mark_wine_thread();
+
if (This->gstfilter)
gst_element_set_state(This->gstfilter, GST_STATE_READY);
return S_OK;
@@ -1165,6 +1180,8 @@ static HRESULT WINAPI GST_Pause(IBaseFilter *iface) {
if (!This->gstfilter)
return VFW_E_NOT_CONNECTED;
+ mark_wine_thread();
+
gst_element_get_state(This->gstfilter, &now, NULL, -1);
if (now == GST_STATE_PAUSED)
return S_OK;
@@ -1187,6 +1204,8 @@ static HRESULT WINAPI GST_Run(IBaseFilter *iface, REFERENCE_TIME tStart) {
TRACE("(%s)\n", wine_dbgstr_longlong(tStart));
+ mark_wine_thread();
+
if (!This->gstfilter)
return VFW_E_NOT_CONNECTED;
@@ -1236,6 +1255,8 @@ static HRESULT WINAPI GST_GetState(IBaseFilter *iface, DWORD dwMilliSecsTimeout,
TRACE("(%d, %p)\n", dwMilliSecsTimeout, pState);
+ mark_wine_thread();
+
if (!This->gstfilter) {
*pState = State_Stopped;
return S_OK;
@@ -1290,6 +1311,7 @@ static HRESULT WINAPI GST_ChangeRate(IMediaSeeking *iface) {
GSTOutPin *This = impl_from_IMediaSeeking(iface);
GstEvent *ev = gst_event_new_seek(This->seek.dRate, GST_FORMAT_TIME, 0, GST_SEEK_TYPE_NONE, -1, GST_SEEK_TYPE_NONE, -1);
TRACE("(%p) New rate %g\n", iface, This->seek.dRate);
+ mark_wine_thread();
gst_pad_push_event(This->my_sink, ev);
return S_OK;
}
@@ -1316,6 +1338,8 @@ static HRESULT WINAPI GST_Seeking_GetCurrentPosition(IMediaSeeking *iface, REFER
if (!pos)
return E_POINTER;
+ mark_wine_thread();
+
if (!This->their_src) {
*pos = This->seek.llCurrent;
TRACE("Cached value\n");
@@ -1352,6 +1376,8 @@ static HRESULT WINAPI GST_Seeking_SetPositions(IMediaSeeking *iface, REFERENCE_T
GstSeekType curtype, stoptype;
GstEvent *e;
+ mark_wine_thread();
+
if (!This->seek.llDuration)
return E_NOTIMPL;
@@ -1425,6 +1451,7 @@ static ULONG WINAPI GST_QualityControl_Release(IQualityControl *iface)
static HRESULT WINAPI GST_QualityControl_Notify(IQualityControl *iface, IBaseFilter *sender, Quality qm) {
GSTOutPin *pin = impl_from_IQualityControl(iface);
REFERENCE_TIME late = qm.Late;
+ mark_wine_thread();
if (qm.Late < 0 && -qm.Late > qm.TimeStamp)
late = -qm.TimeStamp;
gst_pad_push_event(pin->my_sink, gst_event_new_qos(1000./qm.Proportion, late*100, qm.TimeStamp*100));
@@ -1474,6 +1501,8 @@ static ULONG WINAPI GSTOutPin_Release(IPin *iface) {
ULONG refCount = InterlockedDecrement(&This->pin.pin.refCount);
TRACE("(%p)->() Release from %d\n", iface, refCount + 1);
+ mark_wine_thread();
+
if (!refCount) {
if (This->their_src)
gst_pad_unlink(This->their_src, This->my_sink);
@@ -1608,6 +1637,7 @@ static HRESULT GST_RemoveOutputPins(GSTImpl *This) {
ULONG i;
GSTOutPin **ppOldPins = This->ppPins;
TRACE("(%p)\n", This);
+ mark_wine_thread();
if (!This->gstfilter)
return S_OK;
@@ -1657,6 +1687,8 @@ static HRESULT WINAPI GSTInPin_ReceiveConnection(IPin *iface, IPin *pReceivePin,
TRACE("(%p/%p)->(%p, %p)\n", This, iface, pReceivePin, pmt);
dump_AM_MEDIA_TYPE(pmt);
+ mark_wine_thread();
+
EnterCriticalSection(This->pin.pCritSec);
if (!This->pin.pConnectedTo) {
ALLOCATOR_PROPERTIES props;
@@ -1711,6 +1743,8 @@ static HRESULT WINAPI GSTInPin_Disconnect(IPin *iface) {
FILTER_STATE state;
TRACE("()\n");
+ mark_wine_thread();
+
hr = IBaseFilter_GetState(This->pin.pinInfo.pFilter, INFINITE, &state);
EnterCriticalSection(This->pin.pCritSec);
if (This->pin.pConnectedTo) {
@@ -1830,3 +1864,175 @@ static const IPinVtbl GST_InputPin_Vtbl = {
GSTInPin_EndFlush,
GSTInPin_NewSegment
};
+
+pthread_mutex_t cb_list_lock = PTHREAD_MUTEX_INITIALIZER;
+pthread_cond_t cb_list_cond = PTHREAD_COND_INITIALIZER;
+struct list cb_list = LIST_INIT(cb_list);
+
+void CALLBACK perform_cb(TP_CALLBACK_INSTANCE *instance, void *user)
+{
+ struct cb_data *cbdata = user;
+
+ TRACE("got cb type: 0x%x\n", cbdata->type);
+
+ switch(cbdata->type)
+ {
+ case WATCH_BUS:
+ {
+ struct watch_bus_data *data = &cbdata->u.watch_bus_data;
+ cbdata->u.watch_bus_data.ret = watch_bus(data->bus, data->msg, data->user);
+ break;
+ }
+ case EXISTING_NEW_PAD:
+ {
+ struct existing_new_pad_data *data = &cbdata->u.existing_new_pad_data;
+ existing_new_pad(data->bin, data->pad, data->last, data->user);
+ break;
+ }
+ case CHECK_GET_RANGE:
+ {
+ struct check_get_range_data *data = &cbdata->u.check_get_range_data;
+ cbdata->u.check_get_range_data.ret = check_get_range(data->pad);
+ break;
+ }
+ case QUERY_FUNCTION:
+ {
+ struct query_function_data *data = &cbdata->u.query_function_data;
+ cbdata->u.query_function_data.ret = query_function(data->pad, data->query);
+ break;
+ }
+ case ACTIVATE_PUSH:
+ {
+ struct activate_push_data *data = &cbdata->u.activate_push_data;
+ cbdata->u.activate_push_data.ret = activate_push(data->pad, data->activate);
+ break;
+ }
+ case NO_MORE_PADS:
+ {
+ struct no_more_pads_data *data = &cbdata->u.no_more_pads_data;
+ no_more_pads(data->decodebin, data->user);
+ break;
+ }
+ case REQUEST_BUFFER_SRC:
+ {
+ struct request_buffer_src_data *data = &cbdata->u.request_buffer_src_data;
+ cbdata->u.request_buffer_src_data.ret = request_buffer_src(data->pad,
+ data->ofs, data->len, data->buf);
+ break;
+ }
+ case EVENT_SRC:
+ {
+ struct event_src_data *data = &cbdata->u.event_src_data;
+ cbdata->u.event_src_data.ret = event_src(data->pad, data->event);
+ break;
+ }
+ case EVENT_SINK:
+ {
+ struct event_sink_data *data = &cbdata->u.event_sink_data;
+ cbdata->u.event_sink_data.ret = event_sink(data->pad, data->event);
+ break;
+ }
+ case REQUEST_BUFFER_SINK:
+ {
+ struct request_buffer_sink_data *data = &cbdata->u.request_buffer_sink_data;
+ cbdata->u.request_buffer_sink_data.ret = request_buffer_sink(data->pad,
+ data->ofs, data->size, data->caps, data->buf);
+ break;
+ }
+ case ACCEPT_CAPS_SINK:
+ {
+ struct accept_caps_sink_data *data = &cbdata->u.accept_caps_sink_data;
+ cbdata->u.accept_caps_sink_data.ret = accept_caps_sink(data->pad, data->caps);
+ break;
+ }
+ case SETCAPS_SINK:
+ {
+ struct setcaps_sink_data *data = &cbdata->u.setcaps_sink_data;
+ cbdata->u.setcaps_sink_data.ret = setcaps_sink(data->pad, data->caps);
+ break;
+ }
+ case GOT_DATA_SINK:
+ {
+ struct got_data_sink_data *data = &cbdata->u.got_data_sink_data;
+ cbdata->u.got_data_sink_data.ret = got_data_sink(data->pad, data->buf);
+ break;
+ }
+ case GOT_DATA:
+ {
+ struct got_data_data *data = &cbdata->u.got_data_data;
+ cbdata->u.got_data_data.ret = got_data(data->pad, data->buf);
+ break;
+ }
+ case REQUEST_BUFFER:
+ {
+ struct request_buffer_data *data = &cbdata->u.request_buffer_data;
+ cbdata->u.request_buffer_data.ret = request_buffer(data->pad,
+ data->ofs, data->size, data->caps, data->buf);
+ break;
+ }
+ case REMOVED_DECODED_PAD:
+ {
+ struct removed_decoded_pad_data *data = &cbdata->u.removed_decoded_pad_data;
+ removed_decoded_pad(data->bin, data->pad, data->user);
+ break;
+ }
+ case AUTOPLUG_BLACKLIST:
+ {
+ struct autoplug_blacklist_data *data = &cbdata->u.autoplug_blacklist_data;
+ cbdata->u.autoplug_blacklist_data.ret = autoplug_blacklist(data->bin,
+ data->pad, data->caps, data->fact, data->user);
+ break;
+ }
+ case UNKNOWN_TYPE:
+ {
+ struct unknown_type_data *data = &cbdata->u.unknown_type_data;
+ unknown_type(data->bin, data->pad, data->caps, data->user);
+ break;
+ }
+ case RELEASE_SAMPLE:
+ {
+ struct release_sample_data *data = &cbdata->u.release_sample_data;
+ release_sample(data->data);
+ break;
+ }
+ case TRANSFORM_PAD_ADDED:
+ {
+ struct transform_pad_added_data *data = &cbdata->u.transform_pad_added_data;
+ Gstreamer_transform_pad_added(data->filter, data->pad, data->user);
+ break;
+ }
+ }
+
+ pthread_mutex_lock(&cbdata->lock);
+ cbdata->finished = 1;
+ pthread_cond_broadcast(&cbdata->cond);
+ pthread_mutex_unlock(&cbdata->lock);
+}
+
+static DWORD WINAPI dispatch_thread(void *user)
+{
+ struct cb_data *cbdata;
+
+ pthread_mutex_lock(&cb_list_lock);
+
+ while(1){
+ pthread_cond_wait(&cb_list_cond, &cb_list_lock);
+
+ while(!list_empty(&cb_list)){
+ cbdata = LIST_ENTRY(list_head(&cb_list), struct cb_data, entry);
+ list_remove(&cbdata->entry);
+
+ TrySubmitThreadpoolCallback(&perform_cb, cbdata, NULL);
+ }
+ }
+
+ pthread_mutex_unlock(&cb_list_lock);
+
+ return 0;
+}
+
+void start_dispatch_thread(void)
+{
+ pthread_key_create(&wine_gst_key, NULL);
+ CreateThread(NULL, 0, &dispatch_thread, NULL, 0, NULL);
+}
diff --git a/dlls/winegstreamer/gsttffilter.c b/dlls/winegstreamer/gsttffilter.c
index 28f5d5d..ed5457b 100644
--- a/dlls/winegstreamer/gsttffilter.c
+++ b/dlls/winegstreamer/gsttffilter.c
@@ -28,6 +28,7 @@
#include "gst_private.h"
#include "gst_guids.h"
+#include "gst_cbs.h"
#include "uuids.h"
#include "mmreg.h"
@@ -125,6 +126,8 @@ static HRESULT WINAPI Gstreamer_transform_ProcessBegin(TransformFilter *iface) {
GstTfImpl *This = (GstTfImpl*)iface;
int ret;
+ mark_wine_thread();
+
ret = gst_element_set_state(This->filter, GST_STATE_PLAYING);
TRACE("Returned: %i\n", ret);
return S_OK;
@@ -146,12 +149,7 @@ static HRESULT WINAPI Gstreamer_transform_DecideBufferSize(TransformFilter *tf,
return IMemAllocator_SetProperties(pAlloc, ppropInputRequest, &actual);
}
-static void release_sample(void *data) {
- TRACE("Releasing %p\n", data);
- IMediaSample_Release((IMediaSample *)data);
-}
-
-static GstFlowReturn got_data(GstPad *pad, GstBuffer *buf) {
+GstFlowReturn got_data(GstPad *pad, GstBuffer *buf) {
GstTfImpl *This = gst_pad_get_element_private(pad);
IMediaSample *sample = GST_APP_BUFFER(buf)->priv;
REFERENCE_TIME tStart, tStop;
@@ -188,7 +186,7 @@ static GstFlowReturn got_data(GstPad *pad, GstBuffer *buf) {
return GST_FLOW_OK;
}
-static GstFlowReturn request_buffer(GstPad *pad, guint64 ofs, guint size, GstCaps *caps, GstBuffer **buf) {
+GstFlowReturn request_buffer(GstPad *pad, guint64 ofs, guint size, GstCaps *caps, GstBuffer **buf) {
GstTfImpl *This = gst_pad_get_element_private(pad);
IMediaSample *sample;
BYTE *ptr;
@@ -202,7 +200,7 @@ static GstFlowReturn request_buffer(GstPad *pad, guint64 ofs, guint size, GstCap
}
IMediaSample_SetActualDataLength(sample, size);
IMediaSample_GetPointer(sample, &ptr);
- *buf = gst_app_buffer_new(ptr, size, release_sample, sample);
+ *buf = gst_app_buffer_new(ptr, size, release_sample_wrapper, sample);
if (!*buf) {
IMediaSample_Release(sample);
@@ -224,9 +222,11 @@ static HRESULT WINAPI Gstreamer_transform_ProcessData(TransformFilter *iface, IM
int ret;
TRACE("Reading %p\n", sample);
+ mark_wine_thread();
+
EnterCriticalSection(&This->tf.csReceive);
IMediaSample_GetPointer(sample, &data);
- buf = gst_app_buffer_new(data, IMediaSample_GetActualDataLength(sample), release_sample, sample);
+ buf = gst_app_buffer_new(data, IMediaSample_GetActualDataLength(sample), release_sample_wrapper, sample);
if (!buf) {
LeaveCriticalSection(&This->tf.csReceive);
return S_OK;
@@ -267,6 +267,8 @@ static HRESULT WINAPI Gstreamer_transform_ProcessEnd(TransformFilter *iface) {
GstTfImpl *This = (GstTfImpl*)iface;
int ret;
+ mark_wine_thread();
+
LeaveCriticalSection(&This->tf.csReceive);
ret = gst_element_set_state(This->filter, GST_STATE_READY);
EnterCriticalSection(&This->tf.csReceive);
@@ -274,8 +276,9 @@ static HRESULT WINAPI Gstreamer_transform_ProcessEnd(TransformFilter *iface) {
return S_OK;
}
-static void Gstreamer_transform_pad_added(GstElement *filter, GstPad *pad, GstTfImpl *This)
+void Gstreamer_transform_pad_added(GstElement *filter, GstPad *pad, gpointer user)
{
+ GstTfImpl *This = (GstTfImpl*)user;
int ret;
if (!GST_PAD_IS_SRC(pad))
return;
@@ -294,6 +297,8 @@ static HRESULT Gstreamer_transform_ConnectInput(GstTfImpl *This, const AM_MEDIA_
BOOL done = FALSE, found = FALSE;
int ret;
+ mark_wine_thread();
+
This->filter = gst_element_factory_make(This->gstreamer_name, NULL);
if (!This->filter) {
FIXME("Could not make %s filter\n", This->gstreamer_name);
@@ -303,8 +308,8 @@ static HRESULT Gstreamer_transform_ConnectInput(GstTfImpl *This, const AM_MEDIA_
gst_pad_set_element_private (This->my_src, This);
This->my_sink = gst_pad_new(NULL, GST_PAD_SINK);
- gst_pad_set_chain_function(This->my_sink, got_data);
- gst_pad_set_bufferalloc_function(This->my_sink, request_buffer);
+ gst_pad_set_chain_function(This->my_sink, got_data_wrapper);
+ gst_pad_set_bufferalloc_function(This->my_sink, request_buffer_wrapper);
gst_pad_set_element_private (This->my_sink, This);
ret = gst_pad_set_caps(This->my_src, capsin);
@@ -362,7 +367,7 @@ static HRESULT Gstreamer_transform_ConnectInput(GstTfImpl *This, const AM_MEDIA_
gst_iterator_free(it);
found = !!This->their_src;
if (!found)
- g_signal_connect(This->filter, "pad-added", G_CALLBACK(Gstreamer_transform_pad_added), This);
+ g_signal_connect(This->filter, "pad-added", G_CALLBACK(Gstreamer_transform_pad_added_wrapper), This);
ret = gst_pad_link(This->my_src, This->their_sink);
if (ret < 0) {
WARN("Failed to link with %i\n", ret);
@@ -382,6 +387,8 @@ static HRESULT Gstreamer_transform_ConnectInput(GstTfImpl *This, const AM_MEDIA_
static HRESULT WINAPI Gstreamer_transform_Cleanup(TransformFilter *tf, PIN_DIRECTION dir) {
GstTfImpl *This = (GstTfImpl*)tf;
+ mark_wine_thread();
+
if (dir == PINDIR_INPUT)
{
if (This->filter) {
@@ -405,6 +412,7 @@ static HRESULT WINAPI Gstreamer_transform_Cleanup(TransformFilter *tf, PIN_DIREC
static HRESULT WINAPI Gstreamer_transform_EndOfStream(TransformFilter *iface) {
GstTfImpl *This = (GstTfImpl*)iface;
TRACE("%p\n", This);
+ mark_wine_thread();
gst_pad_push_event(This->my_src, gst_event_new_eos());
return S_OK;
@@ -413,6 +421,7 @@ static HRESULT WINAPI Gstreamer_transform_EndOfStream(TransformFilter *iface) {
static HRESULT WINAPI Gstreamer_transform_BeginFlush(TransformFilter *iface) {
GstTfImpl *This = (GstTfImpl*)iface;
TRACE("%p\n", This);
+ mark_wine_thread();
gst_pad_push_event(This->my_src, gst_event_new_flush_start());
return S_OK;
@@ -421,6 +430,7 @@ static HRESULT WINAPI Gstreamer_transform_BeginFlush(TransformFilter *iface) {
static HRESULT WINAPI Gstreamer_transform_EndFlush(TransformFilter *iface) {
GstTfImpl *This = (GstTfImpl*)iface;
TRACE("%p\n", This);
+ mark_wine_thread();
gst_pad_push_event(This->my_src, gst_event_new_flush_stop());
return S_OK;
@@ -429,6 +439,7 @@ static HRESULT WINAPI Gstreamer_transform_EndFlush(TransformFilter *iface) {
static HRESULT WINAPI Gstreamer_transform_NewSegment(TransformFilter *iface, REFERENCE_TIME tStart, REFERENCE_TIME tStop, double dRate) {
GstTfImpl *This = (GstTfImpl*)iface;
TRACE("%p\n", This);
+ mark_wine_thread();
gst_pad_push_event(This->my_src, gst_event_new_new_segment_full(1,
1.0, dRate, GST_FORMAT_TIME, 0, tStop <= tStart ? -1 : tStop * 100, tStart*100));
@@ -438,6 +449,7 @@ static HRESULT WINAPI Gstreamer_transform_NewSegment(TransformFilter *iface, REF
static HRESULT WINAPI Gstreamer_transform_QOS(TransformFilter *iface, IBaseFilter *sender, Quality qm) {
GstTfImpl *This = (GstTfImpl*)iface;
REFERENCE_TIME late = qm.Late;
+ mark_wine_thread();
if (qm.Late < 0 && -qm.Late > qm.TimeStamp)
late = -qm.TimeStamp;
gst_pad_push_event(This->my_sink, gst_event_new_qos(1000. / qm.Proportion, late * 100, qm.TimeStamp * 100));
@@ -480,6 +492,8 @@ static HRESULT WINAPI Gstreamer_Mp3_SetMediaType(TransformFilter *tf, PIN_DIRECT
HRESULT hr;
int layer;
+ mark_wine_thread();
+
if (dir != PINDIR_INPUT)
return S_OK;
@@ -572,6 +586,7 @@ IUnknown * CALLBACK Gstreamer_Mp3_create(IUnknown *punkout, HRESULT *phr)
*phr = E_FAIL;
return NULL;
}
+ mark_wine_thread();
plugin = Gstreamer_FindMatch("audio/mpeg, mpegversion=(int) 1");
if (!plugin)
{
@@ -620,6 +635,8 @@ static HRESULT WINAPI Gstreamer_YUV_SetMediaType(TransformFilter *tf, PIN_DIRECT
int avgtime;
LONG width, height;
+ mark_wine_thread();
+
if (dir != PINDIR_INPUT)
return S_OK;
@@ -737,6 +754,8 @@ static HRESULT WINAPI Gstreamer_AudioConvert_SetMediaType(TransformFilter *tf, P
BOOL inisfloat = FALSE;
int indepth;
+ mark_wine_thread();
+
if (dir != PINDIR_INPUT)
return S_OK;
diff --git a/dlls/winegstreamer/main.c b/dlls/winegstreamer/main.c
index 4f0e84f..aa0673a 100644
--- a/dlls/winegstreamer/main.c
+++ b/dlls/winegstreamer/main.c
@@ -254,7 +254,6 @@ DWORD Gstreamer_init(void) {
argv[0] = argv0;
argv[1] = argv1;
argv[2] = NULL;
- g_thread_impl_init();
inited = gst_init_check(&argc, &argv, &err);
HeapFree(GetProcessHeap(), 0, argv);
if (err) {
@@ -269,6 +268,8 @@ DWORD Gstreamer_init(void) {
(LPCWSTR)hInst, &newhandle);
if (!newhandle)
ERR("Could not pin module %p\n", hInst);
+
+ start_dispatch_thread();
}
}
return inited;
--
2.7.0
More information about the wine-patches
mailing list