Daniel Lehman : msvcrt: Implement Concurrency::event.

Alexandre Julliard julliard at winehq.org
Mon Mar 20 17:16:45 CDT 2017


Module: wine
Branch: master
Commit: dfd28affe34f8a87e251fdd02e590c425b90ca6f
URL:    http://source.winehq.org/git/wine.git/?a=commit;h=dfd28affe34f8a87e251fdd02e590c425b90ca6f

Author: Daniel Lehman <dlehman at esri.com>
Date:   Mon Dec 12 16:44:44 2016 -0800

msvcrt: Implement Concurrency::event.

Signed-off-by: Daniel Lehman <dlehman at esri.com>
Signed-off-by: Piotr Caban <piotr at codeweavers.com>
Signed-off-by: Alexandre Julliard <julliard at winehq.org>

---

 dlls/msvcrt/lock.c   | 190 +++++++++++++++++++++++++++++++++++++++++++++++----
 dlls/msvcrt/msvcrt.h |  15 ++++
 2 files changed, 193 insertions(+), 12 deletions(-)

diff --git a/dlls/msvcrt/lock.c b/dlls/msvcrt/lock.c
index d9fe05d..8ae9511 100644
--- a/dlls/msvcrt/lock.c
+++ b/dlls/msvcrt/lock.c
@@ -550,19 +550,128 @@ unsigned int __cdecl _GetConcurrency(void)
     return val;
 }
 
+#define EVT_RUNNING     (void*)1
+#define EVT_WAITING     NULL
+
+struct thread_wait;
+typedef struct thread_wait_entry
+{
+    struct thread_wait *wait;
+    struct thread_wait_entry *next;
+    struct thread_wait_entry *prev;
+} thread_wait_entry;
+
+typedef struct thread_wait
+{
+    void *signaled;
+    int pending_waits;
+    thread_wait_entry entries[1];
+} thread_wait;
+
 typedef struct
 {
-    volatile void *wait;
-    void *reset;
+    thread_wait_entry *waiters;
+    INT_PTR signaled;
     critical_section cs;
 } event;
 
+static inline PLARGE_INTEGER evt_timeout(PLARGE_INTEGER pTime, unsigned int timeout)
+{
+    if(timeout == COOPERATIVE_TIMEOUT_INFINITE) return NULL;
+    pTime->QuadPart = (ULONGLONG)timeout * -10000;
+    return pTime;
+}
+
+static void evt_add_queue(thread_wait_entry **head, thread_wait_entry *entry)
+{
+    if(*head) {
+        entry->next = *head;
+        entry->prev = (*head)->prev;
+        (*head)->prev->next = entry;
+        (*head)->prev = entry;
+    } else {
+        entry->next = entry;
+        entry->prev = entry;
+        *head = entry;
+    }
+}
+
+static void evt_remove(thread_wait_entry **head, thread_wait_entry *entry)
+{
+    entry->next->prev = entry->prev;
+    entry->prev->next = entry->next;
+    if(*head == entry)
+        *head = entry->next == entry ? NULL : entry->next;
+}
+
+static MSVCRT_size_t evt_end_wait(thread_wait *wait, event **events, int count)
+{
+    MSVCRT_size_t i, ret = COOPERATIVE_WAIT_TIMEOUT;
+
+    for(i = 0; i < count; i++) {
+        critical_section_lock(&events[i]->cs);
+        if(events[i] == wait->signaled) ret = i;
+        evt_remove(&events[i]->waiters, &wait->entries[i]);
+        critical_section_unlock(&events[i]->cs);
+    }
+
+    return ret;
+}
+
+static inline int evt_transition(void **state, void *from, void *to)
+{
+    return InterlockedCompareExchangePointer(state, to, from) == from;
+}
+
+static MSVCRT_size_t evt_wait(thread_wait *wait, event **events, int count, MSVCRT_bool wait_all, unsigned int timeout)
+{
+    int i;
+    NTSTATUS status;
+    LARGE_INTEGER ntto;
+
+    wait->signaled = EVT_RUNNING;
+    wait->pending_waits = wait_all ? count : 1;
+    for(i = 0; i < count; i++) {
+        wait->entries[i].wait = wait;
+
+        critical_section_lock(&events[i]->cs);
+        evt_add_queue(&events[i]->waiters, &wait->entries[i]);
+        if(events[i]->signaled) {
+            if(!InterlockedDecrement(&wait->pending_waits)) {
+                wait->signaled = events[i];
+                critical_section_unlock(&events[i]->cs);
+
+                return evt_end_wait(wait, events, i+1);
+            }
+        }
+        critical_section_unlock(&events[i]->cs);
+    }
+
+    if(!timeout)
+        return evt_end_wait(wait, events, count);
+
+    if(!evt_transition(&wait->signaled, EVT_RUNNING, EVT_WAITING))
+        return evt_end_wait(wait, events, count);
+
+    status = NtWaitForKeyedEvent(keyed_event, wait, 0, evt_timeout(&ntto, timeout));
+
+    if(status && !evt_transition(&wait->signaled, EVT_WAITING, EVT_RUNNING))
+        NtWaitForKeyedEvent(keyed_event, wait, 0, NULL);
+
+    return evt_end_wait(wait, events, count);
+}
+
 /* ??0event at Concurrency@@QAE at XZ */
 /* ??0event at Concurrency@@QEAA at XZ */
 DEFINE_THISCALL_WRAPPER(event_ctor, 4)
 event* __thiscall event_ctor(event *this)
 {
-    FIXME("(%p) stub\n", this);
+    TRACE("(%p)\n", this);
+
+    this->waiters = NULL;
+    this->signaled = FALSE;
+    critical_section_ctor(&this->cs);
+
     return this;
 }
 
@@ -571,7 +680,10 @@ event* __thiscall event_ctor(event *this)
 DEFINE_THISCALL_WRAPPER(event_dtor, 4)
 void __thiscall event_dtor(event *this)
 {
-    FIXME("(%p) stub\n", this);
+    TRACE("(%p)\n", this);
+    critical_section_dtor(&this->cs);
+    if(this->waiters)
+        ERR("there's a wait on destroyed event\n");
 }
 
 /* ?reset at event@Concurrency@@QAEXXZ */
@@ -579,7 +691,22 @@ void __thiscall event_dtor(event *this)
 DEFINE_THISCALL_WRAPPER(event_reset, 4)
 void __thiscall event_reset(event *this)
 {
-    FIXME("(%p) stub\n", this);
+    thread_wait_entry *entry;
+
+    TRACE("(%p)\n", this);
+
+    critical_section_lock(&this->cs);
+    if(this->signaled) {
+        this->signaled = FALSE;
+        if(this->waiters) {
+            entry = this->waiters;
+            do {
+                InterlockedIncrement(&entry->wait->pending_waits);
+                entry = entry->next;
+            } while (entry != this->waiters);
+        }
+    }
+    critical_section_unlock(&this->cs);
 }
 
 /* ?set at event@Concurrency@@QAEXXZ */
@@ -587,26 +714,65 @@ void __thiscall event_reset(event *this)
 DEFINE_THISCALL_WRAPPER(event_set, 4)
 void __thiscall event_set(event *this)
 {
-    FIXME("(%p) stub\n", this);
+    thread_wait_entry *entry;
+
+    TRACE("(%p)\n", this);
+
+    critical_section_lock(&this->cs);
+    if(!this->signaled) {
+        this->signaled = TRUE;
+        if(this->waiters) {
+            entry = this->waiters;
+            do {
+                if(!InterlockedDecrement(&entry->wait->pending_waits)) {
+                    if(InterlockedExchangePointer(&entry->wait->signaled, this) == EVT_WAITING)
+                        NtReleaseKeyedEvent(keyed_event, entry->wait, 0, NULL);
+                }
+                entry = entry->next;
+            } while (entry != this->waiters);
+        }
+    }
+    critical_section_unlock(&this->cs);
 }
 
 /* ?wait at event@Concurrency@@QAEII at Z */
 /* ?wait at event@Concurrency@@QEAA_KI at Z */
 DEFINE_THISCALL_WRAPPER(event_wait, 8)
-size_t __thiscall event_wait(event *this, unsigned int timeout)
+MSVCRT_size_t __thiscall event_wait(event *this, unsigned int timeout)
 {
-    FIXME("(%p %u) stub\n", this, timeout);
-    return COOPERATIVE_WAIT_TIMEOUT;
+    thread_wait wait;
+    MSVCRT_size_t signaled;
+
+    TRACE("(%p %u)\n", this, timeout);
+
+    critical_section_lock(&this->cs);
+    signaled = this->signaled;
+    critical_section_unlock(&this->cs);
+
+    if(!timeout) return signaled ? 0 : COOPERATIVE_WAIT_TIMEOUT;
+    return signaled ? 0 : evt_wait(&wait, &this, 1, FALSE, timeout);
 }
 
 /* ?wait_for_multiple at event@Concurrency@@SAIPAPAV12 at I_NI@Z */
 /* ?wait_for_multiple at event@Concurrency@@SA_KPEAPEAV12 at _K_NI@Z */
 int __cdecl event_wait_for_multiple(event **events, MSVCRT_size_t count, MSVCRT_bool wait_all, unsigned int timeout)
 {
-    FIXME("(%p %ld %d %u) stub\n", events, count, wait_all, timeout);
-    return COOPERATIVE_WAIT_TIMEOUT;
-}
+    thread_wait *wait;
+    MSVCRT_size_t ret;
+
+    TRACE("(%p %ld %d %u)\n", events, count, wait_all, timeout);
+
+    if(count == 0)
+        return 0;
 
+    wait = heap_alloc(FIELD_OFFSET(thread_wait, entries[count]));
+    if(!wait)
+        throw_bad_alloc("bad allocation");
+    ret = evt_wait(wait, events, count, wait_all, timeout);
+    heap_free(wait);
+
+    return ret;
+}
 #endif
 
 #if _MSVCR_VER >= 110
diff --git a/dlls/msvcrt/msvcrt.h b/dlls/msvcrt/msvcrt.h
index 3caa43c..cee8113 100644
--- a/dlls/msvcrt/msvcrt.h
+++ b/dlls/msvcrt/msvcrt.h
@@ -1181,6 +1181,7 @@ extern char* __cdecl __unDName(char *,const char*,int,malloc_func_t,free_func_t,
 
 #define UCRTBASE_SCANF_MASK                              (0x0007)
 
+#define COOPERATIVE_TIMEOUT_INFINITE ((unsigned int)-1)
 #define COOPERATIVE_WAIT_TIMEOUT     ~0
 
 typedef enum {
@@ -1380,4 +1381,18 @@ typedef struct {
     _FPIEEE_VALUE Result;
 } _FPIEEE_RECORD, *_PFPIEEE_RECORD;
 
+static inline void* __WINE_ALLOC_SIZE(1) heap_alloc(size_t len)
+{
+    return HeapAlloc(GetProcessHeap(), 0, len);
+}
+
+static inline void* __WINE_ALLOC_SIZE(1) heap_alloc_zero(size_t len)
+{
+    return HeapAlloc(GetProcessHeap(), HEAP_ZERO_MEMORY, len);
+}
+
+static inline BOOL heap_free(void *mem)
+{
+    return HeapFree(GetProcessHeap(), 0, mem);
+}
 #endif /* __WINE_MSVCRT_H */




More information about the wine-cvs mailing list