Bug 1168806: Configurable consumer thread for socket IPC classes, r=kmachulis

The consumer thread handles socket creation, destruction, and
data processing for socket IPC. Traditionally this has been
done on the main thread.

This patch extends the socket IPC classes to support arbitrary
consumer threads. The thread is configured when establishing a
connection, and performs all of the above operations until the
socket is closed.
This commit is contained in:
Thomas Zimmermann 2015-06-02 10:01:57 +02:00
parent d8e16014cd
commit fb090327db
16 changed files with 307 additions and 208 deletions

View File

@ -74,8 +74,11 @@ public:
SOCKET_IS_CONNECTED
};
DroidSocketImpl(MessageLoop* aIOLoop, BluetoothSocket* aConsumer)
DroidSocketImpl(nsIThread* aConsumerThread,
MessageLoop* aIOLoop,
BluetoothSocket* aConsumer)
: ipc::UnixFdWatcher(aIOLoop)
, DataSocketIO(aConsumerThread)
, mConsumer(aConsumer)
, mShuttingDownOnIOThread(false)
, mConnectionStatus(SOCKET_IS_DISCONNECTED)
@ -83,7 +86,7 @@ public:
~DroidSocketImpl()
{
MOZ_ASSERT(NS_IsMainThread());
MOZ_ASSERT(IsConsumerThread());
}
void Send(UnixSocketIOBuffer* aBuffer)
@ -142,7 +145,8 @@ public:
bool IsShutdownOnMainThread() const override
{
MOZ_ASSERT(NS_IsMainThread());
MOZ_ASSERT(IsConsumerThread());
return mConsumer == nullptr;
}
@ -153,14 +157,14 @@ public:
void ShutdownOnMainThread() override
{
MOZ_ASSERT(NS_IsMainThread());
MOZ_ASSERT(IsConsumerThread());
MOZ_ASSERT(!IsShutdownOnMainThread());
mConsumer = nullptr;
}
void ShutdownOnIOThread() override
{
MOZ_ASSERT(!NS_IsMainThread());
MOZ_ASSERT(!IsConsumerThread());
MOZ_ASSERT(!mShuttingDownOnIOThread);
Close(); // will also remove fd from I/O loop
@ -214,7 +218,7 @@ public:
void Run() override
{
MOZ_ASSERT(!NS_IsMainThread());
MOZ_ASSERT(!GetIO()->IsConsumerThread());
MOZ_ASSERT(!IsCanceled());
GetIO()->Connect(mFd);
@ -234,7 +238,7 @@ public:
void Run() override
{
MOZ_ASSERT(!NS_IsMainThread());
MOZ_ASSERT(!GetIO()->IsConsumerThread());
if (!IsCanceled()) {
GetIO()->Listen(mFd);
@ -254,7 +258,7 @@ class SocketConnectClientFdTask final
void Run() override
{
MOZ_ASSERT(!NS_IsMainThread());
MOZ_ASSERT(!GetIO()->IsConsumerThread());
GetIO()->ConnectClientFd();
}
@ -314,8 +318,9 @@ DroidSocketImpl::Accept(int aFd)
SetFd(aFd);
mConnectionStatus = SOCKET_IS_CONNECTED;
NS_DispatchToMainThread(
new SocketIOEventRunnable(this, SocketIOEventRunnable::CONNECT_SUCCESS));
GetConsumerThread()->Dispatch(
new SocketIOEventRunnable(this, SocketIOEventRunnable::CONNECT_SUCCESS),
NS_DISPATCH_NORMAL);
AddWatchers(READ_WATCHER, true);
if (HasPendingData()) {
@ -338,7 +343,7 @@ DroidSocketImpl::OnFileCanReadWithoutBlocking(int aFd)
void
DroidSocketImpl::OnSocketCanReceiveWithoutBlocking(int aFd)
{
MOZ_ASSERT(!NS_IsMainThread());
MOZ_ASSERT(!IsConsumerThread());
MOZ_ASSERT(!mShuttingDownOnIOThread);
ssize_t res = ReceiveData(aFd);
@ -361,7 +366,7 @@ public:
void Run() override
{
MOZ_ASSERT(!NS_IsMainThread());
MOZ_ASSERT(!GetIO()->IsConsumerThread());
MOZ_ASSERT(!IsCanceled());
GetIO()->Accept(mFd);
@ -383,7 +388,7 @@ public:
void Accept(int aFd, const nsAString& aBdAddress,
int aConnectionStatus) override
{
MOZ_ASSERT(NS_IsMainThread());
MOZ_ASSERT(mImpl->IsConsumerThread());
mozilla::ScopedClose fd(aFd); // Close received socket fd on error
@ -404,7 +409,7 @@ public:
void OnError(BluetoothStatus aStatus) override
{
MOZ_ASSERT(NS_IsMainThread());
MOZ_ASSERT(mImpl->IsConsumerThread());
BT_LOGR("BluetoothSocketInterface::Accept failed: %d", (int)aStatus);
if (!mImpl->IsShutdownOnMainThread()) {
@ -430,7 +435,7 @@ public:
NS_IMETHOD Run() override
{
MOZ_ASSERT(NS_IsMainThread());
MOZ_ASSERT(GetIO()->IsConsumerThread());
MOZ_ASSERT(sBluetoothSocketInterface);
BluetoothSocketResultHandler* res = new AcceptResultHandler(GetIO());
@ -448,7 +453,7 @@ private:
void
DroidSocketImpl::OnSocketCanAcceptWithoutBlocking(int aFd)
{
MOZ_ASSERT(!NS_IsMainThread());
MOZ_ASSERT(!IsConsumerThread());
MOZ_ASSERT(!mShuttingDownOnIOThread);
/* When a listening socket is ready for receiving data,
@ -456,8 +461,8 @@ DroidSocketImpl::OnSocketCanAcceptWithoutBlocking(int aFd)
*/
RemoveWatchers(READ_WATCHER);
nsRefPtr<AcceptRunnable> t = new AcceptRunnable(this, aFd);
NS_DispatchToMainThread(t);
GetConsumerThread()->Dispatch(new AcceptRunnable(this, aFd),
NS_DISPATCH_NORMAL);
}
void
@ -475,7 +480,7 @@ DroidSocketImpl::OnFileCanWriteWithoutBlocking(int aFd)
void
DroidSocketImpl::OnSocketCanSendWithoutBlocking(int aFd)
{
MOZ_ASSERT(!NS_IsMainThread());
MOZ_ASSERT(!IsConsumerThread());
MOZ_ASSERT(!mShuttingDownOnIOThread);
MOZ_ASSERT(aFd >= 0);
@ -492,7 +497,7 @@ DroidSocketImpl::OnSocketCanSendWithoutBlocking(int aFd)
void
DroidSocketImpl::OnSocketCanConnectWithoutBlocking(int aFd)
{
MOZ_ASSERT(!NS_IsMainThread());
MOZ_ASSERT(!IsConsumerThread());
MOZ_ASSERT(!mShuttingDownOnIOThread);
/* We follow Posix behaviour here: Connect operations are
@ -501,8 +506,9 @@ DroidSocketImpl::OnSocketCanConnectWithoutBlocking(int aFd)
mConnectionStatus = SOCKET_IS_CONNECTED;
NS_DispatchToMainThread(
new SocketIOEventRunnable(this, SocketIOEventRunnable::CONNECT_SUCCESS));
GetConsumerThread()->Dispatch(
new SocketIOEventRunnable(this, SocketIOEventRunnable::CONNECT_SUCCESS),
NS_DISPATCH_NORMAL);
AddWatchers(READ_WATCHER, true);
if (HasPendingData()) {
@ -541,10 +547,10 @@ public:
NS_IMETHOD Run() override
{
MOZ_ASSERT(NS_IsMainThread());
DroidSocketImpl* io = SocketIORunnable<DroidSocketImpl>::GetIO();
MOZ_ASSERT(io->IsConsumerThread());
if (NS_WARN_IF(io->IsShutdownOnMainThread())) {
// Since we've already explicitly closed and the close
// happened before this, this isn't really an error.
@ -566,7 +572,8 @@ private:
void
DroidSocketImpl::ConsumeBuffer()
{
NS_DispatchToMainThread(new ReceiveRunnable(this, mBuffer.forget()));
GetConsumerThread()->Dispatch(new ReceiveRunnable(this, mBuffer.forget()),
NS_DISPATCH_NORMAL);
}
void
@ -602,7 +609,7 @@ public:
void Connect(int aFd, const nsAString& aBdAddress,
int aConnectionStatus) override
{
MOZ_ASSERT(NS_IsMainThread());
MOZ_ASSERT(mImpl->IsConsumerThread());
if (mImpl->IsShutdownOnMainThread()) {
BT_LOGD("mConsumer is null, aborting send!");
@ -621,7 +628,7 @@ public:
void OnError(BluetoothStatus aStatus) override
{
MOZ_ASSERT(NS_IsMainThread());
MOZ_ASSERT(mImpl->IsConsumerThread());
BT_WARNING("Connect failed: %d", (int)aStatus);
if (!mImpl->IsShutdownOnMainThread()) {
@ -643,14 +650,14 @@ BluetoothSocket::Connect(const nsAString& aDeviceAddress,
BluetoothSocketType aType,
int aChannel,
bool aAuth, bool aEncrypt,
nsIThread* aConsumerThread,
MessageLoop* aIOLoop)
{
MOZ_ASSERT(NS_IsMainThread());
MOZ_ASSERT(!mImpl);
SetConnectionStatus(SOCKET_CONNECTING);
mImpl = new DroidSocketImpl(aIOLoop, this);
mImpl = new DroidSocketImpl(aConsumerThread, aIOLoop, this);
BluetoothSocketResultHandler* res = new ConnectSocketResultHandler(mImpl);
SetCurrentResultHandler(res);
@ -670,8 +677,14 @@ BluetoothSocket::Connect(const nsAString& aDeviceAddress,
int aChannel,
bool aAuth, bool aEncrypt)
{
nsIThread* consumerThread = nullptr;
nsresult rv = NS_GetCurrentThread(&consumerThread);
if (NS_FAILED(rv)) {
return rv;
}
return Connect(aDeviceAddress, aServiceUuid, aType, aChannel, aAuth,
aEncrypt, XRE_GetIOMessageLoop());
aEncrypt, consumerThread, XRE_GetIOMessageLoop());
}
class ListenResultHandler final : public BluetoothSocketResultHandler
@ -685,14 +698,14 @@ public:
void Listen(int aFd) override
{
MOZ_ASSERT(NS_IsMainThread());
MOZ_ASSERT(mImpl->IsConsumerThread());
mImpl->GetIOLoop()->PostTask(FROM_HERE, new SocketListenTask(mImpl, aFd));
}
void OnError(BluetoothStatus aStatus) override
{
MOZ_ASSERT(NS_IsMainThread());
MOZ_ASSERT(mImpl->IsConsumerThread());
BT_WARNING("Listen failed: %d", (int)aStatus);
}
@ -707,14 +720,14 @@ BluetoothSocket::Listen(const nsAString& aServiceName,
BluetoothSocketType aType,
int aChannel,
bool aAuth, bool aEncrypt,
nsIThread* aConsumerThread,
MessageLoop* aIOLoop)
{
MOZ_ASSERT(NS_IsMainThread());
MOZ_ASSERT(!mImpl);
SetConnectionStatus(SOCKET_LISTENING);
mImpl = new DroidSocketImpl(aIOLoop, this);
mImpl = new DroidSocketImpl(aConsumerThread, aIOLoop, this);
BluetoothSocketResultHandler* res = new ListenResultHandler(mImpl);
SetCurrentResultHandler(res);
@ -734,14 +747,19 @@ BluetoothSocket::Listen(const nsAString& aServiceName,
int aChannel,
bool aAuth, bool aEncrypt)
{
nsIThread* consumerThread = nullptr;
nsresult rv = NS_GetCurrentThread(&consumerThread);
if (NS_FAILED(rv)) {
return rv;
}
return Listen(aServiceName, aServiceUuid, aType, aChannel, aAuth, aEncrypt,
XRE_GetIOMessageLoop());
consumerThread, XRE_GetIOMessageLoop());
}
void
BluetoothSocket::ReceiveSocketData(nsAutoPtr<UnixSocketBuffer>& aBuffer)
{
MOZ_ASSERT(NS_IsMainThread());
MOZ_ASSERT(mObserver);
mObserver->ReceiveSocketData(this, aBuffer);
@ -752,8 +770,8 @@ BluetoothSocket::ReceiveSocketData(nsAutoPtr<UnixSocketBuffer>& aBuffer)
void
BluetoothSocket::SendSocketData(UnixSocketIOBuffer* aBuffer)
{
MOZ_ASSERT(NS_IsMainThread());
MOZ_ASSERT(mImpl);
MOZ_ASSERT(mImpl->IsConsumerThread());
MOZ_ASSERT(!mImpl->IsShutdownOnMainThread());
mImpl->GetIOLoop()->PostTask(
@ -766,12 +784,14 @@ BluetoothSocket::SendSocketData(UnixSocketIOBuffer* aBuffer)
void
BluetoothSocket::Close()
{
MOZ_ASSERT(NS_IsMainThread());
MOZ_ASSERT(sBluetoothSocketInterface);
if (!mImpl) {
return;
}
MOZ_ASSERT(mImpl->IsConsumerThread());
// Stop any watching |SocketMessageWatcher|
if (mCurrentRes) {
sBluetoothSocketInterface->Close(mCurrentRes);
@ -790,7 +810,6 @@ BluetoothSocket::Close()
void
BluetoothSocket::OnConnectSuccess()
{
MOZ_ASSERT(NS_IsMainThread());
MOZ_ASSERT(mObserver);
SetCurrentResultHandler(nullptr);
@ -800,7 +819,6 @@ BluetoothSocket::OnConnectSuccess()
void
BluetoothSocket::OnConnectError()
{
MOZ_ASSERT(NS_IsMainThread());
MOZ_ASSERT(mObserver);
SetCurrentResultHandler(nullptr);
@ -810,7 +828,6 @@ BluetoothSocket::OnConnectError()
void
BluetoothSocket::OnDisconnect()
{
MOZ_ASSERT(NS_IsMainThread());
MOZ_ASSERT(mObserver);
mObserver->OnSocketDisconnect(this);
}

View File

@ -11,6 +11,7 @@
#include "mozilla/ipc/DataSocket.h"
class MessageLoop;
class nsIThread;
BEGIN_BLUETOOTH_NAMESPACE
@ -28,6 +29,7 @@ public:
BluetoothSocketType aType,
int aChannel,
bool aAuth, bool aEncrypt,
nsIThread* aConsumerThread,
MessageLoop* aIOLoop);
nsresult Connect(const nsAString& aDeviceAddress,
@ -41,6 +43,7 @@ public:
BluetoothSocketType aType,
int aChannel,
bool aAuth, bool aEncrypt,
nsIThread* aConsumerThread,
MessageLoop* aIOLoop);
nsresult Listen(const nsAString& aServiceName,

View File

@ -28,7 +28,8 @@ class BluetoothSocket::BluetoothSocketIO final
, public DataSocketIO
{
public:
BluetoothSocketIO(MessageLoop* mIOLoop,
BluetoothSocketIO(nsIThread* aConsumerThread,
MessageLoop* aIOLoop,
BluetoothSocket* aConsumer,
UnixSocketConnector* aConnector);
~BluetoothSocketIO();
@ -132,10 +133,12 @@ private:
};
BluetoothSocket::BluetoothSocketIO::BluetoothSocketIO(
MessageLoop* mIOLoop,
nsIThread* aConsumerThread,
MessageLoop* aIOLoop,
BluetoothSocket* aConsumer,
UnixSocketConnector* aConnector)
: UnixSocketWatcher(mIOLoop)
: UnixSocketWatcher(aIOLoop)
, DataSocketIO(aConsumerThread)
, mConsumer(aConsumer)
, mConnector(aConnector)
, mShuttingDownOnIOThread(false)
@ -148,7 +151,7 @@ BluetoothSocket::BluetoothSocketIO::BluetoothSocketIO(
BluetoothSocket::BluetoothSocketIO::~BluetoothSocketIO()
{
MOZ_ASSERT(NS_IsMainThread());
MOZ_ASSERT(IsConsumerThread());
MOZ_ASSERT(IsShutdownOnMainThread());
}
@ -188,7 +191,7 @@ BluetoothSocket::BluetoothSocketIO::GetDataSocket()
void
BluetoothSocket::BluetoothSocketIO::SetDelayedConnectTask(CancelableTask* aTask)
{
MOZ_ASSERT(NS_IsMainThread());
MOZ_ASSERT(IsConsumerThread());
mDelayedConnectTask = aTask;
}
@ -196,7 +199,7 @@ BluetoothSocket::BluetoothSocketIO::SetDelayedConnectTask(CancelableTask* aTask)
void
BluetoothSocket::BluetoothSocketIO::ClearDelayedConnectTask()
{
MOZ_ASSERT(NS_IsMainThread());
MOZ_ASSERT(IsConsumerThread());
mDelayedConnectTask = nullptr;
}
@ -204,7 +207,7 @@ BluetoothSocket::BluetoothSocketIO::ClearDelayedConnectTask()
void
BluetoothSocket::BluetoothSocketIO::CancelDelayedConnectTask()
{
MOZ_ASSERT(NS_IsMainThread());
MOZ_ASSERT(IsConsumerThread());
if (!mDelayedConnectTask) {
return;
@ -277,8 +280,9 @@ BluetoothSocket::BluetoothSocketIO::OnConnected()
MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop());
MOZ_ASSERT(GetConnectionStatus() == SOCKET_IS_CONNECTED);
NS_DispatchToMainThread(
new SocketIOEventRunnable(this, SocketIOEventRunnable::CONNECT_SUCCESS));
GetConsumerThread()->Dispatch(
new SocketIOEventRunnable(this, SocketIOEventRunnable::CONNECT_SUCCESS),
NS_DISPATCH_NORMAL);
AddWatchers(READ_WATCHER, true);
if (HasPendingData()) {
@ -326,8 +330,9 @@ BluetoothSocket::BluetoothSocketIO::OnSocketCanAcceptWithoutBlocking()
Close();
SetSocket(fd, SOCKET_IS_CONNECTED);
NS_DispatchToMainThread(
new SocketIOEventRunnable(this, SocketIOEventRunnable::CONNECT_SUCCESS));
GetConsumerThread()->Dispatch(
new SocketIOEventRunnable(this, SocketIOEventRunnable::CONNECT_SUCCESS),
NS_DISPATCH_NORMAL);
AddWatchers(READ_WATCHER, true);
if (HasPendingData()) {
@ -376,8 +381,9 @@ BluetoothSocket::BluetoothSocketIO::FireSocketError()
Close();
// Tell the main thread we've errored
NS_DispatchToMainThread(
new SocketIOEventRunnable(this, SocketIOEventRunnable::CONNECT_ERROR));
GetConsumerThread()->Dispatch(
new SocketIOEventRunnable(this, SocketIOEventRunnable::CONNECT_ERROR),
NS_DISPATCH_NORMAL);
}
@ -412,10 +418,10 @@ public:
NS_IMETHOD Run() override
{
MOZ_ASSERT(NS_IsMainThread());
BluetoothSocketIO* io = SocketIORunnable<BluetoothSocketIO>::GetIO();
MOZ_ASSERT(io->IsConsumerThread());
if (NS_WARN_IF(io->IsShutdownOnMainThread())) {
// Since we've already explicitly closed and the close
// happened before this, this isn't really an error.
@ -437,7 +443,8 @@ private:
void
BluetoothSocket::BluetoothSocketIO::ConsumeBuffer()
{
NS_DispatchToMainThread(new ReceiveRunnable(this, mBuffer.forget()));
GetConsumerThread()->Dispatch(new ReceiveRunnable(this, mBuffer.forget()),
NS_DISPATCH_NORMAL);
}
void
@ -457,7 +464,7 @@ BluetoothSocket::BluetoothSocketIO::GetSocketBase()
bool
BluetoothSocket::BluetoothSocketIO::IsShutdownOnMainThread() const
{
MOZ_ASSERT(NS_IsMainThread());
MOZ_ASSERT(IsConsumerThread());
return mConsumer == nullptr;
}
@ -465,7 +472,7 @@ BluetoothSocket::BluetoothSocketIO::IsShutdownOnMainThread() const
void
BluetoothSocket::BluetoothSocketIO::ShutdownOnMainThread()
{
MOZ_ASSERT(NS_IsMainThread());
MOZ_ASSERT(IsConsumerThread());
MOZ_ASSERT(!IsShutdownOnMainThread());
mConsumer = nullptr;
@ -480,7 +487,7 @@ BluetoothSocket::BluetoothSocketIO::IsShutdownOnIOThread() const
void
BluetoothSocket::BluetoothSocketIO::ShutdownOnIOThread()
{
MOZ_ASSERT(!NS_IsMainThread());
MOZ_ASSERT(!IsConsumerThread());
MOZ_ASSERT(!mShuttingDownOnIOThread);
Close(); // will also remove fd from I/O loop
@ -502,7 +509,7 @@ public:
void Run() override
{
MOZ_ASSERT(!NS_IsMainThread());
MOZ_ASSERT(!GetIO()->IsConsumerThread());
if (!IsCanceled()) {
GetIO()->Listen();
@ -520,7 +527,7 @@ public:
void Run() override
{
MOZ_ASSERT(!NS_IsMainThread());
MOZ_ASSERT(!GetIO()->IsConsumerThread());
MOZ_ASSERT(!IsCanceled());
GetIO()->Connect();
@ -537,7 +544,7 @@ public:
void Run() override
{
MOZ_ASSERT(NS_IsMainThread());
MOZ_ASSERT(GetIO()->IsConsumerThread());
if (IsCanceled()) {
return;
@ -576,7 +583,6 @@ BluetoothSocket::Connect(const nsAString& aDeviceAddress,
int aChannel,
bool aAuth, bool aEncrypt)
{
MOZ_ASSERT(NS_IsMainThread());
MOZ_ASSERT(!aDeviceAddress.IsEmpty());
nsAutoPtr<BluetoothUnixSocketConnector> connector(
@ -603,8 +609,6 @@ BluetoothSocket::Listen(const nsAString& aServiceName,
int aChannel,
bool aAuth, bool aEncrypt)
{
MOZ_ASSERT(NS_IsMainThread());
nsAutoPtr<BluetoothUnixSocketConnector> connector(
new BluetoothUnixSocketConnector(NS_LITERAL_CSTRING(BLUETOOTH_ADDRESS_NONE),
aType, aChannel, aAuth, aEncrypt));
@ -625,7 +629,6 @@ BluetoothSocket::Listen(const nsAString& aServiceName,
void
BluetoothSocket::ReceiveSocketData(nsAutoPtr<UnixSocketBuffer>& aBuffer)
{
MOZ_ASSERT(NS_IsMainThread());
MOZ_ASSERT(mObserver);
mObserver->ReceiveSocketData(this, aBuffer);
@ -645,14 +648,15 @@ BluetoothSocket::SendSocketData(const nsACString& aStr)
nsresult
BluetoothSocket::Connect(BluetoothUnixSocketConnector* aConnector,
int aDelayMs, MessageLoop* aIOLoop)
int aDelayMs,
nsIThread* aConsumerThread, MessageLoop* aIOLoop)
{
MOZ_ASSERT(NS_IsMainThread());
MOZ_ASSERT(aConnector);
MOZ_ASSERT(aConsumerThread);
MOZ_ASSERT(aIOLoop);
MOZ_ASSERT(!mIO);
mIO = new BluetoothSocketIO(aIOLoop, this, aConnector);
mIO = new BluetoothSocketIO(aConsumerThread, aIOLoop, this, aConnector);
SetConnectionStatus(SOCKET_CONNECTING);
if (aDelayMs > 0) {
@ -670,19 +674,25 @@ nsresult
BluetoothSocket::Connect(BluetoothUnixSocketConnector* aConnector,
int aDelayMs)
{
return Connect(aConnector, aDelayMs, XRE_GetIOMessageLoop());
nsIThread* consumerThread = nullptr;
nsresult rv = NS_GetCurrentThread(&consumerThread);
if (NS_FAILED(rv)) {
return rv;
}
return Connect(aConnector, aDelayMs, consumerThread, XRE_GetIOMessageLoop());
}
nsresult
BluetoothSocket::Listen(BluetoothUnixSocketConnector* aConnector,
MessageLoop* aIOLoop)
nsIThread* aConsumerThread, MessageLoop* aIOLoop)
{
MOZ_ASSERT(NS_IsMainThread());
MOZ_ASSERT(aConnector);
MOZ_ASSERT(aConsumerThread);
MOZ_ASSERT(aIOLoop);
MOZ_ASSERT(!mIO);
mIO = new BluetoothSocketIO(aIOLoop, this, aConnector);
mIO = new BluetoothSocketIO(aConsumerThread, aIOLoop, this, aConnector);
SetConnectionStatus(SOCKET_LISTENING);
aIOLoop->PostTask(FROM_HERE, new ListenTask(mIO));
@ -693,7 +703,13 @@ BluetoothSocket::Listen(BluetoothUnixSocketConnector* aConnector,
nsresult
BluetoothSocket::Listen(BluetoothUnixSocketConnector* aConnector)
{
return Listen(aConnector, XRE_GetIOMessageLoop());
nsIThread* consumerThread = nullptr;
nsresult rv = NS_GetCurrentThread(&consumerThread);
if (NS_FAILED(rv)) {
return rv;
}
return Listen(aConnector, consumerThread, XRE_GetIOMessageLoop());
}
void
@ -712,8 +728,8 @@ BluetoothSocket::GetAddress(nsAString& aAddrStr)
void
BluetoothSocket::SendSocketData(UnixSocketIOBuffer* aBuffer)
{
MOZ_ASSERT(NS_IsMainThread());
MOZ_ASSERT(mIO);
MOZ_ASSERT(mIO->IsConsumerThread());
MOZ_ASSERT(!mIO->IsShutdownOnMainThread());
mIO->GetIOLoop()->PostTask(
@ -726,11 +742,12 @@ BluetoothSocket::SendSocketData(UnixSocketIOBuffer* aBuffer)
void
BluetoothSocket::Close()
{
MOZ_ASSERT(NS_IsMainThread());
if (!mIO) {
return;
}
MOZ_ASSERT(mIO->IsConsumerThread());
mIO->CancelDelayedConnectTask();
// From this point on, we consider mIO as being deleted.
@ -746,7 +763,6 @@ BluetoothSocket::Close()
void
BluetoothSocket::OnConnectSuccess()
{
MOZ_ASSERT(NS_IsMainThread());
MOZ_ASSERT(mObserver);
mObserver->OnSocketConnectSuccess(this);
}
@ -754,7 +770,6 @@ BluetoothSocket::OnConnectSuccess()
void
BluetoothSocket::OnConnectError()
{
MOZ_ASSERT(NS_IsMainThread());
MOZ_ASSERT(mObserver);
mObserver->OnSocketConnectError(this);
}
@ -762,7 +777,6 @@ BluetoothSocket::OnConnectError()
void
BluetoothSocket::OnDisconnect()
{
MOZ_ASSERT(NS_IsMainThread());
MOZ_ASSERT(mObserver);
mObserver->OnSocketDisconnect(this);
}

View File

@ -66,11 +66,12 @@ public:
*
* @param aConnector Connector object for socket type specific functions
* @param aDelayMs Time delay in milli-seconds.
* @param aConsumerThread The socket's consumer thread.
* @param aIOLoop The socket's I/O thread.
* @return NS_OK on success, or an XPCOM error code otherwise.
*/
nsresult Connect(BluetoothUnixSocketConnector* aConnector,
int aDelayMs, MessageLoop* aIOLoop);
nsresult Connect(BluetoothUnixSocketConnector* aConnector, int aDelayMs,
nsIThread* aConsumerThread, MessageLoop* aIOLoop);
/**
* Starts a task on the socket that will try to connect to a socket in a
@ -88,11 +89,12 @@ public:
* non-blocking manner.
*
* @param aConnector Connector object for socket type specific functions
* @param aConsumerThread The socket's consumer thread.
* @param aIOLoop The socket's I/O thread.
* @return NS_OK on success, or an XPCOM error code otherwise.
*/
nsresult Listen(BluetoothUnixSocketConnector* aConnector,
MessageLoop* aIOLoop);
nsIThread* aConsumerThread, MessageLoop* aIOLoop);
/**
* Starts a task on the socket that will try to accept a new connection in a

View File

@ -206,8 +206,9 @@ class BluetoothDaemonConnectionIO final
, public ConnectionOrientedSocketIO
{
public:
BluetoothDaemonConnectionIO(MessageLoop* aIOLoop, int aFd,
ConnectionStatus aConnectionStatus,
BluetoothDaemonConnectionIO(nsIThread* aConsumerThread,
MessageLoop* aIOLoop,
int aFd, ConnectionStatus aConnectionStatus,
BluetoothDaemonConnection* aConnection,
BluetoothDaemonPDUConsumer* aConsumer);
@ -255,14 +256,17 @@ private:
};
BluetoothDaemonConnectionIO::BluetoothDaemonConnectionIO(
MessageLoop* aIOLoop, int aFd,
nsIThread* aConsumerThread,
MessageLoop* aIOLoop,
int aFd,
ConnectionStatus aConnectionStatus,
BluetoothDaemonConnection* aConnection,
BluetoothDaemonPDUConsumer* aConsumer)
: UnixSocketWatcher(aIOLoop, aFd, aConnectionStatus)
, mConnection(aConnection)
, mConsumer(aConsumer)
, mShuttingDownOnIOThread(false)
: UnixSocketWatcher(aIOLoop, aFd, aConnectionStatus)
, ConnectionOrientedSocketIO(aConsumerThread)
, mConnection(aConnection)
, mConsumer(aConsumer)
, mShuttingDownOnIOThread(false)
{
MOZ_ASSERT(mConnection);
MOZ_ASSERT(mConsumer);
@ -308,8 +312,9 @@ BluetoothDaemonConnectionIO::OnConnected()
MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop());
MOZ_ASSERT(GetConnectionStatus() == SOCKET_IS_CONNECTED);
NS_DispatchToMainThread(
new SocketIOEventRunnable(this, SocketIOEventRunnable::CONNECT_SUCCESS));
GetConsumerThread()->Dispatch(
new SocketIOEventRunnable(this, SocketIOEventRunnable::CONNECT_SUCCESS),
NS_DISPATCH_NORMAL);
AddWatchers(READ_WATCHER, true);
if (HasPendingData()) {
@ -328,8 +333,9 @@ BluetoothDaemonConnectionIO::OnError(const char* aFunction, int aErrno)
Close();
// Tell the main thread we've errored
NS_DispatchToMainThread(
new SocketIOEventRunnable(this, SocketIOEventRunnable::CONNECT_ERROR));
GetConsumerThread()->Dispatch(
new SocketIOEventRunnable(this, SocketIOEventRunnable::CONNECT_ERROR),
NS_DISPATCH_NORMAL);
}
// |ConnectionOrientedSocketIO|
@ -399,7 +405,7 @@ BluetoothDaemonConnectionIO::GetSocketBase()
bool
BluetoothDaemonConnectionIO::IsShutdownOnMainThread() const
{
MOZ_ASSERT(NS_IsMainThread());
MOZ_ASSERT(IsConsumerThread());
return mConnection == nullptr;
}
@ -413,7 +419,7 @@ BluetoothDaemonConnectionIO::IsShutdownOnIOThread() const
void
BluetoothDaemonConnectionIO::ShutdownOnMainThread()
{
MOZ_ASSERT(NS_IsMainThread());
MOZ_ASSERT(IsConsumerThread());
MOZ_ASSERT(!IsShutdownOnMainThread());
mConnection = nullptr;
@ -422,7 +428,7 @@ BluetoothDaemonConnectionIO::ShutdownOnMainThread()
void
BluetoothDaemonConnectionIO::ShutdownOnIOThread()
{
MOZ_ASSERT(!NS_IsMainThread());
MOZ_ASSERT(!IsConsumerThread());
MOZ_ASSERT(!mShuttingDownOnIOThread);
Close(); // will also remove fd from I/O loop
@ -452,10 +458,10 @@ BluetoothDaemonConnection::~BluetoothDaemonConnection()
nsresult
BluetoothDaemonConnection::PrepareAccept(UnixSocketConnector* aConnector,
nsIThread* aConsumerThread,
MessageLoop* aIOLoop,
ConnectionOrientedSocketIO*& aIO)
{
MOZ_ASSERT(NS_IsMainThread());
MOZ_ASSERT(!mIO);
// |BluetoothDaemonConnection| now owns the connector, but doesn't
@ -466,7 +472,7 @@ BluetoothDaemonConnection::PrepareAccept(UnixSocketConnector* aConnector,
SetConnectionStatus(SOCKET_CONNECTING);
mIO = new BluetoothDaemonConnectionIO(
aIOLoop, -1, UnixSocketWatcher::SOCKET_IS_CONNECTING,
aConsumerThread, aIOLoop, -1, UnixSocketWatcher::SOCKET_IS_CONNECTING,
this, mPDUConsumer);
aIO = mIO;
@ -478,8 +484,8 @@ BluetoothDaemonConnection::PrepareAccept(UnixSocketConnector* aConnector,
void
BluetoothDaemonConnection::SendSocketData(UnixSocketIOBuffer* aBuffer)
{
MOZ_ASSERT(NS_IsMainThread());
MOZ_ASSERT(mIO);
MOZ_ASSERT(mIO->IsConsumerThread());
mIO->GetIOLoop()->PostTask(
FROM_HERE,
@ -492,13 +498,14 @@ BluetoothDaemonConnection::SendSocketData(UnixSocketIOBuffer* aBuffer)
void
BluetoothDaemonConnection::Close()
{
MOZ_ASSERT(NS_IsMainThread());
if (!mIO) {
CHROMIUM_LOG("Bluetooth daemon already disconnected!");
return;
}
MOZ_ASSERT(mIO->IsConsumerThread());
mIO->ShutdownOnMainThread();
mIO->GetIOLoop()->PostTask(FROM_HERE, new SocketIOShutdownTask(mIO));
mIO = nullptr;
@ -508,24 +515,18 @@ BluetoothDaemonConnection::Close()
void
BluetoothDaemonConnection::OnConnectSuccess()
{
MOZ_ASSERT(NS_IsMainThread());
mConsumer->OnConnectSuccess(mIndex);
}
void
BluetoothDaemonConnection::OnConnectError()
{
MOZ_ASSERT(NS_IsMainThread());
mConsumer->OnConnectError(mIndex);
}
void
BluetoothDaemonConnection::OnDisconnect()
{
MOZ_ASSERT(NS_IsMainThread());
mConsumer->OnDisconnect(mIndex);
}

View File

@ -125,6 +125,7 @@ public:
//
nsresult PrepareAccept(UnixSocketConnector* aConnector,
nsIThread* aConsumerThread,
MessageLoop* aIOLoop,
ConnectionOrientedSocketIO*& aIO) override;

View File

@ -9,9 +9,22 @@
namespace mozilla {
namespace ipc {
//
// ConnectionOrientedSocketIO
//
ConnectionOrientedSocketIO::ConnectionOrientedSocketIO(
nsIThread* aConsumerThread)
: DataSocketIO(aConsumerThread)
{ }
ConnectionOrientedSocketIO::~ConnectionOrientedSocketIO()
{ }
//
// ConnectionOrientedSocket
//
ConnectionOrientedSocket::~ConnectionOrientedSocket()
{ }

View File

@ -26,6 +26,7 @@ class UnixSocketConnector;
class ConnectionOrientedSocketIO : public DataSocketIO
{
public:
ConnectionOrientedSocketIO(nsIThread* aConsumerThread);
virtual ~ConnectionOrientedSocketIO();
virtual nsresult Accept(int aFd,
@ -42,11 +43,13 @@ public:
*
* @param aConnector The new connector object, owned by the
* connection-oriented socket.
* @param aConsumerThread The socket's consumer thread.
* @param aIOLoop The socket's I/O thread.
* @param[out] aIO, Returns an instance of |ConnectionOrientedSocketIO|.
* @return NS_OK on success, or an XPCOM error code otherwise.
*/
virtual nsresult PrepareAccept(UnixSocketConnector* aConnector,
nsIThread* aConsumerThread,
MessageLoop* aIOLoop,
ConnectionOrientedSocketIO*& aIO) = 0;

View File

@ -50,7 +50,8 @@ DataSocketIO::ReceiveData(int aFd)
nsresult rv = QueryReceiveBuffer(&incoming);
if (NS_FAILED(rv)) {
/* an error occured */
NS_DispatchToMainThread(new SocketIORequestClosingRunnable(this));
GetConsumerThread()->Dispatch(new SocketIORequestClosingRunnable(this),
NS_DISPATCH_NORMAL);
return -1;
}
@ -58,12 +59,14 @@ DataSocketIO::ReceiveData(int aFd)
if (res < 0) {
/* an I/O error occured */
DiscardBuffer();
NS_DispatchToMainThread(new SocketIORequestClosingRunnable(this));
GetConsumerThread()->Dispatch(new SocketIORequestClosingRunnable(this),
NS_DISPATCH_NORMAL);
return -1;
} else if (!res) {
/* EOF or peer shut down sending */
DiscardBuffer();
NS_DispatchToMainThread(new SocketIORequestClosingRunnable(this));
GetConsumerThread()->Dispatch(new SocketIORequestClosingRunnable(this),
NS_DISPATCH_NORMAL);
return 0;
}
@ -90,7 +93,8 @@ DataSocketIO::SendPendingData(int aFd)
ssize_t res = outgoing->Send(aFd);
if (res < 0) {
/* an I/O error occured */
NS_DispatchToMainThread(new SocketIORequestClosingRunnable(this));
GetConsumerThread()->Dispatch(new SocketIORequestClosingRunnable(this),
NS_DISPATCH_NORMAL);
return NS_ERROR_FAILURE;
} else if (!res && outgoing->GetSize()) {
/* I/O is currently blocked; try again later */
@ -105,7 +109,8 @@ DataSocketIO::SendPendingData(int aFd)
return NS_OK;
}
DataSocketIO::DataSocketIO()
DataSocketIO::DataSocketIO(nsIThread* aConsumerThread)
: SocketIOBase(aConsumerThread)
{ }
//

View File

@ -90,7 +90,7 @@ public:
nsresult SendPendingData(int aFd);
protected:
DataSocketIO();
DataSocketIO(nsIThread* aConsumerThread);
private:
/**
@ -120,10 +120,10 @@ public:
void Run() override
{
MOZ_ASSERT(!NS_IsMainThread());
MOZ_ASSERT(!SocketIOTask<Tio>::IsCanceled());
Tio* io = SocketIOTask<Tio>::GetIO();
MOZ_ASSERT(!io->IsConsumerThread());
MOZ_ASSERT(!io->IsShutdownOnIOThread());
io->Send(mData);

View File

@ -27,7 +27,8 @@ class ListenSocketIO final
public:
class ListenTask;
ListenSocketIO(MessageLoop* mIOLoop,
ListenSocketIO(nsIThread* aConsumerThread,
MessageLoop* aIOLoop,
ListenSocket* aListenSocket,
UnixSocketConnector* aConnector);
~ListenSocketIO();
@ -94,11 +95,12 @@ private:
ConnectionOrientedSocketIO* mCOSocketIO;
};
ListenSocketIO::ListenSocketIO(MessageLoop* mIOLoop,
ListenSocketIO::ListenSocketIO(nsIThread* aConsumerThread,
MessageLoop* aIOLoop,
ListenSocket* aListenSocket,
UnixSocketConnector* aConnector)
: UnixSocketWatcher(mIOLoop)
, SocketIOBase()
: UnixSocketWatcher(aIOLoop)
, SocketIOBase(aConsumerThread)
, mListenSocket(aListenSocket)
, mConnector(aConnector)
, mShuttingDownOnIOThread(false)
@ -111,7 +113,7 @@ ListenSocketIO::ListenSocketIO(MessageLoop* mIOLoop,
ListenSocketIO::~ListenSocketIO()
{
MOZ_ASSERT(NS_IsMainThread());
MOZ_ASSERT(IsConsumerThread());
MOZ_ASSERT(IsShutdownOnMainThread());
}
@ -166,8 +168,9 @@ ListenSocketIO::OnListening()
AddWatchers(READ_WATCHER, true);
/* We signal a successful 'connection' to a local address for listening. */
NS_DispatchToMainThread(
new SocketIOEventRunnable(this, SocketIOEventRunnable::CONNECT_SUCCESS));
GetConsumerThread()->Dispatch(
new SocketIOEventRunnable(this, SocketIOEventRunnable::CONNECT_SUCCESS),
NS_DISPATCH_NORMAL);
}
void
@ -188,8 +191,9 @@ ListenSocketIO::FireSocketError()
Close();
// Tell the main thread we've errored
NS_DispatchToMainThread(
new SocketIOEventRunnable(this, SocketIOEventRunnable::CONNECT_ERROR));
GetConsumerThread()->Dispatch(
new SocketIOEventRunnable(this, SocketIOEventRunnable::CONNECT_ERROR),
NS_DISPATCH_NORMAL);
}
void
@ -230,7 +234,7 @@ ListenSocketIO::GetSocketBase()
bool
ListenSocketIO::IsShutdownOnMainThread() const
{
MOZ_ASSERT(NS_IsMainThread());
MOZ_ASSERT(IsConsumerThread());
return mListenSocket == nullptr;
}
@ -244,7 +248,7 @@ ListenSocketIO::IsShutdownOnIOThread() const
void
ListenSocketIO::ShutdownOnMainThread()
{
MOZ_ASSERT(NS_IsMainThread());
MOZ_ASSERT(IsConsumerThread());
MOZ_ASSERT(!IsShutdownOnMainThread());
mListenSocket = nullptr;
@ -253,7 +257,7 @@ ListenSocketIO::ShutdownOnMainThread()
void
ListenSocketIO::ShutdownOnIOThread()
{
MOZ_ASSERT(!NS_IsMainThread());
MOZ_ASSERT(!IsConsumerThread());
MOZ_ASSERT(!mShuttingDownOnIOThread);
Close(); // will also remove fd from I/O loop
@ -277,7 +281,7 @@ public:
void Run() override
{
MOZ_ASSERT(!NS_IsMainThread());
MOZ_ASSERT(!GetIO()->IsConsumerThread());
if (!IsCanceled()) {
GetIO()->Listen(mCOSocketIO);
@ -307,13 +311,13 @@ ListenSocket::~ListenSocket()
nsresult
ListenSocket::Listen(UnixSocketConnector* aConnector,
nsIThread* aConsumerThread,
MessageLoop* aIOLoop,
ConnectionOrientedSocket* aCOSocket)
{
MOZ_ASSERT(NS_IsMainThread());
MOZ_ASSERT(!mIO);
mIO = new ListenSocketIO(aIOLoop, this, aConnector);
mIO = new ListenSocketIO(aConsumerThread, aIOLoop, this, aConnector);
// Prepared I/O object, now start listening.
nsresult rv = Listen(aCOSocket);
@ -330,13 +334,18 @@ nsresult
ListenSocket::Listen(UnixSocketConnector* aConnector,
ConnectionOrientedSocket* aCOSocket)
{
return Listen(aConnector, XRE_GetIOMessageLoop(), aCOSocket);
nsIThread* consumerThread = nullptr;
nsresult rv = NS_GetCurrentThread(&consumerThread);
if (NS_FAILED(rv)) {
return rv;
}
return Listen(aConnector, consumerThread, XRE_GetIOMessageLoop(), aCOSocket);
}
nsresult
ListenSocket::Listen(ConnectionOrientedSocket* aCOSocket)
{
MOZ_ASSERT(NS_IsMainThread());
MOZ_ASSERT(aCOSocket);
MOZ_ASSERT(mIO);
@ -350,7 +359,8 @@ ListenSocket::Listen(ConnectionOrientedSocket* aCOSocket)
}
nsAutoPtr<ConnectionOrientedSocketIO> io;
rv = aCOSocket->PrepareAccept(connector, mIO->GetIOLoop(),
rv = aCOSocket->PrepareAccept(connector,
mIO->GetConsumerThread(), mIO->GetIOLoop(),
*io.StartAssignment());
if (NS_FAILED(rv)) {
return rv;
@ -372,12 +382,12 @@ ListenSocket::Listen(ConnectionOrientedSocket* aCOSocket)
void
ListenSocket::Close()
{
MOZ_ASSERT(NS_IsMainThread());
if (!mIO) {
return;
}
MOZ_ASSERT(mIO->IsConsumerThread());
// From this point on, we consider mIO as being deleted. We sever
// the relationship here so any future calls to listen or connect
// will create a new implementation.
@ -391,24 +401,18 @@ ListenSocket::Close()
void
ListenSocket::OnConnectSuccess()
{
MOZ_ASSERT(NS_IsMainThread());
mConsumer->OnConnectSuccess(mIndex);
}
void
ListenSocket::OnConnectError()
{
MOZ_ASSERT(NS_IsMainThread());
mConsumer->OnConnectError(mIndex);
}
void
ListenSocket::OnDisconnect()
{
MOZ_ASSERT(NS_IsMainThread());
mConsumer->OnDisconnect(mIndex);
}

View File

@ -11,6 +11,7 @@
#include "mozilla/ipc/SocketBase.h"
class MessageLoop;
class nsIThread;
namespace mozilla {
namespace ipc {
@ -36,12 +37,14 @@ public:
* in a non-blocking manner.
*
* @param aConnector Connector object for socket-type-specific functions
* @param aConsumerThread The socket's consumer thread.
* @param aIOLoop The socket's I/O thread.
* @param aCOSocket The connection-oriented socket for handling the
* accepted connection.
* @return NS_OK on success, or an XPCOM error code otherwise.
*/
nsresult Listen(UnixSocketConnector* aConnector,
nsIThread* aConsumerThread,
MessageLoop* aIOLoop,
ConnectionOrientedSocket* aCOSocket);

View File

@ -186,24 +186,18 @@ UnixSocketRawData::Send(int aFd)
SocketConnectionStatus
SocketBase::GetConnectionStatus() const
{
MOZ_ASSERT(NS_IsMainThread());
return mConnectionStatus;
}
int
SocketBase::GetSuggestedConnectDelayMs() const
{
MOZ_ASSERT(NS_IsMainThread());
return mConnectDelayMs;
}
void
SocketBase::NotifySuccess()
{
MOZ_ASSERT(NS_IsMainThread());
mConnectionStatus = SOCKET_CONNECTED;
mConnectTimestamp = PR_IntervalNow();
OnConnectSuccess();
@ -212,8 +206,6 @@ SocketBase::NotifySuccess()
void
SocketBase::NotifyError()
{
MOZ_ASSERT(NS_IsMainThread());
mConnectionStatus = SOCKET_DISCONNECTED;
mConnectDelayMs = CalculateConnectDelayMs();
mConnectTimestamp = 0;
@ -223,8 +215,6 @@ SocketBase::NotifyError()
void
SocketBase::NotifyDisconnect()
{
MOZ_ASSERT(NS_IsMainThread());
mConnectionStatus = SOCKET_DISCONNECTED;
mConnectDelayMs = CalculateConnectDelayMs();
mConnectTimestamp = 0;
@ -234,8 +224,6 @@ SocketBase::NotifyDisconnect()
uint32_t
SocketBase::CalculateConnectDelayMs() const
{
MOZ_ASSERT(NS_IsMainThread());
uint32_t connectDelayMs = mConnectDelayMs;
if (mConnectTimestamp && (PR_IntervalNow()-mConnectTimestamp) > connectDelayMs) {
@ -272,12 +260,31 @@ SocketBase::SetConnectionStatus(SocketConnectionStatus aConnectionStatus)
// SocketIOBase
//
SocketIOBase::SocketIOBase()
{ }
SocketIOBase::SocketIOBase(nsIThread* aConsumerThread)
: mConsumerThread(aConsumerThread)
{
MOZ_ASSERT(mConsumerThread);
}
SocketIOBase::~SocketIOBase()
{ }
nsIThread*
SocketIOBase::GetConsumerThread() const
{
return mConsumerThread;
}
bool
SocketIOBase::IsConsumerThread() const
{
nsIThread* thread = nullptr;
if (NS_FAILED(NS_GetCurrentThread(&thread))) {
return false;
}
return thread == GetConsumerThread();
}
//
// SocketIOEventRunnable
//
@ -291,10 +298,10 @@ SocketIOEventRunnable::SocketIOEventRunnable(SocketIOBase* aIO,
NS_METHOD
SocketIOEventRunnable::Run()
{
MOZ_ASSERT(NS_IsMainThread());
SocketIOBase* io = SocketIORunnable<SocketIOBase>::GetIO();
MOZ_ASSERT(io->IsConsumerThread());
if (NS_WARN_IF(io->IsShutdownOnMainThread())) {
// Since we've already explicitly closed and the close
// happened before this, this isn't really an error.
@ -327,10 +334,10 @@ SocketIORequestClosingRunnable::SocketIORequestClosingRunnable(
NS_METHOD
SocketIORequestClosingRunnable::Run()
{
MOZ_ASSERT(NS_IsMainThread());
SocketIOBase* io = SocketIORunnable<SocketIOBase>::GetIO();
MOZ_ASSERT(io->IsConsumerThread());
if (NS_WARN_IF(io->IsShutdownOnMainThread())) {
// Since we've already explicitly closed and the close
// happened before this, this isn't really an error.
@ -373,19 +380,19 @@ SocketIOShutdownTask::SocketIOShutdownTask(SocketIOBase* aIO)
void
SocketIOShutdownTask::Run()
{
MOZ_ASSERT(!NS_IsMainThread());
SocketIOBase* io = SocketIOTask<SocketIOBase>::GetIO();
MOZ_ASSERT(!io->IsConsumerThread());
MOZ_ASSERT(!io->IsShutdownOnIOThread());
// At this point, there should be no new events on the I/O thread
// after this one with the possible exception of an accept task,
// which ShutdownOnIOThread will cancel for us. We are now fully
// shut down, so we can send a message to the main thread to delete
// |io| safely knowing that it's not reference any longer.
MOZ_ASSERT(!io->IsShutdownOnIOThread());
io->ShutdownOnIOThread();
NS_DispatchToMainThread(new SocketIODeleteInstanceRunnable(io));
io->GetConsumerThread()->Dispatch(new SocketIODeleteInstanceRunnable(io),
NS_DISPATCH_NORMAL);
}
}

View File

@ -353,8 +353,24 @@ public:
*/
virtual void ShutdownOnMainThread() = 0;
/**
* Returns the consumer thread.
*
* @return A pointer to the consumer thread.
*/
nsIThread* GetConsumerThread() const;
/**
* @return True if the current thread is thre consumer thread, or false
* otherwise.
*/
bool IsConsumerThread() const;
protected:
SocketIOBase();
SocketIOBase(nsIThread* nsConsumerThread);
private:
nsCOMPtr<nsIThread> mConsumerThread;
};
//

View File

@ -29,10 +29,12 @@ public:
class DelayedConnectTask;
class ReceiveRunnable;
StreamSocketIO(MessageLoop* mIOLoop,
StreamSocketIO(nsIThread* aConsumerThread,
MessageLoop* mIOLoop,
StreamSocket* aStreamSocket,
UnixSocketConnector* aConnector);
StreamSocketIO(MessageLoop* mIOLoop, int aFd,
StreamSocketIO(nsIThread* aConsumerThread,
MessageLoop* mIOLoop, int aFd,
ConnectionStatus aConnectionStatus,
StreamSocket* aStreamSocket,
UnixSocketConnector* aConnector);
@ -133,10 +135,12 @@ private:
nsAutoPtr<UnixSocketRawData> mBuffer;
};
StreamSocketIO::StreamSocketIO(MessageLoop* mIOLoop,
StreamSocketIO::StreamSocketIO(nsIThread* aConsumerThread,
MessageLoop* aIOLoop,
StreamSocket* aStreamSocket,
UnixSocketConnector* aConnector)
: UnixSocketWatcher(mIOLoop)
: UnixSocketWatcher(aIOLoop)
, ConnectionOrientedSocketIO(aConsumerThread)
, mStreamSocket(aStreamSocket)
, mConnector(aConnector)
, mShuttingDownOnIOThread(false)
@ -147,11 +151,13 @@ StreamSocketIO::StreamSocketIO(MessageLoop* mIOLoop,
MOZ_ASSERT(mConnector);
}
StreamSocketIO::StreamSocketIO(MessageLoop* mIOLoop, int aFd,
ConnectionStatus aConnectionStatus,
StreamSocketIO::StreamSocketIO(nsIThread* aConsumerThread,
MessageLoop* aIOLoop,
int aFd, ConnectionStatus aConnectionStatus,
StreamSocket* aStreamSocket,
UnixSocketConnector* aConnector)
: UnixSocketWatcher(mIOLoop, aFd, aConnectionStatus)
: UnixSocketWatcher(aIOLoop, aFd, aConnectionStatus)
, ConnectionOrientedSocketIO(aConsumerThread)
, mStreamSocket(aStreamSocket)
, mConnector(aConnector)
, mShuttingDownOnIOThread(false)
@ -164,7 +170,7 @@ StreamSocketIO::StreamSocketIO(MessageLoop* mIOLoop, int aFd,
StreamSocketIO::~StreamSocketIO()
{
MOZ_ASSERT(NS_IsMainThread());
MOZ_ASSERT(IsConsumerThread());
MOZ_ASSERT(IsShutdownOnMainThread());
}
@ -183,7 +189,7 @@ StreamSocketIO::GetDataSocket()
void
StreamSocketIO::SetDelayedConnectTask(CancelableTask* aTask)
{
MOZ_ASSERT(NS_IsMainThread());
MOZ_ASSERT(IsConsumerThread());
mDelayedConnectTask = aTask;
}
@ -191,7 +197,7 @@ StreamSocketIO::SetDelayedConnectTask(CancelableTask* aTask)
void
StreamSocketIO::ClearDelayedConnectTask()
{
MOZ_ASSERT(NS_IsMainThread());
MOZ_ASSERT(IsConsumerThread());
mDelayedConnectTask = nullptr;
}
@ -199,7 +205,7 @@ StreamSocketIO::ClearDelayedConnectTask()
void
StreamSocketIO::CancelDelayedConnectTask()
{
MOZ_ASSERT(NS_IsMainThread());
MOZ_ASSERT(IsConsumerThread());
if (!mDelayedConnectTask) {
return;
@ -246,8 +252,9 @@ StreamSocketIO::OnConnected()
MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop());
MOZ_ASSERT(GetConnectionStatus() == SOCKET_IS_CONNECTED);
NS_DispatchToMainThread(
new SocketIOEventRunnable(this, SocketIOEventRunnable::CONNECT_SUCCESS));
GetConsumerThread()->Dispatch(
new SocketIOEventRunnable(this, SocketIOEventRunnable::CONNECT_SUCCESS),
NS_DISPATCH_NORMAL);
AddWatchers(READ_WATCHER, true);
if (HasPendingData()) {
@ -313,8 +320,9 @@ StreamSocketIO::FireSocketError()
Close();
// Tell the main thread we've errored
NS_DispatchToMainThread(
new SocketIOEventRunnable(this, SocketIOEventRunnable::CONNECT_ERROR));
GetConsumerThread()->Dispatch(
new SocketIOEventRunnable(this, SocketIOEventRunnable::CONNECT_ERROR),
NS_DISPATCH_NORMAL);
}
// |ConnectionOrientedSocketIO|
@ -334,8 +342,9 @@ StreamSocketIO::Accept(int aFd,
memcpy(&mAddress, aAddress, mAddressLength);
// Signal success
NS_DispatchToMainThread(
new SocketIOEventRunnable(this, SocketIOEventRunnable::CONNECT_SUCCESS));
GetConsumerThread()->Dispatch(
new SocketIOEventRunnable(this, SocketIOEventRunnable::CONNECT_SUCCESS),
NS_DISPATCH_NORMAL);
AddWatchers(READ_WATCHER, true);
if (HasPendingData()) {
@ -375,10 +384,10 @@ public:
NS_IMETHOD Run() override
{
MOZ_ASSERT(NS_IsMainThread());
StreamSocketIO* io = SocketIORunnable<StreamSocketIO>::GetIO();
MOZ_ASSERT(io->IsConsumerThread());
if (NS_WARN_IF(io->IsShutdownOnMainThread())) {
// Since we've already explicitly closed and the close
// happened before this, this isn't really an error.
@ -400,7 +409,8 @@ private:
void
StreamSocketIO::ConsumeBuffer()
{
NS_DispatchToMainThread(new ReceiveRunnable(this, mBuffer.forget()));
GetConsumerThread()->Dispatch(new ReceiveRunnable(this, mBuffer.forget()),
NS_DISPATCH_NORMAL);
}
void
@ -420,7 +430,7 @@ StreamSocketIO::GetSocketBase()
bool
StreamSocketIO::IsShutdownOnMainThread() const
{
MOZ_ASSERT(NS_IsMainThread());
MOZ_ASSERT(IsConsumerThread());
return mStreamSocket == nullptr;
}
@ -434,7 +444,7 @@ StreamSocketIO::IsShutdownOnIOThread() const
void
StreamSocketIO::ShutdownOnMainThread()
{
MOZ_ASSERT(NS_IsMainThread());
MOZ_ASSERT(IsConsumerThread());
MOZ_ASSERT(!IsShutdownOnMainThread());
mStreamSocket = nullptr;
@ -443,7 +453,7 @@ StreamSocketIO::ShutdownOnMainThread()
void
StreamSocketIO::ShutdownOnIOThread()
{
MOZ_ASSERT(!NS_IsMainThread());
MOZ_ASSERT(!IsConsumerThread());
MOZ_ASSERT(!mShuttingDownOnIOThread);
Close(); // will also remove fd from I/O loop
@ -464,7 +474,7 @@ public:
void Run() override
{
MOZ_ASSERT(!NS_IsMainThread());
MOZ_ASSERT(!GetIO()->IsConsumerThread());
MOZ_ASSERT(!IsCanceled());
GetIO()->Connect();
@ -481,7 +491,7 @@ public:
void Run() override
{
MOZ_ASSERT(NS_IsMainThread());
MOZ_ASSERT(GetIO()->IsConsumerThread());
if (IsCanceled()) {
return;
@ -517,19 +527,16 @@ StreamSocket::~StreamSocket()
void
StreamSocket::ReceiveSocketData(nsAutoPtr<UnixSocketBuffer>& aBuffer)
{
MOZ_ASSERT(NS_IsMainThread());
mConsumer->ReceiveSocketData(mIndex, aBuffer);
}
nsresult
StreamSocket::Connect(UnixSocketConnector* aConnector, int aDelayMs,
MessageLoop* aIOLoop)
nsIThread* aConsumerThread, MessageLoop* aIOLoop)
{
MOZ_ASSERT(NS_IsMainThread());
MOZ_ASSERT(!mIO);
mIO = new StreamSocketIO(aIOLoop, this, aConnector);
mIO = new StreamSocketIO(aConsumerThread, aIOLoop, this, aConnector);
SetConnectionStatus(SOCKET_CONNECTING);
if (aDelayMs > 0) {
@ -540,29 +547,36 @@ StreamSocket::Connect(UnixSocketConnector* aConnector, int aDelayMs,
} else {
aIOLoop->PostTask(FROM_HERE, new StreamSocketIO::ConnectTask(mIO));
}
return NS_OK;
}
nsresult
StreamSocket::Connect(UnixSocketConnector* aConnector, int aDelayMs)
{
return Connect(aConnector, aDelayMs, XRE_GetIOMessageLoop());
nsIThread* consumerThread = nullptr;
nsresult rv = NS_GetCurrentThread(&consumerThread);
if (NS_FAILED(rv)) {
return rv;
}
return Connect(aConnector, aDelayMs, consumerThread, XRE_GetIOMessageLoop());
}
// |ConnectionOrientedSocket|
nsresult
StreamSocket::PrepareAccept(UnixSocketConnector* aConnector,
nsIThread* aConsumerThread,
MessageLoop* aIOLoop,
ConnectionOrientedSocketIO*& aIO)
{
MOZ_ASSERT(NS_IsMainThread());
MOZ_ASSERT(!mIO);
MOZ_ASSERT(aConnector);
SetConnectionStatus(SOCKET_CONNECTING);
mIO = new StreamSocketIO(aIOLoop,
mIO = new StreamSocketIO(aConsumerThread, aIOLoop,
-1, UnixSocketWatcher::SOCKET_IS_CONNECTING,
this, aConnector);
aIO = mIO;
@ -575,10 +589,10 @@ StreamSocket::PrepareAccept(UnixSocketConnector* aConnector,
void
StreamSocket::SendSocketData(UnixSocketIOBuffer* aBuffer)
{
MOZ_ASSERT(NS_IsMainThread());
MOZ_ASSERT(mIO);
MOZ_ASSERT(mIO->IsConsumerThread());
MOZ_ASSERT(!mIO->IsShutdownOnMainThread());
mIO->GetIOLoop()->PostTask(
FROM_HERE,
new SocketIOSendTask<StreamSocketIO, UnixSocketIOBuffer>(mIO, aBuffer));
@ -589,8 +603,8 @@ StreamSocket::SendSocketData(UnixSocketIOBuffer* aBuffer)
void
StreamSocket::Close()
{
MOZ_ASSERT(NS_IsMainThread());
MOZ_ASSERT(mIO);
MOZ_ASSERT(mIO->IsConsumerThread());
mIO->CancelDelayedConnectTask();
@ -607,24 +621,18 @@ StreamSocket::Close()
void
StreamSocket::OnConnectSuccess()
{
MOZ_ASSERT(NS_IsMainThread());
mConsumer->OnConnectSuccess(mIndex);
}
void
StreamSocket::OnConnectError()
{
MOZ_ASSERT(NS_IsMainThread());
mConsumer->OnConnectError(mIndex);
}
void
StreamSocket::OnDisconnect()
{
MOZ_ASSERT(NS_IsMainThread());
mConsumer->OnDisconnect(mIndex);
}

View File

@ -42,11 +42,12 @@ public:
*
* @param aConnector Connector object for socket type specific functions
* @param aDelayMs Time delay in milliseconds.
* @param aConsumerThread The socket's consumer thread.
* @param aIOLoop The socket's I/O thread.
* @return NS_OK on success, or an XPCOM error code otherwise.
*/
nsresult Connect(UnixSocketConnector* aConnector, int aDelayMs,
MessageLoop* aIOLoop);
nsIThread* aConsumerThread, MessageLoop* aIOLoop);
/**
* Starts a task on the socket that will try to connect to a socket in a
@ -62,6 +63,7 @@ public:
//
nsresult PrepareAccept(UnixSocketConnector* aConnector,
nsIThread* aConsumerThread,
MessageLoop* aIOLoop,
ConnectionOrientedSocketIO*& aIO) override;