Rob Shearman : rpcrt4: Eliminate the extra thread used for RPC over
TCP/ IP servers by adding socket-specific server protseq functions.
Alexandre Julliard
julliard at wine.codeweavers.com
Tue Oct 31 15:07:44 CST 2006
Module: wine
Branch: master
Commit: 7553a8990c1499a7ed5327a2a2f4ef81ce5affcd
URL: http://source.winehq.org/git/wine.git/?a=commit;h=7553a8990c1499a7ed5327a2a2f4ef81ce5affcd
Author: Rob Shearman <rob at codeweavers.com>
Date: Tue Oct 31 18:59:55 2006 +0000
rpcrt4: Eliminate the extra thread used for RPC over TCP/IP servers by adding socket-specific server protseq functions.
---
dlls/rpcrt4/rpc_transport.c | 250 ++++++++++++++++++++++++++-----------------
1 files changed, 150 insertions(+), 100 deletions(-)
diff --git a/dlls/rpcrt4/rpc_transport.c b/dlls/rpcrt4/rpc_transport.c
index 134ea4c..d3829c0 100644
--- a/dlls/rpcrt4/rpc_transport.c
+++ b/dlls/rpcrt4/rpc_transport.c
@@ -564,42 +564,8 @@ typedef struct _RpcConnection_tcp
{
RpcConnection common;
int sock;
- HANDLE onEventAvailable;
- HANDLE onEventHandled;
- BOOL quit;
} RpcConnection_tcp;
-static DWORD WINAPI rpcrt4_tcp_poll_thread(LPVOID arg)
-{
- RpcConnection_tcp *tcpc;
- int ret;
- struct pollfd pollInfo;
-
- tcpc = (RpcConnection_tcp*) arg;
- pollInfo.fd = tcpc->sock;
- pollInfo.events = POLLIN;
-
- while (!tcpc->quit)
- {
- ret = poll(&pollInfo, 1, 1000);
- if (ret < 0)
- ERR("poll failed with error %d\n", ret);
- else
- {
- if (pollInfo.revents & POLLIN)
- {
- SignalObjectAndWait(tcpc->onEventAvailable,
- tcpc->onEventHandled, INFINITE, FALSE);
- }
- }
- }
-
- /* This avoids the tcpc being destroyed before we are done with it */
- SetEvent(tcpc->onEventAvailable);
-
- return 0;
-}
-
static RpcConnection *rpcrt4_conn_tcp_alloc(void)
{
RpcConnection_tcp *tcpc;
@@ -607,9 +573,6 @@ static RpcConnection *rpcrt4_conn_tcp_al
if (tcpc == NULL)
return NULL;
tcpc->sock = -1;
- tcpc->onEventAvailable = NULL;
- tcpc->onEventHandled = NULL;
- tcpc->quit = FALSE;
return &tcpc->common;
}
@@ -664,18 +627,19 @@ static RPC_STATUS rpcrt4_ncacn_ip_tcp_op
if (Connection->server)
{
- HANDLE thread = NULL;
ret = bind(sock, ai_cur->ai_addr, ai_cur->ai_addrlen);
if (ret < 0)
{
WARN("bind failed, error %d\n", ret);
- goto done;
+ close(sock);
+ continue;
}
ret = listen(sock, 10);
if (ret < 0)
{
WARN("listen failed, error %d\n", ret);
- goto done;
+ close(sock);
+ continue;
}
/* need a non-blocking socket, otherwise accept() has a potential
* race-condition (poll() says it is readable, connection drops,
@@ -685,47 +649,10 @@ static RPC_STATUS rpcrt4_ncacn_ip_tcp_op
if (ret < 0)
{
WARN("couldn't make socket non-blocking, error %d\n", ret);
- goto done;
- }
- tcpc->onEventAvailable = CreateEventW(NULL, FALSE, FALSE, NULL);
- if (tcpc->onEventAvailable == NULL)
- {
- WARN("creating available event failed, error %lu\n", GetLastError());
- goto done;
- }
- tcpc->onEventHandled = CreateEventW(NULL, FALSE, FALSE, NULL);
- if (tcpc->onEventHandled == NULL)
- {
- WARN("creating handled event failed, error %lu\n", GetLastError());
- goto done;
- }
- tcpc->sock = sock;
- thread = CreateThread(NULL, 0, rpcrt4_tcp_poll_thread, tcpc, 0, NULL);
- if (thread == NULL)
- {
- WARN("creating server polling thread failed, error %lu\n",
- GetLastError());
- tcpc->sock = -1;
- goto done;
- }
- CloseHandle(thread);
-
- done:
- if (thread == NULL) /* ie. we failed somewhere */
- {
close(sock);
- if (tcpc->onEventAvailable != NULL)
- {
- CloseHandle(tcpc->onEventAvailable);
- tcpc->onEventAvailable = NULL;
- }
- if (tcpc->onEventHandled != NULL)
- {
- CloseHandle(tcpc->onEventHandled);
- tcpc->onEventHandled = NULL;
- }
continue;
}
+ tcpc->sock = sock;
}
else /* it's a client */
{
@@ -748,12 +675,6 @@ static RPC_STATUS rpcrt4_ncacn_ip_tcp_op
return RPC_S_SERVER_UNAVAILABLE;
}
-static HANDLE rpcrt4_conn_tcp_get_wait_handle(RpcConnection *Connection)
-{
- RpcConnection_tcp *tcpc = (RpcConnection_tcp*) Connection;
- return tcpc->onEventAvailable;
-}
-
static RPC_STATUS rpcrt4_conn_tcp_handoff(RpcConnection *old_conn, RpcConnection *new_conn)
{
int ret;
@@ -764,7 +685,6 @@ static RPC_STATUS rpcrt4_conn_tcp_handof
addrsize = sizeof(address);
ret = accept(server->sock, (struct sockaddr*) &address, &addrsize);
- SetEvent(server->onEventHandled);
if (ret < 0)
{
ERR("Failed to accept a TCP connection: error %d\n", ret);
@@ -799,15 +719,6 @@ static int rpcrt4_conn_tcp_close(RpcConn
TRACE("%d\n", tcpc->sock);
- if (tcpc->onEventAvailable != NULL)
- {
- /* it's a server connection */
- tcpc->quit = TRUE;
- WaitForSingleObject(tcpc->onEventAvailable, INFINITE);
- CloseHandle(tcpc->onEventAvailable);
- CloseHandle(tcpc->onEventHandled);
- }
-
if (tcpc->sock != -1)
close(tcpc->sock);
tcpc->sock = -1;
@@ -949,6 +860,145 @@ static RPC_STATUS rpcrt4_ncacn_ip_tcp_pa
return RPC_S_OK;
}
+typedef struct _RpcServerProtseq_sock
+{
+ RpcServerProtseq common;
+ int mgr_event_rcv;
+ int mgr_event_snd;
+} RpcServerProtseq_sock;
+
+static RpcServerProtseq *rpcrt4_protseq_sock_alloc(void)
+{
+ RpcServerProtseq_sock *ps = HeapAlloc(GetProcessHeap(), 0, sizeof(*ps));
+ if (ps)
+ {
+ int fds[2];
+ if (!socketpair(PF_UNIX, SOCK_DGRAM, 0, fds))
+ {
+ fcntl(fds[0], F_SETFL, O_NONBLOCK);
+ fcntl(fds[1], F_SETFL, O_NONBLOCK);
+ ps->mgr_event_rcv = fds[0];
+ ps->mgr_event_snd = fds[1];
+ }
+ else
+ {
+ ERR("socketpair failed with error %s\n", strerror(errno));
+ HeapFree(GetProcessHeap(), 0, ps);
+ return NULL;
+ }
+ }
+ return &ps->common;
+}
+
+static void rpcrt4_protseq_sock_signal_state_changed(RpcServerProtseq *protseq)
+{
+ RpcServerProtseq_sock *sockps = CONTAINING_RECORD(protseq, RpcServerProtseq_sock, common);
+ char dummy = 1;
+ write(sockps->mgr_event_snd, &dummy, sizeof(dummy));
+}
+
+static void *rpcrt4_protseq_sock_get_wait_array(RpcServerProtseq *protseq, void *prev_array, unsigned int *count)
+{
+ struct pollfd *poll_info = prev_array;
+ RpcConnection_tcp *conn;
+ RpcServerProtseq_sock *sockps = CONTAINING_RECORD(protseq, RpcServerProtseq_sock, common);
+
+ EnterCriticalSection(&protseq->cs);
+
+ /* open and count connections */
+ *count = 1;
+ conn = (RpcConnection_tcp *)protseq->conn;
+ while (conn) {
+ RPCRT4_OpenConnection(&conn->common);
+ if (conn->sock != -1)
+ (*count)++;
+ conn = (RpcConnection_tcp *)conn->common.Next;
+ }
+
+ /* make array of connections */
+ if (poll_info)
+ poll_info = HeapReAlloc(GetProcessHeap(), 0, poll_info, *count*sizeof(*poll_info));
+ else
+ poll_info = HeapAlloc(GetProcessHeap(), 0, *count*sizeof(*poll_info));
+ if (!poll_info)
+ {
+ ERR("couldn't allocate poll_info\n");
+ LeaveCriticalSection(&protseq->cs);
+ return NULL;
+ }
+
+ poll_info[0].fd = sockps->mgr_event_rcv;
+ poll_info[0].events = POLLIN;
+ *count = 1;
+ conn = CONTAINING_RECORD(protseq->conn, RpcConnection_tcp, common);
+ while (conn) {
+ if (conn->sock != -1)
+ {
+ poll_info[*count].fd = conn->sock;
+ poll_info[*count].events = POLLIN;
+ (*count)++;
+ }
+ conn = CONTAINING_RECORD(conn->common.Next, RpcConnection_tcp, common);
+ }
+ LeaveCriticalSection(&protseq->cs);
+ return poll_info;
+}
+
+static void rpcrt4_protseq_sock_free_wait_array(RpcServerProtseq *protseq, void *array)
+{
+ HeapFree(GetProcessHeap(), 0, array);
+}
+
+static int rpcrt4_protseq_sock_wait_for_new_connection(RpcServerProtseq *protseq, unsigned int count, void *wait_array)
+{
+ struct pollfd *poll_info = wait_array;
+ int ret, i;
+ RpcConnection *cconn;
+ RpcConnection_tcp *conn;
+
+ if (!poll_info)
+ return -1;
+
+ ret = poll(poll_info, count, -1);
+ if (ret < 0)
+ {
+ ERR("poll failed with error %d\n", ret);
+ return -1;
+ }
+
+ for (i = 0; i < count; i++)
+ if (poll_info[i].revents & POLLIN)
+ {
+ /* RPC server event */
+ if (i == 0)
+ {
+ char dummy;
+ read(poll_info[0].fd, &dummy, sizeof(dummy));
+ return 0;
+ }
+
+ /* find which connection got a RPC */
+ EnterCriticalSection(&protseq->cs);
+ conn = CONTAINING_RECORD(protseq->conn, RpcConnection_tcp, common);
+ while (conn) {
+ if (poll_info[i].fd == conn->sock) break;
+ conn = CONTAINING_RECORD(conn->common.Next, RpcConnection_tcp, common);
+ }
+ cconn = NULL;
+ if (conn)
+ RPCRT4_SpawnConnection(&cconn, &conn->common);
+ else
+ ERR("failed to locate connection for fd %d\n", poll_info[i].fd);
+ LeaveCriticalSection(&protseq->cs);
+ if (cconn)
+ RPCRT4_new_client(cconn);
+ else
+ return -1;
+ }
+
+ return 1;
+}
+
static const struct connection_ops conn_protseq_list[] = {
{ "ncacn_np",
{ EPM_PROTOCOL_NCACN, EPM_PROTOCOL_SMB },
@@ -978,7 +1028,7 @@ static const struct connection_ops conn_
{ EPM_PROTOCOL_NCACN, EPM_PROTOCOL_TCP },
rpcrt4_conn_tcp_alloc,
rpcrt4_ncacn_ip_tcp_open,
- rpcrt4_conn_tcp_get_wait_handle,
+ NULL,
rpcrt4_conn_tcp_handoff,
rpcrt4_conn_tcp_read,
rpcrt4_conn_tcp_write,
@@ -1009,11 +1059,11 @@ static const struct protseq_ops protseq_
},
{
"ncacn_ip_tcp",
- rpcrt4_protseq_np_alloc,
- rpcrt4_protseq_np_signal_state_changed,
- rpcrt4_protseq_np_get_wait_array,
- rpcrt4_protseq_np_free_wait_array,
- rpcrt4_protseq_np_wait_for_new_connection,
+ rpcrt4_protseq_sock_alloc,
+ rpcrt4_protseq_sock_signal_state_changed,
+ rpcrt4_protseq_sock_get_wait_array,
+ rpcrt4_protseq_sock_free_wait_array,
+ rpcrt4_protseq_sock_wait_for_new_connection,
},
};
More information about the wine-cvs
mailing list