rpcrt4: Open the endpoint from the caller of RpcServerUseProtseq*
instead of the protseq server thread.
Robert Shearman
rob at codeweavers.com
Wed Nov 8 14:45:19 CST 2006
This allows errors to be returned to the caller and to create more than
one connection for an endpoint.
---
dlls/rpcrt4/rpc_binding.c | 2
dlls/rpcrt4/rpc_binding.h | 4 -
dlls/rpcrt4/rpc_server.c | 3
dlls/rpcrt4/rpc_server.h | 2
dlls/rpcrt4/rpc_transport.c | 302
+++++++++++++++++++++++++++++++++----------
5 files changed, 236 insertions(+), 77 deletions(-)
-------------- next part --------------
diff --git a/dlls/rpcrt4/rpc_binding.c b/dlls/rpcrt4/rpc_binding.c
index eda2322..d233f03 100644
--- a/dlls/rpcrt4/rpc_binding.c
+++ b/dlls/rpcrt4/rpc_binding.c
@@ -258,7 +258,7 @@ RPC_STATUS RPCRT4_OpenBinding(RpcBinding
RPCRT4_CreateConnection(&NewConnection, Binding->server, Binding->Protseq,
Binding->NetworkAddr, Binding->Endpoint, NULL,
Binding->AuthInfo, Binding);
- status = RPCRT4_OpenConnection(NewConnection);
+ status = RPCRT4_OpenClientConnection(NewConnection);
if (status != RPC_S_OK)
{
RPCRT4_DestroyConnection(NewConnection);
diff --git a/dlls/rpcrt4/rpc_binding.h b/dlls/rpcrt4/rpc_binding.h
index e6b097c..4df3d73 100644
--- a/dlls/rpcrt4/rpc_binding.h
+++ b/dlls/rpcrt4/rpc_binding.h
@@ -65,7 +65,7 @@ struct connection_ops {
const char *name;
unsigned char epm_protocols[2]; /* only floors 3 and 4. see http://www.opengroup.org/onlinepubs/9629399/apdxl.htm */
RpcConnection *(*alloc)(void);
- RPC_STATUS (*open_connection)(RpcConnection *conn);
+ RPC_STATUS (*open_connection_client)(RpcConnection *conn);
RPC_STATUS (*handoff)(RpcConnection *old_conn, RpcConnection *new_conn);
int (*read)(RpcConnection *conn, void *buffer, unsigned int len);
int (*write)(RpcConnection *conn, const void *buffer, unsigned int len);
@@ -108,7 +108,7 @@ RpcConnection *RPCRT4_GetIdleConnection(
void RPCRT4_ReleaseIdleConnection(RpcConnection *Connection);
RPC_STATUS RPCRT4_CreateConnection(RpcConnection** Connection, BOOL server, LPCSTR Protseq, LPCSTR NetworkAddr, LPCSTR Endpoint, LPCSTR NetworkOptions, RpcAuthInfo* AuthInfo, RpcBinding* Binding);
RPC_STATUS RPCRT4_DestroyConnection(RpcConnection* Connection);
-RPC_STATUS RPCRT4_OpenConnection(RpcConnection* Connection);
+RPC_STATUS RPCRT4_OpenClientConnection(RpcConnection* Connection);
RPC_STATUS RPCRT4_CloseConnection(RpcConnection* Connection);
RPC_STATUS RPCRT4_SpawnConnection(RpcConnection** Connection, RpcConnection* OldConnection);
diff --git a/dlls/rpcrt4/rpc_server.c b/dlls/rpcrt4/rpc_server.c
index 1c9dcc0..e716b53 100644
--- a/dlls/rpcrt4/rpc_server.c
+++ b/dlls/rpcrt4/rpc_server.c
@@ -496,8 +496,7 @@ static RPC_STATUS RPCRT4_use_protseq(Rpc
{
RPC_STATUS status;
- status = RPCRT4_CreateConnection(&ps->conn, TRUE, ps->Protseq, NULL,
- ps->Endpoint, NULL, NULL, NULL);
+ status = ps->ops->open_endpoint(ps, ps->Endpoint);
if (status != RPC_S_OK)
return status;
diff --git a/dlls/rpcrt4/rpc_server.h b/dlls/rpcrt4/rpc_server.h
index 77dae4e..c385bed 100644
--- a/dlls/rpcrt4/rpc_server.h
+++ b/dlls/rpcrt4/rpc_server.h
@@ -56,6 +56,8 @@ struct protseq_ops
/* returns -1 for failure, 0 for server state changed and 1 to indicate a
* new connection was established */
int (*wait_for_new_connection)(RpcServerProtseq *protseq, unsigned int count, void *wait_array);
+ /* opens the endpoint and optionally begins listening */
+ RPC_STATUS (*open_endpoint)(RpcServerProtseq *protseq, LPSTR endpoint);
};
typedef struct _RpcServerInterface
diff --git a/dlls/rpcrt4/rpc_transport.c b/dlls/rpcrt4/rpc_transport.c
index a264b9b..4679e71 100644
--- a/dlls/rpcrt4/rpc_transport.c
+++ b/dlls/rpcrt4/rpc_transport.c
@@ -89,8 +89,9 @@ static struct list connection_pool = LIS
typedef struct _RpcConnection_np
{
RpcConnection common;
- HANDLE pipe, thread;
+ HANDLE pipe;
OVERLAPPED ovl;
+ BOOL listening;
} RpcConnection_np;
static RpcConnection *rpcrt4_conn_np_alloc(void)
@@ -99,13 +100,35 @@ static RpcConnection *rpcrt4_conn_np_all
if (npc)
{
npc->pipe = NULL;
- npc->thread = NULL;
memset(&npc->ovl, 0, sizeof(npc->ovl));
+ npc->listening = FALSE;
}
return &npc->common;
}
-static RPC_STATUS rpcrt4_connect_pipe(RpcConnection *Connection, LPCSTR pname)
+static RPC_STATUS rpcrt4_conn_listen_pipe(RpcConnection_np *npc)
+{
+ if (npc->listening)
+ return RPC_S_OK;
+
+ npc->listening = TRUE;
+ if (ConnectNamedPipe(npc->pipe, &npc->ovl))
+ return RPC_S_OK;
+
+ WARN("Couldn't ConnectNamedPipe (error was %ld)\n", GetLastError());
+ if (GetLastError() == ERROR_PIPE_CONNECTED) {
+ SetEvent(npc->ovl.hEvent);
+ return RPC_S_OK;
+ }
+ if (GetLastError() == ERROR_IO_PENDING) {
+ /* FIXME: looks like we need to GetOverlappedResult here? */
+ return RPC_S_OK;
+ }
+ npc->listening = FALSE;
+ return RPC_S_SERVER_UNAVAILABLE;
+}
+
+static RPC_STATUS rpcrt4_conn_create_pipe(RpcConnection *Connection, LPCSTR pname)
{
RpcConnection_np *npc = (RpcConnection_np *) Connection;
TRACE("listening on %s\n", pname);
@@ -121,22 +144,13 @@ static RPC_STATUS rpcrt4_connect_pipe(Rp
memset(&npc->ovl, 0, sizeof(npc->ovl));
npc->ovl.hEvent = CreateEventW(NULL, TRUE, FALSE, NULL);
- if (ConnectNamedPipe(npc->pipe, &npc->ovl))
- return RPC_S_OK;
- WARN("Couldn't ConnectNamedPipe (error was %ld)\n", GetLastError());
- if (GetLastError() == ERROR_PIPE_CONNECTED) {
- SetEvent(npc->ovl.hEvent);
- return RPC_S_OK;
- }
- if (GetLastError() == ERROR_IO_PENDING) {
- /* FIXME: looks like we need to GetOverlappedResult here? */
- return RPC_S_OK;
- }
- return RPC_S_SERVER_UNAVAILABLE;
+ /* Note: we don't call ConnectNamedPipe here because it must be done in the
+ * server thread as the thread must be alertable */
+ return RPC_S_OK;
}
-static RPC_STATUS rpcrt4_open_pipe(RpcConnection *Connection, LPCSTR pname, BOOL wait)
+static RPC_STATUS rpcrt4_conn_open_pipe(RpcConnection *Connection, LPCSTR pname, BOOL wait)
{
RpcConnection_np *npc = (RpcConnection_np *) Connection;
HANDLE pipe;
@@ -188,13 +202,36 @@ static RPC_STATUS rpcrt4_ncalrpc_open(Rp
* but we'll implement it with named pipes for now */
pname = I_RpcAllocate(strlen(prefix) + strlen(Connection->Endpoint) + 1);
strcat(strcpy(pname, prefix), Connection->Endpoint);
+ r = rpcrt4_conn_open_pipe(Connection, pname, TRUE);
+ I_RpcFree(pname);
- if (Connection->server)
- r = rpcrt4_connect_pipe(Connection, pname);
- else
- r = rpcrt4_open_pipe(Connection, pname, TRUE);
+ return r;
+}
+
+static RPC_STATUS rpcrt4_protseq_ncalrpc_open_endpoint(RpcServerProtseq* protseq, LPSTR endpoint)
+{
+ static LPCSTR prefix = "\\\\.\\pipe\\lrpc\\";
+ RPC_STATUS r;
+ LPSTR pname;
+ RpcConnection *Connection;
+
+ r = RPCRT4_CreateConnection(&Connection, TRUE, protseq->Protseq, NULL,
+ endpoint, NULL, NULL, NULL);
+ if (r != RPC_S_OK)
+ return r;
+
+ /* protseq=ncalrpc: supposed to use NT LPC ports,
+ * but we'll implement it with named pipes for now */
+ pname = I_RpcAllocate(strlen(prefix) + strlen(Connection->Endpoint) + 1);
+ strcat(strcpy(pname, prefix), Connection->Endpoint);
+ r = rpcrt4_conn_create_pipe(Connection, pname);
I_RpcFree(pname);
+ EnterCriticalSection(&protseq->cs);
+ Connection->Next = protseq->conn;
+ protseq->conn = Connection;
+ LeaveCriticalSection(&protseq->cs);
+
return r;
}
@@ -212,19 +249,35 @@ static RPC_STATUS rpcrt4_ncacn_np_open(R
/* protseq=ncacn_np: named pipes */
pname = I_RpcAllocate(strlen(prefix) + strlen(Connection->Endpoint) + 1);
strcat(strcpy(pname, prefix), Connection->Endpoint);
- if (Connection->server)
- r = rpcrt4_connect_pipe(Connection, pname);
- else
- r = rpcrt4_open_pipe(Connection, pname, FALSE);
+ r = rpcrt4_conn_open_pipe(Connection, pname, FALSE);
I_RpcFree(pname);
return r;
}
-static RPC_STATUS rpcrt4_conn_np_handoff(RpcConnection *old_conn, RpcConnection *new_conn)
+static RPC_STATUS rpcrt4_protseq_ncacn_np_open_endpoint(RpcServerProtseq *protseq, LPSTR endpoint)
{
- RpcConnection_np *old_npc = (RpcConnection_np *) old_conn;
- RpcConnection_np *new_npc = (RpcConnection_np *) new_conn;
+ static LPCSTR prefix = "\\\\.";
+ RPC_STATUS r;
+ LPSTR pname;
+ RpcConnection *Connection;
+
+ r = RPCRT4_CreateConnection(&Connection, TRUE, protseq->Protseq, NULL,
+ endpoint, NULL, NULL, NULL);
+ if (r != RPC_S_OK)
+ return r;
+
+ /* protseq=ncacn_np: named pipes */
+ pname = I_RpcAllocate(strlen(prefix) + strlen(Connection->Endpoint) + 1);
+ strcat(strcpy(pname, prefix), Connection->Endpoint);
+ r = rpcrt4_conn_create_pipe(Connection, pname);
+ I_RpcFree(pname);
+
+ return r;
+}
+
+static void rpcrt4_conn_np_handoff(RpcConnection_np *old_npc, RpcConnection_np *new_npc)
+{
/* because of the way named pipes work, we'll transfer the connected pipe
* to the child, then reopen the server binding to continue listening */
@@ -232,7 +285,41 @@ static RPC_STATUS rpcrt4_conn_np_handoff
new_npc->ovl = old_npc->ovl;
old_npc->pipe = 0;
memset(&old_npc->ovl, 0, sizeof(old_npc->ovl));
- return RPCRT4_OpenConnection(old_conn);
+ old_npc->listening = FALSE;
+}
+
+static RPC_STATUS rpcrt4_ncacn_np_handoff(RpcConnection *old_conn, RpcConnection *new_conn)
+{
+ RPC_STATUS status;
+ LPSTR pname;
+ static LPCSTR prefix = "\\\\.";
+
+ rpcrt4_conn_np_handoff((RpcConnection_np *)old_conn, (RpcConnection_np *)new_conn);
+
+ pname = I_RpcAllocate(strlen(prefix) + strlen(old_conn->Endpoint) + 1);
+ strcat(strcpy(pname, prefix), old_conn->Endpoint);
+ status = rpcrt4_conn_create_pipe(old_conn, pname);
+ I_RpcFree(pname);
+
+ return status;
+}
+
+static RPC_STATUS rpcrt4_ncalrpc_handoff(RpcConnection *old_conn, RpcConnection *new_conn)
+{
+ RPC_STATUS status;
+ LPSTR pname;
+ static LPCSTR prefix = "\\\\.\\pipe\\lrpc\\";
+
+ TRACE("%s\n", old_conn->Endpoint);
+
+ rpcrt4_conn_np_handoff((RpcConnection_np *)old_conn, (RpcConnection_np *)new_conn);
+
+ pname = I_RpcAllocate(strlen(prefix) + strlen(old_conn->Endpoint) + 1);
+ strcat(strcpy(pname, prefix), old_conn->Endpoint);
+ status = rpcrt4_conn_create_pipe(old_conn, pname);
+ I_RpcFree(pname);
+
+ return status;
}
static int rpcrt4_conn_np_read(RpcConnection *Connection,
@@ -409,7 +496,7 @@ static void *rpcrt4_protseq_np_get_wait_
*count = 1;
conn = CONTAINING_RECORD(protseq->conn, RpcConnection_np, common);
while (conn) {
- RPCRT4_OpenConnection(&conn->common);
+ rpcrt4_conn_listen_pipe(conn);
if (conn->ovl.hEvent)
(*count)++;
conn = CONTAINING_RECORD(conn->common.Next, RpcConnection_np, common);
@@ -584,7 +671,7 @@ static RPC_STATUS rpcrt4_ncacn_ip_tcp_op
if (tcpc->sock != -1)
return RPC_S_OK;
- hints.ai_flags = Connection->server ? AI_PASSIVE : 0;
+ hints.ai_flags = 0;
hints.ai_family = PF_UNSPEC;
hints.ai_socktype = SOCK_STREAM;
hints.ai_protocol = IPPROTO_TCP;
@@ -620,45 +707,13 @@ static RPC_STATUS rpcrt4_ncacn_ip_tcp_op
continue;
}
- if (Connection->server)
- {
- ret = bind(sock, ai_cur->ai_addr, ai_cur->ai_addrlen);
- if (ret < 0)
- {
- WARN("bind failed: %s\n", strerror(errno));
- close(sock);
- continue;
- }
- ret = listen(sock, 10);
- if (ret < 0)
- {
- WARN("listen failed: %s\n", strerror(errno));
- close(sock);
- continue;
- }
- /* need a non-blocking socket, otherwise accept() has a potential
- * race-condition (poll() says it is readable, connection drops,
- * and accept() blocks until the next connection comes...)
- */
- ret = fcntl(sock, F_SETFL, O_NONBLOCK);
- if (ret < 0)
- {
- WARN("couldn't make socket non-blocking, error %d\n", ret);
- close(sock);
- continue;
- }
- tcpc->sock = sock;
- }
- else /* it's a client */
+ if (0>connect(sock, ai_cur->ai_addr, ai_cur->ai_addrlen))
{
- if (0>connect(sock, ai_cur->ai_addr, ai_cur->ai_addrlen))
- {
- WARN("connect() failed: %s\n", strerror(errno));
- close(sock);
- continue;
- }
- tcpc->sock = sock;
+ WARN("connect() failed: %s\n", strerror(errno));
+ close(sock);
+ continue;
}
+ tcpc->sock = sock;
freeaddrinfo(ai);
TRACE("connected\n");
@@ -670,6 +725,106 @@ static RPC_STATUS rpcrt4_ncacn_ip_tcp_op
return RPC_S_SERVER_UNAVAILABLE;
}
+static RPC_STATUS rpcrt4_protseq_ncacn_ip_tcp_open_endpoint(RpcServerProtseq *protseq, LPSTR endpoint)
+{
+ RPC_STATUS status;
+ int sock;
+ int ret;
+ struct addrinfo *ai;
+ struct addrinfo *ai_cur;
+ struct addrinfo hints;
+
+ TRACE("(%p, %s)\n", protseq, endpoint);
+
+ hints.ai_flags = AI_PASSIVE /* for non-localhost addresses */;
+ hints.ai_family = PF_UNSPEC;
+ hints.ai_socktype = SOCK_STREAM;
+ hints.ai_protocol = IPPROTO_TCP;
+ hints.ai_addrlen = 0;
+ hints.ai_addr = NULL;
+ hints.ai_canonname = NULL;
+ hints.ai_next = NULL;
+
+ ret = getaddrinfo(NULL, endpoint, &hints, &ai);
+ if (ret)
+ {
+ ERR("getaddrinfo for port %s failed: %s\n", endpoint,
+ gai_strerror(ret));
+ return RPC_S_SERVER_UNAVAILABLE;
+ }
+
+ for (ai_cur = ai; ai_cur; ai_cur = ai_cur->ai_next)
+ {
+ RpcConnection_tcp *tcpc;
+ if (TRACE_ON(rpc))
+ {
+ char host[256];
+ char service[256];
+ getnameinfo(ai_cur->ai_addr, ai_cur->ai_addrlen,
+ host, sizeof(host), service, sizeof(service),
+ NI_NUMERICHOST | NI_NUMERICSERV);
+ TRACE("trying %s:%s\n", host, service);
+ }
+
+ sock = socket(ai_cur->ai_family, ai_cur->ai_socktype, ai_cur->ai_protocol);
+ if (sock < 0)
+ {
+ WARN("socket() failed: %s\n", strerror(errno));
+ continue;
+ }
+
+ ret = bind(sock, ai_cur->ai_addr, ai_cur->ai_addrlen);
+ if (ret < 0)
+ {
+ WARN("bind failed: %s\n", strerror(errno));
+ close(sock);
+ continue;
+ }
+ status = RPCRT4_CreateConnection((RpcConnection **)&tcpc, TRUE,
+ protseq->Protseq, NULL, endpoint, NULL,
+ NULL, NULL);
+ if (status != RPC_S_OK)
+ {
+ close(sock);
+ continue;
+ }
+
+ ret = listen(sock, 10);
+ if (ret < 0)
+ {
+ WARN("listen failed: %s\n", strerror(errno));
+ close(sock);
+ continue;
+ }
+ /* need a non-blocking socket, otherwise accept() has a potential
+ * race-condition (poll() says it is readable, connection drops,
+ * and accept() blocks until the next connection comes...)
+ */
+ ret = fcntl(sock, F_SETFL, O_NONBLOCK);
+ if (ret < 0)
+ {
+ WARN("couldn't make socket non-blocking, error %d\n", ret);
+ close(sock);
+ continue;
+ }
+ tcpc->sock = sock;
+
+ freeaddrinfo(ai);
+
+ EnterCriticalSection(&protseq->cs);
+ tcpc->common.Next = protseq->conn;
+ protseq->conn = &tcpc->common;
+ LeaveCriticalSection(&protseq->cs);
+
+ TRACE("listening on %s\n", endpoint);
+ return RPC_S_OK;
+ }
+
+ freeaddrinfo(ai);
+ ERR("couldn't listen on port %s\n", endpoint);
+ return RPC_S_SERVER_UNAVAILABLE;
+}
+
static RPC_STATUS rpcrt4_conn_tcp_handoff(RpcConnection *old_conn, RpcConnection *new_conn)
{
int ret;
@@ -906,7 +1061,6 @@ static void *rpcrt4_protseq_sock_get_wai
*count = 1;
conn = (RpcConnection_tcp *)protseq->conn;
while (conn) {
- RPCRT4_OpenConnection(&conn->common);
if (conn->sock != -1)
(*count)++;
conn = (RpcConnection_tcp *)conn->common.Next;
@@ -1001,7 +1155,7 @@ static const struct connection_ops conn_
{ EPM_PROTOCOL_NCACN, EPM_PROTOCOL_SMB },
rpcrt4_conn_np_alloc,
rpcrt4_ncacn_np_open,
- rpcrt4_conn_np_handoff,
+ rpcrt4_ncacn_np_handoff,
rpcrt4_conn_np_read,
rpcrt4_conn_np_write,
rpcrt4_conn_np_close,
@@ -1012,7 +1166,7 @@ static const struct connection_ops conn_
{ EPM_PROTOCOL_NCALRPC, EPM_PROTOCOL_PIPE },
rpcrt4_conn_np_alloc,
rpcrt4_ncalrpc_open,
- rpcrt4_conn_np_handoff,
+ rpcrt4_ncalrpc_handoff,
rpcrt4_conn_np_read,
rpcrt4_conn_np_write,
rpcrt4_conn_np_close,
@@ -1042,6 +1196,7 @@ static const struct protseq_ops protseq_
rpcrt4_protseq_np_get_wait_array,
rpcrt4_protseq_np_free_wait_array,
rpcrt4_protseq_np_wait_for_new_connection,
+ rpcrt4_protseq_ncacn_np_open_endpoint,
},
{
"ncalrpc",
@@ -1050,6 +1205,7 @@ static const struct protseq_ops protseq_
rpcrt4_protseq_np_get_wait_array,
rpcrt4_protseq_np_free_wait_array,
rpcrt4_protseq_np_wait_for_new_connection,
+ rpcrt4_protseq_ncalrpc_open_endpoint,
},
{
"ncacn_ip_tcp",
@@ -1058,6 +1214,7 @@ static const struct protseq_ops protseq_
rpcrt4_protseq_sock_get_wait_array,
rpcrt4_protseq_sock_free_wait_array,
rpcrt4_protseq_sock_wait_for_new_connection,
+ rpcrt4_protseq_ncacn_ip_tcp_open_endpoint,
},
};
@@ -1083,11 +1240,12 @@ static const struct connection_ops *rpcr
/**** interface to rest of code ****/
-RPC_STATUS RPCRT4_OpenConnection(RpcConnection* Connection)
+RPC_STATUS RPCRT4_OpenClientConnection(RpcConnection* Connection)
{
TRACE("(Connection == ^%p)\n", Connection);
- return Connection->ops->open_connection(Connection);
+ assert(!Connection->server);
+ return Connection->ops->open_connection_client(Connection);
}
RPC_STATUS RPCRT4_CloseConnection(RpcConnection* Connection)
More information about the wine-patches
mailing list