Named pipe patch 5 (supercedes 4)

Mike McCormack mike at codeweavers.com
Wed May 14 04:02:18 CDT 2003


Hi All,

This patch gets even further along in Dan's named pipe test at:

http://www.kegel.com/pipe.c

Mike


ChangeLog:
* rewrite of the named pipe code
* allow NtFileFlushBuffers to wait
* allow DisconnectNamedPipe to invalidate client cached fd
* fix the winehq pipe test now that one extra test passes

-------------- next part --------------
Index: server/protocol.def
===================================================================
RCS file: /home/wine/wine/server/protocol.def,v
retrieving revision 1.65
diff -u -r1.65 protocol.def
--- server/protocol.def	4 Apr 2003 22:26:34 -0000	1.65
+++ server/protocol.def	14 May 2003 08:56:24 -0000
@@ -665,6 +665,8 @@
 /* Flush a file buffers */
 @REQ(flush_file)
     obj_handle_t handle;        /* handle to the file */
+ at REPLY
+    obj_handle_t event;         /* event set when finished */
 @END
 
 
@@ -1739,6 +1741,8 @@
 /* Disconnect a named pipe */
 @REQ(disconnect_named_pipe)
     obj_handle_t   handle;
+ at REPLY
+    int            fd;           /* associated fd to close */
 @END
 
 
Index: server/fd.c
===================================================================
RCS file: /home/wine/wine/server/fd.c,v
retrieving revision 1.10
diff -u -r1.10 fd.c
--- server/fd.c	11 May 2003 02:45:33 -0000	1.10
+++ server/fd.c	14 May 2003 08:56:25 -0000
@@ -963,7 +963,7 @@
 }
 
 /* default flush() routine */
-int no_flush( struct fd *fd )
+int no_flush( struct fd *fd, struct event **event )
 {
     set_error( STATUS_OBJECT_TYPE_MISMATCH );
     return 0;
@@ -1002,10 +1002,15 @@
 DECL_HANDLER(flush_file)
 {
     struct fd *fd = get_handle_fd_obj( current->process, req->handle, 0 );
+    struct event * event = NULL;
 
     if (fd)
     {
-        fd->fd_ops->flush( fd );
+        fd->fd_ops->flush( fd, &event );
+        if( event )
+        {
+            reply->event = alloc_handle( current->process, event, SYNCHRONIZE, 0 );
+        }
         release_object( fd );
     }
 }
Index: server/file.c
===================================================================
RCS file: /home/wine/wine/server/file.c,v
retrieving revision 1.67
diff -u -r1.67 file.c
--- server/file.c	26 Mar 2003 23:41:43 -0000	1.67
+++ server/file.c	14 May 2003 08:56:25 -0000
@@ -72,7 +72,7 @@
 
 static int file_get_poll_events( struct fd *fd );
 static void file_poll_event( struct fd *fd, int event );
-static int file_flush( struct fd *fd );
+static int file_flush( struct fd *fd, struct event **event );
 static int file_get_info( struct fd *fd, struct get_file_info_reply *reply, int *flags );
 static void file_queue_async( struct fd *fd, void *ptr, unsigned int status, int type, int count );
 
@@ -301,7 +301,7 @@
 }
 
 
-static int file_flush( struct fd *fd )
+static int file_flush( struct fd *fd, struct event **event )
 {
     int ret = (fsync( get_unix_fd(fd) ) != -1);
     if (!ret) file_set_error();
Index: server/file.h
===================================================================
RCS file: /home/wine/wine/server/file.h,v
retrieving revision 1.7
diff -u -r1.7 file.h
--- server/file.h	26 Mar 2003 23:41:43 -0000	1.7
+++ server/file.h	14 May 2003 08:56:25 -0000
@@ -35,7 +35,7 @@
     /* a poll() event occured */
     void (*poll_event)(struct fd *,int event);
     /* flush the object buffers */
-    int  (*flush)(struct fd *);
+    int  (*flush)(struct fd *, struct event **);
     /* get file information */
     int  (*get_file_info)(struct fd *,struct get_file_info_reply *, int *flags);
     /* queue an async operation - see register_async handler in async.c*/
@@ -55,12 +55,13 @@
 extern void set_fd_events( struct fd *fd, int events );
 extern obj_handle_t lock_fd( struct fd *fd, file_pos_t offset, file_pos_t count, int shared, int wait );
 extern void unlock_fd( struct fd *fd, file_pos_t offset, file_pos_t count );
+extern int flush_cached_fd( struct process *process, obj_handle_t handle, int *fd );
 
 extern int default_fd_add_queue( struct object *obj, struct wait_queue_entry *entry );
 extern void default_fd_remove_queue( struct object *obj, struct wait_queue_entry *entry );
 extern int default_fd_signaled( struct object *obj, struct thread *thread );
 extern void default_poll_event( struct fd *fd, int event );
-extern int no_flush( struct fd *fd );
+extern int no_flush( struct fd *fd, struct event **event );
 extern int no_get_file_info( struct fd *fd, struct get_file_info_reply *info, int *flags );
 extern void no_queue_async( struct fd *fd, void* ptr, unsigned int status, int type, int count );
 extern void main_loop(void);
Index: server/handle.c
===================================================================
RCS file: /home/wine/wine/server/handle.c,v
retrieving revision 1.26
diff -u -r1.26 handle.c
--- server/handle.c	19 Feb 2003 00:33:33 -0000	1.26
+++ server/handle.c	14 May 2003 08:56:25 -0000
@@ -397,6 +397,17 @@
     return entry->fd;
 }
 
+int flush_cached_fd( struct process *process, obj_handle_t handle, int *fd )
+{
+    struct handle_entry *entry = get_handle( process, handle );
+
+    if( !entry )
+        return -1;
+    *fd = entry->fd;
+    entry->fd = -1;
+    return 0;
+}
+
 /* find the first inherited handle of the given type */
 /* this is needed for window stations and desktops (don't ask...) */
 obj_handle_t find_inherited_handle( struct process *process, const struct object_ops *ops )
Index: server/named_pipe.c
===================================================================
RCS file: /home/wine/wine/server/named_pipe.c,v
retrieving revision 1.23
diff -u -r1.23 named_pipe.c
--- server/named_pipe.c	17 Apr 2003 02:14:04 -0000	1.23
+++ server/named_pipe.c	14 May 2003 08:56:26 -0000
@@ -19,7 +19,7 @@
  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
  *
  * TODO:
- *   improve error handling
+ *   message mode
  */
 
 #include "config.h"
@@ -50,26 +50,47 @@
     ps_none,
     ps_idle_server,
     ps_wait_open,
-    ps_wait_connect,
     ps_connected_server,
-    ps_connected_client,
-    ps_disconnected
+    ps_wait_disconnect,
+    ps_disconnected_server,
+    ps_wait_connect
+};
+
+struct wait_info 
+{
+    struct thread       *thread;
+    void                *func;
+    void                *overlapped;
 };
 
 struct named_pipe;
 
-struct pipe_user
+struct pipe_server
+{
+    struct object        obj;
+    struct fd           *fd;
+    enum pipe_state      state;
+    struct pipe_client  *client;
+    struct named_pipe   *pipe;
+    struct pipe_server  *next;
+    struct pipe_server  *prev;
+    struct timeout_user *flush_poll;
+    struct event        *event;
+    struct wait_info     wait;
+};
+
+struct pipe_client
 {
-    struct object       obj;
-    struct fd          *fd;
-    enum pipe_state     state;
-    struct pipe_user   *other;
-    struct named_pipe  *pipe;
-    struct pipe_user   *next;
-    struct pipe_user   *prev;
-    struct thread      *thread;
-    void               *func;
-    void               *overlapped;
+    struct object        obj;
+    struct fd           *fd;
+    struct pipe_server  *server;
+    struct wait_info     wait;
+};
+
+struct connect_wait
+{
+    struct wait_info     wait;
+    struct connect_wait *next;
 };
 
 struct named_pipe
@@ -80,11 +101,13 @@
     unsigned int        outsize;
     unsigned int        insize;
     unsigned int        timeout;
-    struct pipe_user   *users;
+    unsigned int        instances;
+    struct pipe_server *servers;
+    struct connect_wait *connect_waiters;
 };
 
 static void named_pipe_dump( struct object *obj, int verbose );
-static void named_pipe_destroy( struct object *obj);
+static void named_pipe_destroy( struct object *obj );
 
 static const struct object_ops named_pipe_ops =
 {
@@ -98,120 +121,359 @@
     named_pipe_destroy            /* destroy */
 };
 
-static void pipe_user_dump( struct object *obj, int verbose );
-static struct fd *pipe_user_get_fd( struct object *obj );
-static void pipe_user_destroy( struct object *obj);
+/* common to clients and servers */
+static int pipe_end_get_poll_events( struct fd *fd );
+static int pipe_end_get_info( struct fd *fd, 
+                  struct get_file_info_reply *reply, int *flags );
+
+/* server end functions */
+static void pipe_server_dump( struct object *obj, int verbose );
+static struct fd *pipe_server_get_fd( struct object *obj );
+static void pipe_server_destroy( struct object *obj);
+static int pipe_server_flush( struct fd *fd, struct event **event );
+
+static const struct object_ops pipe_server_ops =
+{
+    sizeof(struct pipe_server),   /* size */
+    pipe_server_dump,             /* dump */
+    default_fd_add_queue,         /* add_queue */
+    default_fd_remove_queue,      /* remove_queue */
+    default_fd_signaled,          /* signaled */
+    no_satisfied,                 /* satisfied */
+    pipe_server_get_fd,           /* get_fd */
+    pipe_server_destroy           /* destroy */
+};
+
+static const struct fd_ops pipe_server_fd_ops =
+{
+    pipe_end_get_poll_events,     /* get_poll_events */
+    default_poll_event,           /* poll_event */
+    pipe_server_flush,            /* flush */
+    pipe_end_get_info,            /* get_file_info */
+    no_queue_async                /* queue_async */
+};
 
-static int pipe_user_get_poll_events( struct fd *fd );
-static int pipe_user_get_info( struct fd *fd, struct get_file_info_reply *reply, int *flags );
+/* client end functions */
+static void pipe_client_dump( struct object *obj, int verbose );
+static struct fd *pipe_client_get_fd( struct object *obj );
+static void pipe_client_destroy( struct object *obj );
+static int pipe_client_flush( struct fd *fd, struct event **event );
 
-static const struct object_ops pipe_user_ops =
+static const struct object_ops pipe_client_ops =
 {
-    sizeof(struct pipe_user),     /* size */
-    pipe_user_dump,               /* dump */
+    sizeof(struct pipe_client),   /* size */
+    pipe_client_dump,             /* dump */
     default_fd_add_queue,         /* add_queue */
     default_fd_remove_queue,      /* remove_queue */
     default_fd_signaled,          /* signaled */
     no_satisfied,                 /* satisfied */
-    pipe_user_get_fd,             /* get_fd */
-    pipe_user_destroy             /* destroy */
+    pipe_client_get_fd,           /* get_fd */
+    pipe_client_destroy           /* destroy */
 };
 
-static const struct fd_ops pipe_user_fd_ops =
+static const struct fd_ops pipe_client_fd_ops =
 {
-    pipe_user_get_poll_events,    /* get_poll_events */
+    pipe_end_get_poll_events,     /* get_poll_events */
     default_poll_event,           /* poll_event */
-    no_flush,                     /* flush */
-    pipe_user_get_info,           /* get_file_info */
+    pipe_client_flush,            /* flush */
+    pipe_end_get_info,            /* get_file_info */
     no_queue_async                /* queue_async */
 };
 
 static void named_pipe_dump( struct object *obj, int verbose )
 {
-    struct named_pipe *pipe = (struct named_pipe *)obj;
+    struct named_pipe *pipe = (struct named_pipe *) obj;
     assert( obj->ops == &named_pipe_ops );
     fprintf( stderr, "named pipe %p\n" ,pipe);
 }
 
-static void pipe_user_dump( struct object *obj, int verbose )
+static void pipe_server_dump( struct object *obj, int verbose )
+{
+    struct pipe_server *server = (struct pipe_server *) obj;
+    assert( obj->ops == &pipe_server_ops );
+    fprintf( stderr, "named pipe server %p (state %d)\n",
+             server, server->state );
+}
+
+static void pipe_client_dump( struct object *obj, int verbose )
 {
-    struct pipe_user *user = (struct pipe_user *)obj;
-    assert( obj->ops == &pipe_user_ops );
-    fprintf( stderr, "named pipe user %p (state %d)\n", user, user->state );
+    struct pipe_client *client = (struct pipe_client *) obj;
+    assert( obj->ops == &pipe_server_ops );
+    fprintf( stderr, "named pipe client %p (server state %d)\n",
+             client, client->server->state );
 }
 
 static void named_pipe_destroy( struct object *obj)
 {
-    struct named_pipe *pipe = (struct named_pipe *)obj;
-    assert( !pipe->users );
+    struct named_pipe *pipe = (struct named_pipe *) obj;
+    assert( !pipe->servers );
+    assert( !pipe->instances );
 }
 
-static void notify_waiter( struct pipe_user *user, unsigned int status)
+static void notify_waiter( struct wait_info *wait,
+                                  unsigned int status )
 {
-    if(user->thread && user->func && user->overlapped)
+    if( wait->thread && wait->func && wait->overlapped )
     {
         /* queue a system APC, to notify a waiting thread */
-        thread_queue_apc(user->thread, NULL, user->func, APC_ASYNC, 1,
-                         user->overlapped, (void *)status, NULL);
+        thread_queue_apc( wait->thread, NULL, wait->func, APC_ASYNC,
+                          1, wait->overlapped, (void *)status, NULL );
+    }
+    if( wait->thread ) release_object( wait->thread );
+    memset( wait, 0, sizeof (struct wait_info) );
+}
+
+static void set_waiter( struct wait_info *wait, void *func, void *ov )
+{
+    wait->thread = (struct thread *) grab_object( current );
+    wait->func = func;
+    wait->overlapped = ov;
+}
+
+static void notify_connect_waiters( struct named_pipe *pipe )
+{
+    struct connect_wait *cw, **x = &pipe->connect_waiters;
+
+    while( *x )
+    {
+        cw = *x;
+        notify_waiter( &cw->wait, STATUS_SUCCESS );
+        release_object( pipe );
+        *x = cw->next;
+        free( cw );
+    }
+}
+
+static void queue_connect_waiter( struct named_pipe *pipe, 
+                                  void *func, void *overlapped )
+{
+    struct connect_wait *waiter;
+
+    waiter = malloc( sizeof *waiter );
+    if( waiter )
+    {
+        set_waiter( &waiter->wait, func, overlapped );
+        waiter->next = pipe->connect_waiters;
+        pipe->connect_waiters = waiter;
+        grab_object( pipe );
     }
-    if (user->thread) release_object(user->thread);
-    user->thread = NULL;
-    user->func = NULL;
-    user->overlapped=NULL;
+    else
+        set_error( STATUS_NO_MEMORY );
 }
 
-static struct fd *pipe_user_get_fd( struct object *obj )
+static struct fd *pipe_client_get_fd( struct object *obj )
 {
-    struct pipe_user *user = (struct pipe_user *)obj;
-    if (user->fd) return (struct fd *)grab_object( user->fd );
+    struct pipe_client *client = (struct pipe_client *) obj;
+    if( client->fd )
+        return (struct fd *) grab_object( client->fd );
     set_error( STATUS_PIPE_DISCONNECTED );
     return NULL;
 }
 
-static void pipe_user_destroy( struct object *obj)
+static struct fd *pipe_server_get_fd( struct object *obj )
+{
+    struct pipe_server *server = (struct pipe_server *) obj;
+
+    switch(server->state)
+    {
+    case ps_connected_server:
+    case ps_wait_disconnect:
+        assert( server->fd );
+        return (struct fd *) grab_object( server->fd );
+
+    case ps_wait_open:
+    case ps_idle_server:
+        set_error( STATUS_PIPE_LISTENING );
+        break;
+
+    case ps_disconnected_server:
+    case ps_wait_connect:
+        set_error( STATUS_PIPE_DISCONNECTED );
+        break;
+
+    default:
+        assert( 0 );
+    }
+    return NULL;
+}
+
+
+static void notify_empty( struct pipe_server *server )
+{
+    if( !server->flush_poll )
+        return;
+    assert( server->state == ps_connected_server );
+    assert( server->event );
+    remove_timeout_user( server->flush_poll );
+    server->flush_poll = NULL;
+    set_event( server->event );
+    release_object( server->event );
+    server->event = NULL;
+}
+
+static void do_disconnect( struct pipe_server *server )
 {
-    struct pipe_user *user = (struct pipe_user *)obj;
+    /* we may only have a server fd, if the client disconnected */
+    if( server->client )
+    {
+        assert( server->client->server == server );
+        assert( server->client->fd );
+        release_object( server->client->fd );
+        server->client->fd = NULL;
+    }
+    assert( server->fd );
+    release_object( server->fd );
+    server->fd = NULL;
+}
+
+static void pipe_server_destroy( struct object *obj)
+{
+    struct pipe_server *server = (struct pipe_server *)obj;
 
-    assert( obj->ops == &pipe_user_ops );
+    assert( obj->ops == &pipe_server_ops );
 
-    if(user->overlapped)
-        notify_waiter(user,STATUS_HANDLES_CLOSED);
+    if( server->fd )
+    {
+        notify_empty( server );
+        do_disconnect( server );
+    }
 
-    if(user->other)
+    if( server->client )
     {
-        release_object( user->other->fd );
-        user->other->fd = NULL;
-        switch(user->other->state)
+        server->client->server = NULL;
+        server->client = NULL;
+    }
+
+    notify_waiter( &server->wait, STATUS_HANDLES_CLOSED );
+
+    assert( server->pipe->instances );
+    server->pipe->instances--;
+
+    /* remove server from pipe's server list */
+    if( server->next ) server->next->prev = server->prev;
+    if( server->prev ) server->prev->next = server->next;
+    else server->pipe->servers = server->next;
+    release_object( server->pipe );
+}
+
+static void pipe_client_destroy( struct object *obj)
+{
+    struct pipe_client *client = (struct pipe_client *)obj;
+    struct pipe_server *server = client->server;
+
+    assert( obj->ops == &pipe_client_ops );
+
+    notify_waiter( &client->wait, STATUS_HANDLES_CLOSED );
+
+    if( server )
+    {
+        notify_empty( server );
+
+        switch( server->state )
         {
         case ps_connected_server:
-            user->other->state = ps_idle_server;
+            /* Don't destroy the server's fd here as we can't
+               do a successful flush without it. */
+            server->state = ps_wait_disconnect;
+            release_object( client->fd );
+            client->fd = NULL;
             break;
-        case ps_connected_client:
-            user->other->state = ps_disconnected;
+        case ps_disconnected_server:
+            server->state = ps_wait_connect;
             break;
         default:
-            fprintf(stderr,"connected pipe has strange state %d!\n",
-                            user->other->state);
+            assert( 0 );
         }
-        user->other->other=NULL;
-        user->other = NULL;
+        assert( server->client );
+        server->client = NULL;
+        client->server = NULL;
     }
-
-    /* remove user from pipe's user list */
-    if (user->next) user->next->prev = user->prev;
-    if (user->prev) user->prev->next = user->next;
-    else user->pipe->users = user->next;
-    if (user->thread) release_object(user->thread);
-    release_object(user->pipe);
-    if (user->fd) release_object( user->fd );
+    assert( !client->fd );
 }
 
-static int pipe_user_get_poll_events( struct fd *fd )
+static int pipe_end_get_poll_events( struct fd *fd )
 {
     return POLLIN | POLLOUT;  /* FIXME */
 }
 
-static int pipe_user_get_info( struct fd *fd, struct get_file_info_reply *reply, int *flags )
+static int pipe_data_remaining( struct pipe_server *server )
+{
+    struct pollfd pfd;
+    int fd;
+
+    assert( server->client );
+
+    fd = get_unix_fd( server->client->fd );
+    if( fd < 0 )
+        return 0;
+    pfd.fd = fd;
+    pfd.events = POLLIN;
+    pfd.revents = 0;
+
+    if( 0 > poll( &pfd, 1, 0 ) )
+        return 0;
+ 
+    return pfd.revents&POLLIN;
+}
+
+static void check_flushed( void *arg )
+{
+    struct pipe_server *server = (struct pipe_server*) arg;
+
+    assert( server->event );
+    if( pipe_data_remaining( server ) )
+    {
+        struct timeval tv;
+
+        gettimeofday( &tv, 0 );
+        add_timeout( &tv, 100 );
+        server->flush_poll = add_timeout_user( &tv, check_flushed, server );
+    }
+    else
+        notify_empty( server );
+}
+
+static int pipe_server_flush( struct fd *fd, struct event **event )
+{
+    struct pipe_server *server = get_fd_user( fd );
+
+    if( !server )
+        return 0;
+
+    if( server->state != ps_connected_server )
+        return 0;
+
+    /* FIXME: if multiple threads flush the same pipe,
+              maybe should create a list of processes to notify */
+    if( server->flush_poll )
+        return 0;
+
+    if( pipe_data_remaining( server ) )
+    {
+        struct timeval tv;
+
+        /* this kind of sux - 
+           there's no unix way to be alerted when a pipe becomes empty */
+        server->event = create_event( NULL, 0, 0, 0 );
+        if( !server->event )
+            return 0;
+        gettimeofday( &tv, 0 );
+        add_timeout( &tv, 100 );
+        server->flush_poll = add_timeout_user( &tv, check_flushed, server );
+        *event = server->event;
+    }
+
+    return 0; 
+}
+
+static int pipe_client_flush( struct fd *fd, struct event **event )
+{
+    /* FIXME: what do we have to do for this? */
+    return 0;
+}
+
+static int pipe_end_get_info( struct fd *fd, 
+                        struct get_file_info_reply *reply, int *flags )
 {
     if (reply)
     {
@@ -234,12 +496,15 @@
 {
     struct named_pipe *pipe;
 
-    if ((pipe = create_named_object( sync_namespace, &named_pipe_ops, name, len )))
+    pipe = create_named_object( sync_namespace, &named_pipe_ops, name, len );
+    if( pipe )
     {
-        if (get_error() != STATUS_OBJECT_NAME_COLLISION)
+        if( get_error() != STATUS_OBJECT_NAME_COLLISION )
         {
             /* initialize it if it didn't already exist */
-            pipe->users = 0;
+            pipe->servers = 0;
+            pipe->instances = 0;
+            pipe->connect_waiters = NULL;
         }
     }
     return pipe;
@@ -260,65 +525,80 @@
     return NULL;
 }
 
-static struct pipe_user *get_pipe_user_obj( struct process *process, obj_handle_t handle,
-                                            unsigned int access )
+static struct pipe_server *get_pipe_server_obj( struct process *process,
+                                obj_handle_t handle, unsigned int access )
 {
-    return (struct pipe_user *)get_handle_obj( process, handle, access, &pipe_user_ops );
+    struct object *obj;
+    obj = get_handle_obj( process, handle, access, &pipe_server_ops );
+    return (struct pipe_server *) obj;
 }
 
-static struct pipe_user *create_pipe_user( struct named_pipe *pipe )
+static struct pipe_server *create_pipe_server( struct named_pipe *pipe )
 {
-    struct pipe_user *user;
+    struct pipe_server *server;
 
-    user = alloc_object( &pipe_user_ops );
-    if(!user)
+    server = alloc_object( &pipe_server_ops );
+    if( !server )
         return NULL;
 
-    user->fd = NULL;
-    user->pipe = pipe;
-    user->state = ps_none;
-    user->other = NULL;
-    user->thread = NULL;
-    user->func = NULL;
-    user->overlapped = NULL;
+    server->fd = NULL;
+    server->pipe = pipe;
+    server->state = ps_none;
+    server->client = NULL;
+    server->flush_poll = NULL;
+    memset( &server->wait, 0, sizeof (struct wait_info) );
 
-    /* add to list of pipe users */
-    if ((user->next = pipe->users)) user->next->prev = user;
-    user->prev = NULL;
-    pipe->users = user;
+    /* add to list of pipe servers */
+    if ((server->next = pipe->servers)) server->next->prev = server;
+    server->prev = NULL;
+    pipe->servers = server;
 
-    grab_object(pipe);
+    grab_object( pipe );
 
-    return user;
+    return server;
 }
 
-static struct pipe_user *find_partner(struct named_pipe *pipe, enum pipe_state state)
+static struct pipe_client *create_pipe_client( struct pipe_server *server )
 {
-    struct pipe_user *x;
+    struct pipe_client *client;
 
-    for(x = pipe->users; x; x=x->next)
-    {
-        if(x->state==state)
-        break;
-    }
+    client = alloc_object( &pipe_client_ops );
+    if( !client )
+        return NULL;
+
+    client->fd = NULL;
+    client->server = server;
+    memset( &client->wait, 0, sizeof (struct wait_info) );
+
+    return client;
+}
+
+static struct pipe_server *find_server( struct named_pipe *pipe,
+                                        enum pipe_state state )
+{
+    struct pipe_server *x;
+
+    for( x = pipe->servers; x; x = x->next )
+        if( x->state == state )
+            break;
 
-    if(!x)
+    if( !x )
         return NULL;
 
-    return (struct pipe_user *)grab_object( x );
+    return (struct pipe_server *) grab_object( x );
 }
 
 DECL_HANDLER(create_named_pipe)
 {
     struct named_pipe *pipe;
-    struct pipe_user *user;
+    struct pipe_server *server;
 
     reply->handle = 0;
     pipe = create_named_pipe( get_req_data(), get_req_data_size() );
-    if(!pipe)
+    if( !pipe )
         return;
 
-    if (get_error() != STATUS_OBJECT_NAME_COLLISION)
+    if( get_error() != STATUS_OBJECT_NAME_COLLISION )
     {
         pipe->insize = req->insize;
         pipe->outsize = req->outsize;
@@ -326,14 +606,33 @@
         pipe->timeout = req->timeout;
         pipe->pipemode = req->pipemode;
     }
+    else
+    {
+        set_error( 0 );  /* clear the name collision */
+        if( pipe->maxinstances <= pipe->instances )
+        {
+            set_error( STATUS_PIPE_BUSY );
+            release_object( pipe );
+            return;
+        }
+        if( ( pipe->maxinstances != req->maxinstances ) ||
+            ( pipe->timeout != req->timeout ) ||
+            ( pipe->pipemode != req->pipemode ) )
+        {
+            set_error( STATUS_ACCESS_DENIED );
+            release_object( pipe );
+            return;
+        }
+    }
 
-    user = create_pipe_user( pipe );
-
-    if(user)
+    server = create_pipe_server( pipe );
+    if(server)
     {
-        user->state = ps_idle_server;
-        reply->handle = alloc_handle( current->process, user, GENERIC_READ|GENERIC_WRITE, 0 );
-        release_object( user );
+        server->state = ps_idle_server;
+        reply->handle = alloc_handle( current->process, server,
+                                      GENERIC_READ|GENERIC_WRITE, 0 );
+        server->pipe->instances++;
+        release_object( server );
     }
 
     release_object( pipe );
@@ -341,147 +640,176 @@
 
 DECL_HANDLER(open_named_pipe)
 {
-    struct pipe_user *user, *partner;
+    struct pipe_server *server;
+    struct pipe_client *client;
     struct named_pipe *pipe;
+    int fds[2];
 
     reply->handle = 0;
 
-    if (!(pipe = open_named_pipe( get_req_data(), get_req_data_size() )))
+    pipe = open_named_pipe( get_req_data(), get_req_data_size() );
+    if ( !pipe )
     {
         set_error( STATUS_NO_SUCH_FILE );
         return;
     }
-    if (!(partner = find_partner(pipe, ps_wait_open)))
+
+    for( server = pipe->servers; server; server = server->next )
+        if( ( server->state==ps_idle_server ) ||
+            ( server->state==ps_wait_open ) )
+            break;
+    release_object( pipe );
+
+    if ( !server )
     {
-        release_object(pipe);
         set_error( STATUS_PIPE_NOT_AVAILABLE );
         return;
     }
-    if ((user = create_pipe_user( pipe )))
-    {
-        int fds[2];
 
-        if(!socketpair(PF_UNIX, SOCK_STREAM, 0, fds))
+    client = create_pipe_client( server );
+    if( client )
+    {
+        if( !socketpair( PF_UNIX, SOCK_STREAM, 0, fds ) )
         {
-            user->fd = create_anonymous_fd( &pipe_user_fd_ops, fds[1], &user->obj );
-            partner->fd = create_anonymous_fd( &pipe_user_fd_ops, fds[0], &partner->obj );
-            if (user->fd && partner->fd)
+            fcntl( fds[0], F_SETFL, 0 );
+            fcntl( fds[1], F_SETFL, 0 );
+            assert( !client->fd );
+            assert( !server->fd );
+            client->fd = create_anonymous_fd( &pipe_server_fd_ops, 
+                                            fds[1], &client->obj );
+            server->fd = create_anonymous_fd( &pipe_server_fd_ops,
+                                            fds[0], &server->obj );
+            if (client->fd && server->fd)
             {
-                notify_waiter(partner,STATUS_SUCCESS);
-                partner->state = ps_connected_server;
-                partner->other = user;
-                user->state = ps_connected_client;
-                user->other = partner;
-                reply->handle = alloc_handle( current->process, user, req->access, 0 );
+                if( server->state == ps_wait_open )
+                    notify_waiter( &server->wait, STATUS_SUCCESS );
+                assert( !server->wait.thread );
+                server->state = ps_connected_server;
+                server->client = client;
+                client->server = server;
+                reply->handle = alloc_handle( current->process, client,
+                                              req->access, 0 );
             }
         }
-        else file_set_error();
+        else
+            file_set_error();
 
-        release_object( user );
+        release_object( client );
     }
-    release_object( partner );
-    release_object( pipe );
 }
 
 DECL_HANDLER(connect_named_pipe)
 {
-    struct pipe_user *user, *partner;
+    struct pipe_server *server;
 
-    user = get_pipe_user_obj(current->process, req->handle, 0);
-    if(!user)
+    server = get_pipe_server_obj(current->process, req->handle, 0);
+    if(!server)
         return;
 
-    if( user->state != ps_idle_server )
-    {
-        set_error(STATUS_PORT_ALREADY_SET);
-    }
-    else
+    switch( server->state )
     {
-        user->state = ps_wait_open;
-        user->thread = (struct thread *)grab_object(current);
-        user->func = req->func;
-        user->overlapped = req->overlapped;
-
-        /* notify all waiters that a pipe just became available */
-        while( (partner = find_partner(user->pipe,ps_wait_connect)) )
-        {
-            notify_waiter(partner,STATUS_SUCCESS);
-            release_object(partner);
-        }
+    case ps_idle_server:
+    case ps_wait_connect:
+        assert( !server->fd );
+        server->state = ps_wait_open;
+        set_waiter( &server->wait, req->func, req->overlapped );
+        notify_connect_waiters( server->pipe );
+        break;
+    case ps_connected_server:
+        assert( server->fd );
+        set_error( STATUS_PIPE_CONNECTED );
+        break;
+    case ps_disconnected_server:
+        set_error( STATUS_PIPE_BUSY );
+        break;
+    case ps_wait_disconnect:
+        set_error( STATUS_NO_DATA_DETECTED );
+        break;
+    default:
+        set_error( STATUS_INVALID_HANDLE );
+        break;
     }
 
-    release_object(user);
+    release_object(server);
 }
 
 DECL_HANDLER(wait_named_pipe)
 {
     struct named_pipe *pipe;
-    struct pipe_user *partner;
+    struct pipe_server *server;
 
-    if (!(pipe = open_named_pipe( get_req_data(), get_req_data_size() )))
+    pipe = open_named_pipe( get_req_data(), get_req_data_size() );
+    if ( pipe )
     {
         set_error( STATUS_PIPE_NOT_AVAILABLE );
         return;
     }
-    if( (partner = find_partner(pipe,ps_wait_open)) )
+    server = find_server( pipe, ps_wait_open );
+    if( server )
     {
-        /* this should use notify_waiter,
-           but no pipe_user object exists now... */
-        thread_queue_apc(current,NULL,req->func,
-                         APC_ASYNC, 1, req->overlapped, STATUS_SUCCESS, NULL);
-        release_object(partner);
+        /* there's already a server waiting for a client to connect */
+        struct wait_info wait;
+        set_waiter( &wait, req->func, req->overlapped );
+        notify_waiter( &wait, STATUS_SUCCESS );
+        release_object( server );
     }
     else
-    {
-        struct pipe_user *user;
+        queue_connect_waiter( pipe, req->func, req->overlapped );
 
-        if( (user = create_pipe_user( pipe )) )
-        {
-            user->state = ps_wait_connect;
-            user->thread = (struct thread *)grab_object(current);
-            user->func = req->func;
-            user->overlapped = req->overlapped;
-            /* don't release it */
-        }
-    }
-    release_object(pipe);
+    release_object( pipe );
 }
 
 DECL_HANDLER(disconnect_named_pipe)
 {
-    struct pipe_user *user;
+    struct pipe_server *server;
 
-    user = get_pipe_user_obj(current->process, req->handle, 0);
-    if(!user)
+    reply->fd = -1;
+    server = get_pipe_server_obj( current->process, req->handle, 0 );
+    if( !server )
         return;
-    if( (user->state == ps_connected_server) &&
-        (user->other->state == ps_connected_client) )
+    switch( server->state )
     {
-        release_object( user->other->fd );
-        user->other->fd = NULL;
-        user->other->state = ps_disconnected;
-        user->other->other = NULL;
-
-        release_object( user->fd );
-        user->fd = NULL;
-        user->state = ps_idle_server;
-        user->other = NULL;
+    case ps_connected_server:
+        assert( server->fd );
+        assert( server->client );
+        assert( server->client->fd );
+
+        notify_empty( server );
+        notify_waiter( &server->client->wait, STATUS_PIPE_DISCONNECTED );
+
+        /* Dump the client and server fds, but keep the pointers
+           around - client loses all waiting data */
+        server->state = ps_disconnected_server;
+        do_disconnect( server );
+        flush_cached_fd( current->process, req->handle, &reply->fd );
+        break;
+
+    case ps_wait_disconnect:
+        assert( !server->client );
+        assert( server->fd );
+        do_disconnect( server );
+        server->state = ps_wait_connect;
+        flush_cached_fd( current->process, req->handle, &reply->fd );
+        break;
+
+    default:
+        set_error( STATUS_PIPE_DISCONNECTED );
     }
-    release_object(user);
+    release_object( server );
 }
 
 DECL_HANDLER(get_named_pipe_info)
 {
-    struct pipe_user *user;
+    struct pipe_server *server;
 
-    user = get_pipe_user_obj(current->process, req->handle, 0);
-    if(!user)
+    server = get_pipe_server_obj( current->process, req->handle, 0 );
+    if(!server)
         return;
 
-    reply->flags        = user->pipe->pipemode;
-    reply->maxinstances = user->pipe->maxinstances;
-    reply->insize       = user->pipe->insize;
-    reply->outsize      = user->pipe->outsize;
+    reply->flags        = server->pipe->pipemode;
+    reply->maxinstances = server->pipe->maxinstances;
+    reply->insize       = server->pipe->insize;
+    reply->outsize      = server->pipe->outsize;
 
-    release_object(user);
+    release_object(server);
 }
Index: server/serial.c
===================================================================
RCS file: /home/wine/wine/server/serial.c,v
retrieving revision 1.29
diff -u -r1.29 serial.c
--- server/serial.c	12 Mar 2003 22:38:14 -0000	1.29
+++ server/serial.c	14 May 2003 08:56:26 -0000
@@ -58,7 +58,7 @@
 static int serial_get_poll_events( struct fd *fd );
 static void serial_poll_event( struct fd *fd, int event );
 static int serial_get_info( struct fd *fd, struct get_file_info_reply *reply, int *flags );
-static int serial_flush( struct fd *fd );
+static int serial_flush( struct fd *fd, struct event **event );
 static void serial_queue_async(struct fd *fd, void *ptr, unsigned int status, int type, int count);
 
 struct serial
@@ -329,7 +329,7 @@
     set_fd_events ( fd, serial_get_poll_events( fd ));
 }
 
-static int serial_flush( struct fd *fd )
+static int serial_flush( struct fd *fd, struct event **event )
 {
     /* MSDN says: If hFile is a handle to a communications device,
      * the function only flushes the transmit buffer.
Index: dlls/kernel/sync.c
===================================================================
RCS file: /home/wine/wine/dlls/kernel/sync.c,v
retrieving revision 1.29
diff -u -r1.29 sync.c
--- dlls/kernel/sync.c	12 May 2003 03:28:26 -0000	1.29
+++ dlls/kernel/sync.c	14 May 2003 08:56:26 -0000
@@ -754,6 +754,8 @@
     {
         req->handle = hPipe;
         ret = !wine_server_call_err( req );
+        if ( ret )
+            close( reply->fd );
     }
     SERVER_END_REQ;
 
Index: dlls/ntdll/file.c
===================================================================
RCS file: /home/wine/wine/dlls/ntdll/file.c,v
retrieving revision 1.21
diff -u -r1.21 file.c
--- dlls/ntdll/file.c	22 Apr 2003 04:04:17 -0000	1.21
+++ dlls/ntdll/file.c	14 May 2003 08:56:27 -0000
@@ -498,11 +498,19 @@
 NTSTATUS WINAPI NtFlushBuffersFile( HANDLE hFile, IO_STATUS_BLOCK* IoStatusBlock )
 {
     NTSTATUS ret;
+    HANDLE hEvent = NULL;
+
     SERVER_START_REQ( flush_file )
     {
         req->handle = hFile;
         ret = wine_server_call( req );
+        hEvent = reply->event;
     }
     SERVER_END_REQ;
+    if( !ret && hEvent )
+    {
+        ret = NtWaitForSingleObject( hEvent, FALSE, NULL );
+        NtClose( hEvent );
+    }
     return ret;
 }
-------------- next part --------------
Index: dlls/kernel/tests/pipe.c
===================================================================
RCS file: /home/wine/wine/dlls/kernel/tests/pipe.c,v
retrieving revision 1.3
diff -u -r1.3 pipe.c
--- dlls/kernel/tests/pipe.c	13 May 2003 04:48:23 -0000	1.3
+++ dlls/kernel/tests/pipe.c	14 May 2003 08:10:17 -0000
@@ -90,10 +90,7 @@
 
     hFile = CreateFileA(PIPENAME, GENERIC_READ|GENERIC_WRITE, 0, 
             NULL, OPEN_EXISTING, 0, 0);
-    todo_wine
-    {
-        ok(hFile != INVALID_HANDLE_VALUE, "CreateFile failed");
-    }
+    ok(hFile != INVALID_HANDLE_VALUE, "CreateFile failed");
 
     /* don't try to do i/o if one side couldn't be opened, as it hangs */
     if (hFile != INVALID_HANDLE_VALUE) {


More information about the wine-patches mailing list