Bug 1172479: Replace |nsIThread| by |MessageLoop| in socket I/O code, r=kmachulis

Dispatching events via |nsIThread| doesn't work with worker threads. This
patch replaces all uses of |nsIThread| in the socket code by equivalent
uses of |MessageLoop|.
This commit is contained in:
Thomas Zimmermann 2015-06-09 09:50:10 +02:00
parent cf4af59c89
commit c44f1139e2
16 changed files with 188 additions and 268 deletions

View File

@ -5,18 +5,14 @@
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
#include "BluetoothSocket.h"
#include <fcntl.h>
#include <sys/socket.h>
#include "base/message_loop.h"
#include "BluetoothSocketObserver.h"
#include "BluetoothInterface.h"
#include "BluetoothUtils.h"
#include "mozilla/ipc/UnixSocketWatcher.h"
#include "mozilla/FileUtils.h"
#include "mozilla/RefPtr.h"
#include "nsThreadUtils.h"
#include "nsXULAppAPI.h"
using namespace mozilla::ipc;
@ -74,11 +70,11 @@ public:
SOCKET_IS_CONNECTED
};
DroidSocketImpl(nsIThread* aConsumerThread,
DroidSocketImpl(MessageLoop* aConsumerLoop,
MessageLoop* aIOLoop,
BluetoothSocket* aConsumer)
: ipc::UnixFdWatcher(aIOLoop)
, DataSocketIO(aConsumerThread)
, DataSocketIO(aConsumerLoop)
, mConsumer(aConsumer)
, mShuttingDownOnIOThread(false)
, mConnectionStatus(SOCKET_IS_DISCONNECTED)
@ -173,7 +169,7 @@ public:
}
private:
class ReceiveRunnable;
class ReceiveTask;
/**
* libevent triggered functions that reads data from socket when available and
@ -319,9 +315,8 @@ DroidSocketImpl::Accept(int aFd)
SetFd(aFd);
mConnectionStatus = SOCKET_IS_CONNECTED;
GetConsumerThread()->Dispatch(
new SocketIOEventRunnable(this, SocketIOEventRunnable::CONNECT_SUCCESS),
NS_DISPATCH_NORMAL);
GetConsumerThread()->PostTask(
FROM_HERE, new SocketEventTask(this, SocketEventTask::CONNECT_SUCCESS));
AddWatchers(READ_WATCHER, true);
if (HasPendingData()) {
@ -426,15 +421,15 @@ private:
DroidSocketImpl* mImpl;
};
class AcceptRunnable final : public SocketIORunnable<DroidSocketImpl>
class InvokeAcceptTask final : public SocketTask<DroidSocketImpl>
{
public:
AcceptRunnable(DroidSocketImpl* aImpl, int aFd)
: SocketIORunnable<DroidSocketImpl>(aImpl)
, mFd(aFd)
InvokeAcceptTask(DroidSocketImpl* aImpl, int aFd)
: SocketTask<DroidSocketImpl>(aImpl)
, mFd(aFd)
{ }
NS_IMETHOD Run() override
void Run() override
{
MOZ_ASSERT(GetIO()->IsConsumerThread());
MOZ_ASSERT(sBluetoothSocketInterface);
@ -443,8 +438,6 @@ public:
GetIO()->mConsumer->SetCurrentResultHandler(res);
sBluetoothSocketInterface->Accept(mFd, res);
return NS_OK;
}
private:
@ -462,8 +455,7 @@ DroidSocketImpl::OnSocketCanAcceptWithoutBlocking(int aFd)
*/
RemoveWatchers(READ_WATCHER);
GetConsumerThread()->Dispatch(new AcceptRunnable(this, aFd),
NS_DISPATCH_NORMAL);
GetConsumerThread()->PostTask(FROM_HERE, new InvokeAcceptTask(this, aFd));
}
void
@ -507,9 +499,8 @@ DroidSocketImpl::OnSocketCanConnectWithoutBlocking(int aFd)
mConnectionStatus = SOCKET_IS_CONNECTED;
GetConsumerThread()->Dispatch(
new SocketIOEventRunnable(this, SocketIOEventRunnable::CONNECT_SUCCESS),
NS_DISPATCH_NORMAL);
GetConsumerThread()->PostTask(
FROM_HERE, new SocketEventTask(this, SocketEventTask::CONNECT_SUCCESS));
AddWatchers(READ_WATCHER, true);
if (HasPendingData()) {
@ -537,33 +528,30 @@ DroidSocketImpl::QueryReceiveBuffer(
* |ReceiveRunnable| transfers data received on the I/O thread
* to an instance of |BluetoothSocket| on the consumer thread.
*/
class DroidSocketImpl::ReceiveRunnable final
: public SocketIORunnable<DroidSocketImpl>
class DroidSocketImpl::ReceiveTask final : public SocketTask<DroidSocketImpl>
{
public:
ReceiveRunnable(DroidSocketImpl* aIO, UnixSocketBuffer* aBuffer)
: SocketIORunnable<DroidSocketImpl>(aIO)
ReceiveTask(DroidSocketImpl* aIO, UnixSocketBuffer* aBuffer)
: SocketTask<DroidSocketImpl>(aIO)
, mBuffer(aBuffer)
{ }
NS_IMETHOD Run() override
void Run() override
{
DroidSocketImpl* io = SocketIORunnable<DroidSocketImpl>::GetIO();
DroidSocketImpl* io = SocketTask<DroidSocketImpl>::GetIO();
MOZ_ASSERT(io->IsConsumerThread());
if (NS_WARN_IF(io->IsShutdownOnConsumerThread())) {
// Since we've already explicitly closed and the close
// happened before this, this isn't really an error.
return NS_OK;
return;
}
BluetoothSocket* bluetoothSocket = io->GetBluetoothSocket();
MOZ_ASSERT(bluetoothSocket);
bluetoothSocket->ReceiveSocketData(mBuffer);
return NS_OK;
}
private:
@ -573,8 +561,8 @@ private:
void
DroidSocketImpl::ConsumeBuffer()
{
GetConsumerThread()->Dispatch(new ReceiveRunnable(this, mBuffer.forget()),
NS_DISPATCH_NORMAL);
GetConsumerThread()->PostTask(FROM_HERE,
new ReceiveTask(this, mBuffer.forget()));
}
void
@ -651,14 +639,14 @@ BluetoothSocket::Connect(const nsAString& aDeviceAddress,
BluetoothSocketType aType,
int aChannel,
bool aAuth, bool aEncrypt,
nsIThread* aConsumerThread,
MessageLoop* aConsumerLoop,
MessageLoop* aIOLoop)
{
MOZ_ASSERT(!mImpl);
SetConnectionStatus(SOCKET_CONNECTING);
mImpl = new DroidSocketImpl(aConsumerThread, aIOLoop, this);
mImpl = new DroidSocketImpl(aConsumerLoop, aIOLoop, this);
BluetoothSocketResultHandler* res = new ConnectSocketResultHandler(mImpl);
SetCurrentResultHandler(res);
@ -678,14 +666,8 @@ BluetoothSocket::Connect(const nsAString& aDeviceAddress,
int aChannel,
bool aAuth, bool aEncrypt)
{
nsIThread* consumerThread = nullptr;
nsresult rv = NS_GetCurrentThread(&consumerThread);
if (NS_FAILED(rv)) {
return rv;
}
return Connect(aDeviceAddress, aServiceUuid, aType, aChannel, aAuth,
aEncrypt, consumerThread, XRE_GetIOMessageLoop());
aEncrypt, MessageLoop::current(), XRE_GetIOMessageLoop());
}
class ListenResultHandler final : public BluetoothSocketResultHandler
@ -721,14 +703,14 @@ BluetoothSocket::Listen(const nsAString& aServiceName,
BluetoothSocketType aType,
int aChannel,
bool aAuth, bool aEncrypt,
nsIThread* aConsumerThread,
MessageLoop* aConsumerLoop,
MessageLoop* aIOLoop)
{
MOZ_ASSERT(!mImpl);
SetConnectionStatus(SOCKET_LISTENING);
mImpl = new DroidSocketImpl(aConsumerThread, aIOLoop, this);
mImpl = new DroidSocketImpl(aConsumerLoop, aIOLoop, this);
BluetoothSocketResultHandler* res = new ListenResultHandler(mImpl);
SetCurrentResultHandler(res);
@ -748,14 +730,8 @@ BluetoothSocket::Listen(const nsAString& aServiceName,
int aChannel,
bool aAuth, bool aEncrypt)
{
nsIThread* consumerThread = nullptr;
nsresult rv = NS_GetCurrentThread(&consumerThread);
if (NS_FAILED(rv)) {
return rv;
}
return Listen(aServiceName, aServiceUuid, aType, aChannel, aAuth, aEncrypt,
consumerThread, XRE_GetIOMessageLoop());
MessageLoop::current(), XRE_GetIOMessageLoop());
}
void

View File

@ -11,7 +11,6 @@
#include "mozilla/ipc/DataSocket.h"
class MessageLoop;
class nsIThread;
BEGIN_BLUETOOTH_NAMESPACE
@ -29,7 +28,7 @@ public:
BluetoothSocketType aType,
int aChannel,
bool aAuth, bool aEncrypt,
nsIThread* aConsumerThread,
MessageLoop* aConsumerLoop,
MessageLoop* aIOLoop);
nsresult Connect(const nsAString& aDeviceAddress,
@ -43,7 +42,7 @@ public:
BluetoothSocketType aType,
int aChannel,
bool aAuth, bool aEncrypt,
nsIThread* aConsumerThread,
MessageLoop* aConsumerLoop,
MessageLoop* aIOLoop);
nsresult Listen(const nsAString& aServiceName,

View File

@ -8,9 +8,7 @@
#include <fcntl.h>
#include "BluetoothSocketObserver.h"
#include "BluetoothUnixSocketConnector.h"
#include "mozilla/unused.h"
#include "nsTArray.h"
#include "nsThreadUtils.h"
#include "mozilla/RefPtr.h"
#include "nsXULAppAPI.h"
using namespace mozilla::ipc;
@ -28,7 +26,7 @@ class BluetoothSocket::BluetoothSocketIO final
, public DataSocketIO
{
public:
BluetoothSocketIO(nsIThread* aConsumerThread,
BluetoothSocketIO(MessageLoop* aConsumerLoop,
MessageLoop* aIOLoop,
BluetoothSocket* aConsumer,
UnixSocketConnector* aConnector);
@ -90,7 +88,7 @@ public:
void ShutdownOnIOThread() override;
private:
class ReceiveRunnable;
class ReceiveTask;
void FireSocketError();
@ -134,12 +132,12 @@ private:
};
BluetoothSocket::BluetoothSocketIO::BluetoothSocketIO(
nsIThread* aConsumerThread,
MessageLoop* aConsumerLoop,
MessageLoop* aIOLoop,
BluetoothSocket* aConsumer,
UnixSocketConnector* aConnector)
: UnixSocketWatcher(aIOLoop)
, DataSocketIO(aConsumerThread)
, DataSocketIO(aConsumerLoop)
, mConsumer(aConsumer)
, mConnector(aConnector)
, mShuttingDownOnIOThread(false)
@ -281,9 +279,8 @@ BluetoothSocket::BluetoothSocketIO::OnConnected()
MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop());
MOZ_ASSERT(GetConnectionStatus() == SOCKET_IS_CONNECTED);
GetConsumerThread()->Dispatch(
new SocketIOEventRunnable(this, SocketIOEventRunnable::CONNECT_SUCCESS),
NS_DISPATCH_NORMAL);
GetConsumerThread()->PostTask(
FROM_HERE, new SocketEventTask(this, SocketEventTask::CONNECT_SUCCESS));
AddWatchers(READ_WATCHER, true);
if (HasPendingData()) {
@ -331,9 +328,8 @@ BluetoothSocket::BluetoothSocketIO::OnSocketCanAcceptWithoutBlocking()
Close();
SetSocket(fd, SOCKET_IS_CONNECTED);
GetConsumerThread()->Dispatch(
new SocketIOEventRunnable(this, SocketIOEventRunnable::CONNECT_SUCCESS),
NS_DISPATCH_NORMAL);
GetConsumerThread()->PostTask(
FROM_HERE, new SocketEventTask(this, SocketEventTask::CONNECT_SUCCESS));
AddWatchers(READ_WATCHER, true);
if (HasPendingData()) {
@ -382,10 +378,8 @@ BluetoothSocket::BluetoothSocketIO::FireSocketError()
Close();
// Tell the consumer thread we've errored
GetConsumerThread()->Dispatch(
new SocketIOEventRunnable(this, SocketIOEventRunnable::CONNECT_ERROR),
NS_DISPATCH_NORMAL);
GetConsumerThread()->PostTask(
FROM_HERE, new SocketEventTask(this, SocketEventTask::CONNECT_ERROR));
}
// |DataSocketIO|
@ -405,36 +399,34 @@ BluetoothSocket::BluetoothSocketIO::QueryReceiveBuffer(
}
/**
* |ReceiveRunnable| transfers data received on the I/O thread
* |ReceiveTask| transfers data received on the I/O thread
* to an instance of |BluetoothSocket| on the consumer thread.
*/
class BluetoothSocket::BluetoothSocketIO::ReceiveRunnable final
: public SocketIORunnable<BluetoothSocketIO>
class BluetoothSocket::BluetoothSocketIO::ReceiveTask final
: public SocketTask<BluetoothSocketIO>
{
public:
ReceiveRunnable(BluetoothSocketIO* aIO, UnixSocketBuffer* aBuffer)
: SocketIORunnable<BluetoothSocketIO>(aIO)
ReceiveTask(BluetoothSocketIO* aIO, UnixSocketBuffer* aBuffer)
: SocketTask<BluetoothSocketIO>(aIO)
, mBuffer(aBuffer)
{ }
NS_IMETHOD Run() override
void Run() override
{
BluetoothSocketIO* io = SocketIORunnable<BluetoothSocketIO>::GetIO();
BluetoothSocketIO* io = SocketTask<BluetoothSocketIO>::GetIO();
MOZ_ASSERT(io->IsConsumerThread());
if (NS_WARN_IF(io->IsShutdownOnConsumerThread())) {
// Since we've already explicitly closed and the close
// happened before this, this isn't really an error.
return NS_OK;
return;
}
BluetoothSocket* bluetoothSocket = io->GetBluetoothSocket();
MOZ_ASSERT(bluetoothSocket);
bluetoothSocket->ReceiveSocketData(mBuffer);
return NS_OK;
}
private:
@ -444,8 +436,8 @@ private:
void
BluetoothSocket::BluetoothSocketIO::ConsumeBuffer()
{
GetConsumerThread()->Dispatch(new ReceiveRunnable(this, mBuffer.forget()),
NS_DISPATCH_NORMAL);
GetConsumerThread()->PostTask(FROM_HERE,
new ReceiveTask(this, mBuffer.forget()));
}
void
@ -650,14 +642,14 @@ BluetoothSocket::SendSocketData(const nsACString& aStr)
nsresult
BluetoothSocket::Connect(BluetoothUnixSocketConnector* aConnector,
int aDelayMs,
nsIThread* aConsumerThread, MessageLoop* aIOLoop)
MessageLoop* aConsumerLoop, MessageLoop* aIOLoop)
{
MOZ_ASSERT(aConnector);
MOZ_ASSERT(aConsumerThread);
MOZ_ASSERT(aConsumerLoop);
MOZ_ASSERT(aIOLoop);
MOZ_ASSERT(!mIO);
mIO = new BluetoothSocketIO(aConsumerThread, aIOLoop, this, aConnector);
mIO = new BluetoothSocketIO(aConsumerLoop, aIOLoop, this, aConnector);
SetConnectionStatus(SOCKET_CONNECTING);
if (aDelayMs > 0) {
@ -675,25 +667,20 @@ nsresult
BluetoothSocket::Connect(BluetoothUnixSocketConnector* aConnector,
int aDelayMs)
{
nsIThread* consumerThread = nullptr;
nsresult rv = NS_GetCurrentThread(&consumerThread);
if (NS_FAILED(rv)) {
return rv;
}
return Connect(aConnector, aDelayMs, consumerThread, XRE_GetIOMessageLoop());
return Connect(aConnector, aDelayMs, MessageLoop::current(),
XRE_GetIOMessageLoop());
}
nsresult
BluetoothSocket::Listen(BluetoothUnixSocketConnector* aConnector,
nsIThread* aConsumerThread, MessageLoop* aIOLoop)
MessageLoop* aConsumerLoop, MessageLoop* aIOLoop)
{
MOZ_ASSERT(aConnector);
MOZ_ASSERT(aConsumerThread);
MOZ_ASSERT(aConsumerLoop);
MOZ_ASSERT(aIOLoop);
MOZ_ASSERT(!mIO);
mIO = new BluetoothSocketIO(aConsumerThread, aIOLoop, this, aConnector);
mIO = new BluetoothSocketIO(aConsumerLoop, aIOLoop, this, aConnector);
SetConnectionStatus(SOCKET_LISTENING);
aIOLoop->PostTask(FROM_HERE, new ListenTask(mIO));
@ -704,13 +691,7 @@ BluetoothSocket::Listen(BluetoothUnixSocketConnector* aConnector,
nsresult
BluetoothSocket::Listen(BluetoothUnixSocketConnector* aConnector)
{
nsIThread* consumerThread = nullptr;
nsresult rv = NS_GetCurrentThread(&consumerThread);
if (NS_FAILED(rv)) {
return rv;
}
return Listen(aConnector, consumerThread, XRE_GetIOMessageLoop());
return Listen(aConnector, MessageLoop::current(), XRE_GetIOMessageLoop());
}
void

View File

@ -8,13 +8,10 @@
#define mozilla_dom_bluetooth_BluetoothSocket_h
#include "BluetoothCommon.h"
#include <stdlib.h>
#include "mozilla/ipc/DataSocket.h"
#include "mozilla/ipc/UnixSocketWatcher.h"
#include "mozilla/RefPtr.h"
#include "nsAutoPtr.h"
#include "nsString.h"
#include "nsThreadUtils.h"
class MessageLoop;
@ -66,12 +63,12 @@ public:
*
* @param aConnector Connector object for socket type specific functions
* @param aDelayMs Time delay in milli-seconds.
* @param aConsumerThread The socket's consumer thread.
* @param aConsumerLoop The socket's consumer thread.
* @param aIOLoop The socket's I/O thread.
* @return NS_OK on success, or an XPCOM error code otherwise.
*/
nsresult Connect(BluetoothUnixSocketConnector* aConnector, int aDelayMs,
nsIThread* aConsumerThread, MessageLoop* aIOLoop);
MessageLoop* aConsumerLoop, MessageLoop* aIOLoop);
/**
* Starts a task on the socket that will try to connect to a socket in a
@ -89,12 +86,12 @@ public:
* non-blocking manner.
*
* @param aConnector Connector object for socket type specific functions
* @param aConsumerThread The socket's consumer thread.
* @param aConsumerLoop The socket's consumer thread.
* @param aIOLoop The socket's I/O thread.
* @return NS_OK on success, or an XPCOM error code otherwise.
*/
nsresult Listen(BluetoothUnixSocketConnector* aConnector,
nsIThread* aConsumerThread, MessageLoop* aIOLoop);
MessageLoop* aConsumerLoop, MessageLoop* aIOLoop);
/**
* Starts a task on the socket that will try to accept a new connection in a

View File

@ -214,7 +214,7 @@ BluetoothDaemonPDUConsumer::~BluetoothDaemonPDUConsumer()
class BluetoothDaemonConnectionIO final : public ConnectionOrientedSocketIO
{
public:
BluetoothDaemonConnectionIO(nsIThread* aConsumerThread,
BluetoothDaemonConnectionIO(MessageLoop* aConsumerLoop,
MessageLoop* aIOLoop,
int aFd, ConnectionStatus aConnectionStatus,
UnixSocketConnector* aConnector,
@ -247,14 +247,14 @@ private:
};
BluetoothDaemonConnectionIO::BluetoothDaemonConnectionIO(
nsIThread* aConsumerThread,
MessageLoop* aConsumerLoop,
MessageLoop* aIOLoop,
int aFd,
ConnectionStatus aConnectionStatus,
UnixSocketConnector* aConnector,
BluetoothDaemonConnection* aConnection,
BluetoothDaemonPDUConsumer* aConsumer)
: ConnectionOrientedSocketIO(aConsumerThread,
: ConnectionOrientedSocketIO(aConsumerLoop,
aIOLoop,
aFd,
aConnectionStatus,
@ -361,7 +361,7 @@ BluetoothDaemonConnection::~BluetoothDaemonConnection()
nsresult
BluetoothDaemonConnection::PrepareAccept(UnixSocketConnector* aConnector,
nsIThread* aConsumerThread,
MessageLoop* aConsumerLoop,
MessageLoop* aIOLoop,
ConnectionOrientedSocketIO*& aIO)
{
@ -370,7 +370,7 @@ BluetoothDaemonConnection::PrepareAccept(UnixSocketConnector* aConnector,
SetConnectionStatus(SOCKET_CONNECTING);
mIO = new BluetoothDaemonConnectionIO(
aConsumerThread, aIOLoop, -1, UnixSocketWatcher::SOCKET_IS_CONNECTING,
aConsumerLoop, aIOLoop, -1, UnixSocketWatcher::SOCKET_IS_CONNECTING,
aConnector, this, mPDUConsumer);
aIO = mIO;

View File

@ -126,7 +126,7 @@ public:
//
nsresult PrepareAccept(UnixSocketConnector* aConnector,
nsIThread* aConsumerThread,
MessageLoop* aConsumerLoop,
MessageLoop* aIOLoop,
ConnectionOrientedSocketIO*& aIO) override;

View File

@ -15,12 +15,11 @@ namespace ipc {
//
ConnectionOrientedSocketIO::ConnectionOrientedSocketIO(
nsIThread* aConsumerThread,
MessageLoop* aConsumerLoop,
MessageLoop* aIOLoop,
int aFd,
ConnectionStatus aConnectionStatus,
int aFd, ConnectionStatus aConnectionStatus,
UnixSocketConnector* aConnector)
: DataSocketIO(aConsumerThread)
: DataSocketIO(aConsumerLoop)
, UnixSocketWatcher(aIOLoop, aFd, aConnectionStatus)
, mConnector(aConnector)
, mPeerAddressLength(0)
@ -29,10 +28,10 @@ ConnectionOrientedSocketIO::ConnectionOrientedSocketIO(
}
ConnectionOrientedSocketIO::ConnectionOrientedSocketIO(
nsIThread* aConsumerThread,
MessageLoop* aConsumerLoop,
MessageLoop* aIOLoop,
UnixSocketConnector* aConnector)
: DataSocketIO(aConsumerThread)
: DataSocketIO(aConsumerLoop)
, UnixSocketWatcher(aIOLoop)
, mConnector(aConnector)
, mPeerAddressLength(0)
@ -79,9 +78,8 @@ ConnectionOrientedSocketIO::Connect()
fd);
if (NS_FAILED(rv)) {
// Tell the consumer thread we've errored
GetConsumerThread()->Dispatch(
new SocketIOEventRunnable(this, SocketIOEventRunnable::CONNECT_ERROR),
NS_DISPATCH_NORMAL);
GetConsumerThread()->PostTask(
FROM_HERE, new SocketEventTask(this, SocketEventTask::CONNECT_ERROR));
return NS_ERROR_FAILURE;
}
@ -147,9 +145,8 @@ ConnectionOrientedSocketIO::OnConnected()
MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop());
MOZ_ASSERT(GetConnectionStatus() == SOCKET_IS_CONNECTED);
GetConsumerThread()->Dispatch(
new SocketIOEventRunnable(this, SocketIOEventRunnable::CONNECT_SUCCESS),
NS_DISPATCH_NORMAL);
GetConsumerThread()->PostTask(
FROM_HERE, new SocketEventTask(this, SocketEventTask::CONNECT_SUCCESS));
AddWatchers(READ_WATCHER, true);
if (HasPendingData()) {
@ -176,9 +173,8 @@ ConnectionOrientedSocketIO::OnError(const char* aFunction, int aErrno)
Close();
// Tell the consumer thread we've errored
GetConsumerThread()->Dispatch(
new SocketIOEventRunnable(this, SocketIOEventRunnable::CONNECT_ERROR),
NS_DISPATCH_NORMAL);
GetConsumerThread()->PostTask(
FROM_HERE, new SocketEventTask(this, SocketEventTask::CONNECT_ERROR));
}
//

View File

@ -53,13 +53,13 @@ protected:
/**
* Constructs an instance of |ConnectionOrientedSocketIO|
*
* @param aConsumerThread The socket's consumer thread.
* @param aConsumerLoop The socket's consumer thread.
* @param aIOLoop The socket's I/O loop.
* @param aFd The socket file descriptor.
* @param aConnectionStatus The connection status for |aFd|.
* @param aConnector Connector object for socket-type-specific methods.
*/
ConnectionOrientedSocketIO(nsIThread* aConsumerThread,
ConnectionOrientedSocketIO(MessageLoop* aConsumerLoop,
MessageLoop* aIOLoop,
int aFd, ConnectionStatus aConnectionStatus,
UnixSocketConnector* aConnector);
@ -67,11 +67,11 @@ protected:
/**
* Constructs an instance of |ConnectionOrientedSocketIO|
*
* @param aConsumerThread The socket's consumer thread.
* @param aConsumerLoop The socket's consumer thread.
* @param aIOLoop The socket's I/O loop.
* @param aConnector Connector object for socket-type-specific methods.
*/
ConnectionOrientedSocketIO(nsIThread* aConsumerThread,
ConnectionOrientedSocketIO(MessageLoop* aConsumerLoop,
MessageLoop* aIOLoop,
UnixSocketConnector* aConnector);
@ -101,13 +101,13 @@ public:
*
* @param aConnector The new connector object, owned by the
* connection-oriented socket.
* @param aConsumerThread The socket's consumer thread.
* @param aConsumerLoop The socket's consumer thread.
* @param aIOLoop The socket's I/O thread.
* @param[out] aIO, Returns an instance of |ConnectionOrientedSocketIO|.
* @return NS_OK on success, or an XPCOM error code otherwise.
*/
virtual nsresult PrepareAccept(UnixSocketConnector* aConnector,
nsIThread* aConsumerThread,
MessageLoop* aConsumerLoop,
MessageLoop* aIOLoop,
ConnectionOrientedSocketIO*& aIO) = 0;

View File

@ -50,8 +50,8 @@ DataSocketIO::ReceiveData(int aFd)
nsresult rv = QueryReceiveBuffer(&incoming);
if (NS_FAILED(rv)) {
/* an error occured */
GetConsumerThread()->Dispatch(new SocketIORequestClosingRunnable(this),
NS_DISPATCH_NORMAL);
GetConsumerThread()->PostTask(FROM_HERE,
new SocketRequestClosingTask(this));
return -1;
}
@ -59,14 +59,14 @@ DataSocketIO::ReceiveData(int aFd)
if (res < 0) {
/* an I/O error occured */
DiscardBuffer();
GetConsumerThread()->Dispatch(new SocketIORequestClosingRunnable(this),
NS_DISPATCH_NORMAL);
GetConsumerThread()->PostTask(FROM_HERE,
new SocketRequestClosingTask(this));
return -1;
} else if (!res) {
/* EOF or peer shut down sending */
DiscardBuffer();
GetConsumerThread()->Dispatch(new SocketIORequestClosingRunnable(this),
NS_DISPATCH_NORMAL);
GetConsumerThread()->PostTask(FROM_HERE,
new SocketRequestClosingTask(this));
return 0;
}
@ -93,8 +93,8 @@ DataSocketIO::SendPendingData(int aFd)
ssize_t res = outgoing->Send(aFd);
if (res < 0) {
/* an I/O error occured */
GetConsumerThread()->Dispatch(new SocketIORequestClosingRunnable(this),
NS_DISPATCH_NORMAL);
GetConsumerThread()->PostTask(FROM_HERE,
new SocketRequestClosingTask(this));
return NS_ERROR_FAILURE;
} else if (!res && outgoing->GetSize()) {
/* I/O is currently blocked; try again later */
@ -109,8 +109,8 @@ DataSocketIO::SendPendingData(int aFd)
return NS_OK;
}
DataSocketIO::DataSocketIO(nsIThread* aConsumerThread)
: SocketIOBase(aConsumerThread)
DataSocketIO::DataSocketIO(MessageLoop* aConsumerLoop)
: SocketIOBase(aConsumerLoop)
{ }
//

View File

@ -10,6 +10,7 @@
#define mozilla_ipc_datasocket_h
#include "mozilla/ipc/SocketBase.h"
#include "nsTArray.h"
namespace mozilla {
namespace ipc {
@ -90,7 +91,7 @@ public:
nsresult SendPendingData(int aFd);
protected:
DataSocketIO(nsIThread* aConsumerThread);
DataSocketIO(MessageLoop* aConsumerLoop);
private:
/**

View File

@ -27,7 +27,7 @@ class ListenSocketIO final
public:
class ListenTask;
ListenSocketIO(nsIThread* aConsumerThread,
ListenSocketIO(MessageLoop* aConsumerLoop,
MessageLoop* aIOLoop,
ListenSocket* aListenSocket,
UnixSocketConnector* aConnector);
@ -95,12 +95,12 @@ private:
ConnectionOrientedSocketIO* mCOSocketIO;
};
ListenSocketIO::ListenSocketIO(nsIThread* aConsumerThread,
ListenSocketIO::ListenSocketIO(MessageLoop* aConsumerLoop,
MessageLoop* aIOLoop,
ListenSocket* aListenSocket,
UnixSocketConnector* aConnector)
: UnixSocketWatcher(aIOLoop)
, SocketIOBase(aConsumerThread)
, SocketIOBase(aConsumerLoop)
, mListenSocket(aListenSocket)
, mConnector(aConnector)
, mShuttingDownOnIOThread(false)
@ -168,9 +168,8 @@ ListenSocketIO::OnListening()
AddWatchers(READ_WATCHER, true);
/* We signal a successful 'connection' to a local address for listening. */
GetConsumerThread()->Dispatch(
new SocketIOEventRunnable(this, SocketIOEventRunnable::CONNECT_SUCCESS),
NS_DISPATCH_NORMAL);
GetConsumerThread()->PostTask(
FROM_HERE, new SocketEventTask(this, SocketEventTask::CONNECT_SUCCESS));
}
void
@ -191,9 +190,8 @@ ListenSocketIO::FireSocketError()
Close();
// Tell the consumer thread we've errored
GetConsumerThread()->Dispatch(
new SocketIOEventRunnable(this, SocketIOEventRunnable::CONNECT_ERROR),
NS_DISPATCH_NORMAL);
GetConsumerThread()->PostTask(
FROM_HERE, new SocketEventTask(this, SocketEventTask::CONNECT_ERROR));
}
void
@ -311,13 +309,13 @@ ListenSocket::~ListenSocket()
nsresult
ListenSocket::Listen(UnixSocketConnector* aConnector,
nsIThread* aConsumerThread,
MessageLoop* aConsumerLoop,
MessageLoop* aIOLoop,
ConnectionOrientedSocket* aCOSocket)
{
MOZ_ASSERT(!mIO);
mIO = new ListenSocketIO(aConsumerThread, aIOLoop, this, aConnector);
mIO = new ListenSocketIO(aConsumerLoop, aIOLoop, this, aConnector);
// Prepared I/O object, now start listening.
nsresult rv = Listen(aCOSocket);
@ -334,13 +332,8 @@ nsresult
ListenSocket::Listen(UnixSocketConnector* aConnector,
ConnectionOrientedSocket* aCOSocket)
{
nsIThread* consumerThread = nullptr;
nsresult rv = NS_GetCurrentThread(&consumerThread);
if (NS_FAILED(rv)) {
return rv;
}
return Listen(aConnector, consumerThread, XRE_GetIOMessageLoop(), aCOSocket);
return Listen(aConnector, MessageLoop::current(), XRE_GetIOMessageLoop(),
aCOSocket);
}
nsresult

View File

@ -11,7 +11,6 @@
#include "mozilla/ipc/SocketBase.h"
class MessageLoop;
class nsIThread;
namespace mozilla {
namespace ipc {
@ -37,14 +36,14 @@ public:
* in a non-blocking manner.
*
* @param aConnector Connector object for socket-type-specific functions
* @param aConsumerThread The socket's consumer thread.
* @param aConsumerLoop The socket's consumer thread.
* @param aIOLoop The socket's I/O thread.
* @param aCOSocket The connection-oriented socket for handling the
* accepted connection.
* @return NS_OK on success, or an XPCOM error code otherwise.
*/
nsresult Listen(UnixSocketConnector* aConnector,
nsIThread* aConsumerThread,
MessageLoop* aConsumerLoop,
MessageLoop* aIOLoop,
ConnectionOrientedSocket* aCOSocket);

View File

@ -254,52 +254,47 @@ SocketBase::SetConnectionStatus(SocketConnectionStatus aConnectionStatus)
// SocketIOBase
//
SocketIOBase::SocketIOBase(nsIThread* aConsumerThread)
: mConsumerThread(aConsumerThread)
SocketIOBase::SocketIOBase(MessageLoop* aConsumerLoop)
: mConsumerLoop(aConsumerLoop)
{
MOZ_ASSERT(mConsumerThread);
MOZ_ASSERT(mConsumerLoop);
}
SocketIOBase::~SocketIOBase()
{ }
nsIThread*
MessageLoop*
SocketIOBase::GetConsumerThread() const
{
return mConsumerThread;
return mConsumerLoop;
}
bool
SocketIOBase::IsConsumerThread() const
{
nsIThread* thread = nullptr;
if (NS_FAILED(NS_GetCurrentThread(&thread))) {
return false;
}
return thread == GetConsumerThread();
return GetConsumerThread() == MessageLoop::current();
}
//
// SocketIOEventRunnable
// SocketEventTask
//
SocketIOEventRunnable::SocketIOEventRunnable(SocketIOBase* aIO,
SocketEvent aEvent)
: SocketIORunnable<SocketIOBase>(aIO)
SocketEventTask::SocketEventTask(SocketIOBase* aIO, SocketEvent aEvent)
: SocketTask<SocketIOBase>(aIO)
, mEvent(aEvent)
{ }
NS_METHOD
SocketIOEventRunnable::Run()
void
SocketEventTask::Run()
{
SocketIOBase* io = SocketIORunnable<SocketIOBase>::GetIO();
SocketIOBase* io = SocketTask<SocketIOBase>::GetIO();
MOZ_ASSERT(io->IsConsumerThread());
if (NS_WARN_IF(io->IsShutdownOnConsumerThread())) {
// Since we've already explicitly closed and the close
// happened before this, this isn't really an error.
return NS_OK;
return;
}
SocketBase* socketBase = io->GetSocketBase();
@ -312,55 +307,49 @@ SocketIOEventRunnable::Run()
} else if (mEvent == DISCONNECT) {
socketBase->NotifyDisconnect();
}
return NS_OK;
}
//
// SocketIORequestClosingRunnable
// SocketRequestClosingTask
//
SocketIORequestClosingRunnable::SocketIORequestClosingRunnable(
SocketRequestClosingTask::SocketRequestClosingTask(
SocketIOBase* aIO)
: SocketIORunnable<SocketIOBase>(aIO)
: SocketTask<SocketIOBase>(aIO)
{ }
NS_METHOD
SocketIORequestClosingRunnable::Run()
void
SocketRequestClosingTask::Run()
{
SocketIOBase* io = SocketIORunnable<SocketIOBase>::GetIO();
SocketIOBase* io = SocketTask<SocketIOBase>::GetIO();
MOZ_ASSERT(io->IsConsumerThread());
if (NS_WARN_IF(io->IsShutdownOnConsumerThread())) {
// Since we've already explicitly closed and the close
// happened before this, this isn't really an error.
return NS_OK;
return;
}
SocketBase* socketBase = io->GetSocketBase();
MOZ_ASSERT(socketBase);
socketBase->Close();
return NS_OK;
}
//
// SocketIODeleteInstanceRunnable
// SocketDeleteInstanceTask
//
SocketIODeleteInstanceRunnable::SocketIODeleteInstanceRunnable(
SocketDeleteInstanceTask::SocketDeleteInstanceTask(
SocketIOBase* aIO)
: mIO(aIO)
{ }
NS_METHOD
SocketIODeleteInstanceRunnable::Run()
void
SocketDeleteInstanceTask::Run()
{
mIO = nullptr; // delete instance
return NS_OK;
}
//
@ -385,8 +374,8 @@ SocketIOShutdownTask::Run()
// shut down, so we can send a message to the consumer thread to
// delete |io| safely knowing that it's not reference any longer.
io->ShutdownOnIOThread();
io->GetConsumerThread()->Dispatch(new SocketIODeleteInstanceRunnable(io),
NS_DISPATCH_NORMAL);
io->GetConsumerThread()->PostTask(FROM_HERE,
new SocketDeleteInstanceTask(io));
}
}

View File

@ -11,8 +11,6 @@
#include "base/message_loop.h"
#include "nsAutoPtr.h"
#include "nsTArray.h"
#include "nsThreadUtils.h"
namespace mozilla {
namespace ipc {
@ -382,33 +380,33 @@ public:
*
* @return A pointer to the consumer thread.
*/
nsIThread* GetConsumerThread() const;
MessageLoop* GetConsumerThread() const;
/**
* @return True if the current thread is thre consumer thread, or false
* @return True if the current thread is the consumer thread, or false
* otherwise.
*/
bool IsConsumerThread() const;
protected:
SocketIOBase(nsIThread* nsConsumerThread);
SocketIOBase(MessageLoop* aConsumerLoop);
private:
nsCOMPtr<nsIThread> mConsumerThread;
MessageLoop* mConsumerLoop;
};
//
// Socket I/O runnables
// Socket tasks
//
/* |SocketIORunnable| is a runnable for sending a message from
/* |SocketTask| is a task for sending a message from
* the I/O thread to the consumer thread.
*/
template <typename T>
class SocketIORunnable : public nsRunnable
class SocketTask : public Task
{
public:
virtual ~SocketIORunnable()
virtual ~SocketTask()
{ }
T* GetIO() const
@ -417,8 +415,8 @@ public:
}
protected:
SocketIORunnable(T* aIO)
: mIO(aIO)
SocketTask(T* aIO)
: mIO(aIO)
{
MOZ_ASSERT(aIO);
}
@ -428,10 +426,10 @@ private:
};
/**
* |SocketIOEventRunnable| reports the connection state on the
* |SocketEventTask| reports the connection state on the
* I/O thread back to the consumer thread.
*/
class SocketIOEventRunnable final : public SocketIORunnable<SocketIOBase>
class SocketEventTask final : public SocketTask<SocketIOBase>
{
public:
enum SocketEvent {
@ -440,36 +438,35 @@ public:
DISCONNECT
};
SocketIOEventRunnable(SocketIOBase* aIO, SocketEvent aEvent);
SocketEventTask(SocketIOBase* aIO, SocketEvent aEvent);
NS_IMETHOD Run() override;
void Run() override;
private:
SocketEvent mEvent;
};
/**
* |SocketIORequestClosingRunnable| closes an instance of |SocketBase|
* to the consumer thread.
* |SocketRequestClosingTask| closes an instance of |SocketBase|
* on the consumer thread.
*/
class SocketIORequestClosingRunnable final
: public SocketIORunnable<SocketIOBase>
class SocketRequestClosingTask final : public SocketTask<SocketIOBase>
{
public:
SocketIORequestClosingRunnable(SocketIOBase* aIO);
SocketRequestClosingTask(SocketIOBase* aIO);
NS_IMETHOD Run() override;
void Run() override;
};
/**
* |SocketIODeleteInstanceRunnable| deletes an object on the consumer thread.
* |SocketDeleteInstanceTask| deletes an object on the consumer thread.
*/
class SocketIODeleteInstanceRunnable final : public nsRunnable
class SocketDeleteInstanceTask final : public Task
{
public:
SocketIODeleteInstanceRunnable(SocketIOBase* aIO);
SocketDeleteInstanceTask(SocketIOBase* aIO);
NS_IMETHOD Run() override;
void Run() override;
private:
nsAutoPtr<SocketIOBase> mIO;

View File

@ -25,15 +25,15 @@ class StreamSocketIO final : public ConnectionOrientedSocketIO
public:
class ConnectTask;
class DelayedConnectTask;
class ReceiveRunnable;
class ReceiveTask;
StreamSocketIO(nsIThread* aConsumerThread,
MessageLoop* mIOLoop,
StreamSocketIO(MessageLoop* aConsumerLoop,
MessageLoop* aIOLoop,
StreamSocket* aStreamSocket,
UnixSocketConnector* aConnector);
StreamSocketIO(nsIThread* aConsumerThread,
MessageLoop* mIOLoop, int aFd,
ConnectionStatus aConnectionStatus,
StreamSocketIO(MessageLoop* aConsumerLoop,
MessageLoop* aIOLoop,
int aFd, ConnectionStatus aConnectionStatus,
StreamSocket* aStreamSocket,
UnixSocketConnector* aConnector);
~StreamSocketIO();
@ -91,11 +91,11 @@ private:
nsAutoPtr<UnixSocketRawData> mBuffer;
};
StreamSocketIO::StreamSocketIO(nsIThread* aConsumerThread,
StreamSocketIO::StreamSocketIO(MessageLoop* aConsumerLoop,
MessageLoop* aIOLoop,
StreamSocket* aStreamSocket,
UnixSocketConnector* aConnector)
: ConnectionOrientedSocketIO(aConsumerThread, aIOLoop, aConnector)
: ConnectionOrientedSocketIO(aConsumerLoop, aIOLoop, aConnector)
, mStreamSocket(aStreamSocket)
, mShuttingDownOnIOThread(false)
, mDelayedConnectTask(nullptr)
@ -103,12 +103,12 @@ StreamSocketIO::StreamSocketIO(nsIThread* aConsumerThread,
MOZ_ASSERT(mStreamSocket);
}
StreamSocketIO::StreamSocketIO(nsIThread* aConsumerThread,
StreamSocketIO::StreamSocketIO(MessageLoop* aConsumerLoop,
MessageLoop* aIOLoop,
int aFd, ConnectionStatus aConnectionStatus,
StreamSocket* aStreamSocket,
UnixSocketConnector* aConnector)
: ConnectionOrientedSocketIO(aConsumerThread,
: ConnectionOrientedSocketIO(aConsumerLoop,
aIOLoop,
aFd,
aConnectionStatus,
@ -183,36 +183,33 @@ StreamSocketIO::QueryReceiveBuffer(UnixSocketIOBuffer** aBuffer)
}
/**
* |ReceiveRunnable| transfers data received on the I/O thread
* |ReceiveTask| transfers data received on the I/O thread
* to an instance of |StreamSocket| on the consumer thread.
*/
class StreamSocketIO::ReceiveRunnable final
: public SocketIORunnable<StreamSocketIO>
class StreamSocketIO::ReceiveTask final : public SocketTask<StreamSocketIO>
{
public:
ReceiveRunnable(StreamSocketIO* aIO, UnixSocketBuffer* aBuffer)
: SocketIORunnable<StreamSocketIO>(aIO)
ReceiveTask(StreamSocketIO* aIO, UnixSocketBuffer* aBuffer)
: SocketTask<StreamSocketIO>(aIO)
, mBuffer(aBuffer)
{ }
NS_IMETHOD Run() override
void Run() override
{
StreamSocketIO* io = SocketIORunnable<StreamSocketIO>::GetIO();
StreamSocketIO* io = SocketTask<StreamSocketIO>::GetIO();
MOZ_ASSERT(io->IsConsumerThread());
if (NS_WARN_IF(io->IsShutdownOnConsumerThread())) {
// Since we've already explicitly closed and the close
// happened before this, this isn't really an error.
return NS_OK;
return;
}
StreamSocket* streamSocket = io->GetStreamSocket();
MOZ_ASSERT(streamSocket);
streamSocket->ReceiveSocketData(mBuffer);
return NS_OK;
}
private:
@ -222,8 +219,8 @@ private:
void
StreamSocketIO::ConsumeBuffer()
{
GetConsumerThread()->Dispatch(new ReceiveRunnable(this, mBuffer.forget()),
NS_DISPATCH_NORMAL);
GetConsumerThread()->PostTask(FROM_HERE,
new ReceiveTask(this, mBuffer.forget()));
}
void
@ -345,11 +342,11 @@ StreamSocket::ReceiveSocketData(nsAutoPtr<UnixSocketBuffer>& aBuffer)
nsresult
StreamSocket::Connect(UnixSocketConnector* aConnector, int aDelayMs,
nsIThread* aConsumerThread, MessageLoop* aIOLoop)
MessageLoop* aConsumerLoop, MessageLoop* aIOLoop)
{
MOZ_ASSERT(!mIO);
mIO = new StreamSocketIO(aConsumerThread, aIOLoop, this, aConnector);
mIO = new StreamSocketIO(aConsumerLoop, aIOLoop, this, aConnector);
SetConnectionStatus(SOCKET_CONNECTING);
if (aDelayMs > 0) {
@ -367,20 +364,15 @@ StreamSocket::Connect(UnixSocketConnector* aConnector, int aDelayMs,
nsresult
StreamSocket::Connect(UnixSocketConnector* aConnector, int aDelayMs)
{
nsIThread* consumerThread = nullptr;
nsresult rv = NS_GetCurrentThread(&consumerThread);
if (NS_FAILED(rv)) {
return rv;
}
return Connect(aConnector, aDelayMs, consumerThread, XRE_GetIOMessageLoop());
return Connect(aConnector, aDelayMs,
MessageLoop::current(), XRE_GetIOMessageLoop());
}
// |ConnectionOrientedSocket|
nsresult
StreamSocket::PrepareAccept(UnixSocketConnector* aConnector,
nsIThread* aConsumerThread,
MessageLoop* aConsumerLoop,
MessageLoop* aIOLoop,
ConnectionOrientedSocketIO*& aIO)
{
@ -389,7 +381,7 @@ StreamSocket::PrepareAccept(UnixSocketConnector* aConnector,
SetConnectionStatus(SOCKET_CONNECTING);
mIO = new StreamSocketIO(aConsumerThread, aIOLoop,
mIO = new StreamSocketIO(aConsumerLoop, aIOLoop,
-1, UnixSocketWatcher::SOCKET_IS_CONNECTING,
this, aConnector);
aIO = mIO;

View File

@ -42,12 +42,12 @@ public:
*
* @param aConnector Connector object for socket type specific functions
* @param aDelayMs Time delay in milliseconds.
* @param aConsumerThread The socket's consumer thread.
* @param aConsumerLoop The socket's consumer thread.
* @param aIOLoop The socket's I/O thread.
* @return NS_OK on success, or an XPCOM error code otherwise.
*/
nsresult Connect(UnixSocketConnector* aConnector, int aDelayMs,
nsIThread* aConsumerThread, MessageLoop* aIOLoop);
MessageLoop* aConsumerLoop, MessageLoop* aIOLoop);
/**
* Starts a task on the socket that will try to connect to a socket in a
@ -63,7 +63,7 @@ public:
//
nsresult PrepareAccept(UnixSocketConnector* aConnector,
nsIThread* aConsumerThread,
MessageLoop* aConsumerLoop,
MessageLoop* aIOLoop,
ConnectionOrientedSocketIO*& aIO) override;