[PATCH v2 4/5] winegstreamer: Support zero-copy output using the allocator.

Rémi Bernon wine at gitlab.winehq.org
Wed Jun 8 04:09:38 CDT 2022


From: Rémi Bernon <rbernon at codeweavers.com>

Through a custom allocator, by borrowing memory from the reading thread
and mapping it instead of the allocated memory.

We cannot use the buffer pool to share wrapped buffers, because some
decoder will hold on the acquired buffers longer than they should and
we cannot remove our memory from them as long as they keep a reference.

Swapping the memory on map should be safe.

Signed-off-by: Rémi Bernon <rbernon at codeweavers.com>
---
 dlls/winegstreamer/mfplat.c       |   6 ++
 dlls/winegstreamer/unix_private.h |   7 +-
 dlls/winegstreamer/unixlib.h      |   1 +
 dlls/winegstreamer/wg_allocator.c | 135 ++++++++++++++++++++++++++++--
 dlls/winegstreamer/wg_transform.c |  98 +++++++++++++++++++---
 5 files changed, 230 insertions(+), 17 deletions(-)

diff --git a/dlls/winegstreamer/mfplat.c b/dlls/winegstreamer/mfplat.c
index 28a3fc20ead..40199706759 100644
--- a/dlls/winegstreamer/mfplat.c
+++ b/dlls/winegstreamer/mfplat.c
@@ -999,6 +999,12 @@ void wg_sample_release(struct wg_sample *wg_sample)
 {
     struct mf_sample *mf_sample = CONTAINING_RECORD(wg_sample, struct mf_sample, wg_sample);
 
+    if (InterlockedOr(&wg_sample->refcount, 0))
+    {
+        ERR("Sample %p is still in use, trouble ahead!\n", wg_sample);
+        return;
+    }
+
     IMFMediaBuffer_Unlock(mf_sample->media_buffer);
     IMFMediaBuffer_Release(mf_sample->media_buffer);
     IMFSample_Release(mf_sample->sample);
diff --git a/dlls/winegstreamer/unix_private.h b/dlls/winegstreamer/unix_private.h
index 16615ef0833..e9f472986ae 100644
--- a/dlls/winegstreamer/unix_private.h
+++ b/dlls/winegstreamer/unix_private.h
@@ -37,7 +37,12 @@ extern NTSTATUS wg_transform_destroy(void *args) DECLSPEC_HIDDEN;
 extern NTSTATUS wg_transform_push_data(void *args) DECLSPEC_HIDDEN;
 extern NTSTATUS wg_transform_read_data(void *args) DECLSPEC_HIDDEN;
 
-extern GstAllocator *wg_allocator_create(void) DECLSPEC_HIDDEN;
+/* wg_allocator_release_sample can be used to release any sample that was requested. */
+typedef struct wg_sample *(*wg_allocator_request_sample_cb)(gsize size, void *context);
+extern GstAllocator *wg_allocator_create(wg_allocator_request_sample_cb request_sample,
+        void *request_sample_context) DECLSPEC_HIDDEN;
 extern void wg_allocator_destroy(GstAllocator *allocator) DECLSPEC_HIDDEN;
+extern void wg_allocator_release_sample(GstAllocator *allocator, struct wg_sample *sample,
+        bool discard_data) DECLSPEC_HIDDEN;
 
 #endif /* __WINE_WINEGSTREAMER_UNIX_PRIVATE_H */
diff --git a/dlls/winegstreamer/unixlib.h b/dlls/winegstreamer/unixlib.h
index f334a168bd1..860a8ab2a52 100644
--- a/dlls/winegstreamer/unixlib.h
+++ b/dlls/winegstreamer/unixlib.h
@@ -128,6 +128,7 @@ struct wg_sample
     /* timestamp and duration are in 100-nanosecond units. */
     UINT64 pts;
     UINT64 duration;
+    LONG refcount; /* unix refcount */
     UINT32 flags;
     UINT32 max_size;
     UINT32 size;
diff --git a/dlls/winegstreamer/wg_allocator.c b/dlls/winegstreamer/wg_allocator.c
index 90dada288ae..c31751ce83f 100644
--- a/dlls/winegstreamer/wg_allocator.c
+++ b/dlls/winegstreamer/wg_allocator.c
@@ -33,20 +33,33 @@
 
 #include "unix_private.h"
 
+#include "wine/list.h"
+
 GST_DEBUG_CATEGORY_EXTERN(wine);
 #define GST_CAT_DEFAULT wine
 
 typedef struct
 {
     GstMemory parent;
+    struct list entry;
 
     GstMemory *unix_memory;
     GstMapInfo unix_map_info;
+
+    struct wg_sample *sample;
+    gsize written;
 } WgMemory;
 
 typedef struct
 {
     GstAllocator parent;
+
+    wg_allocator_request_sample_cb request_sample;
+    void *request_sample_context;
+
+    pthread_mutex_t mutex;
+    pthread_cond_t release_cond;
+    struct list memory_list;
 } WgAllocator;
 
 typedef struct
@@ -58,6 +71,7 @@ G_DEFINE_TYPE(WgAllocator, wg_allocator, GST_TYPE_ALLOCATOR);
 
 static gpointer wg_allocator_map(GstMemory *gst_memory, GstMapInfo *info, gsize maxsize)
 {
+    WgAllocator *allocator = (WgAllocator *)gst_memory->allocator;
     WgMemory *memory = (WgMemory *)gst_memory;
 
     if (gst_memory->parent)
@@ -65,7 +79,19 @@ static gpointer wg_allocator_map(GstMemory *gst_memory, GstMapInfo *info, gsize
 
     GST_LOG("memory %p, info %p, maxsize %#zx", memory, info, maxsize);
 
-    info->data = memory->unix_map_info.data;
+    pthread_mutex_lock(&allocator->mutex);
+
+    if (!memory->sample)
+        info->data = memory->unix_map_info.data;
+    else
+    {
+        InterlockedIncrement(&memory->sample->refcount);
+        info->data = memory->sample->data;
+    }
+    if (info->flags & GST_MAP_WRITE)
+        memory->written = max(memory->written, maxsize);
+
+    pthread_mutex_unlock(&allocator->mutex);
 
     GST_INFO("Mapped memory %p to %p", memory, info->data);
     return info->data;
@@ -73,12 +99,23 @@ static gpointer wg_allocator_map(GstMemory *gst_memory, GstMapInfo *info, gsize
 
 static void wg_allocator_unmap(GstMemory *gst_memory, GstMapInfo *info)
 {
+    WgAllocator *allocator = (WgAllocator *)gst_memory->allocator;
     WgMemory *memory = (WgMemory *)gst_memory;
 
     if (gst_memory->parent)
         return wg_allocator_unmap(gst_memory->parent, info);
 
     GST_LOG("memory %p, info %p", memory, info);
+
+    pthread_mutex_lock(&allocator->mutex);
+
+    if (memory->sample && info->data == memory->sample->data)
+    {
+        InterlockedDecrement(&memory->sample->refcount);
+        pthread_cond_signal(&allocator->release_cond);
+    }
+
+    pthread_mutex_unlock(&allocator->mutex);
 }
 
 static void wg_allocator_init(WgAllocator *allocator)
@@ -91,6 +128,10 @@ static void wg_allocator_init(WgAllocator *allocator)
     allocator->parent.mem_unmap_full = wg_allocator_unmap;
 
     GST_OBJECT_FLAG_SET(allocator, GST_ALLOCATOR_FLAG_CUSTOM_ALLOC);
+
+    pthread_mutex_init(&allocator->mutex, NULL);
+    pthread_cond_init(&allocator->release_cond, NULL);
+    list_init(&allocator->memory_list);
 }
 
 static void wg_allocator_finalize(GObject *object)
@@ -99,6 +140,9 @@ static void wg_allocator_finalize(GObject *object)
 
     GST_LOG("allocator %p", allocator);
 
+    pthread_cond_destroy(&allocator->release_cond);
+    pthread_mutex_destroy(&allocator->mutex);
+
     G_OBJECT_CLASS(wg_allocator_parent_class)->finalize(object);
 }
 
@@ -116,8 +160,15 @@ static GstMemory *wg_allocator_alloc(GstAllocator *gst_allocator, gsize size,
     memory->unix_memory = gst_allocator_alloc(NULL, size, params);
     gst_memory_map(memory->unix_memory, &memory->unix_map_info, GST_MAP_WRITE);
 
-    GST_INFO("Allocated memory %p, unix_memory %p, data %p", memory, memory->unix_memory,
-            memory->unix_map_info.data);
+    pthread_mutex_lock(&allocator->mutex);
+
+    memory->sample = allocator->request_sample(size, allocator->request_sample_context);
+    list_add_tail(&allocator->memory_list, &memory->entry);
+
+    pthread_mutex_unlock(&allocator->mutex);
+
+    GST_INFO("Allocated memory %p, sample %p, unix_memory %p, data %p", memory,
+            memory->sample, memory->unix_memory, memory->unix_map_info.data);
     return (GstMemory *)memory;
 }
 
@@ -128,6 +179,16 @@ static void wg_allocator_free(GstAllocator *gst_allocator, GstMemory *gst_memory
 
     GST_LOG("allocator %p, memory %p", allocator, memory);
 
+    pthread_mutex_lock(&allocator->mutex);
+
+    if (memory->sample)
+        InterlockedDecrement(&memory->sample->refcount);
+    memory->sample = NULL;
+
+    list_remove(&memory->entry);
+
+    pthread_mutex_unlock(&allocator->mutex);
+
     gst_memory_unmap(memory->unix_memory, &memory->unix_map_info);
     gst_memory_unref(memory->unix_memory);
     g_slice_free(WgMemory, memory);
@@ -145,18 +206,82 @@ static void wg_allocator_class_init(WgAllocatorClass *klass)
     root_class->finalize = wg_allocator_finalize;
 }
 
-GstAllocator *wg_allocator_create(void)
+GstAllocator *wg_allocator_create(wg_allocator_request_sample_cb request_sample, void *request_sample_context)
+{
+    WgAllocator *allocator;
+
+    if (!(allocator = g_object_new(wg_allocator_get_type(), NULL)))
+        return NULL;
+
+    allocator->request_sample = request_sample;
+    allocator->request_sample_context = request_sample_context;
+    return GST_ALLOCATOR(allocator);
+}
+
+static void release_memory_sample(WgAllocator *allocator, WgMemory *memory, bool discard_data)
 {
-    return g_object_new(wg_allocator_get_type(), NULL);
+    struct wg_sample *sample;
+
+    if (!(sample = memory->sample))
+        return;
+
+    while (sample->refcount > 1)
+    {
+        GST_WARNING("Waiting for sample %p to be unmapped", sample);
+        pthread_cond_wait(&allocator->release_cond, &allocator->mutex);
+    }
+    InterlockedDecrement(&sample->refcount);
+
+    if (memory->written && !discard_data)
+    {
+        GST_WARNING("Copying %#zx bytes from sample %p, back to memory %p", memory->written, sample, memory);
+        memcpy(memory->unix_map_info.data, memory->sample->data, memory->written);
+    }
+
+    memory->sample = NULL;
+    GST_INFO("Released sample %p from memory %p", sample, memory);
 }
 
 void wg_allocator_destroy(GstAllocator *gst_allocator)
 {
     WgAllocator *allocator = (WgAllocator *)gst_allocator;
+    WgMemory *memory;
 
     GST_LOG("allocator %p", allocator);
 
+    pthread_mutex_lock(&allocator->mutex);
+    LIST_FOR_EACH_ENTRY(memory, &allocator->memory_list, WgMemory, entry)
+        release_memory_sample(allocator, memory, true);
+    pthread_mutex_unlock(&allocator->mutex);
+
     g_object_unref(allocator);
 
     GST_INFO("Destroyed buffer allocator %p", allocator);
 }
+
+static WgMemory *find_sample_memory(WgAllocator *allocator, struct wg_sample *sample)
+{
+    WgMemory *memory;
+
+    LIST_FOR_EACH_ENTRY(memory, &allocator->memory_list, WgMemory, entry)
+        if (memory->sample == sample)
+            return memory;
+
+    return NULL;
+}
+
+void wg_allocator_release_sample(GstAllocator *gst_allocator, struct wg_sample *sample,
+        bool discard_data)
+{
+    WgAllocator *allocator = (WgAllocator *)gst_allocator;
+    WgMemory *memory;
+
+    GST_LOG("allocator %p, sample %p, discard_data %u", allocator, sample, discard_data);
+
+    pthread_mutex_lock(&allocator->mutex);
+    if ((memory = find_sample_memory(allocator, sample)))
+        release_memory_sample(allocator, memory, discard_data);
+    else if (sample->refcount)
+        GST_ERROR("Couldn't find memory for sample %p", sample);
+    pthread_mutex_unlock(&allocator->mutex);
+}
diff --git a/dlls/winegstreamer/wg_transform.c b/dlls/winegstreamer/wg_transform.c
index c87536e5fbb..31d50e389cf 100644
--- a/dlls/winegstreamer/wg_transform.c
+++ b/dlls/winegstreamer/wg_transform.c
@@ -54,6 +54,7 @@ struct wg_transform
     GstBufferList *input;
     guint input_max_length;
     guint output_plane_align;
+    struct wg_sample *output_wg_sample;
     GstAtomicQueue *output_queue;
     GstSample *output_sample;
     bool output_caps_changed;
@@ -305,6 +306,20 @@ static bool transform_append_element(struct wg_transform *transform, GstElement
     return success;
 }
 
+static struct wg_sample *transform_request_sample(gsize size, void *context)
+{
+    struct wg_transform *transform = context;
+    struct wg_sample *sample;
+
+    GST_LOG("size %#zx, context %p", size, transform);
+
+    sample = InterlockedExchangePointer((void **)&transform->output_wg_sample, NULL);
+    if (!sample || sample->max_size < size)
+        return NULL;
+
+    return sample;
+}
+
 NTSTATUS wg_transform_create(void *args)
 {
     struct wg_transform_create_params *params = args;
@@ -329,7 +344,7 @@ NTSTATUS wg_transform_create(void *args)
         goto out;
     if (!(transform->output_queue = gst_atomic_queue_new(8)))
         goto out;
-    if (!(transform->allocator = wg_allocator_create()))
+    if (!(transform->allocator = wg_allocator_create(transform_request_sample, transform)))
         goto out;
     transform->input_max_length = 1;
     transform->output_plane_align = 0;
@@ -622,10 +637,22 @@ static bool copy_buffer(GstBuffer *buffer, GstCaps *caps, struct wg_sample *samp
 static NTSTATUS read_transform_output_data(GstBuffer *buffer, GstCaps *caps, gsize plane_align,
         struct wg_sample *sample)
 {
+    bool ret, needs_copy;
     gsize total_size;
-    bool ret;
+    GstMapInfo info;
+
+    if (!gst_buffer_map(buffer, &info, GST_MAP_READ))
+    {
+        GST_ERROR("Failed to map buffer %p", buffer);
+        sample->size = 0;
+        return STATUS_UNSUCCESSFUL;
+    }
+    needs_copy = info.data != sample->data;
+    gst_buffer_unmap(buffer, &info);
 
-    if (is_caps_video(caps))
+    if ((ret = !needs_copy))
+        total_size = sample->size = info.size;
+    else if (is_caps_video(caps))
         ret = copy_video_buffer(buffer, caps, plane_align, sample, &total_size);
     else
         ret = copy_buffer(buffer, caps, sample, &total_size);
@@ -657,7 +684,18 @@ static NTSTATUS read_transform_output_data(GstBuffer *buffer, GstCaps *caps, gsi
     if (!GST_BUFFER_FLAG_IS_SET(buffer, GST_BUFFER_FLAG_DELTA_UNIT))
         sample->flags |= WG_SAMPLE_FLAG_SYNC_POINT;
 
-    GST_INFO("Copied %u bytes, sample %p, flags %#x", sample->size, sample, sample->flags);
+    if (needs_copy)
+    {
+        if (is_caps_video(caps))
+            GST_WARNING("Copied %u bytes, sample %p, flags %#x", sample->size, sample, sample->flags);
+        else
+            GST_INFO("Copied %u bytes, sample %p, flags %#x", sample->size, sample, sample->flags);
+    }
+    else if (sample->flags & WG_SAMPLE_FLAG_INCOMPLETE)
+        GST_ERROR("Partial read %u bytes, sample %p, flags %#x", sample->size, sample, sample->flags);
+    else
+        GST_INFO("Read %u bytes, sample %p, flags %#x", sample->size, sample, sample->flags);
+
     return STATUS_SUCCESS;
 }
 
@@ -667,23 +705,38 @@ NTSTATUS wg_transform_read_data(void *args)
     struct wg_transform *transform = params->transform;
     struct wg_sample *sample = params->sample;
     struct wg_format *format = params->format;
-    GstBufferList *input = transform->input;
+    GstFlowReturn ret = GST_FLOW_OK;
     GstBuffer *output_buffer;
+    GstBufferList *input;
     GstCaps *output_caps;
-    GstFlowReturn ret;
+    bool discard_data;
     NTSTATUS status;
 
+    /* Provide the sample for transform_request_sample to pick it up */
+    InterlockedIncrement(&sample->refcount);
+    InterlockedExchangePointer((void **)&transform->output_wg_sample, sample);
+
     if (!gst_buffer_list_length(transform->input))
         GST_DEBUG("Not input buffer queued");
-    else if (!(transform->input = gst_buffer_list_new()))
+    else if ((input = gst_buffer_list_new()))
+    {
+        ret = gst_pad_push_list(transform->my_src, transform->input);
+        transform->input = input;
+    }
+    else
     {
         GST_ERROR("Failed to allocate new input queue");
-        gst_buffer_list_unref(input);
-        return STATUS_NO_MEMORY;
+        ret = GST_FLOW_ERROR;
     }
-    else if ((ret = gst_pad_push_list(transform->my_src, input)))
+
+    /* Remove the sample so transform_request_sample cannot use it */
+    if (InterlockedExchangePointer((void **)&transform->output_wg_sample, NULL))
+        InterlockedDecrement(&sample->refcount);
+
+    if (ret)
     {
         GST_ERROR("Failed to push transform input, error %d", ret);
+        wg_allocator_release_sample(transform->allocator, sample, false);
         return STATUS_UNSUCCESSFUL;
     }
 
@@ -692,6 +745,7 @@ NTSTATUS wg_transform_read_data(void *args)
         sample->size = 0;
         params->result = MF_E_TRANSFORM_NEED_MORE_INPUT;
         GST_INFO("Cannot read %u bytes, no output available", sample->max_size);
+        wg_allocator_release_sample(transform->allocator, sample, false);
         return STATUS_SUCCESS;
     }
 
@@ -730,19 +784,41 @@ NTSTATUS wg_transform_read_data(void *args)
 
         params->result = MF_E_TRANSFORM_STREAM_CHANGE;
         GST_INFO("Format changed detected, returning no output");
+        wg_allocator_release_sample(transform->allocator, sample, false);
         return STATUS_SUCCESS;
     }
 
     if ((status = read_transform_output_data(output_buffer, output_caps,
                 transform->output_plane_align, sample)))
+    {
+        wg_allocator_release_sample(transform->allocator, sample, false);
         return status;
+    }
 
-    if (!(sample->flags & WG_SAMPLE_FLAG_INCOMPLETE))
+    if (sample->flags & WG_SAMPLE_FLAG_INCOMPLETE)
+        discard_data = false;
+    else
     {
+        /* Taint the buffer memory to make sure it cannot be reused by the buffer pool,
+         * for the pool to always requests new memory from the allocator, and so we can
+         * then always provide output sample memory to achieve zero-copy.
+         *
+         * However, some decoder keep a reference on the buffer they passed downstream,
+         * to re-use it later. In this case, it will not be possible to do zero-copy,
+         * and we should copy the data back to the buffer and leave it unchanged.
+         *
+         * Some other plugins make assumptions that the returned buffer will always have
+         * at least one memory attached, we cannot just remove it and need to replace the
+         * memory instead.
+         */
+        if ((discard_data = gst_buffer_is_writable(output_buffer)))
+            gst_buffer_replace_all_memory(output_buffer, gst_allocator_alloc(NULL, 0, NULL));
+
         gst_sample_unref(transform->output_sample);
         transform->output_sample = NULL;
     }
 
     params->result = S_OK;
+    wg_allocator_release_sample(transform->allocator, sample, discard_data);
     return STATUS_SUCCESS;
 }
-- 
GitLab


https://gitlab.winehq.org/wine/wine/-/merge_requests/197



More information about the wine-devel mailing list