From 44d603b75fe10d51377c20e4c3c2a1bf031c4fcf Mon Sep 17 00:00:00 2001 From: Daniel Lehman Date: Mon, 12 Dec 2016 16:44:44 -0800 Subject: [PATCH 2/3] msvcrt: Implement Concurrency::event wait_for_multiple can handle more than WaitForMultipleObjects' limit of 64 a separate thread is used to handle threads that wait on multiple handles and is loosely modeled after wineserver's select request handler a very simple command structure is used to communicate with it over a single shared pipe Signed-off-by: Daniel Lehman --- dlls/msvcrt/lock.c | 387 +++++++++++++++++++++++++++++++++++++++++++++++++-- dlls/msvcrt/main.c | 3 + dlls/msvcrt/msvcrt.h | 18 +++ 3 files changed, 397 insertions(+), 11 deletions(-) diff --git a/dlls/msvcrt/lock.c b/dlls/msvcrt/lock.c index 83b5116..8434fc0 100644 --- a/dlls/msvcrt/lock.c +++ b/dlls/msvcrt/lock.c @@ -20,8 +20,11 @@ #include "wine/port.h" #include +#include +#include #include "wine/debug.h" +#include "wine/list.h" #include "windef.h" #include "winbase.h" #include "winternl.h" @@ -551,19 +554,316 @@ unsigned int __cdecl _GetConcurrency(void) return val; } +static int event_fd; +static ULONGLONG event_now; +static struct list event_timeouts = LIST_INIT(event_timeouts); +static RTL_RUN_ONCE evt_init_once = RTL_RUN_ONCE_INIT; + typedef struct { - volatile void *wait; - void *reset; + volatile ULONG_PTR signaled; + struct list *waiters; critical_section cs; } event; +struct thread_wait; +typedef struct +{ + struct thread_wait *wait; + struct list entry; +} thread_wait_entry; + +typedef struct thread_wait +{ + int count; + int wait_all; + size_t result; + event **events; + ULONGLONG timeout; + struct list entry; + thread_wait_entry entries[1]; +} thread_wait; + +enum +{ + EC_WAIT, + EC_WAKE, + EC_QUIT +}; + +typedef struct +{ + int cmd; + void *arg; +} event_cmd; + +static thread_wait *evt_thread_wait_alloc(int count, event **evts, int wait_all, size_t timeout) +{ + thread_wait *wait; + int i; + + wait = heap_alloc(FIELD_OFFSET(thread_wait, entries[count])); + if(!wait) + throw_bad_alloc("allocating wait for event"); + + wait->count = count; + wait->wait_all = wait_all; + wait->result = -1; + wait->events = evts; + wait->timeout = timeout; + for(i = 0; i < count; i++) { + wait->entries[i].wait = wait; + list_init(&wait->entries[i].entry); + } + + return wait; +} + +static void evt_remove_queue(thread_wait *wait) +{ + int i; + + for(i = 0; i < wait->count; i++) { + list_remove(&wait->entries[i].entry); + if(wait->events[i]->waiters == &wait->entries[i].entry) + wait->events[i]->waiters = list_head(wait->events[i]->waiters); + } + if(wait->timeout != COOPERATIVE_TIMEOUT_INFINITE) + list_remove(&wait->entry); +} + +static inline void evt_release_wait(thread_wait *wait, int result) +{ + wait->result = result; + NtReleaseKeyedEvent(keyed_event, wait, 0, NULL); +} + +static size_t evt_get_timeout(void) +{ + thread_wait *wait; + struct list *head; + int i; + + while((head = list_head(&event_timeouts))) { + wait = LIST_ENTRY(head, thread_wait, entry); + if(wait->timeout <= event_now) { + for(i = 0; i < wait->count; i++) + critical_section_lock(&wait->events[i]->cs); + evt_remove_queue(wait); + for(i = 0; i < wait->count; i++) + critical_section_unlock(&wait->events[i]->cs); + + evt_release_wait(wait, COOPERATIVE_WAIT_TIMEOUT); + } + else + return wait->timeout - event_now; + } + + return -1; +} + +static void evt_add_wait_queue(thread_wait *wait) +{ + thread_wait *cur; + int i; + + for(i = 0; i < wait->count; i++) { + if(wait->events[i]->waiters) + list_add_tail(wait->events[i]->waiters, &wait->entries[i].entry); + else + wait->events[i]->waiters = &wait->entries[i].entry; + } + + if(wait->timeout == COOPERATIVE_TIMEOUT_INFINITE) + return; + + /* convert relative timeout to absolute */ + wait->timeout = event_now + wait->timeout; + LIST_FOR_EACH_ENTRY(cur, &event_timeouts, thread_wait, entry) { + if(wait->timeout < cur->timeout) { + list_add_before(&cur->entry, &wait->entry); + return; + } + } + + list_add_tail(&event_timeouts, &wait->entry); +} + +static int evt_check_wait(thread_wait *wait) +{ + int i; + + if(wait->wait_all) { + for(i = 0; i < wait->count; i++) { + if(!wait->events[i]->signaled) + return -1; + } + return wait->count-1; + } else { + for(i = 0; i < wait->count; i++) { + if(wait->events[i]->signaled) + return i; + } + return -1; + } +} + +static void evt_cmd_wait(thread_wait *wait) +{ + int i, ret; + + for(i = 0; i < wait->count; i++) + critical_section_lock(&wait->events[i]->cs); + ret = evt_check_wait(wait); + if(ret == -1) + evt_add_wait_queue(wait); + for(i = 0; i < wait->count; i++) + critical_section_unlock(&wait->events[i]->cs); + + if(ret != -1) + evt_release_wait(wait, ret); +} + +static void evt_cmd_wake(event *evt) +{ + int i, ret; + struct list *ptr; + thread_wait *wait; + thread_wait_entry *cur; + + critical_section_lock(&evt->cs); + if(!evt->waiters) { + critical_section_unlock(&evt->cs); + return; + } + + ptr = evt->waiters; + do { + cur = LIST_ENTRY(ptr, thread_wait_entry, entry); + wait = cur->wait; + + for(i = 0; i < wait->count; i++) + if(evt != wait->events[i]) + critical_section_lock(&wait->events[i]->cs); + + ret = evt_check_wait(wait); + if(ret != -1) + evt_remove_queue(wait); + + for(i = 0; i < wait->count; i++) + if(evt != wait->events[i]) + critical_section_unlock(&wait->events[i]->cs); + + if(ret != -1) { + evt_release_wait(wait, ret); + + /* restart at head of the list since a + wake up can change the wait queue */ + ptr = evt->waiters; + } else { + ptr = list_next(evt->waiters, ptr); + } + } while (ptr); + critical_section_unlock(&evt->cs); +} + +static DWORD WINAPI evt_thread(void *arg) +{ + struct pollfd fd; + ULONGLONG timeout; + event_cmd cmd; + int rc, done; + int read_fd; + + read_fd = (int)(long)arg; + fd.fd = read_fd; + fd.events = POLLIN; + fd.revents = 0; + + done = 0; + while(!done) { + timeout = evt_get_timeout(); + rc = poll(&fd, 1, timeout); + event_now = GetTickCount64(); + if(rc > 0) { + while((rc = read(read_fd, &cmd, sizeof(cmd))) == sizeof(cmd)) { + switch(cmd.cmd) { + case EC_WAIT: + evt_cmd_wait(cmd.arg); + break; + case EC_WAKE: + evt_cmd_wake(cmd.arg); + break; + case EC_QUIT: + done = 1; + break; + } + } + } else if (rc < 0) { + if(errno == EINTR) { + TRACE("poll interrupted\n"); + continue; + } else { + ERR("poll %i %s\n", rc, strerror(errno)); + break; + } + } else { + /* timed out: fall-through and process timeouts from list */ + } + } + + close(read_fd); + close(event_fd); + return 0; +} + +static BOOL WINAPI evt_thread_start(INIT_ONCE *init_once, void *parm, void **ctx) +{ + int rc, fds[2]; + + rc = pipe(fds); + if(rc) return FALSE; + + rc = fcntl(fds[0], F_SETFL, O_NONBLOCK); + if(rc) goto error; + + if(!QueueUserWorkItem(evt_thread, (void*)(long)fds[0], WT_EXECUTELONGFUNCTION)) + goto error; + + event_fd = fds[1]; + return TRUE; + +error: + close(fds[0]); + close(fds[1]); + return FALSE; +} + +void msvcrt_term_event(void) +{ + event_cmd cmd; + int rc; + + if(!event_fd) + return; + + cmd.cmd = EC_QUIT; + rc = write(event_fd, &cmd, sizeof(cmd)); + if(rc < 0) + ERR("failed to send QUIT: %d %d\n", event_fd, errno); +} + /* ??0event@Concurrency@@QAE@XZ */ /* ??0event@Concurrency@@QEAA@XZ */ DEFINE_THISCALL_WRAPPER(event_ctor, 4) event* __thiscall event_ctor(event *this) { - FIXME("(%p) stub\n", this); + TRACE("(%p)\n", this); + + this->signaled = 0; + this->waiters = NULL; + critical_section_ctor(&this->cs); /* implicitly init keyed_event */ + return this; } @@ -572,7 +872,9 @@ event* __thiscall event_ctor(event *this) DEFINE_THISCALL_WRAPPER(event_dtor, 4) void __thiscall event_dtor(event *this) { - FIXME("(%p) stub\n", this); + TRACE("(%p)\n", this); + + critical_section_dtor(&this->cs); } /* ?reset@event@Concurrency@@QAEXXZ */ @@ -580,7 +882,11 @@ void __thiscall event_dtor(event *this) DEFINE_THISCALL_WRAPPER(event_reset, 4) void __thiscall event_reset(event *this) { - FIXME("(%p) stub\n", this); + TRACE("(%p)\n", this); + + critical_section_lock(&this->cs); + this->signaled = 0; + critical_section_unlock(&this->cs); } /* ?set@event@Concurrency@@QAEXXZ */ @@ -588,7 +894,51 @@ void __thiscall event_reset(event *this) DEFINE_THISCALL_WRAPPER(event_set, 4) void __thiscall event_set(event *this) { - FIXME("(%p) stub\n", this); + event_cmd cmd; + struct list *waiters; + int rc; + + TRACE("(%p)\n", this); + + critical_section_lock(&this->cs); + this->signaled = 1; + waiters = this->waiters; + critical_section_unlock(&this->cs); + + if(waiters) { + cmd.cmd = EC_WAKE; + cmd.arg = this; + rc = write(event_fd, &cmd, sizeof(cmd)); + if(rc < 0) + ERR("failed to send WAKE: %d %d\n", event_fd, errno); + } +} + +static size_t evt_wait(event **events, unsigned int count, MSVCRT_bool wait_all, unsigned int timeout) +{ + thread_wait *wait; + event_cmd cmd; + size_t res; + int rc; + + if(!event_fd) { + InitOnceExecuteOnce(&evt_init_once, evt_thread_start, NULL, NULL); + if(!event_fd) + ERR("failed to set up event thread\n"); + } + + wait = evt_thread_wait_alloc(count, events, wait_all, timeout); + cmd.cmd = EC_WAIT; + cmd.arg = wait; + rc = write(event_fd, &cmd, sizeof(cmd)); + if(rc < 0) + ERR("failed to send WAIT: %d %d\n", event_fd, errno); + NtWaitForKeyedEvent(keyed_event, wait, 0, NULL); + + res = wait->result; + heap_free(wait); + + return res; } /* ?wait@event@Concurrency@@QAEII@Z */ @@ -596,16 +946,31 @@ void __thiscall event_set(event *this) DEFINE_THISCALL_WRAPPER(event_wait, 8) size_t __thiscall event_wait(event *this, unsigned int timeout) { - FIXME("(%p %u) stub\n", this, timeout); - return COOPERATIVE_WAIT_TIMEOUT; + TRACE("(%p %u)\n", this, timeout); + + critical_section_lock(&this->cs); + if(this->signaled) { + critical_section_unlock(&this->cs); + return 0; + } + critical_section_unlock(&this->cs); + + return evt_wait(&this, 1, FALSE, timeout); } /* ?wait_for_multiple@event@Concurrency@@SAIPAPAV12@I_NI@Z */ /* ?wait_for_multiple@event@Concurrency@@SA_KPEAPEAV12@_K_NI@Z */ -int __cdecl event_wait_for_multiple(event **events, unsigned int count, MSVCRT_bool wait_all, unsigned int timeout) +int __cdecl event_wait_for_multiple(event **events, MSVCRT_size_t count, MSVCRT_bool wait_all, unsigned int timeout) { - FIXME("(%p %u %d %u) stub\n", events, count, wait_all, timeout); - return COOPERATIVE_WAIT_TIMEOUT; + TRACE("(%p %ld %d %u)\n", events, count, wait_all, timeout); + + if(count == 0) + return 0; + + if(count == 1) + return event_wait(events[0], timeout); + + return evt_wait(events, count, wait_all, timeout); } #endif diff --git a/dlls/msvcrt/main.c b/dlls/msvcrt/main.c index a930a32..31c1d4a 100644 --- a/dlls/msvcrt/main.c +++ b/dlls/msvcrt/main.c @@ -127,6 +127,9 @@ BOOL WINAPI DllMain(HINSTANCE hinstDLL, DWORD fdwReason, LPVOID lpvReserved) break; case DLL_PROCESS_DETACH: msvcrt_free_io(); +#if _MSVCR_VER >= 100 + msvcrt_term_event(); +#endif if (lpvReserved) break; msvcrt_free_popen_data(); msvcrt_free_locks(); diff --git a/dlls/msvcrt/msvcrt.h b/dlls/msvcrt/msvcrt.h index 1a2c750..6114487 100644 --- a/dlls/msvcrt/msvcrt.h +++ b/dlls/msvcrt/msvcrt.h @@ -329,6 +329,10 @@ extern void msvcrt_free_popen_data(void) DECLSPEC_HIDDEN; extern BOOL msvcrt_init_heap(void) DECLSPEC_HIDDEN; extern void msvcrt_destroy_heap(void) DECLSPEC_HIDDEN; +#if _MSVCR_VER >= 100 +extern void msvcrt_term_event(void) DECLSPEC_HIDDEN; +#endif + extern unsigned msvcrt_create_io_inherit_block(WORD*, BYTE**) DECLSPEC_HIDDEN; extern unsigned int __cdecl _control87(unsigned int, unsigned int); @@ -1378,4 +1382,18 @@ typedef struct { _FPIEEE_VALUE Result; } _FPIEEE_RECORD, *_PFPIEEE_RECORD; +static inline void* __WINE_ALLOC_SIZE(1) heap_alloc(size_t len) +{ + return HeapAlloc(GetProcessHeap(), 0, len); +} + +static inline void* __WINE_ALLOC_SIZE(1) heap_alloc_zero(size_t len) +{ + return HeapAlloc(GetProcessHeap(), HEAP_ZERO_MEMORY, len); +} + +static inline BOOL heap_free(void *mem) +{ + return HeapFree(GetProcessHeap(), 0, mem); +} #endif /* __WINE_MSVCRT_H */ -- 1.9.5