[OLE #11] Propagate apartments through the intermediate threads, make listener thread apartment scoped

Mike Hearn mh at codeweavers.com
Thu Dec 23 07:40:30 CST 2004


This obsoletes OLE #10, and will partly fix InstallShield. The next
patch from Rob will complete the fix in a cleaner way than my original
patch.

This is the first patch I've produced using SVK. Please let me know if
it gives you problems.

Mike Hearn <mh at codeweavers.com>
Propagate apartments through the intermediate threads, make listener
thread apartment scoped


-------------- next part --------------
--- dlls/ole32/compobj.c  (revision 5)
+++ dlls/ole32/compobj.c  (local)
@@ -1105,7 +1105,7 @@ _LocalServerThread(LPVOID param) {
     ULARGE_INTEGER	newpos;
     ULONG		res;
 
-    TRACE("Starting threader for %s.\n",debugstr_guid(&newClass->classIdentifier));
+    TRACE("Starting classfactory server thread for %s.\n",debugstr_guid(&newClass->classIdentifier));
 
     strcpy(pipefn,PIPEPREF);
     WINE_StringFromCLSID(&newClass->classIdentifier,pipefn+strlen(PIPEPREF));
@@ -1262,7 +1262,7 @@ HRESULT WINAPI CoRegisterClassObject(
   if (dwClsContext & CLSCTX_LOCAL_SERVER) {
       DWORD tid;
 
-      start_listener_thread();
+      start_apartment_listener_thread();
       newClass->hThread = CreateThread(NULL,0,_LocalServerThread,newClass,0,&tid);
   }
   return S_OK;
--- dlls/ole32/compobj_private.h  (revision 5)
+++ dlls/ole32/compobj_private.h  (local)
@@ -105,6 +105,7 @@ typedef struct tagAPARTMENT {
   IOBJECT *proxies;        /* imported objects */
   LPUNKNOWN state;         /* state object (see Co[Get,Set]State) */
   LPVOID ErrorInfo;        /* thread error info */
+  DWORD listenertid;       /* id of apartment_listener_thread */
 } APARTMENT;
 
 extern APARTMENT MTA, *apts;
@@ -152,7 +153,7 @@ HRESULT MARSHAL_Disconnect_Proxies(void)
 
 HRESULT MARSHAL_GetStandardMarshalCF(LPVOID *ppv);
 
-void start_listener_thread(void);
+void start_apartment_listener_thread(void);
 
 extern HRESULT PIPE_GetNewPipeBuf(wine_marshal_id *mid, IRpcChannelBuffer **pipebuf);
 
--- dlls/ole32/marshal.c  (revision 5)
+++ dlls/ole32/marshal.c  (local)
@@ -176,7 +176,11 @@ MARSHAL_Register_Proxy(wine_marshal_id *
 
     for (i=0;i<nrofproxies;i++) {
 	if (MARSHAL_Compare_Mids(mid,&(proxies[i].mid))) {
-	    ERR("Already have mid?\n");
+            /* this will happen if the program attempts to marshal two
+             * objects that implement the same interface */
+            
+	    FIXME("need to use IPIDs, already have mid oxid=%s, oid=%s, iid=%s\n",
+                  wine_dbgstr_longlong(mid->oxid), wine_dbgstr_longlong(mid->oid), debugstr_guid(&mid->iid));
 	    return E_FAIL;
 	}
     }
@@ -262,7 +266,7 @@ StdMarshalImpl_MarshalInterface(
 
   TRACE("(...,%s,...)\n",debugstr_guid(riid));
 
-  start_listener_thread(); /* just to be sure we have one running. */
+  start_apartment_listener_thread(); /* just to be sure we have one running. */
 
   IUnknown_QueryInterface((LPUNKNOWN)pv,&IID_IUnknown,(LPVOID*)&pUnk);
   mid.oxid = COM_CurrentApt()->oxid;
--- dlls/ole32/rpc.c  (revision 5)
+++ dlls/ole32/rpc.c  (local)
@@ -91,15 +91,19 @@ static wine_rpc_request **reqs = NULL;
 static int nrofreqs = 0;
 
 /* This pipe is _thread_ based, each thread which talks to a remote
- * apartment (mid) has its own pipe */
+ * apartment (mid) has its own pipe. The same structure is used both
+ * for outgoing and incoming RPCs.
+ */
 typedef struct _wine_pipe {
     wine_marshal_id	mid;	/* target mid */
-    DWORD		tid;	/* thread which owns this outgoing pipe */
+    DWORD		tid;	/* thread which owns this pipe */
     HANDLE		hPipe;
 
     int			pending;
     HANDLE		hThread;
     CRITICAL_SECTION	crit;
+
+    APARTMENT          *apt;    /* apartment of the marshalling thread for the stub dispatch case */
 } wine_pipe;
 
 static wine_pipe *pipes = NULL;
@@ -167,7 +171,7 @@ write_pipe(HANDLE hf, LPVOID ptr, DWORD 
     return S_OK;
 }
 
-static DWORD WINAPI _StubReaderThread(LPVOID);
+static DWORD WINAPI stub_dispatch_thread(LPVOID);
 
 static HRESULT
 PIPE_RegisterPipe(wine_marshal_id *mid, HANDLE hPipe, BOOL startreader) {
@@ -187,10 +191,12 @@ PIPE_RegisterPipe(wine_marshal_id *mid, 
   sprintf(pipefn,OLESTUBMGR"_%08lx%08lx",(DWORD)(mid->oxid >> 32),(DWORD)mid->oxid);
   memcpy(&(pipes[nrofpipes].mid),mid,sizeof(*mid));
   pipes[nrofpipes].hPipe	= hPipe;
+  pipes[nrofpipes].apt          = COM_CurrentApt();
+  assert( pipes[nrofpipes].apt );
   InitializeCriticalSection(&(pipes[nrofpipes].crit));
   nrofpipes++;
   if (startreader) {
-      pipes[nrofpipes-1].hThread = CreateThread(NULL,0,_StubReaderThread,(LPVOID)(pipes+(nrofpipes-1)),0,&(pipes[nrofpipes-1].tid));
+      pipes[nrofpipes-1].hThread = CreateThread(NULL,0,stub_dispatch_thread,(LPVOID)(pipes+(nrofpipes-1)),0,&(pipes[nrofpipes-1].tid));
   } else {
       pipes[nrofpipes-1].tid	 = GetCurrentThreadId();
   }
@@ -579,6 +585,8 @@ HRESULT create_marshalled_proxy(REFCLSID
   WINE_StringFromCLSID(rclsid,pipefn+strlen(PIPEPREF));
 
   while (tries++<MAXTRIES) {
+      TRACE("waiting for %s\n", pipefn);
+      
       WaitNamedPipeA( pipefn, NMPWAIT_WAIT_FOREVER );
       hPipe	= CreateFileA(
 	      pipefn,
@@ -606,6 +614,7 @@ HRESULT create_marshalled_proxy(REFCLSID
 	  Sleep(1000);
 	  continue;
       }
+      TRACE("read marshal id from pipe\n");
       CloseHandle(hPipe);
       break;
   }
@@ -617,6 +626,7 @@ HRESULT create_marshalled_proxy(REFCLSID
   if (hres) goto out;
   seekto.u.LowPart = 0;seekto.u.HighPart = 0;
   hres = IStream_Seek(pStm,seekto,SEEK_SET,&newpos);
+  TRACE("unmarshalling classfactory\n");
   hres = CoUnmarshalInterface(pStm,&IID_IClassFactory,ppv);
 out:
   IStream_Release(pStm);
@@ -726,15 +736,21 @@ end:
     return hres;
 }
 
-static DWORD WINAPI
-_StubReaderThread(LPVOID param) {
+/* This thread listens on the given pipe for requests to a particular stub manager */
+static DWORD WINAPI stub_dispatch_thread(LPVOID param)
+{
     wine_pipe		*xpipe = (wine_pipe*)param;
     HANDLE		xhPipe = xpipe->hPipe;
     HRESULT		hres = S_OK;
 
-    TRACE("STUB reader thread %lx\n",GetCurrentProcessId());
+    TRACE("starting for apartment OXID %08lx%08lx\n", (DWORD)(xpipe->mid.oxid >> 32), (DWORD)(xpipe->mid.oxid));
+
+    /* join marshalling apartment. fixme: this stuff is all very wrong, threading needs to work like native */
+    NtCurrentTeb()->ReservedForOle = xpipe->apt;
+    
     while (!hres) {
 	int i;
+        
 	hres = COM_RpcReceive(xpipe);
 	if (hres) break;
 
@@ -746,21 +762,31 @@ _StubReaderThread(LPVOID param) {
 	    }
 	}
     }
-    FIXME("Failed with hres %lx\n",hres);
+
+    /* fixme: this thread never quits naturally */
+    WARN("exiting with hres %lx\n",hres);
     CloseHandle(xhPipe);
     return 0;
 }
 
-/* This thread listens on a named pipe for the entire process. It
- * deals with incoming connection requests to objects.
+/* This thread listens on a named pipe for each apartment that exports
+ * objects. It deals with incoming connection requests. Each time a
+ * client connects a separate thread is spawned for that particular
+ * connection.
+ *
+ * This architecture is different in native DCOM.
  */
-static DWORD WINAPI listener_thread(LPVOID param)
+static DWORD WINAPI apartment_listener_thread(LPVOID param)
 {
     char		pipefn[200];
     HANDLE		listenPipe;
+    APARTMENT          *apt = (APARTMENT *) param;
 
-    sprintf(pipefn,OLESTUBMGR"_%08lx",GetCurrentProcessId());
-    TRACE("Process listener thread starting on (%s)\n",pipefn);
+    /* we must join the marshalling threads apartment */
+    NtCurrentTeb()->ReservedForOle = apt;
+
+    sprintf(pipefn,OLESTUBMGR"_%08lx%08lx", (DWORD)(apt->oxid >> 32), (DWORD)(apt->oxid));
+    TRACE("Apartment listener thread starting on (%s)\n",pipefn);
 
     while (1) {
 	listenPipe = CreateNamedPipeA(
@@ -774,7 +800,7 @@ static DWORD WINAPI listener_thread(LPVO
 	    NULL
 	);
 	if (listenPipe == INVALID_HANDLE_VALUE) {
-	    FIXME("pipe creation failed for %s, le is %lx\n",pipefn,GetLastError());
+	    FIXME("pipe creation failed for %s, error %lx\n",pipefn,GetLastError());
 	    return 1; /* permanent failure, so quit stubmgr thread */
 	}
 	if (!ConnectNamedPipe(listenPipe,NULL)) {
@@ -787,15 +813,24 @@ static DWORD WINAPI listener_thread(LPVO
     return 0;
 }
 
-void start_listener_thread()
+void start_apartment_listener_thread()
 {
-  static BOOL running = FALSE;
-  DWORD tid;
+    APARTMENT *apt = COM_CurrentApt();
 
-  if (!running)
-  {
-      running = TRUE;
-      CreateThread(NULL, 0, listener_thread, NULL, 0, &tid);
-      Sleep(2000); /* actually we just try opening the pipe until it succeeds */
-  }
+    assert( apt );
+    
+    TRACE("apt->listenertid=%ld\n", apt->listenertid);
+
+    /* apt->listenertid is a hack which needs to die at some point, as
+     * it leaks information into the apartment structure. in fact,
+     * this thread isn't quite correct anyway as native RPC doesn't
+     * use a thread per apartment at all, instead the dispatch thread
+     * either enters the apartment to perform the RPC (for MTAs, RTAs)
+     * or does a context switch into it for STAs.
+     */
+    
+    if (!apt->listenertid)
+    {
+        CreateThread(NULL, 0, apartment_listener_thread, apt, 0, &apt->listenertid);
+    }
 }


More information about the wine-patches mailing list