wild pointers in current named pipe implementation?

Gregory M. Turner gmturner007 at ameritech.net
Tue Apr 15 13:48:03 CDT 2003


On Tuesday 15 April 2003 12:56 pm, Bill Medland wrote:
> On April 15, 2003 10:12 am, Dan Kegel wrote:
> > Mike isn't doing flushing code for this version of the named
> > pipe implementation, as it's not clear it's possible without a rewrite.
> > He's replacing the whole thing.
> > - Dan
>
> Oh.
>
> Well in that case I'll just forget it and leave the patch in place on my
> machine and hope that the whole thing is fixed before our company really
> needs it.

No cause for despair!  IIRC the patch was just rejected because it used
wineserver calls for the actual pipe i/o, which was considered too
inefficient, even for wine, by Alexandre ;)  Alexandre suggested using
connectionless sockets (I'm not sure if he means unix domain or inet sockets?).
Far be it from me to have an opinion on things I do not understand, but if I
/were/ to do so, I'd say that I don't see why connectionful unix domain sockets
ought to be insufficient ;)

Anyhow, I vaguely recall that Mike replied that it would take some time, but
that perhaps Alexandre's idea was workable... am I getting this right?  In case
it will help illuminate some idea-emotocon's above brighter heads than mine,
here is my shot at converting Mike's patch (which was posted in an almost
unmentionable format) into a unified diff against CVS wine, courtesy of
winetreediff:
--
diff -ur --minimal --exclude-from=/home/greg/bin/winetreediff_excl ../wine/files/file.c ./files/file.c
--- ../wine/files/file.c	2003-03-17 23:04:34.000000000 -0600
+++ ./files/file.c	2003-04-09 19:49:12.000000000 -0500
@@ -318,6 +318,27 @@
 
 
 /***********************************************************************
+ *           FILE_GetTypeFlags
+ */
+BOOL WINAPI FILE_GetTypeFlags( HANDLE hFile, enum fd_type *type, int* flags )
+{
+    NTSTATUS ret;
+    SERVER_START_REQ( get_file_info )
+    {
+        req->handle = hFile;
+        ret = wine_server_call_err( req );
+        if( !ret )
+        {
+            if(type) *type = reply->fd_type;
+            if(flags) *flags = reply->fd_flags;
+        }
+    }
+    SERVER_END_REQ;
+    return !ret;
+}
+
+
+/***********************************************************************
  *           FILE_GetUnixHandleType
  *
  * Retrieve the Unix handle corresponding to a file handle.
@@ -1645,10 +1666,52 @@
 }
 
 /***********************************************************************
+ *             FILE_NamedPipeReadService      (INTERNAL)
+ */
+static void FILE_NamedPipeReadService(async_private *ovp)
+{
+    async_fileio *fileio = (async_fileio*) ovp;
+    LPOVERLAPPED lpOverlapped = fileio->lpOverlapped;
+    int result, r;
+    int already = lpOverlapped->InternalHigh;
+    LPVOID p = &fileio->buffer[already];
+
+    TRACE("%p %p\n", lpOverlapped, fileio->buffer );
+
+    /* read pipe data from server */
+    SERVER_START_REQ( read_named_pipe )
+    {
+        req->handle = ovp->handle;
+        req->count = fileio->count - already;
+        req->id = (ovp->fd == -1) ? 0 : ovp->fd;
+        wine_server_set_reply( req, p, fileio->count - already);
+        r = wine_server_call_err( req );
+        if( !r )
+        {
+            result = wine_server_reply_size( reply );
+            ovp->fd = reply->id;
+        }
+    }
+    SERVER_END_REQ;
+
+    if( r )
+        goto end;
+
+    lpOverlapped->InternalHigh += result;
+    TRACE("read %d more bytes %ld/%d so far\n",result,
+        lpOverlapped->InternalHigh,fileio->count);
+
+    if(lpOverlapped->InternalHigh >= fileio->count )
+        r = STATUS_SUCCESS;
+    else
+        r = STATUS_PENDING;
+
+end:
+    lpOverlapped->Internal = r;
+}
+
+/***********************************************************************
  *             FILE_AsyncReadService      (INTERNAL)
- *
- *  This function is called while the client is waiting on the
- *  server, so we can't make any server calls here.
  */
 static void FILE_AsyncReadService(async_private *ovp)
 {
@@ -1661,6 +1724,8 @@
 
     /* check to see if the data is ready (non-blocking) */
 
+    if ( ovp->fd < 0 )
+        ovp->fd = FILE_GetUnixHandle ( fileio->async.handle, GENERIC_READ );
     if ( fileio->fd_type == FD_TYPE_SOCKET )
         result = read (ovp->fd, &fileio->buffer[already], fileio->count - already);
     else
@@ -1708,12 +1773,9 @@
 static BOOL FILE_ReadFileEx(HANDLE hFile, LPVOID buffer, DWORD bytesToRead,
 			 LPOVERLAPPED overlapped,
 			 LPOVERLAPPED_COMPLETION_ROUTINE lpCompletionRoutine,
-                         HANDLE hEvent)
+                         HANDLE hEvent, enum fd_type type)
 {
     async_fileio *ovp;
-    int fd;
-    int flags;
-    enum fd_type type;
 
     TRACE("file %p to buf %p num %ld %p func %p\n",
 	  hFile, buffer, bytesToRead, overlapped, lpCompletionRoutine);
@@ -1725,26 +1787,22 @@
         return FALSE;
     }
 
-    fd = FILE_GetUnixHandleType ( hFile, GENERIC_READ, &type, &flags);
-    if ( fd < 0 )
-    {
-        WARN ( "Couldn't get FD\n" );
-        return FALSE;
-    }
-
     ovp = (async_fileio*) HeapAlloc(GetProcessHeap(), 0, sizeof (async_fileio));
     if(!ovp)
     {
         TRACE("HeapAlloc Failed\n");
         SetLastError(ERROR_NOT_ENOUGH_MEMORY);
-        goto error;
+        return FALSE;
     }
 
     ovp->async.ops = ( lpCompletionRoutine ? &fileio_async_ops : &fileio_nocomp_async_ops );
     ovp->async.handle = hFile;
-    ovp->async.fd = fd;
+    ovp->async.fd = -1;     /* get it later */
     ovp->async.type = ASYNC_TYPE_READ;
-    ovp->async.func = FILE_AsyncReadService;
+    if( type == FD_TYPE_NAMED_PIPE )
+        ovp->async.func = FILE_NamedPipeReadService;
+    else
+        ovp->async.func = FILE_AsyncReadService;
     ovp->async.event = hEvent;
     ovp->lpOverlapped = overlapped;
     ovp->count = bytesToRead;
@@ -1753,11 +1811,6 @@
     ovp->fd_type = type;
 
     return !register_new_async (&ovp->async);
-
-error:
-    close (fd);
-    return FALSE;
-
 }
 
 /***********************************************************************
@@ -1767,21 +1820,28 @@
 			 LPOVERLAPPED overlapped,
 			 LPOVERLAPPED_COMPLETION_ROUTINE lpCompletionRoutine)
 {
+    enum fd_type type;
+
+    if( !FILE_GetTypeFlags( hFile, &type, NULL ))
+        return FALSE;
     overlapped->InternalHigh = 0;
-    return FILE_ReadFileEx(hFile,buffer,bytesToRead,overlapped,lpCompletionRoutine, INVALID_HANDLE_VALUE);
+    return FILE_ReadFileEx(hFile,buffer,bytesToRead,overlapped,lpCompletionRoutine, INVALID_HANDLE_VALUE, type);
 }
 
 static BOOL FILE_TimeoutRead(HANDLE hFile, LPVOID buffer, DWORD bytesToRead, LPDWORD bytesRead)
 {
     OVERLAPPED ov;
     BOOL r = FALSE;
+    enum fd_type type;
 
     TRACE("%p %p %ld %p\n", hFile, buffer, bytesToRead, bytesRead );
 
+    if( !FILE_GetTypeFlags( hFile, &type, NULL ))
+        return FALSE;
     ZeroMemory(&ov, sizeof (OVERLAPPED));
     if(STATUS_SUCCESS==NtCreateEvent(&ov.hEvent, SYNCHRONIZE, NULL, 0, 0))
     {
-        if(FILE_ReadFileEx(hFile, buffer, bytesToRead, &ov, NULL, ov.hEvent))
+        if(FILE_ReadFileEx(hFile, buffer, bytesToRead, &ov, NULL, ov.hEvent, type))
         {
             r = GetOverlappedResult(hFile, &ov, bytesRead, TRUE);
         }
@@ -1796,7 +1856,7 @@
 BOOL WINAPI ReadFile( HANDLE hFile, LPVOID buffer, DWORD bytesToRead,
                         LPDWORD bytesRead, LPOVERLAPPED overlapped )
 {
-    int unix_handle, result, flags;
+    int unix_handle, result, flags = 0;
     enum fd_type type;
 
     TRACE("%p %p %ld %p %p\n", hFile, buffer, bytesToRead,
@@ -1805,23 +1865,20 @@
     if (bytesRead) *bytesRead = 0;  /* Do this before anything else */
     if (!bytesToRead) return TRUE;
 
-    unix_handle = FILE_GetUnixHandleType( hFile, GENERIC_READ, &type, &flags );
+    if( !FILE_GetTypeFlags( hFile, &type, &flags ))
+        return FALSE;
 
     if (flags & FD_FLAG_OVERLAPPED)
     {
-	if (unix_handle == -1) return FALSE;
         if ( (overlapped==NULL) || NtResetEvent( overlapped->hEvent, NULL ) )
         {
             TRACE("Overlapped not specified or invalid event flag\n");
-	    close(unix_handle);
             SetLastError(ERROR_INVALID_PARAMETER);
             return FALSE;
         }
 
-        close(unix_handle);
         overlapped->InternalHigh = 0;
-
-        if(!FILE_ReadFileEx(hFile, buffer, bytesToRead, overlapped, NULL, overlapped->hEvent))
+        if(!FILE_ReadFileEx(hFile, buffer, bytesToRead, overlapped, NULL, overlapped->hEvent, type))
             return FALSE;
 
         if ( !GetOverlappedResult (hFile, overlapped, bytesRead, FALSE) )
@@ -1834,10 +1891,8 @@
         return TRUE;
     }
     if (flags & FD_FLAG_TIMEOUT)
-    {
-        close(unix_handle);
         return FILE_TimeoutRead(hFile, buffer, bytesToRead, bytesRead);
-    }
+
     switch(type)
     {
     case FD_TYPE_SMB:
@@ -1848,7 +1903,6 @@
 
     case FD_TYPE_DEFAULT:
         /* normal unix files */
-        if (unix_handle == -1) return FALSE;
         if (overlapped)
         {
             DWORD highOffset = overlapped->OffsetHigh;
@@ -1856,17 +1910,19 @@
                                                              &highOffset, FILE_BEGIN)) &&
                  (GetLastError() != NO_ERROR) )
             {
-              close(unix_handle);
               return FALSE;
             }
         }
         break;
 
     default:
-	if (unix_handle == -1)
-	    return FALSE;
+        break;
     }
 
+    unix_handle = FILE_GetUnixHandle ( hFile, GENERIC_READ );
+    if (unix_handle == -1)
+        return FALSE;
+
     if(overlapped)
     {
 	off_t offset = OVERLAPPED_OFFSET(overlapped);
@@ -1894,6 +1950,54 @@
 
 
 /***********************************************************************
+ *             FILE_NamedPipeWrite      (INTERNAL)
+ *
+ * hack: use fd member to store write ID.
+ */
+static void FILE_NamedPipeWrite(async_private *ovp)
+{
+    async_fileio *fileio = (async_fileio *) ovp;
+    LPOVERLAPPED lpOverlapped = fileio->lpOverlapped;
+    int count, already = lpOverlapped->InternalHigh;
+    LPVOID p = &fileio->buffer[already];
+    NTSTATUS r;
+
+    /* write some data (non-blocking) */
+    count = fileio->count - already;
+
+    TRACE("ovp %p writing %d bytes of %p\n",lpOverlapped,count,fileio->buffer );
+
+    SERVER_START_REQ( write_named_pipe )
+    {
+        req->handle = ovp->handle;
+        req->count = count;
+        req->id = (ovp->fd == -1) ? 0 : ovp->fd;
+        wine_server_add_data( req, p, count);
+        r = wine_server_call_err( req );
+        if( !r )
+            ovp->fd = reply->id;
+    }
+    SERVER_END_REQ;
+
+    TRACE("r = %08lx\n",r);
+
+    if( r )
+        goto end;
+
+    lpOverlapped->InternalHigh += count;
+
+    TRACE("wrote %d more bytes %ld/%d so far\n",count,lpOverlapped->InternalHigh,fileio->count);
+
+    if(lpOverlapped->InternalHigh < fileio->count)
+        r = STATUS_PENDING;
+    else
+        r = STATUS_SUCCESS;
+
+end:
+    lpOverlapped->Internal = r;
+}
+
+/***********************************************************************
  *             FILE_AsyncWriteService      (INTERNAL)
  *
  *  This function is called while the client is waiting on the
@@ -1910,6 +2014,8 @@
 
     /* write some data (non-blocking) */
 
+    if ( ovp->fd < 0 )
+        ovp->fd = FILE_GetUnixHandle ( fileio->async.handle, GENERIC_WRITE );
     if ( fileio->fd_type == FD_TYPE_SOCKET )
         result = write(ovp->fd, &fileio->buffer[already], fileio->count - already);
     else
@@ -1952,12 +2058,9 @@
 static BOOL FILE_WriteFileEx(HANDLE hFile, LPCVOID buffer, DWORD bytesToWrite,
                              LPOVERLAPPED overlapped,
                              LPOVERLAPPED_COMPLETION_ROUTINE lpCompletionRoutine,
-                             HANDLE hEvent)
+                             HANDLE hEvent, enum fd_type type)
 {
     async_fileio *ovp;
-    int fd;
-    int flags;
-    enum fd_type type;
 
     TRACE("file %p to buf %p num %ld %p func %p handle %p\n",
 	  hFile, buffer, bytesToWrite, overlapped, lpCompletionRoutine, hEvent);
@@ -1968,26 +2071,22 @@
         return FALSE;
     }
 
-    fd = FILE_GetUnixHandleType ( hFile, GENERIC_WRITE, &type, &flags );
-    if ( fd < 0 )
-    {
-        TRACE( "Couldn't get FD\n" );
-        return FALSE;
-    }
-
     ovp = (async_fileio*) HeapAlloc(GetProcessHeap(), 0, sizeof (async_fileio));
     if(!ovp)
     {
         TRACE("HeapAlloc Failed\n");
         SetLastError(ERROR_NOT_ENOUGH_MEMORY);
-        goto error;
+        return FALSE;
     }
 
     ovp->async.ops = ( lpCompletionRoutine ? &fileio_async_ops : &fileio_nocomp_async_ops );
     ovp->async.handle = hFile;
-    ovp->async.fd = fd;
+    ovp->async.fd = -1;  /* get it later */
     ovp->async.type = ASYNC_TYPE_WRITE;
-    ovp->async.func = FILE_AsyncWriteService;
+    if( type == FD_TYPE_NAMED_PIPE )
+        ovp->async.func = FILE_NamedPipeWrite;
+    else
+        ovp->async.func = FILE_AsyncWriteService;
     ovp->lpOverlapped = overlapped;
     ovp->async.event = hEvent;
     ovp->buffer = (LPVOID) buffer;
@@ -1996,10 +2095,6 @@
     ovp->fd_type = type;
 
     return !register_new_async (&ovp->async);
-
-error:
-    close (fd);
-    return FALSE;
 }
 
 /***********************************************************************
@@ -2009,9 +2104,35 @@
 			 LPOVERLAPPED overlapped,
 			 LPOVERLAPPED_COMPLETION_ROUTINE lpCompletionRoutine)
 {
+    enum fd_type type;
+
+    if( !FILE_GetTypeFlags( hFile, &type, NULL ))
+         return FALSE;
     overlapped->InternalHigh = 0;
 
-    return FILE_WriteFileEx(hFile, buffer, bytesToWrite, overlapped, lpCompletionRoutine, INVALID_HANDLE_VALUE);
+    return FILE_WriteFileEx(hFile, buffer, bytesToWrite, overlapped, lpCompletionRoutine, INVALID_HANDLE_VALUE, type);
+}
+
+static BOOL FILE_TimeoutWrite(HANDLE hFile, LPCVOID buffer, DWORD bytesToWrite, LPDWORD bytesWritten)
+{
+    OVERLAPPED ov;
+    BOOL r = FALSE;
+    enum fd_type type;
+
+    TRACE("%p %p %ld %p\n", hFile, buffer, bytesToWrite, bytesWritten );
+
+    if( !FILE_GetTypeFlags( hFile, &type, NULL ))
+        return FALSE;
+    ZeroMemory(&ov, sizeof (OVERLAPPED));
+    if(STATUS_SUCCESS==NtCreateEvent(&ov.hEvent, SYNCHRONIZE, NULL, 0, 0))
+    {
+        if(FILE_WriteFileEx(hFile, buffer, bytesToWrite, &ov, NULL, ov.hEvent, type))
+        {
+            r = GetOverlappedResult(hFile, &ov, bytesWritten, TRUE);
+        }
+    }
+    CloseHandle(ov.hEvent);
+    return r;
 }
 
 /***********************************************************************
@@ -2029,23 +2150,21 @@
     if (bytesWritten) *bytesWritten = 0;  /* Do this before anything else */
     if (!bytesToWrite) return TRUE;
 
-    unix_handle = FILE_GetUnixHandleType( hFile, GENERIC_WRITE, &type, &flags );
+    if( !FILE_GetTypeFlags( hFile, &type, &flags ))
+         return FALSE;
 
     if (flags & FD_FLAG_OVERLAPPED)
     {
-	if (unix_handle == -1) return FALSE;
         if ( (overlapped==NULL) || NtResetEvent( overlapped->hEvent, NULL ) )
         {
             TRACE("Overlapped not specified or invalid event flag\n");
-	    close(unix_handle);
             SetLastError(ERROR_INVALID_PARAMETER);
             return FALSE;
         }
 
-        close(unix_handle);
         overlapped->InternalHigh = 0;
 
-        if(!FILE_WriteFileEx(hFile, buffer, bytesToWrite, overlapped, NULL, overlapped->hEvent))
+        if(!FILE_WriteFileEx(hFile, buffer, bytesToWrite, overlapped, NULL, overlapped->hEvent, type))
             return FALSE;
 
         if ( !GetOverlappedResult (hFile, overlapped, bytesWritten, FALSE) )
@@ -2058,6 +2177,9 @@
         return TRUE;
     }
 
+    if (flags & FD_FLAG_TIMEOUT)
+        return FILE_TimeoutWrite(hFile, buffer, bytesToWrite, bytesWritten);
+
     switch(type)
     {
     case FD_TYPE_CONSOLE:
@@ -2066,33 +2188,29 @@
 	return FILE_WriteConsole(hFile, buffer, bytesToWrite, bytesWritten, NULL);
 
     case FD_TYPE_DEFAULT:
-        if (unix_handle == -1) return FALSE;
-
         if(overlapped)
         {
             DWORD highOffset = overlapped->OffsetHigh;
             if ( (INVALID_SET_FILE_POINTER == SetFilePointer(hFile, overlapped->Offset,
                                                              &highOffset, FILE_BEGIN)) &&
                  (GetLastError() != NO_ERROR) )
-            {
-              close(unix_handle);
               return FALSE;
-            }
         }
         break;
 
     default:
-        if (unix_handle == -1)
-            return FALSE;
         if (overlapped)
         {
-            close(unix_handle);
             SetLastError(ERROR_INVALID_PARAMETER);
             return FALSE;
         }
         break;
     }
 
+    unix_handle = FILE_GetUnixHandle ( hFile, GENERIC_WRITE );
+    if (unix_handle == -1)
+        return FALSE;
+
     if(overlapped)
     {
 	off_t offset = OVERLAPPED_OFFSET(overlapped);
diff -ur --minimal --exclude-from=/home/greg/bin/winetreediff_excl ../wine/include/wine/server_protocol.h ./include/wine/server_protocol.h
--- ../wine/include/wine/server_protocol.h	2003-04-05 22:13:00.000000000 -0600
+++ ./include/wine/server_protocol.h	2003-04-09 18:57:15.000000000 -0500
@@ -817,12 +817,14 @@
     FD_TYPE_DEFAULT,
     FD_TYPE_CONSOLE,
     FD_TYPE_SOCKET,
-    FD_TYPE_SMB
+    FD_TYPE_SMB,
+    FD_TYPE_NAMED_PIPE
 };
 #define FD_FLAG_OVERLAPPED         0x01
 #define FD_FLAG_TIMEOUT            0x02
 #define FD_FLAG_RECV_SHUTDOWN      0x04
 #define FD_FLAG_SEND_SHUTDOWN      0x08
+#define FD_FLAG_NO_FD              0x10
 
 
 struct set_file_pointer_request
@@ -898,6 +900,8 @@
     int          index_high;
     int          index_low;
     unsigned int serial;
+    int          fd_type;
+    int          fd_flags;
 };
 
 
@@ -2441,6 +2445,33 @@
 
 
 
+struct read_named_pipe_request
+{
+    struct request_header __header;
+    obj_handle_t  handle;
+    unsigned int  id;
+    int           count;
+};
+struct read_named_pipe_reply
+{
+    struct reply_header __header;
+    unsigned int  id;
+};
+
+
+struct write_named_pipe_request
+{
+    struct request_header __header;
+    obj_handle_t  handle;
+    unsigned int  id;
+    int           count;
+};
+struct write_named_pipe_reply
+{
+    struct reply_header __header;
+    unsigned int  id;
+};
+
 struct wait_named_pipe_request
 {
     struct request_header __header;
@@ -3178,6 +3209,8 @@
     REQ_create_named_pipe,
     REQ_open_named_pipe,
     REQ_connect_named_pipe,
+    REQ_read_named_pipe,
+    REQ_write_named_pipe,
     REQ_wait_named_pipe,
     REQ_disconnect_named_pipe,
     REQ_get_named_pipe_info,
@@ -3360,6 +3393,8 @@
     struct create_named_pipe_request create_named_pipe_request;
     struct open_named_pipe_request open_named_pipe_request;
     struct connect_named_pipe_request connect_named_pipe_request;
+    struct read_named_pipe_request read_named_pipe_request;
+    struct write_named_pipe_request write_named_pipe_request;
     struct wait_named_pipe_request wait_named_pipe_request;
     struct disconnect_named_pipe_request disconnect_named_pipe_request;
     struct get_named_pipe_info_request get_named_pipe_info_request;
@@ -3540,6 +3575,8 @@
     struct create_named_pipe_reply create_named_pipe_reply;
     struct open_named_pipe_reply open_named_pipe_reply;
     struct connect_named_pipe_reply connect_named_pipe_reply;
+    struct read_named_pipe_reply read_named_pipe_reply;
+    struct write_named_pipe_reply write_named_pipe_reply;
     struct wait_named_pipe_reply wait_named_pipe_reply;
     struct disconnect_named_pipe_reply disconnect_named_pipe_reply;
     struct get_named_pipe_info_reply get_named_pipe_info_reply;
@@ -3581,6 +3618,6 @@
     struct get_next_hook_reply get_next_hook_reply;
 };
 
-#define SERVER_PROTOCOL_VERSION 105
+#define SERVER_PROTOCOL_VERSION 106
 
 #endif /* __WINE_WINE_SERVER_PROTOCOL_H */
diff -ur --minimal --exclude-from=/home/greg/bin/winetreediff_excl ../wine/server/fd.c ./server/fd.c
--- ../wine/server/fd.c	2003-03-25 19:32:18.000000000 -0600
+++ ./server/fd.c	2003-04-09 17:26:22.000000000 -0500
@@ -1044,8 +1044,7 @@
 
     if (fd)
     {
-        int flags;
-        fd->fd_ops->get_file_info( fd, reply, &flags );
+        reply->fd_type = fd->fd_ops->get_file_info( fd, reply, &reply->fd_flags );
         release_object( fd );
     }
 }
diff -ur --minimal --exclude-from=/home/greg/bin/winetreediff_excl ../wine/server/named_pipe.c ./server/named_pipe.c
--- ../wine/server/named_pipe.c	2003-04-05 22:13:01.000000000 -0600
+++ ./server/named_pipe.c	2003-04-10 18:13:06.000000000 -0500
@@ -2,7 +2,7 @@
  * Server-side pipe management
  *
  * Copyright (C) 1998 Alexandre Julliard
- * Copyright (C) 2001 Mike McCormack
+ * Copyright (C) 2001, 2003 Mike McCormack
  *
  * This library is free software; you can redistribute it and/or
  * modify it under the terms of the GNU Lesser General Public
@@ -18,6 +18,24 @@
  * License along with this library; if not, write to the Free Software
  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
  *
+ * This implemention now handles reading writing and buffering of pipe data
+ *  internally. This is necessary for two reasons:
+ *  - named pipes have a message mode which cannot be implemented on top
+ *    of unix pipes without race conditions or threads locking the server.
+ *  - named pipes can work across a network via the SMB protocol
+ *
+ * The implementation works by allocating a message of a fixed size, then
+ *  allowing a thread to write into it. When all writing for a message is
+ *  complete, the message is sent to its destination.  This means that:
+ *  - only complete messages are read from a message queue
+ *  - if one thread has problems during a write, other threads can continue
+ *    to write to a pipe
+ *  - writing a single message can be done in multiple server calls if
+ *    the message does not completely fit into the server buffer
+ *  - a queue for incomplete outgoing messages (user->in) is kept
+ *
+ * 23 Feb 2003 mjm
+ *
  * TODO:
  *   improve error handling
  */
@@ -27,7 +45,7 @@
 
 #include <assert.h>
 #include <fcntl.h>
-#include <string.h>
+//#include <string.h>
 #include <stdio.h>
 #include <stdlib.h>
 #include <sys/time.h>
@@ -39,16 +57,21 @@
 #include <unistd.h>
 
 #include "winbase.h"
+#include "ntstatus.h"
 
 #include "file.h"
 #include "handle.h"
 #include "thread.h"
 #include "request.h"
+#include "async.h"
+
+#define FILE_FLAG_FIRST_PIPE_INSTANCE 0x00080000
 
 enum pipe_state
 {
     ps_none,
     ps_idle_server,
+    ps_wait_only,
     ps_wait_open,
     ps_wait_connect,
     ps_connected_server,
@@ -56,30 +79,61 @@
     ps_disconnected
 };
 
+#define PIPE_MIN_BUFFER 0x100
+#define PIPE_MAX_BUFFER 0x10000
+
 struct named_pipe;
 
+/* each message is a size followed by data */
+struct pipe_msg
+{
+    unsigned int     length;
+    unsigned int     offset;
+    struct pipe_msg *next;
+    struct pipe_msg *prev;
+    unsigned char    data[1];
+};
+
+struct msg_queue
+{
+    int length;   /* total size of all messages in the queue */
+    int maxsize;    /* try not to add more data than this */
+    struct pipe_msg    *first;  /* first message the queue */
+    struct pipe_msg    *last;   /* last message in the queue */
+};
+
 struct pipe_user
 {
     struct object       obj;
     struct fd          *fd;
     enum pipe_state     state;
-    struct pipe_user   *other;
-    struct named_pipe  *pipe;
-    struct pipe_user   *next;
-    struct pipe_user   *prev;
-    struct thread      *thread;
+    struct pipe_user   *other;      /* the other end */
+    struct named_pipe  *pipe;       /* description of this pipe end */
+
+    struct async_queue  read_q;     /* queues of threads waiting for us */
+    struct async_queue  write_q;
+    /* struct async_queue  wait_q; */
+    
+    struct thread      *thread;     /* the thread that owns this pipe end */
     void               *func;
     void               *overlapped;
+    struct msg_queue in;      /* messages that are being written */
+    struct msg_queue out;     /* messages waitiing to be read */
+
+    struct pipe_user   *next;       /* doubly linked list of pipe users */
+    struct pipe_user   *prev;
 };
 
 struct named_pipe
 {
     struct object       obj;         /* object header */
+    unsigned int        openmode;
     unsigned int        pipemode;
     unsigned int        maxinstances;
     unsigned int        outsize;
     unsigned int        insize;
     unsigned int        timeout;
+    unsigned int        instances;
     struct pipe_user   *users;
 };
 
@@ -99,33 +153,47 @@
 };
 
 static void pipe_user_dump( struct object *obj, int verbose );
-static struct fd *pipe_user_get_fd( struct object *obj );
 static void pipe_user_destroy( struct object *obj);
-
-static int pipe_user_get_poll_events( struct fd *fd );
-static int pipe_user_get_info( struct fd *fd, struct get_file_info_reply *reply, int *flags );
+static struct fd *pipe_user_get_fd( struct object *obj );
 
 static const struct object_ops pipe_user_ops =
 {
-    sizeof(struct pipe_user),     /* size */
-    pipe_user_dump,               /* dump */
-    default_fd_add_queue,         /* add_queue */
-    default_fd_remove_queue,      /* remove_queue */
-    default_fd_signaled,          /* signaled */
-    no_satisfied,                 /* satisfied */
-    pipe_user_get_fd,             /* get_fd */
-    pipe_user_destroy             /* destroy */
+    sizeof(struct pipe_user),   /* size */
+    pipe_user_dump,             /* dump */
+    add_queue,                  /* add_queue */
+    remove_queue,               /* remove_queue */
+    NULL,                       /* signaled */
+    NULL,                       /* satisfied */
+    pipe_user_get_fd,           /* get_fd */
+    pipe_user_destroy           /* destroy */
 };
 
+static int pipe_user_flush( struct fd *fd );
+static int pipe_user_get_info( struct fd *fd, struct get_file_info_reply *reply, int *flags );
+static void pipe_user_queue_async( struct fd *fd, void *ptr, unsigned int status, int type, int count );
+
 static const struct fd_ops pipe_user_fd_ops =
 {
-    pipe_user_get_poll_events,    /* get_poll_events */
-    default_poll_event,           /* poll_event */
-    no_flush,                     /* flush */
-    pipe_user_get_info,           /* get_file_info */
-    no_queue_async                /* queue_async */
+    NULL,                  /* get_poll_events */
+    NULL,                  /* poll_event */
+    pipe_user_flush,       /* flush */
+    pipe_user_get_info,    /* get_file_info */
+    pipe_user_queue_async  /* queue_async */
 };
 
+static struct fd *pipe_user_get_fd( struct object *obj )
+{
+    struct pipe_user *pipe_user = (struct pipe_user *)obj;
+    assert( obj->ops == &pipe_user_ops );
+    return (struct fd *)grab_object( pipe_user->fd );
+}
+
+static int pipe_user_flush( struct fd *fd )
+{
+    /* need to sleep until write all the pending data is written */
+    return 0;
+}
+
 static void named_pipe_dump( struct object *obj, int verbose )
 {
     struct named_pipe *pipe = (struct named_pipe *)obj;
@@ -160,10 +228,220 @@
     user->overlapped=NULL;
 }
 
-static struct fd *pipe_user_get_fd( struct object *obj )
+/* the following functions implement a message FIFO */
+
+static void add_message( struct msg_queue *q, struct pipe_msg *msg)
 {
-    struct pipe_user *user = (struct pipe_user *)obj;
-    return (struct fd *)grab_object( user->fd );
+    /* add a message at the end of the queue */
+    q->length += msg->length;
+    msg->prev = q->last;
+    if( q->last )
+        q->last->next = msg;
+    else
+        q->first = msg;
+    q->last = msg;
+}
+
+/* remove a message from a queue and optionally free it */
+static void remove_message( struct msg_queue *q, struct pipe_msg *msg, int del)
+{
+    q->length -= msg->length;
+    if( msg->next )
+        msg->next->prev = msg->prev;
+    else
+        q->last = msg->prev;
+    if( msg->prev )
+        msg->prev->next = msg->next;
+    else
+        q->first = msg->next;
+    msg->next = msg->prev = NULL;
+
+    /* if there's no messages, the length should be zero */
+    assert( q->first || !q->length );
+
+    if( del )
+        free( msg );
+}
+
+static void init_message_queue( struct msg_queue *q, int max )
+{
+    q->first = q->last = NULL;
+    q->length = 0;
+    q->maxsize = max;
+}
+
+static void delete_message_queue( struct msg_queue *q )
+{
+    while( q->first )
+    {
+        struct pipe_msg * t = q->first;
+        remove_message( q, t, 1 );
+    }
+    assert( q->length == 0);
+    q->maxsize = 0;
+}
+
+static struct pipe_msg *create_new_message( struct msg_queue *q, int size )
+{
+    struct pipe_msg *msg;
+ 
+    msg = malloc( sizeof *msg + size );
+    msg->length = size;
+    msg->offset = 0;
+    add_message( q, msg );
+    return msg;
+}
+
+static struct pipe_msg *find_message( struct msg_queue *q, int id)
+{
+    struct pipe_msg *msg;
+
+    for( msg = q->first; msg; msg = msg->next )
+         if( id == (int) msg )
+              return msg;
+    set_error( STATUS_INVALID_PARAMETER );
+    return NULL;
+}
+
+static struct pipe_msg *find_new_message( struct msg_queue *q )
+{
+    struct pipe_msg *msg;
+
+    for( msg = q->first; msg; msg = msg->next )
+         if( msg->offset == 0 )
+              return msg;
+    return NULL;
+}
+
+/* This function collects data sent from one thread into a message. 
+   When the message is complete, the function returns non-zero. */
+static int pipe_build_msg( struct pipe_msg *msg )
+{
+    const void *data = get_req_data();
+    int count = get_req_data_size();
+
+    /* trying to write more data than we were told about -
+       this shouldn't happen... if it does somebody stuffed up, so
+       to make sure we don't get in trouble, throw away the extra data */
+    if( count > (msg->length - msg->offset) )
+    {
+        fprintf(stderr,"Throwing away extra in pipe write!\n");
+        count = (msg->length - msg->offset);
+    }
+ 
+    /* copy the data to the message */
+    memcpy( &msg->data[msg->offset], data, count );
+    msg->offset += count;
+
+    /* check if this message is complete */
+    if( msg->offset != msg->length )
+        return 0;
+
+    msg->offset = 0;
+    return 1;
+}
+
+static int pipe_get_message( struct pipe_msg *msg, int count )
+{
+    void *out;
+
+    if( count > (msg->length - msg->offset) )
+        count = (msg->length - msg->offset) ;
+
+    if( count > get_reply_max_size() )
+        count = get_reply_max_size();
+
+    out = set_reply_data_size( count );
+    memcpy( out, &msg->data[msg->offset], count);
+    msg->offset += count;
+
+    return (msg->offset == msg->length);
+}
+
+static int pipe_user_check_actions( struct pipe_user *user )
+{
+    int done = 0;
+
+    if(IS_READY(user->read_q))
+    {
+        if( find_new_message( &user->out ) )
+        {
+            async_notify(user->read_q.head,STATUS_ALERTED);
+            done = 1;
+        }
+        if( ( user->state != ps_connected_server ) && 
+            ( user->state != ps_connected_client) )
+        {
+            async_notify(user->read_q.head,STATUS_PIPE_CLOSING);
+            done = 1;
+        }
+    }
+
+    if(IS_READY(user->write_q) && ( user->in.length < user->in.maxsize ) )
+    {
+        async_notify(user->write_q.head,STATUS_ALERTED);
+        done = 1;
+    }
+
+    /*if(IS_READY(user->wait_q) && (POLLIN & event) )
+    {
+        async_notify(user->wait_q.head,STATUS_ALERTED);
+        done = 1;
+    } */
+
+    return done;
+}
+
+static void pipe_user_queue_async(struct fd *fd, void *ptr, unsigned int status, int type, int count)
+{
+    struct pipe_user *user = get_fd_user( fd );
+    struct async_queue *q;
+    struct async *async;
+    int timeout;
+
+    assert(user->obj.ops == &pipe_user_ops);
+
+    switch(type)
+    {
+    case ASYNC_TYPE_READ:
+        q = &user->read_q;
+        timeout = user->pipe->timeout;
+        break;
+    /* case ASYNC_TYPE_WAIT:
+        q = &user->wait_q;
+        timeout = 0;
+        break; */
+    case ASYNC_TYPE_WRITE:
+        q = &user->write_q;
+        timeout = user->pipe->timeout;
+        break;
+    default:
+        set_error(STATUS_INVALID_PARAMETER);
+        return;
+    }
+
+    async = find_async ( q, current, ptr );
+
+    if ( status == STATUS_PENDING )
+    {
+        if ( !async )
+            async = create_async ( &user->obj, current, ptr );
+        if ( !async )
+            return;
+
+        async->status = STATUS_PENDING;
+        if(!async->q)
+        {
+            async_add_timeout(async,timeout);
+            async_insert(q, async);
+        }
+
+        /* Check if the new pending request can be served immediately */
+        if( pipe_user_check_actions( user ) )
+             return;
+    }
+    else if ( async ) destroy_async ( async );
+    else set_error ( STATUS_INVALID_PARAMETER );
 }
 
 static void pipe_user_destroy( struct object *obj)
@@ -177,8 +455,6 @@
 
     if(user->other)
     {
-        release_object( user->other->fd );
-        user->other->fd = NULL;
         switch(user->other->state)
         {
         case ps_connected_server:
@@ -195,22 +471,25 @@
         user->other = NULL;
     }
 
+    delete_message_queue( &user->in );
+    delete_message_queue( &user->out );
+
+    destroy_async_queue(&user->read_q);
+    destroy_async_queue(&user->write_q);
+    /* destroy_async_queue(&user->wait_q); */
+    if (user->fd) release_object( user->fd );
+
     /* remove user from pipe's user list */
     if (user->next) user->next->prev = user->prev;
     if (user->prev) user->prev->next = user->next;
     else user->pipe->users = user->next;
     if (user->thread) release_object(user->thread);
     release_object(user->pipe);
-    if (user->fd) release_object( user->fd );
-}
-
-static int pipe_user_get_poll_events( struct fd *fd )
-{
-    return POLLIN | POLLOUT;  /* FIXME */
 }
 
 static int pipe_user_get_info( struct fd *fd, struct get_file_info_reply *reply, int *flags )
 {
+    /* struct pipe_user * user = get_fd_user( fd );*/
     if (reply)
     {
         reply->type        = FILE_TYPE_PIPE;
@@ -224,8 +503,8 @@
         reply->index_low   = 0;
         reply->serial      = 0;
     }
-    *flags = 0;
-    return FD_TYPE_DEFAULT;
+    *flags = FD_FLAG_TIMEOUT;
+    return FD_TYPE_NAMED_PIPE;
 }
 
 static struct named_pipe *create_named_pipe( const WCHAR *name, size_t len )
@@ -243,28 +522,13 @@
     return pipe;
 }
 
-static struct named_pipe *open_named_pipe( const WCHAR *name, size_t len )
-{
-    struct object *obj;
-
-    if ((obj = find_object( sync_namespace, name, len )))
-    {
-        if (obj->ops == &named_pipe_ops) return (struct named_pipe *)obj;
-        release_object( obj );
-        set_error( STATUS_OBJECT_TYPE_MISMATCH );
-    }
-    else set_error( STATUS_OBJECT_NAME_NOT_FOUND );
-
-    return NULL;
-}
-
 static struct pipe_user *get_pipe_user_obj( struct process *process, obj_handle_t handle,
                                             unsigned int access )
 {
     return (struct pipe_user *)get_handle_obj( process, handle, access, &pipe_user_ops );
 }
 
-static struct pipe_user *create_pipe_user( struct named_pipe *pipe )
+static struct pipe_user *create_pipe_user( struct named_pipe *pipe, int fd )
 {
     struct pipe_user *user;
 
@@ -272,7 +536,13 @@
     if(!user)
         return NULL;
 
-    user->fd = NULL;
+    user->fd = alloc_fd( &pipe_user_fd_ops, &user->obj );
+    if (!user->fd)
+    {
+        release_object( user );
+        return NULL;
+    }
+
     user->pipe = pipe;
     user->state = ps_none;
     user->other = NULL;
@@ -285,6 +555,13 @@
     user->prev = NULL;
     pipe->users = user;
 
+    init_message_queue( &user->in, pipe->insize );
+    init_message_queue( &user->out, pipe->outsize );
+
+    init_async_queue( &user->read_q );
+    init_async_queue( &user->write_q );
+    /* init_async_queue( &user->wait_q ); */
+
     grab_object(pipe);
 
     return user;
@@ -306,12 +583,48 @@
     return (struct pipe_user *)grab_object( x );
 }
 
+static void do_connect_pipeends( struct pipe_user *client, struct pipe_user *server )
+{
+    struct pipe_msg **pmsg;
+
+    server->state = ps_connected_server;
+    server->other = client;
+    client->state = ps_connected_client;
+    client->other = server;
+    notify_waiter( server,STATUS_SUCCESS );
+    
+    /* send any waiting messages to the server */
+    pmsg = &client->in.first;
+    while( *pmsg )
+    {
+        struct pipe_msg *msg = *pmsg;
+        if( msg->offset == 0 )
+        {
+            remove_message( &client->in, msg, 0 );
+            add_message( &server->out, msg );
+        }
+        else
+            pmsg = &msg->next;
+    }
+
+    /* the server may want to read, the client may want to write */
+    pipe_user_check_actions( server );
+    pipe_user_check_actions( client );
+}
+ 
 DECL_HANDLER(create_named_pipe)
 {
     struct named_pipe *pipe;
     struct pipe_user *user;
 
     reply->handle = 0;
+
+    if( req->maxinstances > PIPE_UNLIMITED_INSTANCES )
+    {
+        set_error( STATUS_INVALID_PARAMETER );
+        return;
+    }
+
     pipe = create_named_pipe( get_req_data(), get_req_data_size() );
     if(!pipe)
         return;
@@ -322,11 +635,26 @@
         pipe->outsize = req->outsize;
         pipe->maxinstances = req->maxinstances;
         pipe->timeout = req->timeout;
+        pipe->openmode = req->openmode;
         pipe->pipemode = req->pipemode;
+        pipe->instances = 0;
+    }
+    else if( ( req->openmode & FILE_FLAG_FIRST_PIPE_INSTANCE ) ||
+             ( req->openmode != pipe->openmode ) ||
+             ( req->timeout != pipe->timeout ) ||
+             ( ( req->pipemode & PIPE_READMODE_MESSAGE ) & 
+               !( req->openmode & PIPE_TYPE_MESSAGE ) ) ||
+             ( ( pipe->maxinstances != PIPE_UNLIMITED_INSTANCES) && 
+               ( pipe->instances >= pipe->maxinstances ) )
+           )
+    {
+        set_error( STATUS_ACCESS_DENIED );
+        goto end;
     }
 
-    user = create_pipe_user( pipe );
+    pipe->instances ++;
 
+    user = create_pipe_user (pipe, -1);
     if(user)
     {
         user->state = ps_idle_server;
@@ -334,51 +662,49 @@
         release_object( user );
     }
 
+end:
     release_object( pipe );
 }
 
 DECL_HANDLER(open_named_pipe)
 {
-    struct pipe_user *user, *partner;
     struct named_pipe *pipe;
 
     reply->handle = 0;
-
-    if (!(pipe = open_named_pipe( get_req_data(), get_req_data_size() )))
-    {
-        set_error( STATUS_NO_SUCH_FILE );
-        return;
-    }
-    if (!(partner = find_partner(pipe, ps_wait_open)))
-    {
-        release_object(pipe);
-        set_error( STATUS_PIPE_NOT_AVAILABLE );
+    pipe = create_named_pipe( get_req_data(), get_req_data_size() );
+    if(!pipe)
         return;
-    }
-    if ((user = create_pipe_user( pipe )))
-    {
-        int fds[2];
 
-        if(!socketpair(PF_UNIX, SOCK_STREAM, 0, fds))
+    if (get_error() == STATUS_OBJECT_NAME_COLLISION)
+    {
+        struct pipe_user *user = create_pipe_user (pipe, -1);
+        if( user )
         {
-            user->fd = create_anonymous_fd( &pipe_user_fd_ops, fds[1], &user->obj );
-            partner->fd = create_anonymous_fd( &pipe_user_fd_ops, fds[0], &partner->obj );
-            if (user->fd && partner->fd)
+	    struct pipe_user *partner;
+	    reply->handle = alloc_handle( current->process, user, req->access, 0);
+            //user->fd = create_anonymous_fd( &pipe_user_fd_ops, fds[1], &user->obj );
+            //partner->fd = create_anonymous_fd( &pipe_user_fd_ops, fds[0], &partner->obj );
+            //if (user->fd && partner->fd
+	    if ((partner = find_partner(pipe, ps_wait_open)))
             {
-                notify_waiter(partner,STATUS_SUCCESS);
-                partner->state = ps_connected_server;
-                partner->other = user;
-                user->state = ps_connected_client;
-                user->other = partner;
-                reply->handle = alloc_handle( current->process, user, req->access, 0 );
+                do_connect_pipeends( user, partner );
+                release_object( partner );
             }
+            else
+                user->state = ps_wait_connect;
+            release_object(user);
+        }
+        else
+        {
+            set_error(STATUS_NO_MEMORY);
         }
-        else file_set_error();
-
-        release_object( user );
     }
-    release_object( partner );
-    release_object( pipe );
+    else
+    {
+        set_error(STATUS_NO_SUCH_FILE);
+    }
+
+    release_object(pipe);
 }
 
 DECL_HANDLER(connect_named_pipe)
@@ -401,10 +727,18 @@
         user->overlapped = req->overlapped;
 
         /* notify all waiters that a pipe just became available */
-        while( (partner = find_partner(user->pipe,ps_wait_connect)) )
+        while( (partner = find_partner(user->pipe,ps_wait_only)) )
         {
             notify_waiter(partner,STATUS_SUCCESS);
             release_object(partner);
+            release_object(partner);
+        }
+
+        /* connect any CreateFile calls waiting for a pipe */
+        while( (partner = find_partner(user->pipe,ps_wait_connect)) )
+        {
+            do_connect_pipeends( partner, user );
+            release_object(partner);
         }
     }
 
@@ -414,35 +748,58 @@
 DECL_HANDLER(wait_named_pipe)
 {
     struct named_pipe *pipe;
-    struct pipe_user *partner;
 
-    if (!(pipe = open_named_pipe( get_req_data(), get_req_data_size() )))
-    {
-        set_error( STATUS_PIPE_NOT_AVAILABLE );
-        return;
-    }
-    if( (partner = find_partner(pipe,ps_wait_open)) )
-    {
-        /* this should use notify_waiter,
-           but no pipe_user object exists now... */
-        thread_queue_apc(current,NULL,req->func,
-                         APC_ASYNC, 1, req->overlapped, STATUS_SUCCESS, NULL);
-        release_object(partner);
-    }
-    else
+    // if (!(pipe = open_named_pipe( get_req_data(), get_req_data_size() )))
+    // {
+    //    set_error( STATUS_PIPE_NOT_AVAILABLE );
+    //    return;
+    // }
+    // if( (partner = find_partner(pipe,ps_wait_open)) )
+    // {
+    //    /* this should use notify_waiter,
+    //       but no pipe_user object exists now... */
+    //    thread_queue_apc(current,NULL,req->func,
+    //                     APC_ASYNC, 1, req->overlapped, STATUS_SUCCESS, NULL);
+    //    release_object(partner);
+    // }
+    // else
+    pipe = create_named_pipe( get_req_data(), get_req_data_size() );
+    if( pipe )
     {
-        struct pipe_user *user;
+        /* only wait if the pipe already exists */
+        if(get_error() == STATUS_OBJECT_NAME_COLLISION)
+        {
+            struct pipe_user *partner;
 
-        if( (user = create_pipe_user( pipe )) )
+            set_error(STATUS_SUCCESS);
+            if( (partner = find_partner(pipe,ps_wait_open)) )
+            {
+                /* this should use notify_waiter,
+                   but no pipe_user object exists now... */
+                thread_queue_apc(current,NULL,req->func,
+                    APC_ASYNC,1,2,req->overlapped,STATUS_SUCCESS);
+                release_object(partner);
+            }
+            else
+            {
+                struct pipe_user *user;
+
+                if( (user = create_pipe_user (pipe, -1)) )
+                {
+                    user->state = ps_wait_only;
+                    user->thread = (struct thread *)grab_object(current);
+                    user->func = req->func;
+                    user->overlapped = req->overlapped;
+                    /* don't release it */
+                }
+            }
+        }
+        else
         {
-            user->state = ps_wait_connect;
-            user->thread = (struct thread *)grab_object(current);
-            user->func = req->func;
-            user->overlapped = req->overlapped;
-            /* don't release it */
+            set_error(STATUS_PIPE_NOT_AVAILABLE);
         }
+        release_object(pipe);
     }
-    release_object(pipe);
 }
 
 DECL_HANDLER(disconnect_named_pipe)
@@ -455,15 +812,17 @@
     if( (user->state == ps_connected_server) &&
         (user->other->state == ps_connected_client) )
     {
-        release_object( user->other->fd );
-        user->other->fd = NULL;
         user->other->state = ps_disconnected;
-        user->other->other = NULL;
-
-        release_object( user->fd );
-        user->fd = NULL;
         user->state = ps_idle_server;
+
+        pipe_user_check_actions( user );
+        pipe_user_check_actions( user->other );
+
+        user->other->other = NULL;
         user->other = NULL;
+
+        user->pipe->instances --;
+        assert( user->pipe->instances >= 0 );
     }
     release_object(user);
 }
@@ -483,3 +842,109 @@
 
     release_object(user);
 }
+
+DECL_HANDLER(read_named_pipe)
+{
+    struct pipe_user *user;
+    struct pipe_msg *msg;
+
+    reply->id = 0;
+
+    user = get_pipe_user_obj(current->process, req->handle, 0);
+    if(!user)
+        return;
+
+    /* make sure we're connected */
+    switch( user->state )
+    {
+    case ps_connected_server:
+    case ps_connected_client:
+    case ps_wait_connect:
+        break;
+    case ps_disconnected:
+        set_error( STATUS_PIPE_DISCONNECTED );
+        goto end;
+    case ps_wait_open:
+        set_error( STATUS_PIPE_LISTENING );
+        goto end;
+    default:
+        set_error( STATUS_PIPE_NOT_AVAILABLE );
+        goto end;
+    }
+
+    /* find the message identified by id or a new one */
+    if( req->id )
+        msg = find_message( &user->out, req->id );
+    else
+        msg = find_new_message( &user->out );
+
+    /* send a message, when finished, delete it */
+    if( msg )
+    {
+        if( pipe_get_message( msg, req->count ) )
+        {
+            remove_message( &user->out, msg, 1 );
+            pipe_user_check_actions( user );
+        }
+        else
+            reply->id = (int) msg;
+    }
+
+end:
+    release_object(user);
+}
+
+DECL_HANDLER(write_named_pipe)
+{
+    struct pipe_user *user;
+    struct pipe_msg *msg;
+
+    reply->id = 0;
+
+    user = get_pipe_user_obj(current->process, req->handle, 0);
+    if(!user)
+        return;
+
+    /* make sure we're connected first */
+    switch( user->state )
+    {
+    case ps_connected_server:
+    case ps_connected_client:
+    case ps_wait_connect:
+        break;
+    case ps_disconnected:
+        set_error( STATUS_PIPE_DISCONNECTED );
+        goto end;
+    case ps_wait_open:
+        set_error( STATUS_PIPE_LISTENING );
+        goto end;
+    default:
+        set_error( STATUS_PIPE_NOT_AVAILABLE );
+        goto end;
+    }
+
+    /* find the message identified by id or a new one */
+    if( req->id )
+        msg = find_message( &user->in, req->id );
+    else
+        msg = create_new_message( &user->in, req->count );
+
+    /* Cook a message. If it is ready, send it */
+    if( pipe_build_msg( msg ) )
+    {
+        if( user->other )
+        {
+            remove_message( &user->in, msg, 0 );
+            add_message( &user->other->out, msg );
+            /* server should read, another thread may want to write */
+            pipe_user_check_actions( user->other );
+            pipe_user_check_actions( user );
+        }
+    }
+    else
+        reply->id = (int) msg;
+
+end:
+    release_object(user);
+}
+
diff -ur --minimal --exclude-from=/home/greg/bin/winetreediff_excl ../wine/server/request.h ./server/request.h
--- ../wine/server/request.h	2003-03-26 17:41:43.000000000 -0600
+++ ./server/request.h	2003-04-09 18:57:15.000000000 -0500
@@ -239,6 +239,8 @@
 DECL_HANDLER(create_named_pipe);
 DECL_HANDLER(open_named_pipe);
 DECL_HANDLER(connect_named_pipe);
+DECL_HANDLER(read_named_pipe);
+DECL_HANDLER(write_named_pipe);
 DECL_HANDLER(wait_named_pipe);
 DECL_HANDLER(disconnect_named_pipe);
 DECL_HANDLER(get_named_pipe_info);
@@ -420,6 +422,8 @@
     (req_handler)req_create_named_pipe,
     (req_handler)req_open_named_pipe,
     (req_handler)req_connect_named_pipe,
+    (req_handler)req_read_named_pipe,
+    (req_handler)req_write_named_pipe,
     (req_handler)req_wait_named_pipe,
     (req_handler)req_disconnect_named_pipe,
     (req_handler)req_get_named_pipe_info,
diff -ur --minimal --exclude-from=/home/greg/bin/winetreediff_excl ../wine/server/trace.c ./server/trace.c
--- ../wine/server/trace.c	2003-04-05 22:13:01.000000000 -0600
+++ ./server/trace.c	2003-04-09 18:57:15.000000000 -0500
@@ -893,7 +893,9 @@
     fprintf( stderr, " links=%d,", req->links );
     fprintf( stderr, " index_high=%d,", req->index_high );
     fprintf( stderr, " index_low=%d,", req->index_low );
-    fprintf( stderr, " serial=%08x", req->serial );
+    fprintf( stderr, " serial=%08x,", req->serial );
+    fprintf( stderr, " fd_type=%d,", req->fd_type );
+    fprintf( stderr, " fd_flags=%d", req->fd_flags );
 }
 
 static void dump_lock_file_request( const struct lock_file_request *req )
@@ -1979,6 +1981,30 @@
     fprintf( stderr, " func=%p", req->func );
 }
 
+static void dump_read_named_pipe_request( const struct read_named_pipe_request *req )
+{
+    fprintf( stderr, " handle=%p,", req->handle );
+    fprintf( stderr, " id=%08x,", req->id );
+    fprintf( stderr, " count=%d", req->count );
+}
+
+static void dump_read_named_pipe_reply( const struct read_named_pipe_reply *req )
+{
+    fprintf( stderr, " id=%08x", req->id );
+}
+
+static void dump_write_named_pipe_request( const struct write_named_pipe_request *req )
+{
+    fprintf( stderr, " handle=%p,", req->handle );
+    fprintf( stderr, " id=%08x,", req->id );
+    fprintf( stderr, " count=%d", req->count );
+}
+
+static void dump_write_named_pipe_reply( const struct write_named_pipe_reply *req )
+{
+    fprintf( stderr, " id=%08x", req->id );
+}
+
 static void dump_wait_named_pipe_request( const struct wait_named_pipe_request *req )
 {
     fprintf( stderr, " timeout=%08x,", req->timeout );
@@ -2576,6 +2602,8 @@
     (dump_func)dump_create_named_pipe_request,
     (dump_func)dump_open_named_pipe_request,
     (dump_func)dump_connect_named_pipe_request,
+    (dump_func)dump_read_named_pipe_request,
+    (dump_func)dump_write_named_pipe_request,
     (dump_func)dump_wait_named_pipe_request,
     (dump_func)dump_disconnect_named_pipe_request,
     (dump_func)dump_get_named_pipe_info_request,
@@ -2754,6 +2782,8 @@
     (dump_func)dump_create_named_pipe_reply,
     (dump_func)dump_open_named_pipe_reply,
     (dump_func)0,
+    (dump_func)dump_read_named_pipe_reply,
+    (dump_func)dump_write_named_pipe_reply,
     (dump_func)0,
     (dump_func)0,
     (dump_func)dump_get_named_pipe_info_reply,
@@ -2932,6 +2962,8 @@
     "create_named_pipe",
     "open_named_pipe",
     "connect_named_pipe",
+    "read_named_pipe",
+    "write_named_pipe",
     "wait_named_pipe",
     "disconnect_named_pipe",
     "get_named_pipe_info",
-- 
gmt

"Democracy is the theory that holds that the common people know
what they want, and deserve to get it good and hard." HL Mencken



More information about the wine-devel mailing list