diff --git a/dom/bluetooth/bluedroid/BluetoothSocket.cpp b/dom/bluetooth/bluedroid/BluetoothSocket.cpp index 2c00feea66d..74e654b6f4a 100644 --- a/dom/bluetooth/bluedroid/BluetoothSocket.cpp +++ b/dom/bluetooth/bluedroid/BluetoothSocket.cpp @@ -74,8 +74,11 @@ public: SOCKET_IS_CONNECTED }; - DroidSocketImpl(MessageLoop* aIOLoop, BluetoothSocket* aConsumer) + DroidSocketImpl(nsIThread* aConsumerThread, + MessageLoop* aIOLoop, + BluetoothSocket* aConsumer) : ipc::UnixFdWatcher(aIOLoop) + , DataSocketIO(aConsumerThread) , mConsumer(aConsumer) , mShuttingDownOnIOThread(false) , mConnectionStatus(SOCKET_IS_DISCONNECTED) @@ -83,7 +86,7 @@ public: ~DroidSocketImpl() { - MOZ_ASSERT(NS_IsMainThread()); + MOZ_ASSERT(IsConsumerThread()); } void Send(UnixSocketIOBuffer* aBuffer) @@ -142,7 +145,8 @@ public: bool IsShutdownOnMainThread() const override { - MOZ_ASSERT(NS_IsMainThread()); + MOZ_ASSERT(IsConsumerThread()); + return mConsumer == nullptr; } @@ -153,14 +157,14 @@ public: void ShutdownOnMainThread() override { - MOZ_ASSERT(NS_IsMainThread()); + MOZ_ASSERT(IsConsumerThread()); MOZ_ASSERT(!IsShutdownOnMainThread()); mConsumer = nullptr; } void ShutdownOnIOThread() override { - MOZ_ASSERT(!NS_IsMainThread()); + MOZ_ASSERT(!IsConsumerThread()); MOZ_ASSERT(!mShuttingDownOnIOThread); Close(); // will also remove fd from I/O loop @@ -214,7 +218,7 @@ public: void Run() override { - MOZ_ASSERT(!NS_IsMainThread()); + MOZ_ASSERT(!GetIO()->IsConsumerThread()); MOZ_ASSERT(!IsCanceled()); GetIO()->Connect(mFd); @@ -234,7 +238,7 @@ public: void Run() override { - MOZ_ASSERT(!NS_IsMainThread()); + MOZ_ASSERT(!GetIO()->IsConsumerThread()); if (!IsCanceled()) { GetIO()->Listen(mFd); @@ -254,7 +258,7 @@ class SocketConnectClientFdTask final void Run() override { - MOZ_ASSERT(!NS_IsMainThread()); + MOZ_ASSERT(!GetIO()->IsConsumerThread()); GetIO()->ConnectClientFd(); } @@ -314,8 +318,9 @@ DroidSocketImpl::Accept(int aFd) SetFd(aFd); mConnectionStatus = SOCKET_IS_CONNECTED; - NS_DispatchToMainThread( - new SocketIOEventRunnable(this, SocketIOEventRunnable::CONNECT_SUCCESS)); + GetConsumerThread()->Dispatch( + new SocketIOEventRunnable(this, SocketIOEventRunnable::CONNECT_SUCCESS), + NS_DISPATCH_NORMAL); AddWatchers(READ_WATCHER, true); if (HasPendingData()) { @@ -338,7 +343,7 @@ DroidSocketImpl::OnFileCanReadWithoutBlocking(int aFd) void DroidSocketImpl::OnSocketCanReceiveWithoutBlocking(int aFd) { - MOZ_ASSERT(!NS_IsMainThread()); + MOZ_ASSERT(!IsConsumerThread()); MOZ_ASSERT(!mShuttingDownOnIOThread); ssize_t res = ReceiveData(aFd); @@ -361,7 +366,7 @@ public: void Run() override { - MOZ_ASSERT(!NS_IsMainThread()); + MOZ_ASSERT(!GetIO()->IsConsumerThread()); MOZ_ASSERT(!IsCanceled()); GetIO()->Accept(mFd); @@ -383,7 +388,7 @@ public: void Accept(int aFd, const nsAString& aBdAddress, int aConnectionStatus) override { - MOZ_ASSERT(NS_IsMainThread()); + MOZ_ASSERT(mImpl->IsConsumerThread()); mozilla::ScopedClose fd(aFd); // Close received socket fd on error @@ -404,7 +409,7 @@ public: void OnError(BluetoothStatus aStatus) override { - MOZ_ASSERT(NS_IsMainThread()); + MOZ_ASSERT(mImpl->IsConsumerThread()); BT_LOGR("BluetoothSocketInterface::Accept failed: %d", (int)aStatus); if (!mImpl->IsShutdownOnMainThread()) { @@ -430,7 +435,7 @@ public: NS_IMETHOD Run() override { - MOZ_ASSERT(NS_IsMainThread()); + MOZ_ASSERT(GetIO()->IsConsumerThread()); MOZ_ASSERT(sBluetoothSocketInterface); BluetoothSocketResultHandler* res = new AcceptResultHandler(GetIO()); @@ -448,7 +453,7 @@ private: void DroidSocketImpl::OnSocketCanAcceptWithoutBlocking(int aFd) { - MOZ_ASSERT(!NS_IsMainThread()); + MOZ_ASSERT(!IsConsumerThread()); MOZ_ASSERT(!mShuttingDownOnIOThread); /* When a listening socket is ready for receiving data, @@ -456,8 +461,8 @@ DroidSocketImpl::OnSocketCanAcceptWithoutBlocking(int aFd) */ RemoveWatchers(READ_WATCHER); - nsRefPtr t = new AcceptRunnable(this, aFd); - NS_DispatchToMainThread(t); + GetConsumerThread()->Dispatch(new AcceptRunnable(this, aFd), + NS_DISPATCH_NORMAL); } void @@ -475,7 +480,7 @@ DroidSocketImpl::OnFileCanWriteWithoutBlocking(int aFd) void DroidSocketImpl::OnSocketCanSendWithoutBlocking(int aFd) { - MOZ_ASSERT(!NS_IsMainThread()); + MOZ_ASSERT(!IsConsumerThread()); MOZ_ASSERT(!mShuttingDownOnIOThread); MOZ_ASSERT(aFd >= 0); @@ -492,7 +497,7 @@ DroidSocketImpl::OnSocketCanSendWithoutBlocking(int aFd) void DroidSocketImpl::OnSocketCanConnectWithoutBlocking(int aFd) { - MOZ_ASSERT(!NS_IsMainThread()); + MOZ_ASSERT(!IsConsumerThread()); MOZ_ASSERT(!mShuttingDownOnIOThread); /* We follow Posix behaviour here: Connect operations are @@ -501,8 +506,9 @@ DroidSocketImpl::OnSocketCanConnectWithoutBlocking(int aFd) mConnectionStatus = SOCKET_IS_CONNECTED; - NS_DispatchToMainThread( - new SocketIOEventRunnable(this, SocketIOEventRunnable::CONNECT_SUCCESS)); + GetConsumerThread()->Dispatch( + new SocketIOEventRunnable(this, SocketIOEventRunnable::CONNECT_SUCCESS), + NS_DISPATCH_NORMAL); AddWatchers(READ_WATCHER, true); if (HasPendingData()) { @@ -541,10 +547,10 @@ public: NS_IMETHOD Run() override { - MOZ_ASSERT(NS_IsMainThread()); - DroidSocketImpl* io = SocketIORunnable::GetIO(); + MOZ_ASSERT(io->IsConsumerThread()); + if (NS_WARN_IF(io->IsShutdownOnMainThread())) { // Since we've already explicitly closed and the close // happened before this, this isn't really an error. @@ -566,7 +572,8 @@ private: void DroidSocketImpl::ConsumeBuffer() { - NS_DispatchToMainThread(new ReceiveRunnable(this, mBuffer.forget())); + GetConsumerThread()->Dispatch(new ReceiveRunnable(this, mBuffer.forget()), + NS_DISPATCH_NORMAL); } void @@ -602,7 +609,7 @@ public: void Connect(int aFd, const nsAString& aBdAddress, int aConnectionStatus) override { - MOZ_ASSERT(NS_IsMainThread()); + MOZ_ASSERT(mImpl->IsConsumerThread()); if (mImpl->IsShutdownOnMainThread()) { BT_LOGD("mConsumer is null, aborting send!"); @@ -621,7 +628,7 @@ public: void OnError(BluetoothStatus aStatus) override { - MOZ_ASSERT(NS_IsMainThread()); + MOZ_ASSERT(mImpl->IsConsumerThread()); BT_WARNING("Connect failed: %d", (int)aStatus); if (!mImpl->IsShutdownOnMainThread()) { @@ -643,14 +650,14 @@ BluetoothSocket::Connect(const nsAString& aDeviceAddress, BluetoothSocketType aType, int aChannel, bool aAuth, bool aEncrypt, + nsIThread* aConsumerThread, MessageLoop* aIOLoop) { - MOZ_ASSERT(NS_IsMainThread()); MOZ_ASSERT(!mImpl); SetConnectionStatus(SOCKET_CONNECTING); - mImpl = new DroidSocketImpl(aIOLoop, this); + mImpl = new DroidSocketImpl(aConsumerThread, aIOLoop, this); BluetoothSocketResultHandler* res = new ConnectSocketResultHandler(mImpl); SetCurrentResultHandler(res); @@ -670,8 +677,14 @@ BluetoothSocket::Connect(const nsAString& aDeviceAddress, int aChannel, bool aAuth, bool aEncrypt) { + nsIThread* consumerThread = nullptr; + nsresult rv = NS_GetCurrentThread(&consumerThread); + if (NS_FAILED(rv)) { + return rv; + } + return Connect(aDeviceAddress, aServiceUuid, aType, aChannel, aAuth, - aEncrypt, XRE_GetIOMessageLoop()); + aEncrypt, consumerThread, XRE_GetIOMessageLoop()); } class ListenResultHandler final : public BluetoothSocketResultHandler @@ -685,14 +698,14 @@ public: void Listen(int aFd) override { - MOZ_ASSERT(NS_IsMainThread()); + MOZ_ASSERT(mImpl->IsConsumerThread()); mImpl->GetIOLoop()->PostTask(FROM_HERE, new SocketListenTask(mImpl, aFd)); } void OnError(BluetoothStatus aStatus) override { - MOZ_ASSERT(NS_IsMainThread()); + MOZ_ASSERT(mImpl->IsConsumerThread()); BT_WARNING("Listen failed: %d", (int)aStatus); } @@ -707,14 +720,14 @@ BluetoothSocket::Listen(const nsAString& aServiceName, BluetoothSocketType aType, int aChannel, bool aAuth, bool aEncrypt, + nsIThread* aConsumerThread, MessageLoop* aIOLoop) { - MOZ_ASSERT(NS_IsMainThread()); MOZ_ASSERT(!mImpl); SetConnectionStatus(SOCKET_LISTENING); - mImpl = new DroidSocketImpl(aIOLoop, this); + mImpl = new DroidSocketImpl(aConsumerThread, aIOLoop, this); BluetoothSocketResultHandler* res = new ListenResultHandler(mImpl); SetCurrentResultHandler(res); @@ -734,14 +747,19 @@ BluetoothSocket::Listen(const nsAString& aServiceName, int aChannel, bool aAuth, bool aEncrypt) { + nsIThread* consumerThread = nullptr; + nsresult rv = NS_GetCurrentThread(&consumerThread); + if (NS_FAILED(rv)) { + return rv; + } + return Listen(aServiceName, aServiceUuid, aType, aChannel, aAuth, aEncrypt, - XRE_GetIOMessageLoop()); + consumerThread, XRE_GetIOMessageLoop()); } void BluetoothSocket::ReceiveSocketData(nsAutoPtr& aBuffer) { - MOZ_ASSERT(NS_IsMainThread()); MOZ_ASSERT(mObserver); mObserver->ReceiveSocketData(this, aBuffer); @@ -752,8 +770,8 @@ BluetoothSocket::ReceiveSocketData(nsAutoPtr& aBuffer) void BluetoothSocket::SendSocketData(UnixSocketIOBuffer* aBuffer) { - MOZ_ASSERT(NS_IsMainThread()); MOZ_ASSERT(mImpl); + MOZ_ASSERT(mImpl->IsConsumerThread()); MOZ_ASSERT(!mImpl->IsShutdownOnMainThread()); mImpl->GetIOLoop()->PostTask( @@ -766,12 +784,14 @@ BluetoothSocket::SendSocketData(UnixSocketIOBuffer* aBuffer) void BluetoothSocket::Close() { - MOZ_ASSERT(NS_IsMainThread()); MOZ_ASSERT(sBluetoothSocketInterface); + if (!mImpl) { return; } + MOZ_ASSERT(mImpl->IsConsumerThread()); + // Stop any watching |SocketMessageWatcher| if (mCurrentRes) { sBluetoothSocketInterface->Close(mCurrentRes); @@ -790,7 +810,6 @@ BluetoothSocket::Close() void BluetoothSocket::OnConnectSuccess() { - MOZ_ASSERT(NS_IsMainThread()); MOZ_ASSERT(mObserver); SetCurrentResultHandler(nullptr); @@ -800,7 +819,6 @@ BluetoothSocket::OnConnectSuccess() void BluetoothSocket::OnConnectError() { - MOZ_ASSERT(NS_IsMainThread()); MOZ_ASSERT(mObserver); SetCurrentResultHandler(nullptr); @@ -810,7 +828,6 @@ BluetoothSocket::OnConnectError() void BluetoothSocket::OnDisconnect() { - MOZ_ASSERT(NS_IsMainThread()); MOZ_ASSERT(mObserver); mObserver->OnSocketDisconnect(this); } diff --git a/dom/bluetooth/bluedroid/BluetoothSocket.h b/dom/bluetooth/bluedroid/BluetoothSocket.h index d34c9252add..228f13597ea 100644 --- a/dom/bluetooth/bluedroid/BluetoothSocket.h +++ b/dom/bluetooth/bluedroid/BluetoothSocket.h @@ -11,6 +11,7 @@ #include "mozilla/ipc/DataSocket.h" class MessageLoop; +class nsIThread; BEGIN_BLUETOOTH_NAMESPACE @@ -28,6 +29,7 @@ public: BluetoothSocketType aType, int aChannel, bool aAuth, bool aEncrypt, + nsIThread* aConsumerThread, MessageLoop* aIOLoop); nsresult Connect(const nsAString& aDeviceAddress, @@ -41,6 +43,7 @@ public: BluetoothSocketType aType, int aChannel, bool aAuth, bool aEncrypt, + nsIThread* aConsumerThread, MessageLoop* aIOLoop); nsresult Listen(const nsAString& aServiceName, diff --git a/dom/bluetooth/bluez/BluetoothSocket.cpp b/dom/bluetooth/bluez/BluetoothSocket.cpp index 28aae48f814..22e9a1360d4 100644 --- a/dom/bluetooth/bluez/BluetoothSocket.cpp +++ b/dom/bluetooth/bluez/BluetoothSocket.cpp @@ -28,7 +28,8 @@ class BluetoothSocket::BluetoothSocketIO final , public DataSocketIO { public: - BluetoothSocketIO(MessageLoop* mIOLoop, + BluetoothSocketIO(nsIThread* aConsumerThread, + MessageLoop* aIOLoop, BluetoothSocket* aConsumer, UnixSocketConnector* aConnector); ~BluetoothSocketIO(); @@ -132,10 +133,12 @@ private: }; BluetoothSocket::BluetoothSocketIO::BluetoothSocketIO( - MessageLoop* mIOLoop, + nsIThread* aConsumerThread, + MessageLoop* aIOLoop, BluetoothSocket* aConsumer, UnixSocketConnector* aConnector) - : UnixSocketWatcher(mIOLoop) + : UnixSocketWatcher(aIOLoop) + , DataSocketIO(aConsumerThread) , mConsumer(aConsumer) , mConnector(aConnector) , mShuttingDownOnIOThread(false) @@ -148,7 +151,7 @@ BluetoothSocket::BluetoothSocketIO::BluetoothSocketIO( BluetoothSocket::BluetoothSocketIO::~BluetoothSocketIO() { - MOZ_ASSERT(NS_IsMainThread()); + MOZ_ASSERT(IsConsumerThread()); MOZ_ASSERT(IsShutdownOnMainThread()); } @@ -188,7 +191,7 @@ BluetoothSocket::BluetoothSocketIO::GetDataSocket() void BluetoothSocket::BluetoothSocketIO::SetDelayedConnectTask(CancelableTask* aTask) { - MOZ_ASSERT(NS_IsMainThread()); + MOZ_ASSERT(IsConsumerThread()); mDelayedConnectTask = aTask; } @@ -196,7 +199,7 @@ BluetoothSocket::BluetoothSocketIO::SetDelayedConnectTask(CancelableTask* aTask) void BluetoothSocket::BluetoothSocketIO::ClearDelayedConnectTask() { - MOZ_ASSERT(NS_IsMainThread()); + MOZ_ASSERT(IsConsumerThread()); mDelayedConnectTask = nullptr; } @@ -204,7 +207,7 @@ BluetoothSocket::BluetoothSocketIO::ClearDelayedConnectTask() void BluetoothSocket::BluetoothSocketIO::CancelDelayedConnectTask() { - MOZ_ASSERT(NS_IsMainThread()); + MOZ_ASSERT(IsConsumerThread()); if (!mDelayedConnectTask) { return; @@ -277,8 +280,9 @@ BluetoothSocket::BluetoothSocketIO::OnConnected() MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop()); MOZ_ASSERT(GetConnectionStatus() == SOCKET_IS_CONNECTED); - NS_DispatchToMainThread( - new SocketIOEventRunnable(this, SocketIOEventRunnable::CONNECT_SUCCESS)); + GetConsumerThread()->Dispatch( + new SocketIOEventRunnable(this, SocketIOEventRunnable::CONNECT_SUCCESS), + NS_DISPATCH_NORMAL); AddWatchers(READ_WATCHER, true); if (HasPendingData()) { @@ -326,8 +330,9 @@ BluetoothSocket::BluetoothSocketIO::OnSocketCanAcceptWithoutBlocking() Close(); SetSocket(fd, SOCKET_IS_CONNECTED); - NS_DispatchToMainThread( - new SocketIOEventRunnable(this, SocketIOEventRunnable::CONNECT_SUCCESS)); + GetConsumerThread()->Dispatch( + new SocketIOEventRunnable(this, SocketIOEventRunnable::CONNECT_SUCCESS), + NS_DISPATCH_NORMAL); AddWatchers(READ_WATCHER, true); if (HasPendingData()) { @@ -376,8 +381,9 @@ BluetoothSocket::BluetoothSocketIO::FireSocketError() Close(); // Tell the main thread we've errored - NS_DispatchToMainThread( - new SocketIOEventRunnable(this, SocketIOEventRunnable::CONNECT_ERROR)); + GetConsumerThread()->Dispatch( + new SocketIOEventRunnable(this, SocketIOEventRunnable::CONNECT_ERROR), + NS_DISPATCH_NORMAL); } @@ -412,10 +418,10 @@ public: NS_IMETHOD Run() override { - MOZ_ASSERT(NS_IsMainThread()); - BluetoothSocketIO* io = SocketIORunnable::GetIO(); + MOZ_ASSERT(io->IsConsumerThread()); + if (NS_WARN_IF(io->IsShutdownOnMainThread())) { // Since we've already explicitly closed and the close // happened before this, this isn't really an error. @@ -437,7 +443,8 @@ private: void BluetoothSocket::BluetoothSocketIO::ConsumeBuffer() { - NS_DispatchToMainThread(new ReceiveRunnable(this, mBuffer.forget())); + GetConsumerThread()->Dispatch(new ReceiveRunnable(this, mBuffer.forget()), + NS_DISPATCH_NORMAL); } void @@ -457,7 +464,7 @@ BluetoothSocket::BluetoothSocketIO::GetSocketBase() bool BluetoothSocket::BluetoothSocketIO::IsShutdownOnMainThread() const { - MOZ_ASSERT(NS_IsMainThread()); + MOZ_ASSERT(IsConsumerThread()); return mConsumer == nullptr; } @@ -465,7 +472,7 @@ BluetoothSocket::BluetoothSocketIO::IsShutdownOnMainThread() const void BluetoothSocket::BluetoothSocketIO::ShutdownOnMainThread() { - MOZ_ASSERT(NS_IsMainThread()); + MOZ_ASSERT(IsConsumerThread()); MOZ_ASSERT(!IsShutdownOnMainThread()); mConsumer = nullptr; @@ -480,7 +487,7 @@ BluetoothSocket::BluetoothSocketIO::IsShutdownOnIOThread() const void BluetoothSocket::BluetoothSocketIO::ShutdownOnIOThread() { - MOZ_ASSERT(!NS_IsMainThread()); + MOZ_ASSERT(!IsConsumerThread()); MOZ_ASSERT(!mShuttingDownOnIOThread); Close(); // will also remove fd from I/O loop @@ -502,7 +509,7 @@ public: void Run() override { - MOZ_ASSERT(!NS_IsMainThread()); + MOZ_ASSERT(!GetIO()->IsConsumerThread()); if (!IsCanceled()) { GetIO()->Listen(); @@ -520,7 +527,7 @@ public: void Run() override { - MOZ_ASSERT(!NS_IsMainThread()); + MOZ_ASSERT(!GetIO()->IsConsumerThread()); MOZ_ASSERT(!IsCanceled()); GetIO()->Connect(); @@ -537,7 +544,7 @@ public: void Run() override { - MOZ_ASSERT(NS_IsMainThread()); + MOZ_ASSERT(GetIO()->IsConsumerThread()); if (IsCanceled()) { return; @@ -576,7 +583,6 @@ BluetoothSocket::Connect(const nsAString& aDeviceAddress, int aChannel, bool aAuth, bool aEncrypt) { - MOZ_ASSERT(NS_IsMainThread()); MOZ_ASSERT(!aDeviceAddress.IsEmpty()); nsAutoPtr connector( @@ -603,8 +609,6 @@ BluetoothSocket::Listen(const nsAString& aServiceName, int aChannel, bool aAuth, bool aEncrypt) { - MOZ_ASSERT(NS_IsMainThread()); - nsAutoPtr connector( new BluetoothUnixSocketConnector(NS_LITERAL_CSTRING(BLUETOOTH_ADDRESS_NONE), aType, aChannel, aAuth, aEncrypt)); @@ -625,7 +629,6 @@ BluetoothSocket::Listen(const nsAString& aServiceName, void BluetoothSocket::ReceiveSocketData(nsAutoPtr& aBuffer) { - MOZ_ASSERT(NS_IsMainThread()); MOZ_ASSERT(mObserver); mObserver->ReceiveSocketData(this, aBuffer); @@ -645,14 +648,15 @@ BluetoothSocket::SendSocketData(const nsACString& aStr) nsresult BluetoothSocket::Connect(BluetoothUnixSocketConnector* aConnector, - int aDelayMs, MessageLoop* aIOLoop) + int aDelayMs, + nsIThread* aConsumerThread, MessageLoop* aIOLoop) { - MOZ_ASSERT(NS_IsMainThread()); MOZ_ASSERT(aConnector); + MOZ_ASSERT(aConsumerThread); MOZ_ASSERT(aIOLoop); MOZ_ASSERT(!mIO); - mIO = new BluetoothSocketIO(aIOLoop, this, aConnector); + mIO = new BluetoothSocketIO(aConsumerThread, aIOLoop, this, aConnector); SetConnectionStatus(SOCKET_CONNECTING); if (aDelayMs > 0) { @@ -670,19 +674,25 @@ nsresult BluetoothSocket::Connect(BluetoothUnixSocketConnector* aConnector, int aDelayMs) { - return Connect(aConnector, aDelayMs, XRE_GetIOMessageLoop()); + nsIThread* consumerThread = nullptr; + nsresult rv = NS_GetCurrentThread(&consumerThread); + if (NS_FAILED(rv)) { + return rv; + } + + return Connect(aConnector, aDelayMs, consumerThread, XRE_GetIOMessageLoop()); } nsresult BluetoothSocket::Listen(BluetoothUnixSocketConnector* aConnector, - MessageLoop* aIOLoop) + nsIThread* aConsumerThread, MessageLoop* aIOLoop) { - MOZ_ASSERT(NS_IsMainThread()); MOZ_ASSERT(aConnector); + MOZ_ASSERT(aConsumerThread); MOZ_ASSERT(aIOLoop); MOZ_ASSERT(!mIO); - mIO = new BluetoothSocketIO(aIOLoop, this, aConnector); + mIO = new BluetoothSocketIO(aConsumerThread, aIOLoop, this, aConnector); SetConnectionStatus(SOCKET_LISTENING); aIOLoop->PostTask(FROM_HERE, new ListenTask(mIO)); @@ -693,7 +703,13 @@ BluetoothSocket::Listen(BluetoothUnixSocketConnector* aConnector, nsresult BluetoothSocket::Listen(BluetoothUnixSocketConnector* aConnector) { - return Listen(aConnector, XRE_GetIOMessageLoop()); + nsIThread* consumerThread = nullptr; + nsresult rv = NS_GetCurrentThread(&consumerThread); + if (NS_FAILED(rv)) { + return rv; + } + + return Listen(aConnector, consumerThread, XRE_GetIOMessageLoop()); } void @@ -712,8 +728,8 @@ BluetoothSocket::GetAddress(nsAString& aAddrStr) void BluetoothSocket::SendSocketData(UnixSocketIOBuffer* aBuffer) { - MOZ_ASSERT(NS_IsMainThread()); MOZ_ASSERT(mIO); + MOZ_ASSERT(mIO->IsConsumerThread()); MOZ_ASSERT(!mIO->IsShutdownOnMainThread()); mIO->GetIOLoop()->PostTask( @@ -726,11 +742,12 @@ BluetoothSocket::SendSocketData(UnixSocketIOBuffer* aBuffer) void BluetoothSocket::Close() { - MOZ_ASSERT(NS_IsMainThread()); if (!mIO) { return; } + MOZ_ASSERT(mIO->IsConsumerThread()); + mIO->CancelDelayedConnectTask(); // From this point on, we consider mIO as being deleted. @@ -746,7 +763,6 @@ BluetoothSocket::Close() void BluetoothSocket::OnConnectSuccess() { - MOZ_ASSERT(NS_IsMainThread()); MOZ_ASSERT(mObserver); mObserver->OnSocketConnectSuccess(this); } @@ -754,7 +770,6 @@ BluetoothSocket::OnConnectSuccess() void BluetoothSocket::OnConnectError() { - MOZ_ASSERT(NS_IsMainThread()); MOZ_ASSERT(mObserver); mObserver->OnSocketConnectError(this); } @@ -762,7 +777,6 @@ BluetoothSocket::OnConnectError() void BluetoothSocket::OnDisconnect() { - MOZ_ASSERT(NS_IsMainThread()); MOZ_ASSERT(mObserver); mObserver->OnSocketDisconnect(this); } diff --git a/dom/bluetooth/bluez/BluetoothSocket.h b/dom/bluetooth/bluez/BluetoothSocket.h index 5732c32a09e..89ab902bf4f 100644 --- a/dom/bluetooth/bluez/BluetoothSocket.h +++ b/dom/bluetooth/bluez/BluetoothSocket.h @@ -66,11 +66,12 @@ public: * * @param aConnector Connector object for socket type specific functions * @param aDelayMs Time delay in milli-seconds. + * @param aConsumerThread The socket's consumer thread. * @param aIOLoop The socket's I/O thread. * @return NS_OK on success, or an XPCOM error code otherwise. */ - nsresult Connect(BluetoothUnixSocketConnector* aConnector, - int aDelayMs, MessageLoop* aIOLoop); + nsresult Connect(BluetoothUnixSocketConnector* aConnector, int aDelayMs, + nsIThread* aConsumerThread, MessageLoop* aIOLoop); /** * Starts a task on the socket that will try to connect to a socket in a @@ -88,11 +89,12 @@ public: * non-blocking manner. * * @param aConnector Connector object for socket type specific functions + * @param aConsumerThread The socket's consumer thread. * @param aIOLoop The socket's I/O thread. * @return NS_OK on success, or an XPCOM error code otherwise. */ nsresult Listen(BluetoothUnixSocketConnector* aConnector, - MessageLoop* aIOLoop); + nsIThread* aConsumerThread, MessageLoop* aIOLoop); /** * Starts a task on the socket that will try to accept a new connection in a diff --git a/ipc/bluetooth/BluetoothDaemonConnection.cpp b/ipc/bluetooth/BluetoothDaemonConnection.cpp index bbc5969f7c8..412801f0510 100644 --- a/ipc/bluetooth/BluetoothDaemonConnection.cpp +++ b/ipc/bluetooth/BluetoothDaemonConnection.cpp @@ -206,8 +206,9 @@ class BluetoothDaemonConnectionIO final , public ConnectionOrientedSocketIO { public: - BluetoothDaemonConnectionIO(MessageLoop* aIOLoop, int aFd, - ConnectionStatus aConnectionStatus, + BluetoothDaemonConnectionIO(nsIThread* aConsumerThread, + MessageLoop* aIOLoop, + int aFd, ConnectionStatus aConnectionStatus, BluetoothDaemonConnection* aConnection, BluetoothDaemonPDUConsumer* aConsumer); @@ -255,14 +256,17 @@ private: }; BluetoothDaemonConnectionIO::BluetoothDaemonConnectionIO( - MessageLoop* aIOLoop, int aFd, + nsIThread* aConsumerThread, + MessageLoop* aIOLoop, + int aFd, ConnectionStatus aConnectionStatus, BluetoothDaemonConnection* aConnection, BluetoothDaemonPDUConsumer* aConsumer) -: UnixSocketWatcher(aIOLoop, aFd, aConnectionStatus) -, mConnection(aConnection) -, mConsumer(aConsumer) -, mShuttingDownOnIOThread(false) + : UnixSocketWatcher(aIOLoop, aFd, aConnectionStatus) + , ConnectionOrientedSocketIO(aConsumerThread) + , mConnection(aConnection) + , mConsumer(aConsumer) + , mShuttingDownOnIOThread(false) { MOZ_ASSERT(mConnection); MOZ_ASSERT(mConsumer); @@ -308,8 +312,9 @@ BluetoothDaemonConnectionIO::OnConnected() MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop()); MOZ_ASSERT(GetConnectionStatus() == SOCKET_IS_CONNECTED); - NS_DispatchToMainThread( - new SocketIOEventRunnable(this, SocketIOEventRunnable::CONNECT_SUCCESS)); + GetConsumerThread()->Dispatch( + new SocketIOEventRunnable(this, SocketIOEventRunnable::CONNECT_SUCCESS), + NS_DISPATCH_NORMAL); AddWatchers(READ_WATCHER, true); if (HasPendingData()) { @@ -328,8 +333,9 @@ BluetoothDaemonConnectionIO::OnError(const char* aFunction, int aErrno) Close(); // Tell the main thread we've errored - NS_DispatchToMainThread( - new SocketIOEventRunnable(this, SocketIOEventRunnable::CONNECT_ERROR)); + GetConsumerThread()->Dispatch( + new SocketIOEventRunnable(this, SocketIOEventRunnable::CONNECT_ERROR), + NS_DISPATCH_NORMAL); } // |ConnectionOrientedSocketIO| @@ -399,7 +405,7 @@ BluetoothDaemonConnectionIO::GetSocketBase() bool BluetoothDaemonConnectionIO::IsShutdownOnMainThread() const { - MOZ_ASSERT(NS_IsMainThread()); + MOZ_ASSERT(IsConsumerThread()); return mConnection == nullptr; } @@ -413,7 +419,7 @@ BluetoothDaemonConnectionIO::IsShutdownOnIOThread() const void BluetoothDaemonConnectionIO::ShutdownOnMainThread() { - MOZ_ASSERT(NS_IsMainThread()); + MOZ_ASSERT(IsConsumerThread()); MOZ_ASSERT(!IsShutdownOnMainThread()); mConnection = nullptr; @@ -422,7 +428,7 @@ BluetoothDaemonConnectionIO::ShutdownOnMainThread() void BluetoothDaemonConnectionIO::ShutdownOnIOThread() { - MOZ_ASSERT(!NS_IsMainThread()); + MOZ_ASSERT(!IsConsumerThread()); MOZ_ASSERT(!mShuttingDownOnIOThread); Close(); // will also remove fd from I/O loop @@ -452,10 +458,10 @@ BluetoothDaemonConnection::~BluetoothDaemonConnection() nsresult BluetoothDaemonConnection::PrepareAccept(UnixSocketConnector* aConnector, + nsIThread* aConsumerThread, MessageLoop* aIOLoop, ConnectionOrientedSocketIO*& aIO) { - MOZ_ASSERT(NS_IsMainThread()); MOZ_ASSERT(!mIO); // |BluetoothDaemonConnection| now owns the connector, but doesn't @@ -466,7 +472,7 @@ BluetoothDaemonConnection::PrepareAccept(UnixSocketConnector* aConnector, SetConnectionStatus(SOCKET_CONNECTING); mIO = new BluetoothDaemonConnectionIO( - aIOLoop, -1, UnixSocketWatcher::SOCKET_IS_CONNECTING, + aConsumerThread, aIOLoop, -1, UnixSocketWatcher::SOCKET_IS_CONNECTING, this, mPDUConsumer); aIO = mIO; @@ -478,8 +484,8 @@ BluetoothDaemonConnection::PrepareAccept(UnixSocketConnector* aConnector, void BluetoothDaemonConnection::SendSocketData(UnixSocketIOBuffer* aBuffer) { - MOZ_ASSERT(NS_IsMainThread()); MOZ_ASSERT(mIO); + MOZ_ASSERT(mIO->IsConsumerThread()); mIO->GetIOLoop()->PostTask( FROM_HERE, @@ -492,13 +498,14 @@ BluetoothDaemonConnection::SendSocketData(UnixSocketIOBuffer* aBuffer) void BluetoothDaemonConnection::Close() { - MOZ_ASSERT(NS_IsMainThread()); - if (!mIO) { CHROMIUM_LOG("Bluetooth daemon already disconnected!"); return; } + MOZ_ASSERT(mIO->IsConsumerThread()); + + mIO->ShutdownOnMainThread(); mIO->GetIOLoop()->PostTask(FROM_HERE, new SocketIOShutdownTask(mIO)); mIO = nullptr; @@ -508,24 +515,18 @@ BluetoothDaemonConnection::Close() void BluetoothDaemonConnection::OnConnectSuccess() { - MOZ_ASSERT(NS_IsMainThread()); - mConsumer->OnConnectSuccess(mIndex); } void BluetoothDaemonConnection::OnConnectError() { - MOZ_ASSERT(NS_IsMainThread()); - mConsumer->OnConnectError(mIndex); } void BluetoothDaemonConnection::OnDisconnect() { - MOZ_ASSERT(NS_IsMainThread()); - mConsumer->OnDisconnect(mIndex); } diff --git a/ipc/bluetooth/BluetoothDaemonConnection.h b/ipc/bluetooth/BluetoothDaemonConnection.h index 76f195beb20..dd26fb32a51 100644 --- a/ipc/bluetooth/BluetoothDaemonConnection.h +++ b/ipc/bluetooth/BluetoothDaemonConnection.h @@ -125,6 +125,7 @@ public: // nsresult PrepareAccept(UnixSocketConnector* aConnector, + nsIThread* aConsumerThread, MessageLoop* aIOLoop, ConnectionOrientedSocketIO*& aIO) override; diff --git a/ipc/unixsocket/ConnectionOrientedSocket.cpp b/ipc/unixsocket/ConnectionOrientedSocket.cpp index 915a1f1f38f..b6a28e33b73 100644 --- a/ipc/unixsocket/ConnectionOrientedSocket.cpp +++ b/ipc/unixsocket/ConnectionOrientedSocket.cpp @@ -9,9 +9,22 @@ namespace mozilla { namespace ipc { +// +// ConnectionOrientedSocketIO +// + +ConnectionOrientedSocketIO::ConnectionOrientedSocketIO( + nsIThread* aConsumerThread) + : DataSocketIO(aConsumerThread) +{ } + ConnectionOrientedSocketIO::~ConnectionOrientedSocketIO() { } +// +// ConnectionOrientedSocket +// + ConnectionOrientedSocket::~ConnectionOrientedSocket() { } diff --git a/ipc/unixsocket/ConnectionOrientedSocket.h b/ipc/unixsocket/ConnectionOrientedSocket.h index c4ea1b2d054..c2674ee14ed 100644 --- a/ipc/unixsocket/ConnectionOrientedSocket.h +++ b/ipc/unixsocket/ConnectionOrientedSocket.h @@ -26,6 +26,7 @@ class UnixSocketConnector; class ConnectionOrientedSocketIO : public DataSocketIO { public: + ConnectionOrientedSocketIO(nsIThread* aConsumerThread); virtual ~ConnectionOrientedSocketIO(); virtual nsresult Accept(int aFd, @@ -42,11 +43,13 @@ public: * * @param aConnector The new connector object, owned by the * connection-oriented socket. + * @param aConsumerThread The socket's consumer thread. * @param aIOLoop The socket's I/O thread. * @param[out] aIO, Returns an instance of |ConnectionOrientedSocketIO|. * @return NS_OK on success, or an XPCOM error code otherwise. */ virtual nsresult PrepareAccept(UnixSocketConnector* aConnector, + nsIThread* aConsumerThread, MessageLoop* aIOLoop, ConnectionOrientedSocketIO*& aIO) = 0; diff --git a/ipc/unixsocket/DataSocket.cpp b/ipc/unixsocket/DataSocket.cpp index c9a4d95d2f6..c0686b079e3 100644 --- a/ipc/unixsocket/DataSocket.cpp +++ b/ipc/unixsocket/DataSocket.cpp @@ -50,7 +50,8 @@ DataSocketIO::ReceiveData(int aFd) nsresult rv = QueryReceiveBuffer(&incoming); if (NS_FAILED(rv)) { /* an error occured */ - NS_DispatchToMainThread(new SocketIORequestClosingRunnable(this)); + GetConsumerThread()->Dispatch(new SocketIORequestClosingRunnable(this), + NS_DISPATCH_NORMAL); return -1; } @@ -58,12 +59,14 @@ DataSocketIO::ReceiveData(int aFd) if (res < 0) { /* an I/O error occured */ DiscardBuffer(); - NS_DispatchToMainThread(new SocketIORequestClosingRunnable(this)); + GetConsumerThread()->Dispatch(new SocketIORequestClosingRunnable(this), + NS_DISPATCH_NORMAL); return -1; } else if (!res) { /* EOF or peer shut down sending */ DiscardBuffer(); - NS_DispatchToMainThread(new SocketIORequestClosingRunnable(this)); + GetConsumerThread()->Dispatch(new SocketIORequestClosingRunnable(this), + NS_DISPATCH_NORMAL); return 0; } @@ -90,7 +93,8 @@ DataSocketIO::SendPendingData(int aFd) ssize_t res = outgoing->Send(aFd); if (res < 0) { /* an I/O error occured */ - NS_DispatchToMainThread(new SocketIORequestClosingRunnable(this)); + GetConsumerThread()->Dispatch(new SocketIORequestClosingRunnable(this), + NS_DISPATCH_NORMAL); return NS_ERROR_FAILURE; } else if (!res && outgoing->GetSize()) { /* I/O is currently blocked; try again later */ @@ -105,7 +109,8 @@ DataSocketIO::SendPendingData(int aFd) return NS_OK; } -DataSocketIO::DataSocketIO() +DataSocketIO::DataSocketIO(nsIThread* aConsumerThread) + : SocketIOBase(aConsumerThread) { } // diff --git a/ipc/unixsocket/DataSocket.h b/ipc/unixsocket/DataSocket.h index db9ac506f5d..3ca4761beb0 100644 --- a/ipc/unixsocket/DataSocket.h +++ b/ipc/unixsocket/DataSocket.h @@ -90,7 +90,7 @@ public: nsresult SendPendingData(int aFd); protected: - DataSocketIO(); + DataSocketIO(nsIThread* aConsumerThread); private: /** @@ -120,10 +120,10 @@ public: void Run() override { - MOZ_ASSERT(!NS_IsMainThread()); MOZ_ASSERT(!SocketIOTask::IsCanceled()); Tio* io = SocketIOTask::GetIO(); + MOZ_ASSERT(!io->IsConsumerThread()); MOZ_ASSERT(!io->IsShutdownOnIOThread()); io->Send(mData); diff --git a/ipc/unixsocket/ListenSocket.cpp b/ipc/unixsocket/ListenSocket.cpp index 4ebcc9d9d6e..ff6feaf9232 100644 --- a/ipc/unixsocket/ListenSocket.cpp +++ b/ipc/unixsocket/ListenSocket.cpp @@ -27,7 +27,8 @@ class ListenSocketIO final public: class ListenTask; - ListenSocketIO(MessageLoop* mIOLoop, + ListenSocketIO(nsIThread* aConsumerThread, + MessageLoop* aIOLoop, ListenSocket* aListenSocket, UnixSocketConnector* aConnector); ~ListenSocketIO(); @@ -94,11 +95,12 @@ private: ConnectionOrientedSocketIO* mCOSocketIO; }; -ListenSocketIO::ListenSocketIO(MessageLoop* mIOLoop, +ListenSocketIO::ListenSocketIO(nsIThread* aConsumerThread, + MessageLoop* aIOLoop, ListenSocket* aListenSocket, UnixSocketConnector* aConnector) - : UnixSocketWatcher(mIOLoop) - , SocketIOBase() + : UnixSocketWatcher(aIOLoop) + , SocketIOBase(aConsumerThread) , mListenSocket(aListenSocket) , mConnector(aConnector) , mShuttingDownOnIOThread(false) @@ -111,7 +113,7 @@ ListenSocketIO::ListenSocketIO(MessageLoop* mIOLoop, ListenSocketIO::~ListenSocketIO() { - MOZ_ASSERT(NS_IsMainThread()); + MOZ_ASSERT(IsConsumerThread()); MOZ_ASSERT(IsShutdownOnMainThread()); } @@ -166,8 +168,9 @@ ListenSocketIO::OnListening() AddWatchers(READ_WATCHER, true); /* We signal a successful 'connection' to a local address for listening. */ - NS_DispatchToMainThread( - new SocketIOEventRunnable(this, SocketIOEventRunnable::CONNECT_SUCCESS)); + GetConsumerThread()->Dispatch( + new SocketIOEventRunnable(this, SocketIOEventRunnable::CONNECT_SUCCESS), + NS_DISPATCH_NORMAL); } void @@ -188,8 +191,9 @@ ListenSocketIO::FireSocketError() Close(); // Tell the main thread we've errored - NS_DispatchToMainThread( - new SocketIOEventRunnable(this, SocketIOEventRunnable::CONNECT_ERROR)); + GetConsumerThread()->Dispatch( + new SocketIOEventRunnable(this, SocketIOEventRunnable::CONNECT_ERROR), + NS_DISPATCH_NORMAL); } void @@ -230,7 +234,7 @@ ListenSocketIO::GetSocketBase() bool ListenSocketIO::IsShutdownOnMainThread() const { - MOZ_ASSERT(NS_IsMainThread()); + MOZ_ASSERT(IsConsumerThread()); return mListenSocket == nullptr; } @@ -244,7 +248,7 @@ ListenSocketIO::IsShutdownOnIOThread() const void ListenSocketIO::ShutdownOnMainThread() { - MOZ_ASSERT(NS_IsMainThread()); + MOZ_ASSERT(IsConsumerThread()); MOZ_ASSERT(!IsShutdownOnMainThread()); mListenSocket = nullptr; @@ -253,7 +257,7 @@ ListenSocketIO::ShutdownOnMainThread() void ListenSocketIO::ShutdownOnIOThread() { - MOZ_ASSERT(!NS_IsMainThread()); + MOZ_ASSERT(!IsConsumerThread()); MOZ_ASSERT(!mShuttingDownOnIOThread); Close(); // will also remove fd from I/O loop @@ -277,7 +281,7 @@ public: void Run() override { - MOZ_ASSERT(!NS_IsMainThread()); + MOZ_ASSERT(!GetIO()->IsConsumerThread()); if (!IsCanceled()) { GetIO()->Listen(mCOSocketIO); @@ -307,13 +311,13 @@ ListenSocket::~ListenSocket() nsresult ListenSocket::Listen(UnixSocketConnector* aConnector, + nsIThread* aConsumerThread, MessageLoop* aIOLoop, ConnectionOrientedSocket* aCOSocket) { - MOZ_ASSERT(NS_IsMainThread()); MOZ_ASSERT(!mIO); - mIO = new ListenSocketIO(aIOLoop, this, aConnector); + mIO = new ListenSocketIO(aConsumerThread, aIOLoop, this, aConnector); // Prepared I/O object, now start listening. nsresult rv = Listen(aCOSocket); @@ -330,13 +334,18 @@ nsresult ListenSocket::Listen(UnixSocketConnector* aConnector, ConnectionOrientedSocket* aCOSocket) { - return Listen(aConnector, XRE_GetIOMessageLoop(), aCOSocket); + nsIThread* consumerThread = nullptr; + nsresult rv = NS_GetCurrentThread(&consumerThread); + if (NS_FAILED(rv)) { + return rv; + } + + return Listen(aConnector, consumerThread, XRE_GetIOMessageLoop(), aCOSocket); } nsresult ListenSocket::Listen(ConnectionOrientedSocket* aCOSocket) { - MOZ_ASSERT(NS_IsMainThread()); MOZ_ASSERT(aCOSocket); MOZ_ASSERT(mIO); @@ -350,7 +359,8 @@ ListenSocket::Listen(ConnectionOrientedSocket* aCOSocket) } nsAutoPtr io; - rv = aCOSocket->PrepareAccept(connector, mIO->GetIOLoop(), + rv = aCOSocket->PrepareAccept(connector, + mIO->GetConsumerThread(), mIO->GetIOLoop(), *io.StartAssignment()); if (NS_FAILED(rv)) { return rv; @@ -372,12 +382,12 @@ ListenSocket::Listen(ConnectionOrientedSocket* aCOSocket) void ListenSocket::Close() { - MOZ_ASSERT(NS_IsMainThread()); - if (!mIO) { return; } + MOZ_ASSERT(mIO->IsConsumerThread()); + // From this point on, we consider mIO as being deleted. We sever // the relationship here so any future calls to listen or connect // will create a new implementation. @@ -391,24 +401,18 @@ ListenSocket::Close() void ListenSocket::OnConnectSuccess() { - MOZ_ASSERT(NS_IsMainThread()); - mConsumer->OnConnectSuccess(mIndex); } void ListenSocket::OnConnectError() { - MOZ_ASSERT(NS_IsMainThread()); - mConsumer->OnConnectError(mIndex); } void ListenSocket::OnDisconnect() { - MOZ_ASSERT(NS_IsMainThread()); - mConsumer->OnDisconnect(mIndex); } diff --git a/ipc/unixsocket/ListenSocket.h b/ipc/unixsocket/ListenSocket.h index 0ab73c91bf2..55a75a4598a 100644 --- a/ipc/unixsocket/ListenSocket.h +++ b/ipc/unixsocket/ListenSocket.h @@ -11,6 +11,7 @@ #include "mozilla/ipc/SocketBase.h" class MessageLoop; +class nsIThread; namespace mozilla { namespace ipc { @@ -36,12 +37,14 @@ public: * in a non-blocking manner. * * @param aConnector Connector object for socket-type-specific functions + * @param aConsumerThread The socket's consumer thread. * @param aIOLoop The socket's I/O thread. * @param aCOSocket The connection-oriented socket for handling the * accepted connection. * @return NS_OK on success, or an XPCOM error code otherwise. */ nsresult Listen(UnixSocketConnector* aConnector, + nsIThread* aConsumerThread, MessageLoop* aIOLoop, ConnectionOrientedSocket* aCOSocket); diff --git a/ipc/unixsocket/SocketBase.cpp b/ipc/unixsocket/SocketBase.cpp index 491c4638504..b8a7defcaba 100644 --- a/ipc/unixsocket/SocketBase.cpp +++ b/ipc/unixsocket/SocketBase.cpp @@ -186,24 +186,18 @@ UnixSocketRawData::Send(int aFd) SocketConnectionStatus SocketBase::GetConnectionStatus() const { - MOZ_ASSERT(NS_IsMainThread()); - return mConnectionStatus; } int SocketBase::GetSuggestedConnectDelayMs() const { - MOZ_ASSERT(NS_IsMainThread()); - return mConnectDelayMs; } void SocketBase::NotifySuccess() { - MOZ_ASSERT(NS_IsMainThread()); - mConnectionStatus = SOCKET_CONNECTED; mConnectTimestamp = PR_IntervalNow(); OnConnectSuccess(); @@ -212,8 +206,6 @@ SocketBase::NotifySuccess() void SocketBase::NotifyError() { - MOZ_ASSERT(NS_IsMainThread()); - mConnectionStatus = SOCKET_DISCONNECTED; mConnectDelayMs = CalculateConnectDelayMs(); mConnectTimestamp = 0; @@ -223,8 +215,6 @@ SocketBase::NotifyError() void SocketBase::NotifyDisconnect() { - MOZ_ASSERT(NS_IsMainThread()); - mConnectionStatus = SOCKET_DISCONNECTED; mConnectDelayMs = CalculateConnectDelayMs(); mConnectTimestamp = 0; @@ -234,8 +224,6 @@ SocketBase::NotifyDisconnect() uint32_t SocketBase::CalculateConnectDelayMs() const { - MOZ_ASSERT(NS_IsMainThread()); - uint32_t connectDelayMs = mConnectDelayMs; if (mConnectTimestamp && (PR_IntervalNow()-mConnectTimestamp) > connectDelayMs) { @@ -272,12 +260,31 @@ SocketBase::SetConnectionStatus(SocketConnectionStatus aConnectionStatus) // SocketIOBase // -SocketIOBase::SocketIOBase() -{ } +SocketIOBase::SocketIOBase(nsIThread* aConsumerThread) + : mConsumerThread(aConsumerThread) +{ + MOZ_ASSERT(mConsumerThread); +} SocketIOBase::~SocketIOBase() { } +nsIThread* +SocketIOBase::GetConsumerThread() const +{ + return mConsumerThread; +} + +bool +SocketIOBase::IsConsumerThread() const +{ + nsIThread* thread = nullptr; + if (NS_FAILED(NS_GetCurrentThread(&thread))) { + return false; + } + return thread == GetConsumerThread(); +} + // // SocketIOEventRunnable // @@ -291,10 +298,10 @@ SocketIOEventRunnable::SocketIOEventRunnable(SocketIOBase* aIO, NS_METHOD SocketIOEventRunnable::Run() { - MOZ_ASSERT(NS_IsMainThread()); - SocketIOBase* io = SocketIORunnable::GetIO(); + MOZ_ASSERT(io->IsConsumerThread()); + if (NS_WARN_IF(io->IsShutdownOnMainThread())) { // Since we've already explicitly closed and the close // happened before this, this isn't really an error. @@ -327,10 +334,10 @@ SocketIORequestClosingRunnable::SocketIORequestClosingRunnable( NS_METHOD SocketIORequestClosingRunnable::Run() { - MOZ_ASSERT(NS_IsMainThread()); - SocketIOBase* io = SocketIORunnable::GetIO(); + MOZ_ASSERT(io->IsConsumerThread()); + if (NS_WARN_IF(io->IsShutdownOnMainThread())) { // Since we've already explicitly closed and the close // happened before this, this isn't really an error. @@ -373,19 +380,19 @@ SocketIOShutdownTask::SocketIOShutdownTask(SocketIOBase* aIO) void SocketIOShutdownTask::Run() { - MOZ_ASSERT(!NS_IsMainThread()); - SocketIOBase* io = SocketIOTask::GetIO(); + MOZ_ASSERT(!io->IsConsumerThread()); + MOZ_ASSERT(!io->IsShutdownOnIOThread()); + // At this point, there should be no new events on the I/O thread // after this one with the possible exception of an accept task, // which ShutdownOnIOThread will cancel for us. We are now fully // shut down, so we can send a message to the main thread to delete // |io| safely knowing that it's not reference any longer. - MOZ_ASSERT(!io->IsShutdownOnIOThread()); io->ShutdownOnIOThread(); - - NS_DispatchToMainThread(new SocketIODeleteInstanceRunnable(io)); + io->GetConsumerThread()->Dispatch(new SocketIODeleteInstanceRunnable(io), + NS_DISPATCH_NORMAL); } } diff --git a/ipc/unixsocket/SocketBase.h b/ipc/unixsocket/SocketBase.h index 3c55f426ab3..6e1290285b0 100644 --- a/ipc/unixsocket/SocketBase.h +++ b/ipc/unixsocket/SocketBase.h @@ -353,8 +353,24 @@ public: */ virtual void ShutdownOnMainThread() = 0; + /** + * Returns the consumer thread. + * + * @return A pointer to the consumer thread. + */ + nsIThread* GetConsumerThread() const; + + /** + * @return True if the current thread is thre consumer thread, or false + * otherwise. + */ + bool IsConsumerThread() const; + protected: - SocketIOBase(); + SocketIOBase(nsIThread* nsConsumerThread); + +private: + nsCOMPtr mConsumerThread; }; // diff --git a/ipc/unixsocket/StreamSocket.cpp b/ipc/unixsocket/StreamSocket.cpp index 024e05de29b..fc55e47827e 100644 --- a/ipc/unixsocket/StreamSocket.cpp +++ b/ipc/unixsocket/StreamSocket.cpp @@ -29,10 +29,12 @@ public: class DelayedConnectTask; class ReceiveRunnable; - StreamSocketIO(MessageLoop* mIOLoop, + StreamSocketIO(nsIThread* aConsumerThread, + MessageLoop* mIOLoop, StreamSocket* aStreamSocket, UnixSocketConnector* aConnector); - StreamSocketIO(MessageLoop* mIOLoop, int aFd, + StreamSocketIO(nsIThread* aConsumerThread, + MessageLoop* mIOLoop, int aFd, ConnectionStatus aConnectionStatus, StreamSocket* aStreamSocket, UnixSocketConnector* aConnector); @@ -133,10 +135,12 @@ private: nsAutoPtr mBuffer; }; -StreamSocketIO::StreamSocketIO(MessageLoop* mIOLoop, +StreamSocketIO::StreamSocketIO(nsIThread* aConsumerThread, + MessageLoop* aIOLoop, StreamSocket* aStreamSocket, UnixSocketConnector* aConnector) - : UnixSocketWatcher(mIOLoop) + : UnixSocketWatcher(aIOLoop) + , ConnectionOrientedSocketIO(aConsumerThread) , mStreamSocket(aStreamSocket) , mConnector(aConnector) , mShuttingDownOnIOThread(false) @@ -147,11 +151,13 @@ StreamSocketIO::StreamSocketIO(MessageLoop* mIOLoop, MOZ_ASSERT(mConnector); } -StreamSocketIO::StreamSocketIO(MessageLoop* mIOLoop, int aFd, - ConnectionStatus aConnectionStatus, +StreamSocketIO::StreamSocketIO(nsIThread* aConsumerThread, + MessageLoop* aIOLoop, + int aFd, ConnectionStatus aConnectionStatus, StreamSocket* aStreamSocket, UnixSocketConnector* aConnector) - : UnixSocketWatcher(mIOLoop, aFd, aConnectionStatus) + : UnixSocketWatcher(aIOLoop, aFd, aConnectionStatus) + , ConnectionOrientedSocketIO(aConsumerThread) , mStreamSocket(aStreamSocket) , mConnector(aConnector) , mShuttingDownOnIOThread(false) @@ -164,7 +170,7 @@ StreamSocketIO::StreamSocketIO(MessageLoop* mIOLoop, int aFd, StreamSocketIO::~StreamSocketIO() { - MOZ_ASSERT(NS_IsMainThread()); + MOZ_ASSERT(IsConsumerThread()); MOZ_ASSERT(IsShutdownOnMainThread()); } @@ -183,7 +189,7 @@ StreamSocketIO::GetDataSocket() void StreamSocketIO::SetDelayedConnectTask(CancelableTask* aTask) { - MOZ_ASSERT(NS_IsMainThread()); + MOZ_ASSERT(IsConsumerThread()); mDelayedConnectTask = aTask; } @@ -191,7 +197,7 @@ StreamSocketIO::SetDelayedConnectTask(CancelableTask* aTask) void StreamSocketIO::ClearDelayedConnectTask() { - MOZ_ASSERT(NS_IsMainThread()); + MOZ_ASSERT(IsConsumerThread()); mDelayedConnectTask = nullptr; } @@ -199,7 +205,7 @@ StreamSocketIO::ClearDelayedConnectTask() void StreamSocketIO::CancelDelayedConnectTask() { - MOZ_ASSERT(NS_IsMainThread()); + MOZ_ASSERT(IsConsumerThread()); if (!mDelayedConnectTask) { return; @@ -246,8 +252,9 @@ StreamSocketIO::OnConnected() MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop()); MOZ_ASSERT(GetConnectionStatus() == SOCKET_IS_CONNECTED); - NS_DispatchToMainThread( - new SocketIOEventRunnable(this, SocketIOEventRunnable::CONNECT_SUCCESS)); + GetConsumerThread()->Dispatch( + new SocketIOEventRunnable(this, SocketIOEventRunnable::CONNECT_SUCCESS), + NS_DISPATCH_NORMAL); AddWatchers(READ_WATCHER, true); if (HasPendingData()) { @@ -313,8 +320,9 @@ StreamSocketIO::FireSocketError() Close(); // Tell the main thread we've errored - NS_DispatchToMainThread( - new SocketIOEventRunnable(this, SocketIOEventRunnable::CONNECT_ERROR)); + GetConsumerThread()->Dispatch( + new SocketIOEventRunnable(this, SocketIOEventRunnable::CONNECT_ERROR), + NS_DISPATCH_NORMAL); } // |ConnectionOrientedSocketIO| @@ -334,8 +342,9 @@ StreamSocketIO::Accept(int aFd, memcpy(&mAddress, aAddress, mAddressLength); // Signal success - NS_DispatchToMainThread( - new SocketIOEventRunnable(this, SocketIOEventRunnable::CONNECT_SUCCESS)); + GetConsumerThread()->Dispatch( + new SocketIOEventRunnable(this, SocketIOEventRunnable::CONNECT_SUCCESS), + NS_DISPATCH_NORMAL); AddWatchers(READ_WATCHER, true); if (HasPendingData()) { @@ -375,10 +384,10 @@ public: NS_IMETHOD Run() override { - MOZ_ASSERT(NS_IsMainThread()); - StreamSocketIO* io = SocketIORunnable::GetIO(); + MOZ_ASSERT(io->IsConsumerThread()); + if (NS_WARN_IF(io->IsShutdownOnMainThread())) { // Since we've already explicitly closed and the close // happened before this, this isn't really an error. @@ -400,7 +409,8 @@ private: void StreamSocketIO::ConsumeBuffer() { - NS_DispatchToMainThread(new ReceiveRunnable(this, mBuffer.forget())); + GetConsumerThread()->Dispatch(new ReceiveRunnable(this, mBuffer.forget()), + NS_DISPATCH_NORMAL); } void @@ -420,7 +430,7 @@ StreamSocketIO::GetSocketBase() bool StreamSocketIO::IsShutdownOnMainThread() const { - MOZ_ASSERT(NS_IsMainThread()); + MOZ_ASSERT(IsConsumerThread()); return mStreamSocket == nullptr; } @@ -434,7 +444,7 @@ StreamSocketIO::IsShutdownOnIOThread() const void StreamSocketIO::ShutdownOnMainThread() { - MOZ_ASSERT(NS_IsMainThread()); + MOZ_ASSERT(IsConsumerThread()); MOZ_ASSERT(!IsShutdownOnMainThread()); mStreamSocket = nullptr; @@ -443,7 +453,7 @@ StreamSocketIO::ShutdownOnMainThread() void StreamSocketIO::ShutdownOnIOThread() { - MOZ_ASSERT(!NS_IsMainThread()); + MOZ_ASSERT(!IsConsumerThread()); MOZ_ASSERT(!mShuttingDownOnIOThread); Close(); // will also remove fd from I/O loop @@ -464,7 +474,7 @@ public: void Run() override { - MOZ_ASSERT(!NS_IsMainThread()); + MOZ_ASSERT(!GetIO()->IsConsumerThread()); MOZ_ASSERT(!IsCanceled()); GetIO()->Connect(); @@ -481,7 +491,7 @@ public: void Run() override { - MOZ_ASSERT(NS_IsMainThread()); + MOZ_ASSERT(GetIO()->IsConsumerThread()); if (IsCanceled()) { return; @@ -517,19 +527,16 @@ StreamSocket::~StreamSocket() void StreamSocket::ReceiveSocketData(nsAutoPtr& aBuffer) { - MOZ_ASSERT(NS_IsMainThread()); - mConsumer->ReceiveSocketData(mIndex, aBuffer); } nsresult StreamSocket::Connect(UnixSocketConnector* aConnector, int aDelayMs, - MessageLoop* aIOLoop) + nsIThread* aConsumerThread, MessageLoop* aIOLoop) { - MOZ_ASSERT(NS_IsMainThread()); MOZ_ASSERT(!mIO); - mIO = new StreamSocketIO(aIOLoop, this, aConnector); + mIO = new StreamSocketIO(aConsumerThread, aIOLoop, this, aConnector); SetConnectionStatus(SOCKET_CONNECTING); if (aDelayMs > 0) { @@ -540,29 +547,36 @@ StreamSocket::Connect(UnixSocketConnector* aConnector, int aDelayMs, } else { aIOLoop->PostTask(FROM_HERE, new StreamSocketIO::ConnectTask(mIO)); } + return NS_OK; } nsresult StreamSocket::Connect(UnixSocketConnector* aConnector, int aDelayMs) { - return Connect(aConnector, aDelayMs, XRE_GetIOMessageLoop()); + nsIThread* consumerThread = nullptr; + nsresult rv = NS_GetCurrentThread(&consumerThread); + if (NS_FAILED(rv)) { + return rv; + } + + return Connect(aConnector, aDelayMs, consumerThread, XRE_GetIOMessageLoop()); } // |ConnectionOrientedSocket| nsresult StreamSocket::PrepareAccept(UnixSocketConnector* aConnector, + nsIThread* aConsumerThread, MessageLoop* aIOLoop, ConnectionOrientedSocketIO*& aIO) { - MOZ_ASSERT(NS_IsMainThread()); MOZ_ASSERT(!mIO); MOZ_ASSERT(aConnector); SetConnectionStatus(SOCKET_CONNECTING); - mIO = new StreamSocketIO(aIOLoop, + mIO = new StreamSocketIO(aConsumerThread, aIOLoop, -1, UnixSocketWatcher::SOCKET_IS_CONNECTING, this, aConnector); aIO = mIO; @@ -575,10 +589,10 @@ StreamSocket::PrepareAccept(UnixSocketConnector* aConnector, void StreamSocket::SendSocketData(UnixSocketIOBuffer* aBuffer) { - MOZ_ASSERT(NS_IsMainThread()); MOZ_ASSERT(mIO); - + MOZ_ASSERT(mIO->IsConsumerThread()); MOZ_ASSERT(!mIO->IsShutdownOnMainThread()); + mIO->GetIOLoop()->PostTask( FROM_HERE, new SocketIOSendTask(mIO, aBuffer)); @@ -589,8 +603,8 @@ StreamSocket::SendSocketData(UnixSocketIOBuffer* aBuffer) void StreamSocket::Close() { - MOZ_ASSERT(NS_IsMainThread()); MOZ_ASSERT(mIO); + MOZ_ASSERT(mIO->IsConsumerThread()); mIO->CancelDelayedConnectTask(); @@ -607,24 +621,18 @@ StreamSocket::Close() void StreamSocket::OnConnectSuccess() { - MOZ_ASSERT(NS_IsMainThread()); - mConsumer->OnConnectSuccess(mIndex); } void StreamSocket::OnConnectError() { - MOZ_ASSERT(NS_IsMainThread()); - mConsumer->OnConnectError(mIndex); } void StreamSocket::OnDisconnect() { - MOZ_ASSERT(NS_IsMainThread()); - mConsumer->OnDisconnect(mIndex); } diff --git a/ipc/unixsocket/StreamSocket.h b/ipc/unixsocket/StreamSocket.h index 2a7478fba89..c83e750d5e2 100644 --- a/ipc/unixsocket/StreamSocket.h +++ b/ipc/unixsocket/StreamSocket.h @@ -42,11 +42,12 @@ public: * * @param aConnector Connector object for socket type specific functions * @param aDelayMs Time delay in milliseconds. + * @param aConsumerThread The socket's consumer thread. * @param aIOLoop The socket's I/O thread. * @return NS_OK on success, or an XPCOM error code otherwise. */ nsresult Connect(UnixSocketConnector* aConnector, int aDelayMs, - MessageLoop* aIOLoop); + nsIThread* aConsumerThread, MessageLoop* aIOLoop); /** * Starts a task on the socket that will try to connect to a socket in a @@ -62,6 +63,7 @@ public: // nsresult PrepareAccept(UnixSocketConnector* aConnector, + nsIThread* aConsumerThread, MessageLoop* aIOLoop, ConnectionOrientedSocketIO*& aIO) override;