[5/8] vcomp: Add multithreaded implementation for _vcomp_fork.

Sebastian Lackner sebastian at fds-team.de
Tue Jul 14 14:41:10 CDT 2015


---
 dlls/vcomp/main.c |  180 +++++++++++++++++++++++++++++++++++++++++++++++++++---
 1 file changed, 172 insertions(+), 8 deletions(-)

diff --git a/dlls/vcomp/main.c b/dlls/vcomp/main.c
index 03c085b..20aeb4f 100644
--- a/dlls/vcomp/main.c
+++ b/dlls/vcomp/main.c
@@ -4,6 +4,7 @@
  *
  * Copyright 2011 Austin English
  * Copyright 2012 Dan Kegel
+ * Copyright 2015 Sebastian Lackner
  *
  * This library is free software; you can redistribute it and/or
  * modify it under the terms of the GNU Lesser General Public
@@ -23,22 +24,52 @@
 #include "config.h"
 
 #include <stdarg.h>
+#include <assert.h>
 
 #include "windef.h"
 #include "winbase.h"
 #include "wine/debug.h"
+#include "wine/list.h"
 
 WINE_DEFAULT_DEBUG_CHANNEL(vcomp);
 
+static struct list vcomp_idle_threads = LIST_INIT(vcomp_idle_threads);
 static DWORD   vcomp_context_tls = TLS_OUT_OF_INDEXES;
+static HMODULE vcomp_module;
 static int     vcomp_max_threads;
 static int     vcomp_num_threads;
 static BOOL    vcomp_nested_fork = FALSE;
 
+static RTL_CRITICAL_SECTION vcomp_section;
+static RTL_CRITICAL_SECTION_DEBUG critsect_debug =
+{
+    0, 0, &vcomp_section,
+    { &critsect_debug.ProcessLocksList, &critsect_debug.ProcessLocksList },
+      0, 0, { (DWORD_PTR)(__FILE__ ": vcomp_section") }
+};
+static RTL_CRITICAL_SECTION vcomp_section = { &critsect_debug, -1, 0, 0, 0, 0 };
+
 struct vcomp_thread_data
 {
+    struct vcomp_team_data  *team;
     int                     thread_num;
     int                     fork_threads;
+
+    /* only used for concurrent tasks */
+    struct list             entry;
+    CONDITION_VARIABLE      cond;
+};
+
+struct vcomp_team_data
+{
+    CONDITION_VARIABLE      cond;
+    int                     num_threads;
+    int                     finished_threads;
+
+    /* callback arguments */
+    int                     nargs;
+    void                    *wrapper;
+    __ms_va_list            valist;
 };
 
 void CDECL _vcomp_fork_call_wrapper(void *wrapper, int nargs, __ms_va_list args);
@@ -145,8 +176,9 @@ static struct vcomp_thread_data *vcomp_init_thread_data(void)
         ExitProcess(1);
     }
 
-    thread_data->thread_num = 0;
-    thread_data->fork_threads = 0;
+    thread_data->team           = NULL;
+    thread_data->thread_num     = 0;
+    thread_data->fork_threads   = 0;
 
     vcomp_set_thread_data(thread_data);
     return thread_data;
@@ -187,8 +219,9 @@ int CDECL omp_get_num_procs(void)
 
 int CDECL omp_get_num_threads(void)
 {
-    TRACE("stub\n");
-    return 1;
+    struct vcomp_team_data *team_data = vcomp_init_thread_data()->team;
+    TRACE("()\n");
+    return team_data ? team_data->num_threads : 1;
 }
 
 int CDECL omp_get_thread_num(void)
@@ -244,15 +277,145 @@ void CDECL _vcomp_single_end(void)
     TRACE("stub\n");
 }
 
+static DWORD WINAPI _vcomp_fork_worker(void *param)
+{
+    struct vcomp_thread_data *thread_data = param;
+    vcomp_set_thread_data(thread_data);
+
+    TRACE("starting worker thread for %p\n", thread_data);
+
+    EnterCriticalSection(&vcomp_section);
+    for (;;)
+    {
+        struct vcomp_team_data *team = thread_data->team;
+        if (team != NULL)
+        {
+            LeaveCriticalSection(&vcomp_section);
+            _vcomp_fork_call_wrapper(team->wrapper, team->nargs, team->valist);
+            EnterCriticalSection(&vcomp_section);
+
+            thread_data->team = NULL;
+            list_remove(&thread_data->entry);
+            list_add_tail(&vcomp_idle_threads, &thread_data->entry);
+            if (++team->finished_threads >= team->num_threads)
+                WakeAllConditionVariable(&team->cond);
+        }
+
+        if (!SleepConditionVariableCS(&thread_data->cond, &vcomp_section, 5000) &&
+            GetLastError() == ERROR_TIMEOUT && !thread_data->team)
+        {
+            break;
+        }
+    }
+    list_remove(&thread_data->entry);
+    LeaveCriticalSection(&vcomp_section);
+
+    TRACE("terminating worker thread for %p\n", thread_data);
+
+    HeapFree(GetProcessHeap(), 0, thread_data);
+    vcomp_set_thread_data(NULL);
+    FreeLibraryAndExitThread(vcomp_module, 0);
+    return 0;
+}
+
 void WINAPIV _vcomp_fork(BOOL ifval, int nargs, void *wrapper, ...)
 {
-    __ms_va_list valist;
+    struct vcomp_thread_data *prev_thread_data = vcomp_init_thread_data();
+    struct vcomp_thread_data thread_data;
+    struct vcomp_team_data team_data;
+    int num_threads;
 
     TRACE("(%d, %d, %p, ...)\n", ifval, nargs, wrapper);
 
-    __ms_va_start(valist, wrapper);
-    _vcomp_fork_call_wrapper(wrapper, nargs, valist);
-    __ms_va_end(valist);
+    if (!ifval)
+        num_threads = 1;
+    else if (prev_thread_data->team && !vcomp_nested_fork)
+        num_threads = 1;
+    else if (prev_thread_data->fork_threads)
+        num_threads = prev_thread_data->fork_threads;
+    else
+        num_threads = vcomp_num_threads;
+
+    InitializeConditionVariable(&team_data.cond);
+    team_data.num_threads       = 1;
+    team_data.finished_threads  = 0;
+    team_data.nargs             = nargs;
+    team_data.wrapper           = wrapper;
+    __ms_va_start(team_data.valist, wrapper);
+
+    thread_data.team            = &team_data;
+    thread_data.thread_num      = 0;
+    thread_data.fork_threads    = 0;
+    list_init(&thread_data.entry);
+    InitializeConditionVariable(&thread_data.cond);
+
+    if (num_threads > 1)
+    {
+        struct list *ptr;
+        EnterCriticalSection(&vcomp_section);
+
+        /* reuse existing threads (if any) */
+        while (team_data.num_threads < num_threads && (ptr = list_head(&vcomp_idle_threads)))
+        {
+            struct vcomp_thread_data *data = LIST_ENTRY(ptr, struct vcomp_thread_data, entry);
+            data->team          = &team_data;
+            data->thread_num    = team_data.num_threads++;
+            data->fork_threads  = 0;
+            list_remove(&data->entry);
+            list_add_tail(&thread_data.entry, &data->entry);
+            WakeAllConditionVariable(&data->cond);
+        }
+
+        /* spawn additional threads */
+        while (team_data.num_threads < num_threads)
+        {
+            struct vcomp_thread_data *data;
+            HMODULE module;
+            HANDLE thread;
+
+            data = HeapAlloc(GetProcessHeap(), 0, sizeof(*data));
+            if (!data) break;
+
+            data->team          = &team_data;
+            data->thread_num    = team_data.num_threads;
+            data->fork_threads  = 0;
+            InitializeConditionVariable(&data->cond);
+
+            thread = CreateThread(NULL, 0, _vcomp_fork_worker, data, 0, NULL);
+            if (!thread)
+            {
+                HeapFree(GetProcessHeap(), 0, data);
+                break;
+            }
+
+            GetModuleHandleExW(GET_MODULE_HANDLE_EX_FLAG_FROM_ADDRESS,
+                               (const WCHAR *)vcomp_module, &module);
+            team_data.num_threads++;
+            list_add_tail(&thread_data.entry, &data->entry);
+            CloseHandle(thread);
+        }
+
+        LeaveCriticalSection(&vcomp_section);
+    }
+
+    vcomp_set_thread_data(&thread_data);
+    _vcomp_fork_call_wrapper(team_data.wrapper, team_data.nargs, team_data.valist);
+    vcomp_set_thread_data(prev_thread_data);
+    prev_thread_data->fork_threads = 0;
+
+    if (team_data.num_threads > 1)
+    {
+        EnterCriticalSection(&vcomp_section);
+
+        team_data.finished_threads++;
+        while (team_data.finished_threads < team_data.num_threads)
+            SleepConditionVariableCS(&team_data.cond, &vcomp_section, INFINITE);
+
+        LeaveCriticalSection(&vcomp_section);
+        assert(list_empty(&thread_data.entry));
+    }
+
+    __ms_va_end(team_data.valist);
 }
 
 BOOL WINAPI DllMain(HINSTANCE instance, DWORD reason, LPVOID reserved)
@@ -275,6 +438,7 @@ BOOL WINAPI DllMain(HINSTANCE instance, DWORD reason, LPVOID reserved)
             }
 
             GetSystemInfo(&sysinfo);
+            vcomp_module      = instance;
             vcomp_max_threads = sysinfo.dwNumberOfProcessors;
             vcomp_num_threads = sysinfo.dwNumberOfProcessors;
             break;
-- 
2.4.5



More information about the wine-patches mailing list