Named Pipe patch

Mike McCormack mike at codeweavers.com
Tue Mar 4 09:59:30 CST 2003


Hi,

This patch probably shouldn't be applied... it's just to let everybody 
know where I'm at with the named pipes code.

The patch moves away from using unix pipes to implement named pipes, and 
instead implements pipes internally. This is necessary to implement 
message mode properly (without race conditions).

I don't have time to work on it this week, so don't expect anything new 
before the weekend. If anybody wants to play with it or comment, feel 
welcome.

I've started trying to pass some of Dan's tests, but it still fails... 
Dan, your test's coverage is great!

Mike
-------------- next part --------------
Index: files/file.c
===================================================================
RCS file: /home/wine/wine/files/file.c,v
retrieving revision 1.177
diff -r1.177 file.c
320a321,341
>  *           FILE_GetTypeFlags
>  */
> BOOL WINAPI FILE_GetTypeFlags( HANDLE hFile, enum fd_type *type, int* flags )
> {
>     NTSTATUS ret;
>     SERVER_START_REQ( get_file_info )
>     {
>         req->handle = hFile;
>         ret = wine_server_call_err( req );
>         if( !ret )
>         {
>             if(type) *type = reply->fd_type;
>             if(flags) *flags = reply->fd_flags;
>         }
>     }
>     SERVER_END_REQ;
>     return !ret;
> }
> 
> 
> /***********************************************************************
1647a1669,1713
>  *             FILE_NamedPipeReadService      (INTERNAL)
>  */
> static void FILE_NamedPipeReadService(async_private *ovp)
> {
>     async_fileio *fileio = (async_fileio*) ovp;
>     LPOVERLAPPED lpOverlapped = fileio->lpOverlapped;
>     int result, r;
>     int already = lpOverlapped->InternalHigh;
>     LPVOID p = &fileio->buffer[already];
> 
>     TRACE("%p %p\n", lpOverlapped, fileio->buffer );
> 
>     /* read pipe data from server */
>     SERVER_START_REQ( read_named_pipe )
>     {
>         req->handle = ovp->handle;
>         req->count = fileio->count - already;
>         req->id = (ovp->fd == -1) ? 0 : ovp->fd;
>         wine_server_set_reply( req, p, fileio->count - already);
>         r = wine_server_call_err( req );
>         if( !r )
>         {
>             result = wine_server_reply_size( reply );
>             ovp->fd = reply->id;
>         }
>     }
>     SERVER_END_REQ;
> 
>     if( r )
>         goto end;
> 
>     lpOverlapped->InternalHigh += result;
>     TRACE("read %d more bytes %ld/%d so far\n",result,
>         lpOverlapped->InternalHigh,fileio->count);
> 
>     if(lpOverlapped->InternalHigh >= fileio->count )
>         r = STATUS_SUCCESS;
>     else
>         r = STATUS_PENDING;
> 
> end:
>     lpOverlapped->Internal = r;
> }
> 
> /***********************************************************************
1649,1651d1714
<  *
<  *  This function is called while the client is waiting on the
<  *  server, so we can't make any server calls here.
1663a1727,1728
>     if ( ovp->fd < 0 )
>         ovp->fd = FILE_GetUnixHandle ( fileio->async.handle, GENERIC_READ );
1711c1776
<                          HANDLE hEvent)
---
>                          HANDLE hEvent, enum fd_type type)
1714,1716d1778
<     int fd;
<     int flags;
<     enum fd_type type;
1728,1734d1789
<     fd = FILE_GetUnixHandleType ( hFile, GENERIC_READ, &type, &flags);
<     if ( fd < 0 )
<     {
<         WARN ( "Couldn't get FD\n" );
<         return FALSE;
<     }
< 
1740c1795
<         goto error;
---
>         return FALSE;
1745c1800
<     ovp->async.fd = fd;
---
>     ovp->async.fd = -1;     /* get it later */
1747c1802,1805
<     ovp->async.func = FILE_AsyncReadService;
---
>     if( type == FD_TYPE_NAMED_PIPE )
>         ovp->async.func = FILE_NamedPipeReadService;
>     else
>         ovp->async.func = FILE_AsyncReadService;
1756,1760d1813
< 
< error:
<     close (fd);
<     return FALSE;
< 
1769a1823,1826
>     enum fd_type type;
> 
>     if( !FILE_GetTypeFlags( hFile, &type, NULL ))
>         return FALSE;
1771c1828
<     return FILE_ReadFileEx(hFile,buffer,bytesToRead,overlapped,lpCompletionRoutine, INVALID_HANDLE_VALUE);
---
>     return FILE_ReadFileEx(hFile,buffer,bytesToRead,overlapped,lpCompletionRoutine, INVALID_HANDLE_VALUE, type);
1777a1835
>     enum fd_type type;
1780a1839,1840
>     if( !FILE_GetTypeFlags( hFile, &type, NULL ))
>         return FALSE;
1784c1844
<         if(FILE_ReadFileEx(hFile, buffer, bytesToRead, &ov, NULL, ov.hEvent))
---
>         if(FILE_ReadFileEx(hFile, buffer, bytesToRead, &ov, NULL, ov.hEvent, type))
1799c1859
<     int unix_handle, result, flags;
---
>     int unix_handle, result, flags = 0;
1808c1868,1869
<     unix_handle = FILE_GetUnixHandleType( hFile, GENERIC_READ, &type, &flags );
---
>     if( !FILE_GetTypeFlags( hFile, &type, &flags ))
>         return FALSE;
1812d1872
< 	if (unix_handle == -1) return FALSE;
1816d1875
< 	    close(unix_handle);
1821d1879
<         close(unix_handle);
1823,1824c1881
< 
<         if(!FILE_ReadFileEx(hFile, buffer, bytesToRead, overlapped, NULL, overlapped->hEvent))
---
>         if(!FILE_ReadFileEx(hFile, buffer, bytesToRead, overlapped, NULL, overlapped->hEvent, type))
1837,1838d1893
<     {
<         close(unix_handle);
1840c1895
<     }
---
> 
1851d1905
<         if (unix_handle == -1) return FALSE;
1859d1912
<               close(unix_handle);
1866,1867c1919
< 	if (unix_handle == -1)
< 	    return FALSE;
---
>         break;
1869a1922,1925
>     unix_handle = FILE_GetUnixHandle ( hFile, GENERIC_READ );
>     if (unix_handle == -1)
>         return FALSE;
> 
1896a1953,2000
>  *             FILE_NamedPipeWrite      (INTERNAL)
>  *
>  * hack: use fd member to store write ID.
>  */
> static void FILE_NamedPipeWrite(async_private *ovp)
> {
>     async_fileio *fileio = (async_fileio *) ovp;
>     LPOVERLAPPED lpOverlapped = fileio->lpOverlapped;
>     int count, already = lpOverlapped->InternalHigh;
>     LPVOID p = &fileio->buffer[already];
>     NTSTATUS r;
> 
>     /* write some data (non-blocking) */
>     count = fileio->count - already;
> 
>     TRACE("ovp %p writing %d bytes of %p\n",lpOverlapped,count,fileio->buffer );
> 
>     SERVER_START_REQ( write_named_pipe )
>     {
>         req->handle = ovp->handle;
>         req->count = count;
>         req->id = (ovp->fd == -1) ? 0 : ovp->fd;
>         wine_server_add_data( req, p, count);
>         r = wine_server_call_err( req );
>         if( !r )
>             ovp->fd = reply->id;
>     }
>     SERVER_END_REQ;
> 
>     TRACE("r = %08x\n",r);
> 
>     if( r )
>         goto end;
> 
>     lpOverlapped->InternalHigh += count;
> 
>     TRACE("wrote %d more bytes %ld/%d so far\n",count,lpOverlapped->InternalHigh,fileio->count);
> 
>     if(lpOverlapped->InternalHigh < fileio->count)
>         r = STATUS_PENDING;
>     else
>         r = STATUS_SUCCESS;
> 
> end:
>     lpOverlapped->Internal = r;
> }
> 
> /***********************************************************************
1912a2017,2018
>     if ( ovp->fd < 0 )
>         ovp->fd = FILE_GetUnixHandle ( fileio->async.handle, GENERIC_WRITE );
1955c2061
<                              HANDLE hEvent)
---
>                              HANDLE hEvent, enum fd_type type)
1958,1960d2063
<     int fd;
<     int flags;
<     enum fd_type type;
1971,1977d2073
<     fd = FILE_GetUnixHandleType ( hFile, GENERIC_WRITE, &type, &flags );
<     if ( fd < 0 )
<     {
<         TRACE( "Couldn't get FD\n" );
<         return FALSE;
<     }
< 
1983c2079
<         goto error;
---
>         return FALSE;
1988c2084
<     ovp->async.fd = fd;
---
>     ovp->async.fd = -1;  /* get it later */
1990c2086,2089
<     ovp->async.func = FILE_AsyncWriteService;
---
>     if( type == FD_TYPE_NAMED_PIPE )
>         ovp->async.func = FILE_NamedPipeWrite;
>     else
>         ovp->async.func = FILE_AsyncWriteService;
1999,2002d2097
< 
< error:
<     close (fd);
<     return FALSE;
2011a2107,2110
>     enum fd_type type;
> 
>     if( !FILE_GetTypeFlags( hFile, &type, NULL ))
>          return FALSE;
2014c2113,2135
<     return FILE_WriteFileEx(hFile, buffer, bytesToWrite, overlapped, lpCompletionRoutine, INVALID_HANDLE_VALUE);
---
>     return FILE_WriteFileEx(hFile, buffer, bytesToWrite, overlapped, lpCompletionRoutine, INVALID_HANDLE_VALUE, type);
> }
> 
> static BOOL FILE_TimeoutWrite(HANDLE hFile, LPCVOID buffer, DWORD bytesToWrite, LPDWORD bytesWritten)
> {
>     OVERLAPPED ov;
>     BOOL r = FALSE;
>     enum fd_type type;
> 
>     TRACE("%p %p %ld %p\n", hFile, buffer, bytesToWrite, bytesWritten );
> 
>     if( !FILE_GetTypeFlags( hFile, &type, NULL ))
>         return FALSE;
>     ZeroMemory(&ov, sizeof (OVERLAPPED));
>     if(STATUS_SUCCESS==NtCreateEvent(&ov.hEvent, SYNCHRONIZE, NULL, 0, 0))
>     {
>         if(FILE_WriteFileEx(hFile, buffer, bytesToWrite, &ov, NULL, ov.hEvent, type))
>         {
>             r = GetOverlappedResult(hFile, &ov, bytesWritten, TRUE);
>         }
>     }
>     CloseHandle(ov.hEvent);
>     return r;
2032c2153,2154
<     unix_handle = FILE_GetUnixHandleType( hFile, GENERIC_WRITE, &type, &flags );
---
>     if( !FILE_GetTypeFlags( hFile, &type, &flags ))
>          return FALSE;
2036d2157
< 	if (unix_handle == -1) return FALSE;
2040d2160
< 	    close(unix_handle);
2045d2164
<         close(unix_handle);
2048c2167
<         if(!FILE_WriteFileEx(hFile, buffer, bytesToWrite, overlapped, NULL, overlapped->hEvent))
---
>         if(!FILE_WriteFileEx(hFile, buffer, bytesToWrite, overlapped, NULL, overlapped->hEvent, type))
2060a2180,2182
>     if (flags & FD_FLAG_TIMEOUT)
>         return FILE_TimeoutWrite(hFile, buffer, bytesToWrite, bytesWritten);
> 
2069,2070d2190
<         if (unix_handle == -1) return FALSE;
< 
2077,2078d2196
<             {
<               close(unix_handle);
2080d2197
<             }
2085,2086d2201
<         if (unix_handle == -1)
<             return FALSE;
2089d2203
<             close(unix_handle);
2094a2209,2212
> 
>     unix_handle = FILE_GetUnixHandle ( hFile, GENERIC_WRITE );
>     if (unix_handle == -1)
>         return FALSE;
Index: include/wine/server_protocol.h
===================================================================
RCS file: /home/wine/wine/include/wine/server_protocol.h,v
retrieving revision 1.57
diff -r1.57 server_protocol.h
809c809,810
<     FD_TYPE_SMB
---
>     FD_TYPE_SMB,
>     FD_TYPE_NAMED_PIPE
814a816
> #define FD_FLAG_NO_FD              0x10
889a892,893
>     int          fd_type;
>     int          fd_flags;
2442a2447,2474
> struct read_named_pipe_request
> {
>     struct request_header __header;
>     obj_handle_t  handle;
>     unsigned int  id;
>     int           count;
> };
> struct read_named_pipe_reply
> {
>     struct reply_header __header;
>     unsigned int  id;
> };
> 
> 
> struct write_named_pipe_request
> {
>     struct request_header __header;
>     obj_handle_t  handle;
>     unsigned int  id;
>     int           count;
> };
> struct write_named_pipe_reply
> {
>     struct reply_header __header;
>     unsigned int  id;
> };
> 
> 
3154a3187,3188
>     REQ_read_named_pipe,
>     REQ_write_named_pipe,
3335a3370,3371
>     struct read_named_pipe_request read_named_pipe_request;
>     struct write_named_pipe_request write_named_pipe_request;
3514a3551,3552
>     struct read_named_pipe_reply read_named_pipe_reply;
>     struct write_named_pipe_reply write_named_pipe_reply;
3554c3592
< #define SERVER_PROTOCOL_VERSION 97
---
> #define SERVER_PROTOCOL_VERSION 103
Index: server/fd.c
===================================================================
RCS file: /home/wine/wine/server/fd.c,v
retrieving revision 1.3
diff -r1.3 fd.c
540,541c540
<         int flags;
<         fd->fd_ops->get_file_info( fd, reply, &flags );
---
>         reply->fd_type = fd->fd_ops->get_file_info( fd, reply, &reply->fd_flags );
Index: server/named_pipe.c
===================================================================
RCS file: /home/wine/wine/server/named_pipe.c,v
retrieving revision 1.20
diff -r1.20 named_pipe.c
5c5
<  * Copyright (C) 2001 Mike McCormack
---
>  * Copyright (C) 2001, 2003 Mike McCormack
20a21,38
>  * This implemention now handles reading writing and buffering of pipe data
>  *  internally. This is necessary for two reasons:
>  *  - named pipes have a message mode which cannot be implemented on top
>  *    of unix pipes without race conditions or threads locking the server.
>  *  - named pipes can work across a network via the SMB protocol
>  *
>  * The implementation works by allocating a message of a fixed size, then
>  *  allowing a thread to write into it. When all writing for a message is
>  *  complete, the message is sent to its destination.  This means that:
>  *  - only complete messages are read from a message queue
>  *  - if one thread has problems during a write, other threads can continue
>  *    to write to a pipe
>  *  - writing a single message can be done in multiple server calls if
>  *    the message does not completely fit into the server buffer
>  *  - a queue for incomplete outgoing messages (user->in) is kept
>  *
>  * 23 Feb 2003 mjm
>  *
30c48
< #include <string.h>
---
> //#include <string.h>
41a60
> #include "ntstatus.h"
46a66,68
> #include "async.h"
> 
> #define FILE_FLAG_FIRST_PIPE_INSTANCE 0x00080000
51a74
>     ps_wait_only,
58a82,84
> #define PIPE_MIN_BUFFER 0x100
> #define PIPE_MAX_BUFFER 0x10000
> 
60a87,104
> /* each message is a size followed by data */
> struct pipe_msg
> {
>     unsigned int     length;
>     unsigned int     offset;
>     struct pipe_msg *next;
>     struct pipe_msg *prev;
>     unsigned char    data[1];
> };
> 
> struct msg_queue
> {
>     int length;   /* total size of all messages in the queue */
>     int maxsize;    /* try not to add more data than this */
>     struct pipe_msg    *first;  /* first message the queue */
>     struct pipe_msg    *last;   /* last message in the queue */
> };
> 
66,70c110,117
<     struct pipe_user   *other;
<     struct named_pipe  *pipe;
<     struct pipe_user   *next;
<     struct pipe_user   *prev;
<     struct thread      *thread;
---
>     struct pipe_user   *other;      /* the other end */
>     struct named_pipe  *pipe;       /* description of this pipe end */
> 
>     struct async_queue  read_q;     /* queues of threads waiting for us */
>     struct async_queue  write_q;
>     /* struct async_queue  wait_q; */
>     
>     struct thread      *thread;     /* the thread that owns this pipe end */
72a120,124
>     struct msg_queue in;      /* messages that are being written */
>     struct msg_queue out;     /* messages waitiing to be read */
> 
>     struct pipe_user   *next;       /* doubly linked list of pipe users */
>     struct pipe_user   *prev;
77a130
>     unsigned int        openmode;
82a136
>     unsigned int        instances;
102d155
< static struct fd *pipe_user_get_fd( struct object *obj );
104,106c157
< 
< static int pipe_user_get_poll_events( struct fd *fd );
< static int pipe_user_get_info( struct fd *fd, struct get_file_info_reply *reply, int *flags );
---
> static struct fd *pipe_user_get_fd( struct object *obj );
110,117c161,168
<     sizeof(struct pipe_user),     /* size */
<     pipe_user_dump,               /* dump */
<     default_fd_add_queue,         /* add_queue */
<     default_fd_remove_queue,      /* remove_queue */
<     default_fd_signaled,          /* signaled */
<     no_satisfied,                 /* satisfied */
<     pipe_user_get_fd,             /* get_fd */
<     pipe_user_destroy             /* destroy */
---
>     sizeof(struct pipe_user),   /* size */
>     pipe_user_dump,             /* dump */
>     add_queue,                  /* add_queue */
>     remove_queue,               /* remove_queue */
>     NULL,                       /* signaled */
>     NULL,                       /* satisfied */
>     pipe_user_get_fd,           /* get_fd */
>     pipe_user_destroy           /* destroy */
119a171,174
> static int pipe_user_flush( struct fd *fd );
> static int pipe_user_get_info( struct fd *fd, struct get_file_info_reply *reply, int *flags );
> static void pipe_user_queue_async( struct fd *fd, void *ptr, unsigned int status, int type, int count );
> 
122,126c177,181
<     pipe_user_get_poll_events,    /* get_poll_events */
<     default_poll_event,           /* poll_event */
<     no_flush,                     /* flush */
<     pipe_user_get_info,           /* get_file_info */
<     no_queue_async                /* queue_async */
---
>     NULL,                  /* get_poll_events */
>     NULL,                  /* poll_event */
>     pipe_user_flush,       /* flush */
>     pipe_user_get_info,    /* get_file_info */
>     pipe_user_queue_async  /* queue_async */
128a184,196
> static struct fd *pipe_user_get_fd( struct object *obj )
> {
>     struct pipe_user *pipe_user = (struct pipe_user *)obj;
>     assert( obj->ops == &pipe_user_ops );
>     return (struct fd *)grab_object( pipe_user->fd );
> }
> 
> static int pipe_user_flush( struct fd *fd )
> {
>     /* need to sleep until write all the pending data is written */
>     return 0;
> }
> 
163c231,233
< static struct fd *pipe_user_get_fd( struct object *obj )
---
> /* the following functions implement a message FIFO */
> 
> static void add_message( struct msg_queue *q, struct pipe_msg *msg)
165,166c235,444
<     struct pipe_user *user = (struct pipe_user *)obj;
<     return (struct fd *)grab_object( user->fd );
---
>     /* add a message at the end of the queue */
>     q->length += msg->length;
>     msg->prev = q->last;
>     if( q->last )
>         q->last->next = msg;
>     else
>         q->first = msg;
>     q->last = msg;
> }
> 
> /* remove a message from a queue and optionally free it */
> static void remove_message( struct msg_queue *q, struct pipe_msg *msg, int del)
> {
>     q->length -= msg->length;
>     if( msg->next )
>         msg->next->prev = msg->prev;
>     else
>         q->last = msg->prev;
>     if( msg->prev )
>         msg->prev->next = msg->next;
>     else
>         q->first = msg->next;
>     msg->next = msg->prev = NULL;
> 
>     /* if there's no messages, the length should be zero */
>     assert( q->first || !q->length );
> 
>     if( del )
>         free( msg );
> }
> 
> static void init_message_queue( struct msg_queue *q, int max )
> {
>     q->first = q->last = NULL;
>     q->length = 0;
>     q->maxsize = max;
> }
> 
> static void delete_message_queue( struct msg_queue *q )
> {
>     while( q->first )
>     {
>         struct pipe_msg * t = q->first;
>         remove_message( q, t, 1 );
>     }
>     assert( q->length == 0);
>     q->maxsize = 0;
> }
> 
> static struct pipe_msg *create_new_message( struct msg_queue *q, int size )
> {
>     struct pipe_msg *msg;
>  
>     msg = malloc( sizeof *msg + size );
>     msg->length = size;
>     msg->offset = 0;
>     add_message( q, msg );
>     return msg;
> }
> 
> static struct pipe_msg *find_message( struct msg_queue *q, int id)
> {
>     struct pipe_msg *msg;
> 
>     for( msg = q->first; msg; msg = msg->next )
>          if( id == (int) msg )
>               return msg;
>     set_error( STATUS_INVALID_PARAMETER );
>     return NULL;
> }
> 
> static struct pipe_msg *find_new_message( struct msg_queue *q )
> {
>     struct pipe_msg *msg;
> 
>     for( msg = q->first; msg; msg = msg->next )
>          if( msg->offset == 0 )
>               return msg;
>     return NULL;
> }
> 
> /* This function collects data sent from one thread into a message. 
>    When the message is complete, the function returns non-zero. */
> static int pipe_build_msg( struct pipe_msg *msg )
> {
>     const void *data = get_req_data();
>     int count = get_req_data_size();
> 
>     /* trying to write more data than we were told about -
>        this shouldn't happen... if it does somebody stuffed up, so
>        to make sure we don't get in trouble, throw away the extra data */
>     if( count > (msg->length - msg->offset) )
>     {
>         fprintf(stderr,"Throwing away extra in pipe write!\n");
>         count = (msg->length - msg->offset);
>     }
>  
>     /* copy the data to the message */
>     memcpy( &msg->data[msg->offset], data, count );
>     msg->offset += count;
> 
>     /* check if this message is complete */
>     if( msg->offset != msg->length )
>         return 0;
> 
>     msg->offset = 0;
>     return 1;
> }
> 
> static int pipe_get_message( struct pipe_msg *msg, int count )
> {
>     void *out;
> 
>     if( count > (msg->length - msg->offset) )
>         count = (msg->length - msg->offset) ;
> 
>     if( count > get_reply_max_size() )
>         count = get_reply_max_size();
> 
>     out = set_reply_data_size( count );
>     memcpy( out, &msg->data[msg->offset], count);
>     msg->offset += count;
> 
>     return (msg->offset == msg->length);
> }
> 
> static int pipe_user_check_actions( struct pipe_user *user )
> {
>     int done = 0;
> 
>     if(IS_READY(user->read_q))
>     {
>         if( find_new_message( &user->out ) )
>         {
>             async_notify(user->read_q.head,STATUS_ALERTED);
>             done = 1;
>         }
>         if( ( user->state != ps_connected_server ) && 
>             ( user->state != ps_connected_client) )
>         {
>             async_notify(user->read_q.head,STATUS_PIPE_CLOSING);
>             done = 1;
>         }
>     }
> 
>     if(IS_READY(user->write_q) && ( user->in.length < user->in.maxsize ) )
>     {
>         async_notify(user->write_q.head,STATUS_ALERTED);
>         done = 1;
>     }
> 
>     /*if(IS_READY(user->wait_q) && (POLLIN & event) )
>     {
>         async_notify(user->wait_q.head,STATUS_ALERTED);
>         done = 1;
>     } */
> 
>     return done;
> }
> 
> static void pipe_user_queue_async(struct fd *fd, void *ptr, unsigned int status, int type, int count)
> {
>     struct pipe_user *user = get_fd_user( fd );
>     struct async_queue *q;
>     struct async *async;
>     int timeout;
> 
>     assert(user->obj.ops == &pipe_user_ops);
> 
>     switch(type)
>     {
>     case ASYNC_TYPE_READ:
>         q = &user->read_q;
>         timeout = user->pipe->timeout;
>         break;
>     /* case ASYNC_TYPE_WAIT:
>         q = &user->wait_q;
>         timeout = 0;
>         break; */
>     case ASYNC_TYPE_WRITE:
>         q = &user->write_q;
>         timeout = user->pipe->timeout;
>         break;
>     default:
>         set_error(STATUS_INVALID_PARAMETER);
>         return;
>     }
> 
>     async = find_async ( q, current, ptr );
> 
>     if ( status == STATUS_PENDING )
>     {
>         if ( !async )
>             async = create_async ( &user->obj, current, ptr );
>         if ( !async )
>             return;
> 
>         async->status = STATUS_PENDING;
>         if(!async->q)
>         {
>             async_add_timeout(async,timeout);
>             async_insert(q, async);
>         }
> 
>         /* Check if the new pending request can be served immediately */
>         if( pipe_user_check_actions( user ) )
>              return;
>     }
>     else if ( async ) destroy_async ( async );
>     else set_error ( STATUS_INVALID_PARAMETER );
180,181d457
<         release_object( user->other->fd );
<         user->other->fd = NULL;
197a474,481
>     delete_message_queue( &user->in );
>     delete_message_queue( &user->out );
> 
>     destroy_async_queue(&user->read_q);
>     destroy_async_queue(&user->write_q);
>     /* destroy_async_queue(&user->wait_q); */
>     if (user->fd) release_object( user->fd );
> 
204,209d487
<     if (user->fd) release_object( user->fd );
< }
< 
< static int pipe_user_get_poll_events( struct fd *fd )
< {
<     return POLLIN | POLLOUT;  /* FIXME */
213a492
>     /* struct pipe_user * user = get_fd_user( fd );*/
227,228c506,507
<     *flags = 0;
<     return FD_TYPE_DEFAULT;
---
>     *flags = FD_FLAG_TIMEOUT;
>     return FD_TYPE_NAMED_PIPE;
246,260d524
< static struct named_pipe *open_named_pipe( const WCHAR *name, size_t len )
< {
<     struct object *obj;
< 
<     if ((obj = find_object( sync_namespace, name, len )))
<     {
<         if (obj->ops == &named_pipe_ops) return (struct named_pipe *)obj;
<         release_object( obj );
<         set_error( STATUS_OBJECT_TYPE_MISMATCH );
<     }
<     else set_error( STATUS_OBJECT_NAME_NOT_FOUND );
< 
<     return NULL;
< }
< 
267c531
< static struct pipe_user *create_pipe_user( struct named_pipe *pipe )
---
> static struct pipe_user *create_pipe_user( struct named_pipe *pipe, int fd )
275c539,545
<     user->fd = NULL;
---
>     user->fd = alloc_fd( &pipe_user_fd_ops, -1, &user->obj );
>     if (!user->fd)
>     {
>         release_object( user );
>         return NULL;
>     }
> 
287a558,564
>     init_message_queue( &user->in, pipe->insize );
>     init_message_queue( &user->out, pipe->outsize );
> 
>     init_async_queue( &user->read_q );
>     init_async_queue( &user->write_q );
>     /* init_async_queue( &user->wait_q ); */
> 
308a586,614
> static void do_connect_pipeends( struct pipe_user *client, struct pipe_user *server )
> {
>     struct pipe_msg **pmsg;
> 
>     server->state = ps_connected_server;
>     server->other = client;
>     client->state = ps_connected_client;
>     client->other = server;
>     notify_waiter( server,STATUS_SUCCESS );
>     
>     /* send any waiting messages to the server */
>     pmsg = &client->in.first;
>     while( *pmsg )
>     {
>         struct pipe_msg *msg = *pmsg;
>         if( msg->offset == 0 )
>         {
>             remove_message( &client->in, msg, 0 );
>             add_message( &server->out, msg );
>         }
>         else
>             pmsg = &msg->next;
>     }
> 
>     /* the server may want to read, the client may want to write */
>     pipe_user_check_actions( server );
>     pipe_user_check_actions( client );
> }
>  
314a621,627
> 
>     if( req->maxinstances > PIPE_UNLIMITED_INSTANCES )
>     {
>         set_error( STATUS_INVALID_PARAMETER );
>         return;
>     }
> 
324a638
>         pipe->openmode = req->openmode;
325a640,652
>         pipe->instances = 0;
>     }
>     else if( ( req->openmode & FILE_FLAG_FIRST_PIPE_INSTANCE ) ||
>              ( req->openmode != pipe->openmode ) ||
>              ( req->timeout != pipe->timeout ) ||
>              ( ( req->pipemode & PIPE_READMODE_MESSAGE ) & 
>                !( req->openmode & PIPE_TYPE_MESSAGE ) ) ||
>              ( ( pipe->maxinstances != PIPE_UNLIMITED_INSTANCES) && 
>                ( pipe->instances >= pipe->maxinstances ) )
>            )
>     {
>         set_error( STATUS_ACCESS_DENIED );
>         goto end;
328c655
<     user = create_pipe_user( pipe );
---
>     pipe->instances ++;
329a657
>     user = create_pipe_user (pipe, -1);
336a665
> end:
342d670
<     struct pipe_user *user, *partner;
346,355c674,675
< 
<     if (!(pipe = open_named_pipe( get_req_data(), get_req_data_size() )))
<     {
<         set_error( STATUS_NO_SUCH_FILE );
<         return;
<     }
<     if (!(partner = find_partner(pipe, ps_wait_open)))
<     {
<         release_object(pipe);
<         set_error( STATUS_PIPE_NOT_AVAILABLE );
---
>     pipe = create_named_pipe( get_req_data(), get_req_data_size() );
>     if(!pipe)
357,360d676
<     }
<     if ((user = create_pipe_user( pipe )))
<     {
<         int fds[2];
362c678,681
<         if(!socketpair(PF_UNIX, SOCK_STREAM, 0, fds))
---
>     if (get_error() == STATUS_OBJECT_NAME_COLLISION)
>     {
>         struct pipe_user *user = create_pipe_user (pipe, -1);
>         if( user )
364,366c683,686
<             user->fd = alloc_fd( &pipe_user_fd_ops, fds[1], &user->obj );
<             partner->fd = alloc_fd( &pipe_user_fd_ops, fds[0], &partner->obj );
<             if (user->fd && partner->fd)
---
>             struct pipe_user *partner;
> 
>             reply->handle = alloc_handle( current->process, user, req->access, 0 );
>             if ((partner = find_partner(pipe, ps_wait_open)))
368,373c688,689
<                 notify_waiter(partner,STATUS_SUCCESS);
<                 partner->state = ps_connected_server;
<                 partner->other = user;
<                 user->state = ps_connected_client;
<                 user->other = partner;
<                 reply->handle = alloc_handle( current->process, user, req->access, 0 );
---
>                 do_connect_pipeends( user, partner );
>                 release_object( partner );
374a691,697
>             else
>                 user->state = ps_wait_connect;
>             release_object(user);
>         }
>         else
>         {
>             set_error(STATUS_NO_MEMORY);
376,378d698
<         else file_set_error();
< 
<         release_object( user );
380,381c700,705
<     release_object( partner );
<     release_object( pipe );
---
>     else
>     {
>         set_error(STATUS_NO_SUCH_FILE);
>     }
> 
>     release_object(pipe);
404c728
<         while( (partner = find_partner(user->pipe,ps_wait_connect)) )
---
>         while( (partner = find_partner(user->pipe,ps_wait_only)) )
407a732,739
>             release_object(partner);
>         }
> 
>         /* connect any CreateFile calls waiting for a pipe */
>         while( (partner = find_partner(user->pipe,ps_wait_connect)) )
>         {
>             do_connect_pipeends( partner, user );
>             release_object(partner);
417d748
<     struct pipe_user *partner;
419,432c750,751
<     if (!(pipe = open_named_pipe( get_req_data(), get_req_data_size() )))
<     {
<         set_error( STATUS_PIPE_NOT_AVAILABLE );
<         return;
<     }
<     if( (partner = find_partner(pipe,ps_wait_open)) )
<     {
<         /* this should use notify_waiter,
<            but no pipe_user object exists now... */
<         thread_queue_apc(current,NULL,req->func,
<                          APC_ASYNC,1,2,req->overlapped,STATUS_SUCCESS);
<         release_object(partner);
<     }
<     else
---
>     pipe = create_named_pipe( get_req_data(), get_req_data_size() );
>     if( pipe )
434c753,756
<         struct pipe_user *user;
---
>         /* only wait if the pipe already exists */
>         if(get_error() == STATUS_OBJECT_NAME_COLLISION)
>         {
>             struct pipe_user *partner;
436c758,781
<         if( (user = create_pipe_user( pipe )) )
---
>             set_error(STATUS_SUCCESS);
>             if( (partner = find_partner(pipe,ps_wait_open)) )
>             {
>                 /* this should use notify_waiter,
>                    but no pipe_user object exists now... */
>                 thread_queue_apc(current,NULL,req->func,
>                     APC_ASYNC,1,2,req->overlapped,STATUS_SUCCESS);
>                 release_object(partner);
>             }
>             else
>             {
>                 struct pipe_user *user;
> 
>                 if( (user = create_pipe_user (pipe, -1)) )
>                 {
>                     user->state = ps_wait_only;
>                     user->thread = (struct thread *)grab_object(current);
>                     user->func = req->func;
>                     user->overlapped = req->overlapped;
>                     /* don't release it */
>                 }
>             }
>         }
>         else
438,442c783
<             user->state = ps_wait_connect;
<             user->thread = (struct thread *)grab_object(current);
<             user->func = req->func;
<             user->overlapped = req->overlapped;
<             /* don't release it */
---
>             set_error(STATUS_PIPE_NOT_AVAILABLE);
443a785
>         release_object(pipe);
445d786
<     release_object(pipe);
458,459d798
<         release_object( user->other->fd );
<         user->other->fd = NULL;
461,464d799
<         user->other->other = NULL;
< 
<         release_object( user->fd );
<         user->fd = NULL;
465a801,805
> 
>         pipe_user_check_actions( user );
>         pipe_user_check_actions( user->other );
> 
>         user->other->other = NULL;
466a807,809
> 
>         user->pipe->instances --;
>         assert( user->pipe->instances >= 0 );
485a829,934
> 
> DECL_HANDLER(read_named_pipe)
> {
>     struct pipe_user *user;
>     struct pipe_msg *msg;
> 
>     reply->id = 0;
> 
>     user = get_pipe_user_obj(current->process, req->handle, 0);
>     if(!user)
>         return;
> 
>     /* make sure we're connected */
>     switch( user->state )
>     {
>     case ps_connected_server:
>     case ps_connected_client:
>     case ps_wait_connect:
>         break;
>     case ps_disconnected:
>         set_error( STATUS_PIPE_DISCONNECTED );
>         goto end;
>     case ps_wait_open:
>         set_error( STATUS_PIPE_LISTENING );
>         goto end;
>     default:
>         set_error( STATUS_PIPE_NOT_AVAILABLE );
>         goto end;
>     }
> 
>     /* find the message identified by id or a new one */
>     if( req->id )
>         msg = find_message( &user->out, req->id );
>     else
>         msg = find_new_message( &user->out );
> 
>     /* send a message, when finished, delete it */
>     if( msg )
>     {
>         if( pipe_get_message( msg, req->count ) )
>         {
>             remove_message( &user->out, msg, 1 );
>             pipe_user_check_actions( user );
>         }
>         else
>             reply->id = (int) msg;
>     }
> 
> end:
>     release_object(user);
> }
> 
> DECL_HANDLER(write_named_pipe)
> {
>     struct pipe_user *user;
>     struct pipe_msg *msg;
> 
>     reply->id = 0;
> 
>     user = get_pipe_user_obj(current->process, req->handle, 0);
>     if(!user)
>         return;
> 
>     /* make sure we're connected first */
>     switch( user->state )
>     {
>     case ps_connected_server:
>     case ps_connected_client:
>     case ps_wait_connect:
>         break;
>     case ps_disconnected:
>         set_error( STATUS_PIPE_DISCONNECTED );
>         goto end;
>     case ps_wait_open:
>         set_error( STATUS_PIPE_LISTENING );
>         goto end;
>     default:
>         set_error( STATUS_PIPE_NOT_AVAILABLE );
>         goto end;
>     }
> 
>     /* find the message identified by id or a new one */
>     if( req->id )
>         msg = find_message( &user->in, req->id );
>     else
>         msg = create_new_message( &user->in, req->count );
> 
>     /* Cook a message. If it is ready, send it */
>     if( pipe_build_msg( msg ) )
>     {
>         if( user->other )
>         {
>             remove_message( &user->in, msg, 0 );
>             add_message( &user->other->out, msg );
>             /* server should read, another thread may want to write */
>             pipe_user_check_actions( user->other );
>             pipe_user_check_actions( user );
>         }
>     }
>     else
>         reply->id = (int) msg;
> 
> end:
>     release_object(user);
> }
> 
Index: server/protocol.def
===================================================================
RCS file: /home/wine/wine/server/protocol.def,v
retrieving revision 1.58
diff -r1.58 protocol.def
621c621,622
<     FD_TYPE_SMB
---
>     FD_TYPE_SMB,
>     FD_TYPE_NAMED_PIPE
626a628
> #define FD_FLAG_NO_FD              0x10
673a676,677
>     int          fd_type;       /* the type of the fd */
>     int          fd_flags;      /* flags for this fd */
1719a1724,1741
> @END
> 
> 
> @REQ(read_named_pipe)
>     obj_handle_t  handle;
>     unsigned int  id;
>     int           count;
> @REPLY
>     unsigned int  id;
> @END
> 
> 
> @REQ(write_named_pipe)
>     obj_handle_t  handle;
>     unsigned int  id;
>     int           count;
> @REPLY
>     unsigned int  id;
Index: server/request.h
===================================================================
RCS file: /home/wine/wine/server/request.h,v
retrieving revision 1.82
diff -r1.82 request.h
242a243,244
> DECL_HANDLER(read_named_pipe);
> DECL_HANDLER(write_named_pipe);
422a425,426
>     (req_handler)req_read_named_pipe,
>     (req_handler)req_write_named_pipe,
Index: server/trace.c
===================================================================
RCS file: /home/wine/wine/server/trace.c,v
retrieving revision 1.154
diff -r1.154 trace.c
872c872,874
<     fprintf( stderr, " serial=%08x", req->serial );
---
>     fprintf( stderr, " serial=%08x,", req->serial );
>     fprintf( stderr, " fd_type=%d,", req->fd_type );
>     fprintf( stderr, " fd_flags=%d", req->fd_flags );
1955a1958,1981
> static void dump_read_named_pipe_request( const struct read_named_pipe_request *req )
> {
>     fprintf( stderr, " handle=%p,", req->handle );
>     fprintf( stderr, " id=%08x,", req->id );
>     fprintf( stderr, " count=%d", req->count );
> }
> 
> static void dump_read_named_pipe_reply( const struct read_named_pipe_reply *req )
> {
>     fprintf( stderr, " id=%08x", req->id );
> }
> 
> static void dump_write_named_pipe_request( const struct write_named_pipe_request *req )
> {
>     fprintf( stderr, " handle=%p,", req->handle );
>     fprintf( stderr, " id=%08x,", req->id );
>     fprintf( stderr, " count=%d", req->count );
> }
> 
> static void dump_write_named_pipe_reply( const struct write_named_pipe_reply *req )
> {
>     fprintf( stderr, " id=%08x", req->id );
> }
> 
2539a2566,2567
>     (dump_func)dump_read_named_pipe_request,
>     (dump_func)dump_write_named_pipe_request,
2716a2745,2746
>     (dump_func)dump_read_named_pipe_reply,
>     (dump_func)dump_write_named_pipe_reply,
2893a2924,2925
>     "read_named_pipe",
>     "write_named_pipe",


More information about the wine-patches mailing list