[PATCH 2/3] ntdll: Implement threadpool I/O queues.

Zebediah Figura z.figura12 at gmail.com
Sat Apr 11 00:16:26 CDT 2020


Signed-off-by: Zebediah Figura <z.figura12 at gmail.com>
---
 dlls/ntdll/ntdll.spec         |   4 +
 dlls/ntdll/ntdll_misc.h       |   7 +
 dlls/ntdll/tests/threadpool.c | 179 ++++++++++++++++++
 dlls/ntdll/threadpool.c       | 335 +++++++++++++++++++++++++++++++++-
 include/winternl.h            |   8 +-
 5 files changed, 523 insertions(+), 10 deletions(-)

diff --git a/dlls/ntdll/ntdll.spec b/dlls/ntdll/ntdll.spec
index a43e419ab8..0be6ec6bfc 100644
--- a/dlls/ntdll/ntdll.spec
+++ b/dlls/ntdll/ntdll.spec
@@ -1060,6 +1060,7 @@
 @ stdcall RtlxUnicodeStringToAnsiSize(ptr) RtlUnicodeStringToAnsiSize
 @ stdcall RtlxUnicodeStringToOemSize(ptr) RtlUnicodeStringToOemSize
 @ stdcall TpAllocCleanupGroup(ptr)
+@ stdcall TpAllocIoCompletion(ptr ptr ptr ptr ptr)
 @ stdcall TpAllocPool(ptr ptr)
 @ stdcall TpAllocTimer(ptr ptr ptr ptr)
 @ stdcall TpAllocWait(ptr ptr ptr ptr)
@@ -1070,12 +1071,14 @@
 @ stdcall TpCallbackReleaseSemaphoreOnCompletion(ptr long long)
 @ stdcall TpCallbackSetEventOnCompletion(ptr long)
 @ stdcall TpCallbackUnloadDllOnCompletion(ptr ptr)
+@ stdcall TpCancelAsyncIoOperation(ptr)
 @ stdcall TpDisassociateCallback(ptr)
 @ stdcall TpIsTimerSet(ptr)
 @ stdcall TpPostWork(ptr)
 @ stdcall TpQueryPoolStackInformation(ptr ptr)
 @ stdcall TpReleaseCleanupGroup(ptr)
 @ stdcall TpReleaseCleanupGroupMembers(ptr long ptr)
+@ stdcall TpReleaseIoCompletion(ptr)
 @ stdcall TpReleasePool(ptr)
 @ stdcall TpReleaseTimer(ptr)
 @ stdcall TpReleaseWait(ptr)
@@ -1087,6 +1090,7 @@
 @ stdcall TpSetWait(ptr long ptr)
 @ stdcall TpSimpleTryPost(ptr ptr ptr)
 @ stdcall TpStartAsyncIoOperation(ptr)
+@ stdcall TpWaitForIoCompletion(ptr long)
 @ stdcall TpWaitForTimer(ptr long)
 @ stdcall TpWaitForWait(ptr long)
 @ stdcall TpWaitForWork(ptr long)
diff --git a/dlls/ntdll/ntdll_misc.h b/dlls/ntdll/ntdll_misc.h
index 1396e1c19c..b0a73fcbe9 100644
--- a/dlls/ntdll/ntdll_misc.h
+++ b/dlls/ntdll/ntdll_misc.h
@@ -30,6 +30,13 @@
 #include "wine/server.h"
 #include "wine/asm.h"
 
+#define DECLARE_CRITICAL_SECTION(cs) \
+    static RTL_CRITICAL_SECTION cs; \
+    static RTL_CRITICAL_SECTION_DEBUG cs##_debug = \
+    { 0, 0, &cs, { &cs##_debug.ProcessLocksList, &cs##_debug.ProcessLocksList }, \
+      0, 0, { (DWORD_PTR)(__FILE__ ": " # cs) }}; \
+    static RTL_CRITICAL_SECTION cs = { &cs##_debug, -1, 0, 0, 0, 0 };
+
 #define MAX_NT_PATH_LENGTH 277
 
 #define MAX_DOS_DRIVES 26
diff --git a/dlls/ntdll/tests/threadpool.c b/dlls/ntdll/tests/threadpool.c
index bf5493cac0..32d4c3eac2 100644
--- a/dlls/ntdll/tests/threadpool.c
+++ b/dlls/ntdll/tests/threadpool.c
@@ -18,22 +18,27 @@
  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
  */
 
+#define NONAMELESSSTRUCT
+#define NONAMELESSUNION
 #include "ntdll_test.h"
 
 static HMODULE hntdll = 0;
 static NTSTATUS (WINAPI *pTpAllocCleanupGroup)(TP_CLEANUP_GROUP **);
+static NTSTATUS (WINAPI *pTpAllocIoCompletion)(TP_IO **,HANDLE,PTP_IO_CALLBACK,void *,TP_CALLBACK_ENVIRON *);
 static NTSTATUS (WINAPI *pTpAllocPool)(TP_POOL **,PVOID);
 static NTSTATUS (WINAPI *pTpAllocTimer)(TP_TIMER **,PTP_TIMER_CALLBACK,PVOID,TP_CALLBACK_ENVIRON *);
 static NTSTATUS (WINAPI *pTpAllocWait)(TP_WAIT **,PTP_WAIT_CALLBACK,PVOID,TP_CALLBACK_ENVIRON *);
 static NTSTATUS (WINAPI *pTpAllocWork)(TP_WORK **,PTP_WORK_CALLBACK,PVOID,TP_CALLBACK_ENVIRON *);
 static NTSTATUS (WINAPI *pTpCallbackMayRunLong)(TP_CALLBACK_INSTANCE *);
 static VOID     (WINAPI *pTpCallbackReleaseSemaphoreOnCompletion)(TP_CALLBACK_INSTANCE *,HANDLE,DWORD);
+static void     (WINAPI *pTpCancelAsyncIoOperation)(TP_IO *);
 static VOID     (WINAPI *pTpDisassociateCallback)(TP_CALLBACK_INSTANCE *);
 static BOOL     (WINAPI *pTpIsTimerSet)(TP_TIMER *);
 static VOID     (WINAPI *pTpReleaseWait)(TP_WAIT *);
 static VOID     (WINAPI *pTpPostWork)(TP_WORK *);
 static VOID     (WINAPI *pTpReleaseCleanupGroup)(TP_CLEANUP_GROUP *);
 static VOID     (WINAPI *pTpReleaseCleanupGroupMembers)(TP_CLEANUP_GROUP *,BOOL,PVOID);
+static void     (WINAPI *pTpReleaseIoCompletion)(TP_IO *);
 static VOID     (WINAPI *pTpReleasePool)(TP_POOL *);
 static VOID     (WINAPI *pTpReleaseTimer)(TP_TIMER *);
 static VOID     (WINAPI *pTpReleaseWork)(TP_WORK *);
@@ -41,6 +46,8 @@ static VOID     (WINAPI *pTpSetPoolMaxThreads)(TP_POOL *,DWORD);
 static VOID     (WINAPI *pTpSetTimer)(TP_TIMER *,LARGE_INTEGER *,LONG,LONG);
 static VOID     (WINAPI *pTpSetWait)(TP_WAIT *,HANDLE,LARGE_INTEGER *);
 static NTSTATUS (WINAPI *pTpSimpleTryPost)(PTP_SIMPLE_CALLBACK,PVOID,TP_CALLBACK_ENVIRON *);
+static void     (WINAPI *pTpStartAsyncIoOperation)(TP_IO *);
+static void     (WINAPI *pTpWaitForIoCompletion)(TP_IO *,BOOL);
 static VOID     (WINAPI *pTpWaitForTimer)(TP_TIMER *,BOOL);
 static VOID     (WINAPI *pTpWaitForWait)(TP_WAIT *,BOOL);
 static VOID     (WINAPI *pTpWaitForWork)(TP_WORK *,BOOL);
@@ -63,10 +70,12 @@ static BOOL init_threadpool(void)
     }
 
     NTDLL_GET_PROC(TpAllocCleanupGroup);
+    NTDLL_GET_PROC(TpAllocIoCompletion);
     NTDLL_GET_PROC(TpAllocPool);
     NTDLL_GET_PROC(TpAllocTimer);
     NTDLL_GET_PROC(TpAllocWait);
     NTDLL_GET_PROC(TpAllocWork);
+    NTDLL_GET_PROC(TpCancelAsyncIoOperation);
     NTDLL_GET_PROC(TpCallbackMayRunLong);
     NTDLL_GET_PROC(TpCallbackReleaseSemaphoreOnCompletion);
     NTDLL_GET_PROC(TpDisassociateCallback);
@@ -74,6 +83,7 @@ static BOOL init_threadpool(void)
     NTDLL_GET_PROC(TpPostWork);
     NTDLL_GET_PROC(TpReleaseCleanupGroup);
     NTDLL_GET_PROC(TpReleaseCleanupGroupMembers);
+    NTDLL_GET_PROC(TpReleaseIoCompletion);
     NTDLL_GET_PROC(TpReleasePool);
     NTDLL_GET_PROC(TpReleaseTimer);
     NTDLL_GET_PROC(TpReleaseWait);
@@ -82,6 +92,8 @@ static BOOL init_threadpool(void)
     NTDLL_GET_PROC(TpSetTimer);
     NTDLL_GET_PROC(TpSetWait);
     NTDLL_GET_PROC(TpSimpleTryPost);
+    NTDLL_GET_PROC(TpStartAsyncIoOperation);
+    NTDLL_GET_PROC(TpWaitForIoCompletion);
     NTDLL_GET_PROC(TpWaitForTimer);
     NTDLL_GET_PROC(TpWaitForWait);
     NTDLL_GET_PROC(TpWaitForWork);
@@ -1906,6 +1918,172 @@ static void test_tp_multi_wait(void)
     CloseHandle(semaphore);
 }
 
+struct io_cb_ctx
+{
+    unsigned int count;
+    void *ovl;
+    NTSTATUS ret;
+    ULONG_PTR length;
+    TP_IO *io;
+};
+
+static void CALLBACK io_cb(TP_CALLBACK_INSTANCE *instance, void *userdata,
+        void *cvalue, IO_STATUS_BLOCK *iosb, TP_IO *io)
+{
+    struct io_cb_ctx *ctx = userdata;
+    ++ctx->count;
+    ctx->ovl = cvalue;
+    ctx->ret = iosb->u.Status;
+    ctx->length = iosb->Information;
+    ctx->io = io;
+}
+
+static DWORD WINAPI io_wait_thread(void *arg)
+{
+    TP_IO *io = arg;
+    pTpWaitForIoCompletion(io, FALSE);
+    return 0;
+}
+
+static void test_tp_io(void)
+{
+    TP_CALLBACK_ENVIRON environment = {.Version = 1};
+    OVERLAPPED ovl = {}, ovl2 = {};
+    HANDLE client, server, thread;
+    struct io_cb_ctx userdata;
+    char in[1], in2[1];
+    const char out[1];
+    NTSTATUS status;
+    DWORD ret_size;
+    TP_POOL *pool;
+    TP_IO *io;
+    BOOL ret;
+
+    ovl.hEvent = CreateEventW(NULL, TRUE, FALSE, NULL);
+
+    status = pTpAllocPool(&pool, NULL);
+    ok(!status, "failed to allocate pool, status %#x\n", status);
+
+    server = CreateNamedPipeA("\\\\.\\pipe\\wine_tp_test",
+            PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED, 0, 1, 1024, 1024, 0, NULL);
+    ok(server != INVALID_HANDLE_VALUE, "Failed to create server pipe, error %u.\n", GetLastError());
+    client = CreateFileA("\\\\.\\pipe\\wine_tp_test", GENERIC_READ | GENERIC_WRITE,
+            0, NULL, OPEN_EXISTING, 0, 0);
+    ok(client != INVALID_HANDLE_VALUE, "Failed to create client pipe, error %u.\n", GetLastError());
+
+    environment.Pool = pool;
+    io = NULL;
+    status = pTpAllocIoCompletion(&io, server, io_cb, &userdata, &environment);
+    ok(!status, "got %#x\n", status);
+    ok(!!io, "expected non-NULL TP_IO\n");
+
+    pTpWaitForIoCompletion(io, FALSE);
+
+    userdata.count = 0;
+    pTpStartAsyncIoOperation(io);
+
+    thread = CreateThread(NULL, 0, io_wait_thread, io, 0, NULL);
+    ok(WaitForSingleObject(thread, 100) == WAIT_TIMEOUT, "TpWaitForIoCompletion() should not return\n");
+
+    ret = ReadFile(server, in, sizeof(in), NULL, &ovl);
+    ok(!ret, "wrong ret %d\n", ret);
+    ok(GetLastError() == ERROR_IO_PENDING, "wrong error %u\n", GetLastError());
+
+    ret = WriteFile(client, out, sizeof(out), &ret_size, NULL);
+    ok(ret, "WriteFile() failed, error %u\n", GetLastError());
+
+    pTpWaitForIoCompletion(io, FALSE);
+    ok(userdata.count == 1, "callback ran %u times\n", userdata.count);
+    ok(userdata.ovl == &ovl, "expected %p, got %p\n", &ovl, userdata.ovl);
+    ok(userdata.ret == STATUS_SUCCESS, "got status %#x\n", userdata.ret);
+    ok(userdata.length == 1, "got length %lu\n", userdata.length);
+    ok(userdata.io == io, "expected %p, got %p\n", io, userdata.io);
+
+    ok(!WaitForSingleObject(thread, 1000), "wait timed out\n");
+    CloseHandle(thread);
+
+    userdata.count = 0;
+    pTpStartAsyncIoOperation(io);
+    pTpStartAsyncIoOperation(io);
+
+    ret = ReadFile(server, in, sizeof(in), NULL, &ovl);
+    ok(!ret, "wrong ret %d\n", ret);
+    ok(GetLastError() == ERROR_IO_PENDING, "wrong error %u\n", GetLastError());
+    ret = ReadFile(server, in2, sizeof(in2), NULL, &ovl2);
+    ok(!ret, "wrong ret %d\n", ret);
+    ok(GetLastError() == ERROR_IO_PENDING, "wrong error %u\n", GetLastError());
+
+    ret = WriteFile(client, out, sizeof(out), &ret_size, NULL);
+    ok(ret, "WriteFile() failed, error %u\n", GetLastError());
+    ret = WriteFile(client, out, sizeof(out), &ret_size, NULL);
+    ok(ret, "WriteFile() failed, error %u\n", GetLastError());
+
+    pTpWaitForIoCompletion(io, FALSE);
+    ok(userdata.count == 2, "callback ran %u times\n", userdata.count);
+    ok(userdata.ret == STATUS_SUCCESS, "got status %#x\n", userdata.ret);
+    ok(userdata.length == 1, "got length %lu\n", userdata.length);
+    ok(userdata.io == io, "expected %p, got %p\n", io, userdata.io);
+
+    /* The documentation is a bit unclear about passing TRUE to
+     * WaitForThreadpoolIoCallbacks()—"pending I/O requests are not canceled"
+     * [as with CancelIoEx()], but pending threadpool callbacks are, even those
+     * which have not yet reached the completion port [as with
+     * TpCancelAsyncIoOperation()]. */
+    userdata.count = 0;
+    pTpStartAsyncIoOperation(io);
+
+    pTpWaitForIoCompletion(io, TRUE);
+    ok(!userdata.count, "callback ran %u times\n", userdata.count);
+
+    pTpStartAsyncIoOperation(io);
+
+    ret = WriteFile(client, out, sizeof(out), &ret_size, NULL);
+    ok(ret, "WriteFile() failed, error %u\n", GetLastError());
+
+    ret = ReadFile(server, in, sizeof(in), NULL, &ovl);
+    ok(ret, "wrong ret %d\n", ret);
+
+    pTpWaitForIoCompletion(io, FALSE);
+    ok(userdata.count == 1, "callback ran %u times\n", userdata.count);
+    ok(userdata.ovl == &ovl, "expected %p, got %p\n", &ovl, userdata.ovl);
+    ok(userdata.ret == STATUS_SUCCESS, "got status %#x\n", userdata.ret);
+    ok(userdata.length == 1, "got length %lu\n", userdata.length);
+    ok(userdata.io == io, "expected %p, got %p\n", io, userdata.io);
+
+    userdata.count = 0;
+    pTpStartAsyncIoOperation(io);
+
+    ret = ReadFile(server, NULL, 1, NULL, &ovl);
+    ok(!ret, "wrong ret %d\n", ret);
+    ok(GetLastError() == ERROR_NOACCESS, "wrong error %u\n", GetLastError());
+
+    pTpCancelAsyncIoOperation(io);
+    pTpWaitForIoCompletion(io, FALSE);
+    ok(!userdata.count, "callback ran %u times\n", userdata.count);
+
+    userdata.count = 0;
+    pTpStartAsyncIoOperation(io);
+
+    ret = ReadFile(server, in, sizeof(in), NULL, &ovl);
+    ok(!ret, "wrong ret %d\n", ret);
+    ok(GetLastError() == ERROR_IO_PENDING, "wrong error %u\n", GetLastError());
+    ret = CancelIo(server);
+    ok(ret, "CancelIo() failed, error %u\n", GetLastError());
+
+    pTpWaitForIoCompletion(io, FALSE);
+    ok(userdata.count == 1, "callback ran %u times\n", userdata.count);
+    ok(userdata.ovl == &ovl, "expected %p, got %p\n", &ovl, userdata.ovl);
+    ok(userdata.ret == STATUS_CANCELLED, "got status %#x\n", userdata.ret);
+    ok(!userdata.length, "got length %lu\n", userdata.length);
+    ok(userdata.io == io, "expected %p, got %p\n", io, userdata.io);
+
+    CloseHandle(ovl.hEvent);
+    CloseHandle(client);
+    CloseHandle(server);
+    pTpReleaseIoCompletion(io);
+    pTpReleasePool(pool);
+}
+
 START_TEST(threadpool)
 {
     test_RtlQueueWorkItem();
@@ -1925,4 +2103,5 @@ START_TEST(threadpool)
     test_tp_window_length();
     test_tp_wait();
     test_tp_multi_wait();
+    test_tp_io();
 }
diff --git a/dlls/ntdll/threadpool.c b/dlls/ntdll/threadpool.c
index a6e4749e10..215a5e9c53 100644
--- a/dlls/ntdll/threadpool.c
+++ b/dlls/ntdll/threadpool.c
@@ -131,6 +131,7 @@ struct threadpool
     int                     min_workers;
     int                     num_workers;
     int                     num_busy_workers;
+    HANDLE                  compl_port;
     TP_POOL_STACK_INFORMATION stack_info;
 };
 
@@ -139,7 +140,14 @@ enum threadpool_objtype
     TP_OBJECT_TYPE_SIMPLE,
     TP_OBJECT_TYPE_WORK,
     TP_OBJECT_TYPE_TIMER,
-    TP_OBJECT_TYPE_WAIT
+    TP_OBJECT_TYPE_WAIT,
+    TP_OBJECT_TYPE_IO,
+};
+
+struct io_completion
+{
+    IO_STATUS_BLOCK iosb;
+    ULONG_PTR cvalue;
 };
 
 /* internal threadpool object representation */
@@ -201,6 +209,13 @@ struct threadpool_object
             ULONGLONG       timeout;
             HANDLE          handle;
         } wait;
+        struct
+        {
+            PTP_IO_CALLBACK callback;
+            /* locked via .pool->cs */
+            unsigned int    pending_count, completion_count, completion_max;
+            struct io_completion *completions;
+        } io;
     } u;
 };
 
@@ -291,6 +306,29 @@ struct waitqueue_bucket
     HANDLE                  update_event;
 };
 
+/* global I/O completion queue object */
+static RTL_CRITICAL_SECTION_DEBUG ioqueue_debug;
+
+static struct
+{
+    CRITICAL_SECTION        cs;
+    LONG                    objcount;
+    BOOL                    thread_running;
+    HANDLE                  port;
+    RTL_CONDITION_VARIABLE  update_event;
+}
+ioqueue =
+{
+    .cs = { &ioqueue_debug, -1, 0, 0, 0, 0 },
+};
+
+static RTL_CRITICAL_SECTION_DEBUG ioqueue_debug =
+{
+    0, 0, &ioqueue.cs,
+    { &ioqueue_debug.ProcessLocksList, &ioqueue_debug.ProcessLocksList },
+      0, 0, { (DWORD_PTR)(__FILE__ ": ioqueue.cs") }
+};
+
 static inline struct threadpool *impl_from_TP_POOL( TP_POOL *pool )
 {
     return (struct threadpool *)pool;
@@ -317,6 +355,13 @@ static inline struct threadpool_object *impl_from_TP_WAIT( TP_WAIT *wait )
     return object;
 }
 
+static inline struct threadpool_object *impl_from_TP_IO( TP_IO *io )
+{
+    struct threadpool_object *object = (struct threadpool_object *)io;
+    assert( object->type == TP_OBJECT_TYPE_IO );
+    return object;
+}
+
 static inline struct threadpool_group *impl_from_TP_CLEANUP_GROUP( TP_CLEANUP_GROUP *group )
 {
     return (struct threadpool_group *)group;
@@ -343,6 +388,33 @@ static inline LONG interlocked_dec( PLONG dest )
     return interlocked_xchg_add( dest, -1 ) - 1;
 }
 
+static BOOL array_reserve(void **elements, unsigned int *capacity, unsigned int count, unsigned int size)
+{
+    unsigned int new_capacity, max_capacity;
+    void *new_elements;
+
+    if (count <= *capacity)
+        return TRUE;
+
+    max_capacity = ~(SIZE_T)0 / size;
+    if (count > max_capacity)
+        return FALSE;
+
+    new_capacity = max(4, *capacity);
+    while (new_capacity < count && new_capacity <= max_capacity / 2)
+        new_capacity *= 2;
+    if (new_capacity < count)
+        new_capacity = max_capacity;
+
+    if (!(new_elements = RtlReAllocateHeap( GetProcessHeap(), 0, *elements, new_capacity * size )))
+        return FALSE;
+
+    *elements = new_elements;
+    *capacity = new_capacity;
+
+    return TRUE;
+}
+
 static void CALLBACK process_rtl_work_item( TP_CALLBACK_INSTANCE *instance, void *userdata )
 {
     struct rtl_work_item *item = userdata;
@@ -1642,6 +1714,127 @@ static void tp_waitqueue_unlock( struct threadpool_object *wait )
     RtlLeaveCriticalSection( &waitqueue.cs );
 }
 
+static void CALLBACK ioqueue_thread_proc( void *param )
+{
+    struct io_completion *completion;
+    struct threadpool_object *io;
+    IO_STATUS_BLOCK iosb;
+    ULONG_PTR key, value;
+    NTSTATUS status;
+
+    TRACE( "starting I/O completion thread\n" );
+
+    RtlEnterCriticalSection( &ioqueue.cs );
+
+    for (;;)
+    {
+        RtlLeaveCriticalSection( &ioqueue.cs );
+        if ((status = NtRemoveIoCompletion( ioqueue.port, &key, &value, &iosb, NULL )))
+            ERR("NtRemoveIoCompletion failed, status %#x.\n", status);
+        RtlEnterCriticalSection( &ioqueue.cs );
+
+        if (key)
+        {
+            io = (struct threadpool_object *)key;
+
+            RtlEnterCriticalSection( &io->pool->cs );
+
+            if (!array_reserve((void **)&io->u.io.completions, &io->u.io.completion_max,
+                    io->u.io.completion_count + 1, sizeof(*io->u.io.completions)))
+            {
+                ERR("Failed to allocate memory.\n");
+                RtlLeaveCriticalSection( &io->pool->cs );
+                continue;
+            }
+
+            completion = &io->u.io.completions[io->u.io.completion_count++];
+            completion->iosb = iosb;
+            completion->cvalue = value;
+
+            tp_object_submit( io, FALSE );
+
+            RtlLeaveCriticalSection( &io->pool->cs );
+        }
+
+        if (!ioqueue.objcount)
+        {
+            /* All I/O objects have been destroyed; if no new objects are
+             * created within some amount of time, then we can shutdown this
+             * thread. */
+            LARGE_INTEGER timeout = {.QuadPart = (ULONGLONG)THREADPOOL_WORKER_TIMEOUT * -10000};
+            if (RtlSleepConditionVariableCS( &ioqueue.update_event, &ioqueue.cs,
+                    &timeout) == STATUS_TIMEOUT && !ioqueue.objcount)
+                break;
+        }
+    }
+
+    RtlLeaveCriticalSection( &ioqueue.cs );
+
+    TRACE( "terminating I/O completion thread\n" );
+
+    RtlExitUserThread( 0 );
+}
+
+static NTSTATUS tp_ioqueue_lock( struct threadpool_object *io, HANDLE file )
+{
+    NTSTATUS status = STATUS_SUCCESS;
+
+    assert( io->type == TP_OBJECT_TYPE_IO );
+
+    RtlEnterCriticalSection( &ioqueue.cs );
+
+    if (!ioqueue.port && (status = NtCreateIoCompletion( &ioqueue.port,
+            IO_COMPLETION_ALL_ACCESS, NULL, 0 )))
+    {
+        RtlLeaveCriticalSection( &ioqueue.cs );
+        return status;
+    }
+
+    if (!ioqueue.thread_running)
+    {
+        HANDLE thread;
+
+        if (!(status = RtlCreateUserThread( GetCurrentProcess(), NULL, FALSE,
+                NULL, 0, 0, ioqueue_thread_proc, NULL, &thread, NULL )))
+        {
+            ioqueue.thread_running = TRUE;
+            NtClose( thread );
+        }
+    }
+
+    if (status == STATUS_SUCCESS)
+    {
+        FILE_COMPLETION_INFORMATION info;
+        IO_STATUS_BLOCK iosb;
+
+        info.CompletionPort = ioqueue.port;
+        info.CompletionKey = (ULONG_PTR)io;
+
+        status = NtSetInformationFile( file, &iosb, &info, sizeof(info), FileCompletionInformation );
+    }
+
+    if (status == STATUS_SUCCESS)
+    {
+        if (!ioqueue.objcount++)
+            RtlWakeConditionVariable( &ioqueue.update_event );
+    }
+
+    RtlLeaveCriticalSection( &ioqueue.cs );
+    return status;
+}
+
+static void tp_ioqueue_unlock( struct threadpool_object *io )
+{
+    assert( io->type == TP_OBJECT_TYPE_IO );
+
+    RtlEnterCriticalSection( &ioqueue.cs );
+
+    if (!--ioqueue.objcount)
+        NtSetIoCompletion( ioqueue.port, 0, 0, STATUS_SUCCESS, 0 );
+
+    RtlLeaveCriticalSection( &ioqueue.cs );
+}
+
 /***********************************************************************
  *           tp_threadpool_alloc    (internal)
  *
@@ -2017,6 +2210,8 @@ static void tp_object_cancel( struct threadpool_object *object )
         if (object->type == TP_OBJECT_TYPE_WAIT)
             object->u.wait.signaled = 0;
     }
+    if (object->type == TP_OBJECT_TYPE_IO)
+        object->u.io.pending_count = 0;
     RtlLeaveCriticalSection( &pool->cs );
 
     while (pending_callbacks--)
@@ -2027,6 +2222,8 @@ static BOOL object_is_finished( struct threadpool_object *object, BOOL group )
 {
     if (object->num_pending_callbacks)
         return FALSE;
+    if (object->type == TP_OBJECT_TYPE_IO && object->u.io.pending_count)
+        return FALSE;
 
     if (group)
         return !object->num_running_callbacks;
@@ -2066,6 +2263,8 @@ static void tp_object_prepare_shutdown( struct threadpool_object *object )
         tp_timerqueue_unlock( object );
     else if (object->type == TP_OBJECT_TYPE_WAIT)
         tp_waitqueue_unlock( object );
+    else if (object->type == TP_OBJECT_TYPE_IO)
+        tp_ioqueue_unlock( object );
 }
 
 /***********************************************************************
@@ -2131,6 +2330,7 @@ static void CALLBACK threadpool_worker_proc( void *param )
 {
     TP_CALLBACK_INSTANCE *callback_instance;
     struct threadpool_instance instance;
+    struct io_completion completion;
     struct threadpool *pool = param;
     TP_WAIT_RESULT wait_result = 0;
     LARGE_INTEGER timeout;
@@ -2160,6 +2360,12 @@ static void CALLBACK threadpool_worker_proc( void *param )
                 wait_result = object->u.wait.signaled ? WAIT_OBJECT_0 : WAIT_TIMEOUT;
                 if (wait_result == WAIT_OBJECT_0) object->u.wait.signaled--;
             }
+            else if (object->type == TP_OBJECT_TYPE_IO)
+            {
+                assert( object->u.io.completion_count );
+                completion = object->u.io.completions[--object->u.io.completion_count];
+                object->u.io.pending_count--;
+            }
 
             /* Leave critical section and do the actual callback. */
             object->num_associated_callbacks++;
@@ -2218,6 +2424,17 @@ static void CALLBACK threadpool_worker_proc( void *param )
                     break;
                 }
 
+                case TP_OBJECT_TYPE_IO:
+                {
+                    TRACE( "executing I/O callback %p(%p, %p, %#lx, %p, %p)\n",
+                            object->u.io.callback, callback_instance, object->userdata,
+                            completion.cvalue, &completion.iosb, (TP_IO *)object );
+                    object->u.io.callback( callback_instance, object->userdata,
+                            (void *)completion.cvalue, &completion.iosb, (TP_IO *)object );
+                    TRACE( "callback %p returned\n", object->u.io.callback );
+                    break;
+                }
+
                 default:
                     assert(0);
                     break;
@@ -2317,6 +2534,50 @@ NTSTATUS WINAPI TpAllocCleanupGroup( TP_CLEANUP_GROUP **out )
     return tp_group_alloc( (struct threadpool_group **)out );
 }
 
+/***********************************************************************
+ *           TpAllocIoCompletion    (NTDLL.@)
+ */
+NTSTATUS WINAPI TpAllocIoCompletion( TP_IO **out, HANDLE file, PTP_IO_CALLBACK callback,
+                                     void *userdata, TP_CALLBACK_ENVIRON *environment )
+{
+    struct threadpool_object *object;
+    struct threadpool *pool;
+    NTSTATUS status;
+
+    TRACE( "%p %p %p %p %p\n", out, file, callback, userdata, environment );
+
+    if (!(object = RtlAllocateHeap( GetProcessHeap(), HEAP_ZERO_MEMORY, sizeof(*object) )))
+        return STATUS_NO_MEMORY;
+
+    if ((status = tp_threadpool_lock( &pool, environment )))
+    {
+        RtlFreeHeap( GetProcessHeap(), 0, object );
+        return status;
+    }
+
+    object->type = TP_OBJECT_TYPE_IO;
+    object->u.io.callback = callback;
+    if (!(object->u.io.completions = RtlAllocateHeap( GetProcessHeap(), 0, 8 * sizeof(*object->u.io.completions) )))
+    {
+        tp_threadpool_unlock( pool );
+        RtlFreeHeap( GetProcessHeap(), 0, object );
+        return status;
+    }
+
+    if ((status = tp_ioqueue_lock( object, file )))
+    {
+        tp_threadpool_unlock( pool );
+        RtlFreeHeap( GetProcessHeap(), 0, object->u.io.completions );
+        RtlFreeHeap( GetProcessHeap(), 0, object );
+        return status;
+    }
+
+    tp_object_initialize( object, pool, userdata, environment );
+
+    *out = (TP_IO *)object;
+    return STATUS_SUCCESS;
+}
+
 /***********************************************************************
  *           TpAllocPool    (NTDLL.@)
  */
@@ -2441,6 +2702,26 @@ NTSTATUS WINAPI TpAllocWork( TP_WORK **out, PTP_WORK_CALLBACK callback, PVOID us
     return STATUS_SUCCESS;
 }
 
+/***********************************************************************
+ *           TpCancelAsyncIoOperation    (NTDLL.@)
+ */
+void WINAPI TpCancelAsyncIoOperation( TP_IO *io )
+{
+    struct threadpool_object *this = impl_from_TP_IO( io );
+
+    TRACE( "%p\n", io );
+
+    RtlEnterCriticalSection( &this->pool->cs );
+
+    this->u.io.pending_count--;
+    if (object_is_finished( this, TRUE ))
+        RtlWakeAllConditionVariable( &this->group_finished_event );
+    if (object_is_finished( this, FALSE ))
+        RtlWakeAllConditionVariable( &this->finished_event );
+
+    RtlLeaveCriticalSection( &this->pool->cs );
+}
+
 /***********************************************************************
  *           TpCallbackLeaveCriticalSectionOnCompletion    (NTDLL.@)
  */
@@ -2692,6 +2973,20 @@ VOID WINAPI TpReleaseCleanupGroupMembers( TP_CLEANUP_GROUP *group, BOOL cancel_p
     }
 }
 
+/***********************************************************************
+ *           TpReleaseIoCompletion    (NTDLL.@)
+ */
+void WINAPI TpReleaseIoCompletion( TP_IO *io )
+{
+    struct threadpool_object *this = impl_from_TP_IO( io );
+
+    TRACE( "%p\n", io );
+
+    tp_object_prepare_shutdown( this );
+    this->shutdown = TRUE;
+    tp_object_release( this );
+}
+
 /***********************************************************************
  *           TpReleasePool    (NTDLL.@)
  */
@@ -2960,6 +3255,36 @@ NTSTATUS WINAPI TpSimpleTryPost( PTP_SIMPLE_CALLBACK callback, PVOID userdata,
     return STATUS_SUCCESS;
 }
 
+/***********************************************************************
+ *           TpStartAsyncIoOperation    (NTDLL.@)
+ */
+void WINAPI TpStartAsyncIoOperation( TP_IO *io )
+{
+    struct threadpool_object *this = impl_from_TP_IO( io );
+
+    TRACE( "%p\n", io );
+
+    RtlEnterCriticalSection( &this->pool->cs );
+
+    this->u.io.pending_count++;
+
+    RtlLeaveCriticalSection( &this->pool->cs );
+}
+
+/***********************************************************************
+ *           TpWaitForIoCompletion    (NTDLL.@)
+ */
+void WINAPI TpWaitForIoCompletion( TP_IO *io, BOOL cancel_pending )
+{
+    struct threadpool_object *this = impl_from_TP_IO( io );
+
+    TRACE( "%p %d\n", io, cancel_pending );
+
+    if (cancel_pending)
+        tp_object_cancel( this );
+    tp_object_wait( this, FALSE );
+}
+
 /***********************************************************************
  *           TpWaitForTimer    (NTDLL.@)
  */
@@ -3039,11 +3364,3 @@ NTSTATUS WINAPI TpQueryPoolStackInformation( TP_POOL *pool, TP_POOL_STACK_INFORM
 
     return STATUS_SUCCESS;
 }
-
-/***********************************************************************
- *           TpStartAsyncIoOperation    (NTDLL.@)
- */
-void WINAPI TpStartAsyncIoOperation( TP_IO *io )
-{
-    FIXME( "%p\n", io );
-}
diff --git a/include/winternl.h b/include/winternl.h
index e4c611e4af..ac045c1709 100644
--- a/include/winternl.h
+++ b/include/winternl.h
@@ -2351,6 +2351,8 @@ typedef struct _SYSTEM_MODULE_INFORMATION
 
 typedef LONG (CALLBACK *PRTL_EXCEPTION_FILTER)(PEXCEPTION_POINTERS);
 
+typedef void (CALLBACK *PTP_IO_CALLBACK)(PTP_CALLBACK_INSTANCE,void*,void*,IO_STATUS_BLOCK*,PTP_IO);
+
 /***********************************************************************
  * Function declarations
  */
@@ -3013,6 +3015,7 @@ NTSYSAPI NTSTATUS  WINAPI RtlLargeIntegerToChar(const ULONGLONG *,ULONG,ULONG,PC
 /* Threadpool functions */
 
 NTSYSAPI NTSTATUS  WINAPI TpAllocCleanupGroup(TP_CLEANUP_GROUP **);
+NTSYSAPI NTSTATUS  WINAPI TpAllocIoCompletion(TP_IO **,HANDLE,PTP_IO_CALLBACK,void *,TP_CALLBACK_ENVIRON *);
 NTSYSAPI NTSTATUS  WINAPI TpAllocPool(TP_POOL **,PVOID);
 NTSYSAPI NTSTATUS  WINAPI TpAllocTimer(TP_TIMER **,PTP_TIMER_CALLBACK,PVOID,TP_CALLBACK_ENVIRON *);
 NTSYSAPI NTSTATUS  WINAPI TpAllocWait(TP_WAIT **,PTP_WAIT_CALLBACK,PVOID,TP_CALLBACK_ENVIRON *);
@@ -3023,23 +3026,26 @@ NTSYSAPI void      WINAPI TpCallbackReleaseMutexOnCompletion(TP_CALLBACK_INSTANC
 NTSYSAPI void      WINAPI TpCallbackReleaseSemaphoreOnCompletion(TP_CALLBACK_INSTANCE *,HANDLE,DWORD);
 NTSYSAPI void      WINAPI TpCallbackSetEventOnCompletion(TP_CALLBACK_INSTANCE *,HANDLE);
 NTSYSAPI void      WINAPI TpCallbackUnloadDllOnCompletion(TP_CALLBACK_INSTANCE *,HMODULE);
+NTSYSAPI void      WINAPI TpCancelAsyncIoOperation(TP_IO *);
 NTSYSAPI void      WINAPI TpDisassociateCallback(TP_CALLBACK_INSTANCE *);
 NTSYSAPI BOOL      WINAPI TpIsTimerSet(TP_TIMER *);
 NTSYSAPI void      WINAPI TpPostWork(TP_WORK *);
 NTSYSAPI NTSTATUS  WINAPI TpQueryPoolStackInformation(TP_POOL *, TP_POOL_STACK_INFORMATION *stack_info);
 NTSYSAPI void      WINAPI TpReleaseCleanupGroup(TP_CLEANUP_GROUP *);
 NTSYSAPI void      WINAPI TpReleaseCleanupGroupMembers(TP_CLEANUP_GROUP *,BOOL,PVOID);
+NTSYSAPI void      WINAPI TpReleaseIoCompletion(TP_IO *);
 NTSYSAPI void      WINAPI TpReleasePool(TP_POOL *);
 NTSYSAPI void      WINAPI TpReleaseTimer(TP_TIMER *);
 NTSYSAPI void      WINAPI TpReleaseWait(TP_WAIT *);
 NTSYSAPI void      WINAPI TpReleaseWork(TP_WORK *);
-NTSYSAPI void      WINAPI TpStartAsyncIoOperation(TP_IO *);
 NTSYSAPI void      WINAPI TpSetPoolMaxThreads(TP_POOL *,DWORD);
 NTSYSAPI BOOL      WINAPI TpSetPoolMinThreads(TP_POOL *,DWORD);
 NTSYSAPI NTSTATUS  WINAPI TpSetPoolStackInformation(TP_POOL *, TP_POOL_STACK_INFORMATION *stack_info);
 NTSYSAPI void      WINAPI TpSetTimer(TP_TIMER *, LARGE_INTEGER *,LONG,LONG);
 NTSYSAPI void      WINAPI TpSetWait(TP_WAIT *,HANDLE,LARGE_INTEGER *);
 NTSYSAPI NTSTATUS  WINAPI TpSimpleTryPost(PTP_SIMPLE_CALLBACK,PVOID,TP_CALLBACK_ENVIRON *);
+NTSYSAPI void      WINAPI TpStartAsyncIoOperation(TP_IO *);
+NTSYSAPI void      WINAPI TpWaitForIoCompletion(TP_IO *,BOOL);
 NTSYSAPI void      WINAPI TpWaitForTimer(TP_TIMER *,BOOL);
 NTSYSAPI void      WINAPI TpWaitForWait(TP_WAIT *,BOOL);
 NTSYSAPI void      WINAPI TpWaitForWork(TP_WORK *,BOOL);
-- 
2.26.0




More information about the wine-devel mailing list