sock-server.diff (3 in series)

Martin Wilck Martin.Wilck at fujitsu-siemens.com
Tue Apr 23 13:20:44 CDT 2002


Patch: sock-server.diff

Implement server framework for asynchronous IO on sockets.

Patch against: CVS 2002/04/23, with previous patches in series applied.

The most essential part is the implementation of the queue_async()
object method for sockets.
Apart from that, a lot of fine tuning has been done to handle
shutdown() and close() correctly.

The changes from CVS 1.27 to 1.28 of sock.c have removed the part of the code
that activates socket events after the socket is removed from the main
select loop (this happens when FD_CLOSE is received to prevent the
server from busy-looping). This patch reintroduces this code in
slightly different form (sock_try_event() routine, prevent recursive
calls to sock_poll_event()).

The sock_poll_event() routine is also changed wrt the handling of
FD_CLOSE events, which can come in 2 flavors: full close (POLLHUP) or
close of the incoming dataflow only (empty read). The old
implementation would catch close events of the second type only if
the app had requested event notification for reading (FD_READ).

A lot of care has been taken to ascertain that pending data will be
read before FD_CLOSE is delivered.

This is certainly the most "critical" part of this patch series because
correct handling of the socket events, in particular if also
overlapped IO is used, is really tricky.

Modified files:
        include:        server_protocol.h
        server:         protocol.def
                        sock.c

diff -ruNX ignore CVS/wine/include/wine/server_protocol.h TMP/wine/include/wine/server_protocol.h
--- CVS/wine/include/wine/server_protocol.h	Mon Apr 22 10:54:32 2002
+++ TMP/wine/include/wine/server_protocol.h	Mon Apr 22 10:54:15 2002
@@ -796,6 +796,8 @@
 #define FD_FLAG_OVERLAPPED 0x01
 #define FD_FLAG_TIMEOUT    0x02

+#define FD_FLAG_RECV_SHUTDOWN      0x04
+#define FD_FLAG_SEND_SHUTDOWN      0x08


 struct set_file_pointer_request
@@ -3196,6 +3198,6 @@
     struct get_window_properties_reply get_window_properties_reply;
 };

-#define SERVER_PROTOCOL_VERSION 79
+#define SERVER_PROTOCOL_VERSION 80

 #endif /* __WINE_WINE_SERVER_PROTOCOL_H */
diff -ruNX ignore CVS/wine/server/protocol.def TMP/wine/server/protocol.def
--- CVS/wine/server/protocol.def	Mon Apr 22 10:54:32 2002
+++ TMP/wine/server/protocol.def	Mon Apr 22 10:54:15 2002
@@ -612,7 +612,9 @@
 };
 #define FD_FLAG_OVERLAPPED 0x01
 #define FD_FLAG_TIMEOUT    0x02
-
+/* Socket flags: see server/sock.c */
+#define FD_FLAG_RECV_SHUTDOWN      0x04
+#define FD_FLAG_SEND_SHUTDOWN      0x08

 /* Set a file current position */
 @REQ(set_file_pointer)
diff -ruNX ignore CVS/wine/server/sock.c TMP/wine/server/sock.c
--- CVS/wine/server/sock.c	Tue Apr 23 18:22:36 2002
+++ TMP/wine/server/sock.c	Tue Apr 23 18:23:50 2002
@@ -86,6 +86,7 @@
 static void sock_destroy( struct object *obj );
 static int sock_get_error( int err );
 static void sock_set_error(void);
+static void sock_queue_async(struct object *obj, void *ptr, unsigned int status, int type, int count);

 static const struct object_ops sock_ops =
 {
@@ -100,7 +101,7 @@
     sock_get_fd,                  /* get_fd */
     no_flush,                     /* flush */
     sock_get_info,                /* get_file_info */
-    NULL,                         /* queue_async */
+    sock_queue_async,             /* queue_async */
     sock_destroy                  /* destroy */
 };

@@ -124,7 +125,7 @@
 };


-static void sock_reselect( struct sock *sock )
+static int sock_reselect( struct sock *sock )
 {
     int ev = sock_get_poll_events( &sock->obj );

@@ -133,21 +134,61 @@

     if (sock->obj.select == -1) {
         /* previously unconnected socket, is this reselect supposed to connect it? */
-        if (!(sock->state & ~FD_WINE_NONBLOCKING)) return;
+        if (!(sock->state & ~FD_WINE_NONBLOCKING)) return 0;
         /* ok, it is, attach it to the wineserver's main poll loop */
         add_select_user( &sock->obj );
     }
     /* update condition mask */
     set_select_events( &sock->obj, ev );
+    return ev;
+}
+
+/* After POLLHUP is received, the socket will no longer be in the main select loop.
+   This function is used to signal pending events nevertheless */
+static void sock_try_event ( struct sock *sock, int event )
+{
+    struct pollfd pfd;
+
+    pfd.fd = sock->obj.fd;
+    pfd.events = event;
+    pfd.revents = 0;
+    poll (&pfd, 1, 0);
+
+    if ( pfd.revents )
+    {
+        if ( debug_level ) fprintf ( stderr, "sock_try_event: %x\n", pfd.revents );
+        sock_poll_event ( &sock->obj, pfd.revents );
+    }
 }

 /* wake anybody waiting on the socket event or send the associated message */
-static void sock_wake_up( struct sock *sock )
+static void sock_wake_up( struct sock *sock, int pollev )
 {
     unsigned int events = sock->pmask & sock->mask;
     int i;
+    int async_active = 0;

-    if (!events) return;
+    if ( sock->flags & FD_FLAG_OVERLAPPED )
+    {
+        if( pollev & (POLLIN|POLLPRI) && IS_READY( sock->read_q ) )
+        {
+            if (debug_level) fprintf ( stderr, "activating read queue for socket %p\n", sock );
+            async_notify( sock->read_q.head, STATUS_ALERTED );
+            async_active = 1;
+        }
+        if( pollev & POLLOUT && IS_READY( sock->write_q ) )
+        {
+            if (debug_level) fprintf ( stderr, "activating write queue for socket %p\n", sock );
+            async_notify( sock->write_q.head, STATUS_ALERTED );
+            async_active = 1;
+        }
+    }
+
+    /* Do not signal events if there are still pending asynchronous IO requests */
+    /* We need this to delay FD_CLOSE events until all pending overlapped requests are processed */
+    if ( !events || async_active ) return;
+
+    if (events & FD_CLOSE) sock->hmask |= FD_CLOSE;

     if (sock->event)
     {
@@ -183,6 +224,7 @@
 static void sock_poll_event( struct object *obj, int event )
 {
     struct sock *sock = (struct sock *)obj;
+    int empty_recv = 0;

     assert( sock->obj.ops == &sock_ops );
     if (debug_level)
@@ -208,6 +250,7 @@
             sock->errors[FD_CONNECT_BIT] = sock_error( sock->obj.fd );
             if (debug_level)
                 fprintf(stderr, "socket %d connection failure\n", sock->obj.fd);
+            set_select_events( &sock->obj, -1 );
         }
     } else
     if (sock->state & FD_WINE_LISTENING)
@@ -226,6 +269,7 @@
             sock->pmask |= FD_ACCEPT;
             sock->errors[FD_ACCEPT_BIT] = sock_error( sock->obj.fd );
             sock->hmask |= FD_ACCEPT;
+            set_select_events( &sock->obj, -1 );
         }
     } else
     {
@@ -233,11 +277,12 @@
         if (event & POLLIN)
         {
             char dummy;
+            int nr;

             /* Linux 2.4 doesn't report POLLHUP if only one side of the socket
              * has been closed, so we need to check for it explicitly here */
-            if (!recv( sock->obj.fd, &dummy, 1, MSG_PEEK )) event = POLLHUP;
-            else
+            nr  = recv( sock->obj.fd, &dummy, 1, MSG_PEEK );
+            if ( nr > 0 )
             {
                 /* incoming data */
                 sock->pmask |= FD_READ;
@@ -246,6 +291,22 @@
                 if (debug_level)
                     fprintf(stderr, "socket %d is readable\n", sock->obj.fd );
             }
+            else if ( nr == 0 )
+                empty_recv = 1;
+            else
+            {
+                /* EAGAIN can happen if an async recv() falls between the server's poll()
+                   call and the invocation of this routine */
+                if ( errno == EAGAIN )
+                    event &= ~POLLIN;
+                else
+                {
+                    if ( debug_level )
+                        fprintf ( stderr, "recv error on socket %d: %d\n", sock->obj.fd, errno );
+                    event = POLLERR;
+                }
+            }
+
         }
         if (event & POLLOUT)
         {
@@ -263,25 +324,25 @@
             if (debug_level)
                 fprintf(stderr, "socket %d got OOB data\n", sock->obj.fd);
         }
-        if (((event & POLLERR) || ((event & (POLLIN|POLLHUP)) == POLLHUP))
-            && (sock->state & (FD_READ|FD_WRITE))) {
-            /* socket closing */
+        /* According to WS2 specs, FD_CLOSE is only delivered when there is
+           no more data to be read (i.e. empty_recv = 1) */
+        else if ( empty_recv && (sock->state & (FD_READ|FD_WRITE) ))
+        {
             sock->errors[FD_CLOSE_BIT] = sock_error( sock->obj.fd );
-            sock->state &= ~(FD_WINE_CONNECTED|FD_READ|FD_WRITE);
+            if ( event & ( POLLERR|POLLHUP ) )
+                 sock->state &= ~(FD_WINE_CONNECTED|FD_WRITE);
             sock->pmask |= FD_CLOSE;
             if (debug_level)
-                fprintf(stderr, "socket %d aborted by error %d\n",
-                        sock->obj.fd, sock->errors[FD_CLOSE_BIT]);
+                fprintf(stderr, "socket %d aborted by error %d, event: %x - removing from select loop\n",
+                        sock->obj.fd, sock->errors[FD_CLOSE_BIT], event);
+            set_select_events( &sock->obj, -1 );
         }
     }

-    if (event & (POLLERR|POLLHUP))
-        set_select_events( &sock->obj, -1 );
-    else
-        sock_reselect( sock );
+    sock_reselect( sock );

     /* wake up anyone waiting for whatever just happened */
-    if (sock->pmask & sock->mask) sock_wake_up( sock );
+    if ( sock->pmask & sock->mask || sock->flags & FD_FLAG_OVERLAPPED ) sock_wake_up( sock, event );

     /* if anyone is stupid enough to wait on the socket object itself,
      * maybe we should wake them up too, just in case? */
@@ -320,8 +381,14 @@
         /* listening, wait for readable */
         return (sock->hmask & FD_ACCEPT) ? 0 : POLLIN;

-    if (mask & FD_READ)  ev |= POLLIN | POLLPRI;
-    if (mask & FD_WRITE) ev |= POLLOUT;
+    if (mask & (FD_READ) || (sock->flags & WSA_FLAG_OVERLAPPED && IS_READY (sock->read_q)))
+        ev |= POLLIN | POLLPRI;
+    if (mask & FD_WRITE || (sock->flags & WSA_FLAG_OVERLAPPED && IS_READY (sock->write_q)))
+        ev |= POLLOUT;
+    /* We use POLLIN with 0 bytes recv() as FD_CLOSE indication. */
+    if (sock->mask & ~sock->hmask & FD_CLOSE && !(sock->hmask & FD_READ) )
+        ev |= POLLIN;
+
     return ev;
 }

@@ -352,9 +419,68 @@
     }
     *flags = 0;
     if (sock->flags & WSA_FLAG_OVERLAPPED) *flags |= FD_FLAG_OVERLAPPED;
+    if ( !(sock->state & FD_READ ) )  *flags |= FD_FLAG_RECV_SHUTDOWN;
+    if ( !(sock->state & FD_WRITE ) ) *flags |= FD_FLAG_SEND_SHUTDOWN;
     return FD_TYPE_DEFAULT;
 }

+static void sock_queue_async(struct object *obj, void *ptr, unsigned int status, int type, int count)
+{
+    struct sock *sock = (struct sock *)obj;
+    struct async_queue *q;
+    struct async *async;
+    int pollev;
+
+    assert( obj->ops == &sock_ops );
+
+    if ( !(sock->flags & WSA_FLAG_OVERLAPPED) )
+    {
+        set_error ( STATUS_INVALID_HANDLE );
+        return;
+    }
+
+    switch( type )
+    {
+    case ASYNC_TYPE_READ:
+        q = &sock->read_q;
+        break;
+    case ASYNC_TYPE_WRITE:
+        q = &sock->write_q;
+        break;
+    default:
+        set_error( STATUS_INVALID_PARAMETER );
+        return;
+    }
+
+    async = find_async ( q, current, ptr );
+
+    if ( status == STATUS_PENDING )
+    {
+        if ( ( !( sock->state & FD_READ ) && type == ASYNC_TYPE_READ  ) ||
+             ( !( sock->state & FD_WRITE ) && type == ASYNC_TYPE_WRITE ) )
+        {
+            set_error ( STATUS_PIPE_DISCONNECTED );
+            if ( async ) destroy_async ( async );
+        }
+        else
+        {
+            if ( !async )
+                async = create_async ( obj, current, ptr );
+            if ( !async )
+                return;
+
+            async->status = STATUS_PENDING;
+            if ( !async->q )
+                async_insert ( q, async );
+        }
+    }
+    else if ( async ) destroy_async ( async );
+    else set_error ( STATUS_INVALID_PARAMETER );
+
+    pollev = sock_reselect ( sock );
+    if ( pollev ) sock_try_event ( sock, pollev );
+}
+
 static void sock_destroy( struct object *obj )
 {
     struct sock *sock = (struct sock *)obj;
@@ -578,6 +704,7 @@
 {
     struct sock *sock;
     struct event *old_event;
+    int pollev;

     if (!(sock = (struct sock*)get_handle_obj( current->process, req->handle,
                                                GENERIC_READ|GENERIC_WRITE|SYNCHRONIZE, &sock_ops)))
@@ -591,7 +718,10 @@
     if (req->event) sock->event = get_event_obj( current->process, req->event, EVENT_MODIFY_STATE );

     if (debug_level && sock->event) fprintf(stderr, "event ptr: %p\n", sock->event);
-    sock_reselect( sock );
+
+    pollev = sock_reselect( sock );
+    if ( pollev ) sock_try_event ( sock, pollev );
+
     if (sock->mask)
         sock->state |= FD_WINE_NONBLOCKING;

@@ -599,7 +729,7 @@
        it is possible that FD_CONNECT or FD_ACCEPT network events has happened
        before a WSAEventSelect() was done on it.
        (when dealing with Asynchronous socket)  */
-    if (sock->pmask & sock->mask) sock_wake_up( sock );
+    if (sock->pmask & sock->mask) sock_wake_up( sock, pollev );

     if (old_event) release_object( old_event ); /* we're through with it */
     release_object( &sock->obj );
@@ -646,6 +776,7 @@
 DECL_HANDLER(enable_socket_event)
 {
     struct sock *sock;
+    int pollev;

     if (!(sock = (struct sock*)get_handle_obj( current->process, req->handle,
                                                GENERIC_READ|GENERIC_WRITE|SYNCHRONIZE, &sock_ops)))
@@ -655,7 +786,10 @@
     sock->hmask &= ~req->mask;
     sock->state |= req->sstate;
     sock->state &= ~req->cstate;
-    sock_reselect( sock );
+
+    pollev = sock_reselect( sock );
+    if ( pollev ) sock_try_event ( sock, pollev );
+
     release_object( &sock->obj );
 }








More information about the wine-patches mailing list