[5/8] vcomp: Add multithreaded implementation for _vcomp_fork.
Sebastian Lackner
sebastian at fds-team.de
Tue Jul 14 14:41:10 CDT 2015
---
dlls/vcomp/main.c | 180 +++++++++++++++++++++++++++++++++++++++++++++++++++---
1 file changed, 172 insertions(+), 8 deletions(-)
diff --git a/dlls/vcomp/main.c b/dlls/vcomp/main.c
index 03c085b..20aeb4f 100644
--- a/dlls/vcomp/main.c
+++ b/dlls/vcomp/main.c
@@ -4,6 +4,7 @@
*
* Copyright 2011 Austin English
* Copyright 2012 Dan Kegel
+ * Copyright 2015 Sebastian Lackner
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
@@ -23,22 +24,52 @@
#include "config.h"
#include <stdarg.h>
+#include <assert.h>
#include "windef.h"
#include "winbase.h"
#include "wine/debug.h"
+#include "wine/list.h"
WINE_DEFAULT_DEBUG_CHANNEL(vcomp);
+static struct list vcomp_idle_threads = LIST_INIT(vcomp_idle_threads);
static DWORD vcomp_context_tls = TLS_OUT_OF_INDEXES;
+static HMODULE vcomp_module;
static int vcomp_max_threads;
static int vcomp_num_threads;
static BOOL vcomp_nested_fork = FALSE;
+static RTL_CRITICAL_SECTION vcomp_section;
+static RTL_CRITICAL_SECTION_DEBUG critsect_debug =
+{
+ 0, 0, &vcomp_section,
+ { &critsect_debug.ProcessLocksList, &critsect_debug.ProcessLocksList },
+ 0, 0, { (DWORD_PTR)(__FILE__ ": vcomp_section") }
+};
+static RTL_CRITICAL_SECTION vcomp_section = { &critsect_debug, -1, 0, 0, 0, 0 };
+
struct vcomp_thread_data
{
+ struct vcomp_team_data *team;
int thread_num;
int fork_threads;
+
+ /* only used for concurrent tasks */
+ struct list entry;
+ CONDITION_VARIABLE cond;
+};
+
+struct vcomp_team_data
+{
+ CONDITION_VARIABLE cond;
+ int num_threads;
+ int finished_threads;
+
+ /* callback arguments */
+ int nargs;
+ void *wrapper;
+ __ms_va_list valist;
};
void CDECL _vcomp_fork_call_wrapper(void *wrapper, int nargs, __ms_va_list args);
@@ -145,8 +176,9 @@ static struct vcomp_thread_data *vcomp_init_thread_data(void)
ExitProcess(1);
}
- thread_data->thread_num = 0;
- thread_data->fork_threads = 0;
+ thread_data->team = NULL;
+ thread_data->thread_num = 0;
+ thread_data->fork_threads = 0;
vcomp_set_thread_data(thread_data);
return thread_data;
@@ -187,8 +219,9 @@ int CDECL omp_get_num_procs(void)
int CDECL omp_get_num_threads(void)
{
- TRACE("stub\n");
- return 1;
+ struct vcomp_team_data *team_data = vcomp_init_thread_data()->team;
+ TRACE("()\n");
+ return team_data ? team_data->num_threads : 1;
}
int CDECL omp_get_thread_num(void)
@@ -244,15 +277,145 @@ void CDECL _vcomp_single_end(void)
TRACE("stub\n");
}
+static DWORD WINAPI _vcomp_fork_worker(void *param)
+{
+ struct vcomp_thread_data *thread_data = param;
+ vcomp_set_thread_data(thread_data);
+
+ TRACE("starting worker thread for %p\n", thread_data);
+
+ EnterCriticalSection(&vcomp_section);
+ for (;;)
+ {
+ struct vcomp_team_data *team = thread_data->team;
+ if (team != NULL)
+ {
+ LeaveCriticalSection(&vcomp_section);
+ _vcomp_fork_call_wrapper(team->wrapper, team->nargs, team->valist);
+ EnterCriticalSection(&vcomp_section);
+
+ thread_data->team = NULL;
+ list_remove(&thread_data->entry);
+ list_add_tail(&vcomp_idle_threads, &thread_data->entry);
+ if (++team->finished_threads >= team->num_threads)
+ WakeAllConditionVariable(&team->cond);
+ }
+
+ if (!SleepConditionVariableCS(&thread_data->cond, &vcomp_section, 5000) &&
+ GetLastError() == ERROR_TIMEOUT && !thread_data->team)
+ {
+ break;
+ }
+ }
+ list_remove(&thread_data->entry);
+ LeaveCriticalSection(&vcomp_section);
+
+ TRACE("terminating worker thread for %p\n", thread_data);
+
+ HeapFree(GetProcessHeap(), 0, thread_data);
+ vcomp_set_thread_data(NULL);
+ FreeLibraryAndExitThread(vcomp_module, 0);
+ return 0;
+}
+
void WINAPIV _vcomp_fork(BOOL ifval, int nargs, void *wrapper, ...)
{
- __ms_va_list valist;
+ struct vcomp_thread_data *prev_thread_data = vcomp_init_thread_data();
+ struct vcomp_thread_data thread_data;
+ struct vcomp_team_data team_data;
+ int num_threads;
TRACE("(%d, %d, %p, ...)\n", ifval, nargs, wrapper);
- __ms_va_start(valist, wrapper);
- _vcomp_fork_call_wrapper(wrapper, nargs, valist);
- __ms_va_end(valist);
+ if (!ifval)
+ num_threads = 1;
+ else if (prev_thread_data->team && !vcomp_nested_fork)
+ num_threads = 1;
+ else if (prev_thread_data->fork_threads)
+ num_threads = prev_thread_data->fork_threads;
+ else
+ num_threads = vcomp_num_threads;
+
+ InitializeConditionVariable(&team_data.cond);
+ team_data.num_threads = 1;
+ team_data.finished_threads = 0;
+ team_data.nargs = nargs;
+ team_data.wrapper = wrapper;
+ __ms_va_start(team_data.valist, wrapper);
+
+ thread_data.team = &team_data;
+ thread_data.thread_num = 0;
+ thread_data.fork_threads = 0;
+ list_init(&thread_data.entry);
+ InitializeConditionVariable(&thread_data.cond);
+
+ if (num_threads > 1)
+ {
+ struct list *ptr;
+ EnterCriticalSection(&vcomp_section);
+
+ /* reuse existing threads (if any) */
+ while (team_data.num_threads < num_threads && (ptr = list_head(&vcomp_idle_threads)))
+ {
+ struct vcomp_thread_data *data = LIST_ENTRY(ptr, struct vcomp_thread_data, entry);
+ data->team = &team_data;
+ data->thread_num = team_data.num_threads++;
+ data->fork_threads = 0;
+ list_remove(&data->entry);
+ list_add_tail(&thread_data.entry, &data->entry);
+ WakeAllConditionVariable(&data->cond);
+ }
+
+ /* spawn additional threads */
+ while (team_data.num_threads < num_threads)
+ {
+ struct vcomp_thread_data *data;
+ HMODULE module;
+ HANDLE thread;
+
+ data = HeapAlloc(GetProcessHeap(), 0, sizeof(*data));
+ if (!data) break;
+
+ data->team = &team_data;
+ data->thread_num = team_data.num_threads;
+ data->fork_threads = 0;
+ InitializeConditionVariable(&data->cond);
+
+ thread = CreateThread(NULL, 0, _vcomp_fork_worker, data, 0, NULL);
+ if (!thread)
+ {
+ HeapFree(GetProcessHeap(), 0, data);
+ break;
+ }
+
+ GetModuleHandleExW(GET_MODULE_HANDLE_EX_FLAG_FROM_ADDRESS,
+ (const WCHAR *)vcomp_module, &module);
+ team_data.num_threads++;
+ list_add_tail(&thread_data.entry, &data->entry);
+ CloseHandle(thread);
+ }
+
+ LeaveCriticalSection(&vcomp_section);
+ }
+
+ vcomp_set_thread_data(&thread_data);
+ _vcomp_fork_call_wrapper(team_data.wrapper, team_data.nargs, team_data.valist);
+ vcomp_set_thread_data(prev_thread_data);
+ prev_thread_data->fork_threads = 0;
+
+ if (team_data.num_threads > 1)
+ {
+ EnterCriticalSection(&vcomp_section);
+
+ team_data.finished_threads++;
+ while (team_data.finished_threads < team_data.num_threads)
+ SleepConditionVariableCS(&team_data.cond, &vcomp_section, INFINITE);
+
+ LeaveCriticalSection(&vcomp_section);
+ assert(list_empty(&thread_data.entry));
+ }
+
+ __ms_va_end(team_data.valist);
}
BOOL WINAPI DllMain(HINSTANCE instance, DWORD reason, LPVOID reserved)
@@ -275,6 +438,7 @@ BOOL WINAPI DllMain(HINSTANCE instance, DWORD reason, LPVOID reserved)
}
GetSystemInfo(&sysinfo);
+ vcomp_module = instance;
vcomp_max_threads = sysinfo.dwNumberOfProcessors;
vcomp_num_threads = sysinfo.dwNumberOfProcessors;
break;
--
2.4.5
More information about the wine-patches
mailing list