mirror of
https://gitlab.winehq.org/wine/wine-gecko.git
synced 2024-09-13 09:24:08 -07:00
Bug 1172479: Replace |nsIThread| by |MessageLoop| in socket I/O code, r=kmachulis
Dispatching events via |nsIThread| doesn't work with worker threads. This patch replaces all uses of |nsIThread| in the socket code by equivalent uses of |MessageLoop|.
This commit is contained in:
parent
44224b6697
commit
0976600ce3
@ -5,18 +5,14 @@
|
||||
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
|
||||
|
||||
#include "BluetoothSocket.h"
|
||||
|
||||
#include <fcntl.h>
|
||||
#include <sys/socket.h>
|
||||
|
||||
#include "base/message_loop.h"
|
||||
#include "BluetoothSocketObserver.h"
|
||||
#include "BluetoothInterface.h"
|
||||
#include "BluetoothUtils.h"
|
||||
#include "mozilla/ipc/UnixSocketWatcher.h"
|
||||
#include "mozilla/FileUtils.h"
|
||||
#include "mozilla/RefPtr.h"
|
||||
#include "nsThreadUtils.h"
|
||||
#include "nsXULAppAPI.h"
|
||||
|
||||
using namespace mozilla::ipc;
|
||||
@ -74,11 +70,11 @@ public:
|
||||
SOCKET_IS_CONNECTED
|
||||
};
|
||||
|
||||
DroidSocketImpl(nsIThread* aConsumerThread,
|
||||
DroidSocketImpl(MessageLoop* aConsumerLoop,
|
||||
MessageLoop* aIOLoop,
|
||||
BluetoothSocket* aConsumer)
|
||||
: ipc::UnixFdWatcher(aIOLoop)
|
||||
, DataSocketIO(aConsumerThread)
|
||||
, DataSocketIO(aConsumerLoop)
|
||||
, mConsumer(aConsumer)
|
||||
, mShuttingDownOnIOThread(false)
|
||||
, mConnectionStatus(SOCKET_IS_DISCONNECTED)
|
||||
@ -173,7 +169,7 @@ public:
|
||||
}
|
||||
|
||||
private:
|
||||
class ReceiveRunnable;
|
||||
class ReceiveTask;
|
||||
|
||||
/**
|
||||
* libevent triggered functions that reads data from socket when available and
|
||||
@ -319,9 +315,8 @@ DroidSocketImpl::Accept(int aFd)
|
||||
SetFd(aFd);
|
||||
mConnectionStatus = SOCKET_IS_CONNECTED;
|
||||
|
||||
GetConsumerThread()->Dispatch(
|
||||
new SocketIOEventRunnable(this, SocketIOEventRunnable::CONNECT_SUCCESS),
|
||||
NS_DISPATCH_NORMAL);
|
||||
GetConsumerThread()->PostTask(
|
||||
FROM_HERE, new SocketEventTask(this, SocketEventTask::CONNECT_SUCCESS));
|
||||
|
||||
AddWatchers(READ_WATCHER, true);
|
||||
if (HasPendingData()) {
|
||||
@ -426,15 +421,15 @@ private:
|
||||
DroidSocketImpl* mImpl;
|
||||
};
|
||||
|
||||
class AcceptRunnable final : public SocketIORunnable<DroidSocketImpl>
|
||||
class InvokeAcceptTask final : public SocketTask<DroidSocketImpl>
|
||||
{
|
||||
public:
|
||||
AcceptRunnable(DroidSocketImpl* aImpl, int aFd)
|
||||
: SocketIORunnable<DroidSocketImpl>(aImpl)
|
||||
, mFd(aFd)
|
||||
InvokeAcceptTask(DroidSocketImpl* aImpl, int aFd)
|
||||
: SocketTask<DroidSocketImpl>(aImpl)
|
||||
, mFd(aFd)
|
||||
{ }
|
||||
|
||||
NS_IMETHOD Run() override
|
||||
void Run() override
|
||||
{
|
||||
MOZ_ASSERT(GetIO()->IsConsumerThread());
|
||||
MOZ_ASSERT(sBluetoothSocketInterface);
|
||||
@ -443,8 +438,6 @@ public:
|
||||
GetIO()->mConsumer->SetCurrentResultHandler(res);
|
||||
|
||||
sBluetoothSocketInterface->Accept(mFd, res);
|
||||
|
||||
return NS_OK;
|
||||
}
|
||||
|
||||
private:
|
||||
@ -462,8 +455,7 @@ DroidSocketImpl::OnSocketCanAcceptWithoutBlocking(int aFd)
|
||||
*/
|
||||
|
||||
RemoveWatchers(READ_WATCHER);
|
||||
GetConsumerThread()->Dispatch(new AcceptRunnable(this, aFd),
|
||||
NS_DISPATCH_NORMAL);
|
||||
GetConsumerThread()->PostTask(FROM_HERE, new InvokeAcceptTask(this, aFd));
|
||||
}
|
||||
|
||||
void
|
||||
@ -507,9 +499,8 @@ DroidSocketImpl::OnSocketCanConnectWithoutBlocking(int aFd)
|
||||
|
||||
mConnectionStatus = SOCKET_IS_CONNECTED;
|
||||
|
||||
GetConsumerThread()->Dispatch(
|
||||
new SocketIOEventRunnable(this, SocketIOEventRunnable::CONNECT_SUCCESS),
|
||||
NS_DISPATCH_NORMAL);
|
||||
GetConsumerThread()->PostTask(
|
||||
FROM_HERE, new SocketEventTask(this, SocketEventTask::CONNECT_SUCCESS));
|
||||
|
||||
AddWatchers(READ_WATCHER, true);
|
||||
if (HasPendingData()) {
|
||||
@ -537,33 +528,30 @@ DroidSocketImpl::QueryReceiveBuffer(
|
||||
* |ReceiveRunnable| transfers data received on the I/O thread
|
||||
* to an instance of |BluetoothSocket| on the consumer thread.
|
||||
*/
|
||||
class DroidSocketImpl::ReceiveRunnable final
|
||||
: public SocketIORunnable<DroidSocketImpl>
|
||||
class DroidSocketImpl::ReceiveTask final : public SocketTask<DroidSocketImpl>
|
||||
{
|
||||
public:
|
||||
ReceiveRunnable(DroidSocketImpl* aIO, UnixSocketBuffer* aBuffer)
|
||||
: SocketIORunnable<DroidSocketImpl>(aIO)
|
||||
ReceiveTask(DroidSocketImpl* aIO, UnixSocketBuffer* aBuffer)
|
||||
: SocketTask<DroidSocketImpl>(aIO)
|
||||
, mBuffer(aBuffer)
|
||||
{ }
|
||||
|
||||
NS_IMETHOD Run() override
|
||||
void Run() override
|
||||
{
|
||||
DroidSocketImpl* io = SocketIORunnable<DroidSocketImpl>::GetIO();
|
||||
DroidSocketImpl* io = SocketTask<DroidSocketImpl>::GetIO();
|
||||
|
||||
MOZ_ASSERT(io->IsConsumerThread());
|
||||
|
||||
if (NS_WARN_IF(io->IsShutdownOnConsumerThread())) {
|
||||
// Since we've already explicitly closed and the close
|
||||
// happened before this, this isn't really an error.
|
||||
return NS_OK;
|
||||
return;
|
||||
}
|
||||
|
||||
BluetoothSocket* bluetoothSocket = io->GetBluetoothSocket();
|
||||
MOZ_ASSERT(bluetoothSocket);
|
||||
|
||||
bluetoothSocket->ReceiveSocketData(mBuffer);
|
||||
|
||||
return NS_OK;
|
||||
}
|
||||
|
||||
private:
|
||||
@ -573,8 +561,8 @@ private:
|
||||
void
|
||||
DroidSocketImpl::ConsumeBuffer()
|
||||
{
|
||||
GetConsumerThread()->Dispatch(new ReceiveRunnable(this, mBuffer.forget()),
|
||||
NS_DISPATCH_NORMAL);
|
||||
GetConsumerThread()->PostTask(FROM_HERE,
|
||||
new ReceiveTask(this, mBuffer.forget()));
|
||||
}
|
||||
|
||||
void
|
||||
@ -651,14 +639,14 @@ BluetoothSocket::Connect(const nsAString& aDeviceAddress,
|
||||
BluetoothSocketType aType,
|
||||
int aChannel,
|
||||
bool aAuth, bool aEncrypt,
|
||||
nsIThread* aConsumerThread,
|
||||
MessageLoop* aConsumerLoop,
|
||||
MessageLoop* aIOLoop)
|
||||
{
|
||||
MOZ_ASSERT(!mImpl);
|
||||
|
||||
SetConnectionStatus(SOCKET_CONNECTING);
|
||||
|
||||
mImpl = new DroidSocketImpl(aConsumerThread, aIOLoop, this);
|
||||
mImpl = new DroidSocketImpl(aConsumerLoop, aIOLoop, this);
|
||||
|
||||
BluetoothSocketResultHandler* res = new ConnectSocketResultHandler(mImpl);
|
||||
SetCurrentResultHandler(res);
|
||||
@ -678,14 +666,8 @@ BluetoothSocket::Connect(const nsAString& aDeviceAddress,
|
||||
int aChannel,
|
||||
bool aAuth, bool aEncrypt)
|
||||
{
|
||||
nsIThread* consumerThread = nullptr;
|
||||
nsresult rv = NS_GetCurrentThread(&consumerThread);
|
||||
if (NS_FAILED(rv)) {
|
||||
return rv;
|
||||
}
|
||||
|
||||
return Connect(aDeviceAddress, aServiceUuid, aType, aChannel, aAuth,
|
||||
aEncrypt, consumerThread, XRE_GetIOMessageLoop());
|
||||
aEncrypt, MessageLoop::current(), XRE_GetIOMessageLoop());
|
||||
}
|
||||
|
||||
class ListenResultHandler final : public BluetoothSocketResultHandler
|
||||
@ -721,14 +703,14 @@ BluetoothSocket::Listen(const nsAString& aServiceName,
|
||||
BluetoothSocketType aType,
|
||||
int aChannel,
|
||||
bool aAuth, bool aEncrypt,
|
||||
nsIThread* aConsumerThread,
|
||||
MessageLoop* aConsumerLoop,
|
||||
MessageLoop* aIOLoop)
|
||||
{
|
||||
MOZ_ASSERT(!mImpl);
|
||||
|
||||
SetConnectionStatus(SOCKET_LISTENING);
|
||||
|
||||
mImpl = new DroidSocketImpl(aConsumerThread, aIOLoop, this);
|
||||
mImpl = new DroidSocketImpl(aConsumerLoop, aIOLoop, this);
|
||||
|
||||
BluetoothSocketResultHandler* res = new ListenResultHandler(mImpl);
|
||||
SetCurrentResultHandler(res);
|
||||
@ -748,14 +730,8 @@ BluetoothSocket::Listen(const nsAString& aServiceName,
|
||||
int aChannel,
|
||||
bool aAuth, bool aEncrypt)
|
||||
{
|
||||
nsIThread* consumerThread = nullptr;
|
||||
nsresult rv = NS_GetCurrentThread(&consumerThread);
|
||||
if (NS_FAILED(rv)) {
|
||||
return rv;
|
||||
}
|
||||
|
||||
return Listen(aServiceName, aServiceUuid, aType, aChannel, aAuth, aEncrypt,
|
||||
consumerThread, XRE_GetIOMessageLoop());
|
||||
MessageLoop::current(), XRE_GetIOMessageLoop());
|
||||
}
|
||||
|
||||
void
|
||||
|
@ -11,7 +11,6 @@
|
||||
#include "mozilla/ipc/DataSocket.h"
|
||||
|
||||
class MessageLoop;
|
||||
class nsIThread;
|
||||
|
||||
BEGIN_BLUETOOTH_NAMESPACE
|
||||
|
||||
@ -29,7 +28,7 @@ public:
|
||||
BluetoothSocketType aType,
|
||||
int aChannel,
|
||||
bool aAuth, bool aEncrypt,
|
||||
nsIThread* aConsumerThread,
|
||||
MessageLoop* aConsumerLoop,
|
||||
MessageLoop* aIOLoop);
|
||||
|
||||
nsresult Connect(const nsAString& aDeviceAddress,
|
||||
@ -43,7 +42,7 @@ public:
|
||||
BluetoothSocketType aType,
|
||||
int aChannel,
|
||||
bool aAuth, bool aEncrypt,
|
||||
nsIThread* aConsumerThread,
|
||||
MessageLoop* aConsumerLoop,
|
||||
MessageLoop* aIOLoop);
|
||||
|
||||
nsresult Listen(const nsAString& aServiceName,
|
||||
|
@ -8,9 +8,7 @@
|
||||
#include <fcntl.h>
|
||||
#include "BluetoothSocketObserver.h"
|
||||
#include "BluetoothUnixSocketConnector.h"
|
||||
#include "mozilla/unused.h"
|
||||
#include "nsTArray.h"
|
||||
#include "nsThreadUtils.h"
|
||||
#include "mozilla/RefPtr.h"
|
||||
#include "nsXULAppAPI.h"
|
||||
|
||||
using namespace mozilla::ipc;
|
||||
@ -28,7 +26,7 @@ class BluetoothSocket::BluetoothSocketIO final
|
||||
, public DataSocketIO
|
||||
{
|
||||
public:
|
||||
BluetoothSocketIO(nsIThread* aConsumerThread,
|
||||
BluetoothSocketIO(MessageLoop* aConsumerLoop,
|
||||
MessageLoop* aIOLoop,
|
||||
BluetoothSocket* aConsumer,
|
||||
UnixSocketConnector* aConnector);
|
||||
@ -90,7 +88,7 @@ public:
|
||||
void ShutdownOnIOThread() override;
|
||||
|
||||
private:
|
||||
class ReceiveRunnable;
|
||||
class ReceiveTask;
|
||||
|
||||
void FireSocketError();
|
||||
|
||||
@ -134,12 +132,12 @@ private:
|
||||
};
|
||||
|
||||
BluetoothSocket::BluetoothSocketIO::BluetoothSocketIO(
|
||||
nsIThread* aConsumerThread,
|
||||
MessageLoop* aConsumerLoop,
|
||||
MessageLoop* aIOLoop,
|
||||
BluetoothSocket* aConsumer,
|
||||
UnixSocketConnector* aConnector)
|
||||
: UnixSocketWatcher(aIOLoop)
|
||||
, DataSocketIO(aConsumerThread)
|
||||
, DataSocketIO(aConsumerLoop)
|
||||
, mConsumer(aConsumer)
|
||||
, mConnector(aConnector)
|
||||
, mShuttingDownOnIOThread(false)
|
||||
@ -281,9 +279,8 @@ BluetoothSocket::BluetoothSocketIO::OnConnected()
|
||||
MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop());
|
||||
MOZ_ASSERT(GetConnectionStatus() == SOCKET_IS_CONNECTED);
|
||||
|
||||
GetConsumerThread()->Dispatch(
|
||||
new SocketIOEventRunnable(this, SocketIOEventRunnable::CONNECT_SUCCESS),
|
||||
NS_DISPATCH_NORMAL);
|
||||
GetConsumerThread()->PostTask(
|
||||
FROM_HERE, new SocketEventTask(this, SocketEventTask::CONNECT_SUCCESS));
|
||||
|
||||
AddWatchers(READ_WATCHER, true);
|
||||
if (HasPendingData()) {
|
||||
@ -331,9 +328,8 @@ BluetoothSocket::BluetoothSocketIO::OnSocketCanAcceptWithoutBlocking()
|
||||
Close();
|
||||
SetSocket(fd, SOCKET_IS_CONNECTED);
|
||||
|
||||
GetConsumerThread()->Dispatch(
|
||||
new SocketIOEventRunnable(this, SocketIOEventRunnable::CONNECT_SUCCESS),
|
||||
NS_DISPATCH_NORMAL);
|
||||
GetConsumerThread()->PostTask(
|
||||
FROM_HERE, new SocketEventTask(this, SocketEventTask::CONNECT_SUCCESS));
|
||||
|
||||
AddWatchers(READ_WATCHER, true);
|
||||
if (HasPendingData()) {
|
||||
@ -382,10 +378,8 @@ BluetoothSocket::BluetoothSocketIO::FireSocketError()
|
||||
Close();
|
||||
|
||||
// Tell the consumer thread we've errored
|
||||
GetConsumerThread()->Dispatch(
|
||||
new SocketIOEventRunnable(this, SocketIOEventRunnable::CONNECT_ERROR),
|
||||
NS_DISPATCH_NORMAL);
|
||||
|
||||
GetConsumerThread()->PostTask(
|
||||
FROM_HERE, new SocketEventTask(this, SocketEventTask::CONNECT_ERROR));
|
||||
}
|
||||
|
||||
// |DataSocketIO|
|
||||
@ -405,36 +399,34 @@ BluetoothSocket::BluetoothSocketIO::QueryReceiveBuffer(
|
||||
}
|
||||
|
||||
/**
|
||||
* |ReceiveRunnable| transfers data received on the I/O thread
|
||||
* |ReceiveTask| transfers data received on the I/O thread
|
||||
* to an instance of |BluetoothSocket| on the consumer thread.
|
||||
*/
|
||||
class BluetoothSocket::BluetoothSocketIO::ReceiveRunnable final
|
||||
: public SocketIORunnable<BluetoothSocketIO>
|
||||
class BluetoothSocket::BluetoothSocketIO::ReceiveTask final
|
||||
: public SocketTask<BluetoothSocketIO>
|
||||
{
|
||||
public:
|
||||
ReceiveRunnable(BluetoothSocketIO* aIO, UnixSocketBuffer* aBuffer)
|
||||
: SocketIORunnable<BluetoothSocketIO>(aIO)
|
||||
ReceiveTask(BluetoothSocketIO* aIO, UnixSocketBuffer* aBuffer)
|
||||
: SocketTask<BluetoothSocketIO>(aIO)
|
||||
, mBuffer(aBuffer)
|
||||
{ }
|
||||
|
||||
NS_IMETHOD Run() override
|
||||
void Run() override
|
||||
{
|
||||
BluetoothSocketIO* io = SocketIORunnable<BluetoothSocketIO>::GetIO();
|
||||
BluetoothSocketIO* io = SocketTask<BluetoothSocketIO>::GetIO();
|
||||
|
||||
MOZ_ASSERT(io->IsConsumerThread());
|
||||
|
||||
if (NS_WARN_IF(io->IsShutdownOnConsumerThread())) {
|
||||
// Since we've already explicitly closed and the close
|
||||
// happened before this, this isn't really an error.
|
||||
return NS_OK;
|
||||
return;
|
||||
}
|
||||
|
||||
BluetoothSocket* bluetoothSocket = io->GetBluetoothSocket();
|
||||
MOZ_ASSERT(bluetoothSocket);
|
||||
|
||||
bluetoothSocket->ReceiveSocketData(mBuffer);
|
||||
|
||||
return NS_OK;
|
||||
}
|
||||
|
||||
private:
|
||||
@ -444,8 +436,8 @@ private:
|
||||
void
|
||||
BluetoothSocket::BluetoothSocketIO::ConsumeBuffer()
|
||||
{
|
||||
GetConsumerThread()->Dispatch(new ReceiveRunnable(this, mBuffer.forget()),
|
||||
NS_DISPATCH_NORMAL);
|
||||
GetConsumerThread()->PostTask(FROM_HERE,
|
||||
new ReceiveTask(this, mBuffer.forget()));
|
||||
}
|
||||
|
||||
void
|
||||
@ -650,14 +642,14 @@ BluetoothSocket::SendSocketData(const nsACString& aStr)
|
||||
nsresult
|
||||
BluetoothSocket::Connect(BluetoothUnixSocketConnector* aConnector,
|
||||
int aDelayMs,
|
||||
nsIThread* aConsumerThread, MessageLoop* aIOLoop)
|
||||
MessageLoop* aConsumerLoop, MessageLoop* aIOLoop)
|
||||
{
|
||||
MOZ_ASSERT(aConnector);
|
||||
MOZ_ASSERT(aConsumerThread);
|
||||
MOZ_ASSERT(aConsumerLoop);
|
||||
MOZ_ASSERT(aIOLoop);
|
||||
MOZ_ASSERT(!mIO);
|
||||
|
||||
mIO = new BluetoothSocketIO(aConsumerThread, aIOLoop, this, aConnector);
|
||||
mIO = new BluetoothSocketIO(aConsumerLoop, aIOLoop, this, aConnector);
|
||||
SetConnectionStatus(SOCKET_CONNECTING);
|
||||
|
||||
if (aDelayMs > 0) {
|
||||
@ -675,25 +667,20 @@ nsresult
|
||||
BluetoothSocket::Connect(BluetoothUnixSocketConnector* aConnector,
|
||||
int aDelayMs)
|
||||
{
|
||||
nsIThread* consumerThread = nullptr;
|
||||
nsresult rv = NS_GetCurrentThread(&consumerThread);
|
||||
if (NS_FAILED(rv)) {
|
||||
return rv;
|
||||
}
|
||||
|
||||
return Connect(aConnector, aDelayMs, consumerThread, XRE_GetIOMessageLoop());
|
||||
return Connect(aConnector, aDelayMs, MessageLoop::current(),
|
||||
XRE_GetIOMessageLoop());
|
||||
}
|
||||
|
||||
nsresult
|
||||
BluetoothSocket::Listen(BluetoothUnixSocketConnector* aConnector,
|
||||
nsIThread* aConsumerThread, MessageLoop* aIOLoop)
|
||||
MessageLoop* aConsumerLoop, MessageLoop* aIOLoop)
|
||||
{
|
||||
MOZ_ASSERT(aConnector);
|
||||
MOZ_ASSERT(aConsumerThread);
|
||||
MOZ_ASSERT(aConsumerLoop);
|
||||
MOZ_ASSERT(aIOLoop);
|
||||
MOZ_ASSERT(!mIO);
|
||||
|
||||
mIO = new BluetoothSocketIO(aConsumerThread, aIOLoop, this, aConnector);
|
||||
mIO = new BluetoothSocketIO(aConsumerLoop, aIOLoop, this, aConnector);
|
||||
SetConnectionStatus(SOCKET_LISTENING);
|
||||
|
||||
aIOLoop->PostTask(FROM_HERE, new ListenTask(mIO));
|
||||
@ -704,13 +691,7 @@ BluetoothSocket::Listen(BluetoothUnixSocketConnector* aConnector,
|
||||
nsresult
|
||||
BluetoothSocket::Listen(BluetoothUnixSocketConnector* aConnector)
|
||||
{
|
||||
nsIThread* consumerThread = nullptr;
|
||||
nsresult rv = NS_GetCurrentThread(&consumerThread);
|
||||
if (NS_FAILED(rv)) {
|
||||
return rv;
|
||||
}
|
||||
|
||||
return Listen(aConnector, consumerThread, XRE_GetIOMessageLoop());
|
||||
return Listen(aConnector, MessageLoop::current(), XRE_GetIOMessageLoop());
|
||||
}
|
||||
|
||||
void
|
||||
|
@ -8,13 +8,10 @@
|
||||
#define mozilla_dom_bluetooth_BluetoothSocket_h
|
||||
|
||||
#include "BluetoothCommon.h"
|
||||
#include <stdlib.h>
|
||||
#include "mozilla/ipc/DataSocket.h"
|
||||
#include "mozilla/ipc/UnixSocketWatcher.h"
|
||||
#include "mozilla/RefPtr.h"
|
||||
#include "nsAutoPtr.h"
|
||||
#include "nsString.h"
|
||||
#include "nsThreadUtils.h"
|
||||
|
||||
class MessageLoop;
|
||||
|
||||
@ -66,12 +63,12 @@ public:
|
||||
*
|
||||
* @param aConnector Connector object for socket type specific functions
|
||||
* @param aDelayMs Time delay in milli-seconds.
|
||||
* @param aConsumerThread The socket's consumer thread.
|
||||
* @param aConsumerLoop The socket's consumer thread.
|
||||
* @param aIOLoop The socket's I/O thread.
|
||||
* @return NS_OK on success, or an XPCOM error code otherwise.
|
||||
*/
|
||||
nsresult Connect(BluetoothUnixSocketConnector* aConnector, int aDelayMs,
|
||||
nsIThread* aConsumerThread, MessageLoop* aIOLoop);
|
||||
MessageLoop* aConsumerLoop, MessageLoop* aIOLoop);
|
||||
|
||||
/**
|
||||
* Starts a task on the socket that will try to connect to a socket in a
|
||||
@ -89,12 +86,12 @@ public:
|
||||
* non-blocking manner.
|
||||
*
|
||||
* @param aConnector Connector object for socket type specific functions
|
||||
* @param aConsumerThread The socket's consumer thread.
|
||||
* @param aConsumerLoop The socket's consumer thread.
|
||||
* @param aIOLoop The socket's I/O thread.
|
||||
* @return NS_OK on success, or an XPCOM error code otherwise.
|
||||
*/
|
||||
nsresult Listen(BluetoothUnixSocketConnector* aConnector,
|
||||
nsIThread* aConsumerThread, MessageLoop* aIOLoop);
|
||||
MessageLoop* aConsumerLoop, MessageLoop* aIOLoop);
|
||||
|
||||
/**
|
||||
* Starts a task on the socket that will try to accept a new connection in a
|
||||
|
@ -214,7 +214,7 @@ BluetoothDaemonPDUConsumer::~BluetoothDaemonPDUConsumer()
|
||||
class BluetoothDaemonConnectionIO final : public ConnectionOrientedSocketIO
|
||||
{
|
||||
public:
|
||||
BluetoothDaemonConnectionIO(nsIThread* aConsumerThread,
|
||||
BluetoothDaemonConnectionIO(MessageLoop* aConsumerLoop,
|
||||
MessageLoop* aIOLoop,
|
||||
int aFd, ConnectionStatus aConnectionStatus,
|
||||
UnixSocketConnector* aConnector,
|
||||
@ -247,14 +247,14 @@ private:
|
||||
};
|
||||
|
||||
BluetoothDaemonConnectionIO::BluetoothDaemonConnectionIO(
|
||||
nsIThread* aConsumerThread,
|
||||
MessageLoop* aConsumerLoop,
|
||||
MessageLoop* aIOLoop,
|
||||
int aFd,
|
||||
ConnectionStatus aConnectionStatus,
|
||||
UnixSocketConnector* aConnector,
|
||||
BluetoothDaemonConnection* aConnection,
|
||||
BluetoothDaemonPDUConsumer* aConsumer)
|
||||
: ConnectionOrientedSocketIO(aConsumerThread,
|
||||
: ConnectionOrientedSocketIO(aConsumerLoop,
|
||||
aIOLoop,
|
||||
aFd,
|
||||
aConnectionStatus,
|
||||
@ -361,7 +361,7 @@ BluetoothDaemonConnection::~BluetoothDaemonConnection()
|
||||
|
||||
nsresult
|
||||
BluetoothDaemonConnection::PrepareAccept(UnixSocketConnector* aConnector,
|
||||
nsIThread* aConsumerThread,
|
||||
MessageLoop* aConsumerLoop,
|
||||
MessageLoop* aIOLoop,
|
||||
ConnectionOrientedSocketIO*& aIO)
|
||||
{
|
||||
@ -370,7 +370,7 @@ BluetoothDaemonConnection::PrepareAccept(UnixSocketConnector* aConnector,
|
||||
SetConnectionStatus(SOCKET_CONNECTING);
|
||||
|
||||
mIO = new BluetoothDaemonConnectionIO(
|
||||
aConsumerThread, aIOLoop, -1, UnixSocketWatcher::SOCKET_IS_CONNECTING,
|
||||
aConsumerLoop, aIOLoop, -1, UnixSocketWatcher::SOCKET_IS_CONNECTING,
|
||||
aConnector, this, mPDUConsumer);
|
||||
aIO = mIO;
|
||||
|
||||
|
@ -126,7 +126,7 @@ public:
|
||||
//
|
||||
|
||||
nsresult PrepareAccept(UnixSocketConnector* aConnector,
|
||||
nsIThread* aConsumerThread,
|
||||
MessageLoop* aConsumerLoop,
|
||||
MessageLoop* aIOLoop,
|
||||
ConnectionOrientedSocketIO*& aIO) override;
|
||||
|
||||
|
@ -15,12 +15,11 @@ namespace ipc {
|
||||
//
|
||||
|
||||
ConnectionOrientedSocketIO::ConnectionOrientedSocketIO(
|
||||
nsIThread* aConsumerThread,
|
||||
MessageLoop* aConsumerLoop,
|
||||
MessageLoop* aIOLoop,
|
||||
int aFd,
|
||||
ConnectionStatus aConnectionStatus,
|
||||
int aFd, ConnectionStatus aConnectionStatus,
|
||||
UnixSocketConnector* aConnector)
|
||||
: DataSocketIO(aConsumerThread)
|
||||
: DataSocketIO(aConsumerLoop)
|
||||
, UnixSocketWatcher(aIOLoop, aFd, aConnectionStatus)
|
||||
, mConnector(aConnector)
|
||||
, mPeerAddressLength(0)
|
||||
@ -29,10 +28,10 @@ ConnectionOrientedSocketIO::ConnectionOrientedSocketIO(
|
||||
}
|
||||
|
||||
ConnectionOrientedSocketIO::ConnectionOrientedSocketIO(
|
||||
nsIThread* aConsumerThread,
|
||||
MessageLoop* aConsumerLoop,
|
||||
MessageLoop* aIOLoop,
|
||||
UnixSocketConnector* aConnector)
|
||||
: DataSocketIO(aConsumerThread)
|
||||
: DataSocketIO(aConsumerLoop)
|
||||
, UnixSocketWatcher(aIOLoop)
|
||||
, mConnector(aConnector)
|
||||
, mPeerAddressLength(0)
|
||||
@ -79,9 +78,8 @@ ConnectionOrientedSocketIO::Connect()
|
||||
fd);
|
||||
if (NS_FAILED(rv)) {
|
||||
// Tell the consumer thread we've errored
|
||||
GetConsumerThread()->Dispatch(
|
||||
new SocketIOEventRunnable(this, SocketIOEventRunnable::CONNECT_ERROR),
|
||||
NS_DISPATCH_NORMAL);
|
||||
GetConsumerThread()->PostTask(
|
||||
FROM_HERE, new SocketEventTask(this, SocketEventTask::CONNECT_ERROR));
|
||||
return NS_ERROR_FAILURE;
|
||||
}
|
||||
|
||||
@ -147,9 +145,8 @@ ConnectionOrientedSocketIO::OnConnected()
|
||||
MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop());
|
||||
MOZ_ASSERT(GetConnectionStatus() == SOCKET_IS_CONNECTED);
|
||||
|
||||
GetConsumerThread()->Dispatch(
|
||||
new SocketIOEventRunnable(this, SocketIOEventRunnable::CONNECT_SUCCESS),
|
||||
NS_DISPATCH_NORMAL);
|
||||
GetConsumerThread()->PostTask(
|
||||
FROM_HERE, new SocketEventTask(this, SocketEventTask::CONNECT_SUCCESS));
|
||||
|
||||
AddWatchers(READ_WATCHER, true);
|
||||
if (HasPendingData()) {
|
||||
@ -176,9 +173,8 @@ ConnectionOrientedSocketIO::OnError(const char* aFunction, int aErrno)
|
||||
Close();
|
||||
|
||||
// Tell the consumer thread we've errored
|
||||
GetConsumerThread()->Dispatch(
|
||||
new SocketIOEventRunnable(this, SocketIOEventRunnable::CONNECT_ERROR),
|
||||
NS_DISPATCH_NORMAL);
|
||||
GetConsumerThread()->PostTask(
|
||||
FROM_HERE, new SocketEventTask(this, SocketEventTask::CONNECT_ERROR));
|
||||
}
|
||||
|
||||
//
|
||||
|
@ -53,13 +53,13 @@ protected:
|
||||
/**
|
||||
* Constructs an instance of |ConnectionOrientedSocketIO|
|
||||
*
|
||||
* @param aConsumerThread The socket's consumer thread.
|
||||
* @param aConsumerLoop The socket's consumer thread.
|
||||
* @param aIOLoop The socket's I/O loop.
|
||||
* @param aFd The socket file descriptor.
|
||||
* @param aConnectionStatus The connection status for |aFd|.
|
||||
* @param aConnector Connector object for socket-type-specific methods.
|
||||
*/
|
||||
ConnectionOrientedSocketIO(nsIThread* aConsumerThread,
|
||||
ConnectionOrientedSocketIO(MessageLoop* aConsumerLoop,
|
||||
MessageLoop* aIOLoop,
|
||||
int aFd, ConnectionStatus aConnectionStatus,
|
||||
UnixSocketConnector* aConnector);
|
||||
@ -67,11 +67,11 @@ protected:
|
||||
/**
|
||||
* Constructs an instance of |ConnectionOrientedSocketIO|
|
||||
*
|
||||
* @param aConsumerThread The socket's consumer thread.
|
||||
* @param aConsumerLoop The socket's consumer thread.
|
||||
* @param aIOLoop The socket's I/O loop.
|
||||
* @param aConnector Connector object for socket-type-specific methods.
|
||||
*/
|
||||
ConnectionOrientedSocketIO(nsIThread* aConsumerThread,
|
||||
ConnectionOrientedSocketIO(MessageLoop* aConsumerLoop,
|
||||
MessageLoop* aIOLoop,
|
||||
UnixSocketConnector* aConnector);
|
||||
|
||||
@ -101,13 +101,13 @@ public:
|
||||
*
|
||||
* @param aConnector The new connector object, owned by the
|
||||
* connection-oriented socket.
|
||||
* @param aConsumerThread The socket's consumer thread.
|
||||
* @param aConsumerLoop The socket's consumer thread.
|
||||
* @param aIOLoop The socket's I/O thread.
|
||||
* @param[out] aIO, Returns an instance of |ConnectionOrientedSocketIO|.
|
||||
* @return NS_OK on success, or an XPCOM error code otherwise.
|
||||
*/
|
||||
virtual nsresult PrepareAccept(UnixSocketConnector* aConnector,
|
||||
nsIThread* aConsumerThread,
|
||||
MessageLoop* aConsumerLoop,
|
||||
MessageLoop* aIOLoop,
|
||||
ConnectionOrientedSocketIO*& aIO) = 0;
|
||||
|
||||
|
@ -50,8 +50,8 @@ DataSocketIO::ReceiveData(int aFd)
|
||||
nsresult rv = QueryReceiveBuffer(&incoming);
|
||||
if (NS_FAILED(rv)) {
|
||||
/* an error occured */
|
||||
GetConsumerThread()->Dispatch(new SocketIORequestClosingRunnable(this),
|
||||
NS_DISPATCH_NORMAL);
|
||||
GetConsumerThread()->PostTask(FROM_HERE,
|
||||
new SocketRequestClosingTask(this));
|
||||
return -1;
|
||||
}
|
||||
|
||||
@ -59,14 +59,14 @@ DataSocketIO::ReceiveData(int aFd)
|
||||
if (res < 0) {
|
||||
/* an I/O error occured */
|
||||
DiscardBuffer();
|
||||
GetConsumerThread()->Dispatch(new SocketIORequestClosingRunnable(this),
|
||||
NS_DISPATCH_NORMAL);
|
||||
GetConsumerThread()->PostTask(FROM_HERE,
|
||||
new SocketRequestClosingTask(this));
|
||||
return -1;
|
||||
} else if (!res) {
|
||||
/* EOF or peer shut down sending */
|
||||
DiscardBuffer();
|
||||
GetConsumerThread()->Dispatch(new SocketIORequestClosingRunnable(this),
|
||||
NS_DISPATCH_NORMAL);
|
||||
GetConsumerThread()->PostTask(FROM_HERE,
|
||||
new SocketRequestClosingTask(this));
|
||||
return 0;
|
||||
}
|
||||
|
||||
@ -93,8 +93,8 @@ DataSocketIO::SendPendingData(int aFd)
|
||||
ssize_t res = outgoing->Send(aFd);
|
||||
if (res < 0) {
|
||||
/* an I/O error occured */
|
||||
GetConsumerThread()->Dispatch(new SocketIORequestClosingRunnable(this),
|
||||
NS_DISPATCH_NORMAL);
|
||||
GetConsumerThread()->PostTask(FROM_HERE,
|
||||
new SocketRequestClosingTask(this));
|
||||
return NS_ERROR_FAILURE;
|
||||
} else if (!res && outgoing->GetSize()) {
|
||||
/* I/O is currently blocked; try again later */
|
||||
@ -109,8 +109,8 @@ DataSocketIO::SendPendingData(int aFd)
|
||||
return NS_OK;
|
||||
}
|
||||
|
||||
DataSocketIO::DataSocketIO(nsIThread* aConsumerThread)
|
||||
: SocketIOBase(aConsumerThread)
|
||||
DataSocketIO::DataSocketIO(MessageLoop* aConsumerLoop)
|
||||
: SocketIOBase(aConsumerLoop)
|
||||
{ }
|
||||
|
||||
//
|
||||
|
@ -10,6 +10,7 @@
|
||||
#define mozilla_ipc_datasocket_h
|
||||
|
||||
#include "mozilla/ipc/SocketBase.h"
|
||||
#include "nsTArray.h"
|
||||
|
||||
namespace mozilla {
|
||||
namespace ipc {
|
||||
@ -90,7 +91,7 @@ public:
|
||||
nsresult SendPendingData(int aFd);
|
||||
|
||||
protected:
|
||||
DataSocketIO(nsIThread* aConsumerThread);
|
||||
DataSocketIO(MessageLoop* aConsumerLoop);
|
||||
|
||||
private:
|
||||
/**
|
||||
|
@ -27,7 +27,7 @@ class ListenSocketIO final
|
||||
public:
|
||||
class ListenTask;
|
||||
|
||||
ListenSocketIO(nsIThread* aConsumerThread,
|
||||
ListenSocketIO(MessageLoop* aConsumerLoop,
|
||||
MessageLoop* aIOLoop,
|
||||
ListenSocket* aListenSocket,
|
||||
UnixSocketConnector* aConnector);
|
||||
@ -95,12 +95,12 @@ private:
|
||||
ConnectionOrientedSocketIO* mCOSocketIO;
|
||||
};
|
||||
|
||||
ListenSocketIO::ListenSocketIO(nsIThread* aConsumerThread,
|
||||
ListenSocketIO::ListenSocketIO(MessageLoop* aConsumerLoop,
|
||||
MessageLoop* aIOLoop,
|
||||
ListenSocket* aListenSocket,
|
||||
UnixSocketConnector* aConnector)
|
||||
: UnixSocketWatcher(aIOLoop)
|
||||
, SocketIOBase(aConsumerThread)
|
||||
, SocketIOBase(aConsumerLoop)
|
||||
, mListenSocket(aListenSocket)
|
||||
, mConnector(aConnector)
|
||||
, mShuttingDownOnIOThread(false)
|
||||
@ -168,9 +168,8 @@ ListenSocketIO::OnListening()
|
||||
AddWatchers(READ_WATCHER, true);
|
||||
|
||||
/* We signal a successful 'connection' to a local address for listening. */
|
||||
GetConsumerThread()->Dispatch(
|
||||
new SocketIOEventRunnable(this, SocketIOEventRunnable::CONNECT_SUCCESS),
|
||||
NS_DISPATCH_NORMAL);
|
||||
GetConsumerThread()->PostTask(
|
||||
FROM_HERE, new SocketEventTask(this, SocketEventTask::CONNECT_SUCCESS));
|
||||
}
|
||||
|
||||
void
|
||||
@ -191,9 +190,8 @@ ListenSocketIO::FireSocketError()
|
||||
Close();
|
||||
|
||||
// Tell the consumer thread we've errored
|
||||
GetConsumerThread()->Dispatch(
|
||||
new SocketIOEventRunnable(this, SocketIOEventRunnable::CONNECT_ERROR),
|
||||
NS_DISPATCH_NORMAL);
|
||||
GetConsumerThread()->PostTask(
|
||||
FROM_HERE, new SocketEventTask(this, SocketEventTask::CONNECT_ERROR));
|
||||
}
|
||||
|
||||
void
|
||||
@ -311,13 +309,13 @@ ListenSocket::~ListenSocket()
|
||||
|
||||
nsresult
|
||||
ListenSocket::Listen(UnixSocketConnector* aConnector,
|
||||
nsIThread* aConsumerThread,
|
||||
MessageLoop* aConsumerLoop,
|
||||
MessageLoop* aIOLoop,
|
||||
ConnectionOrientedSocket* aCOSocket)
|
||||
{
|
||||
MOZ_ASSERT(!mIO);
|
||||
|
||||
mIO = new ListenSocketIO(aConsumerThread, aIOLoop, this, aConnector);
|
||||
mIO = new ListenSocketIO(aConsumerLoop, aIOLoop, this, aConnector);
|
||||
|
||||
// Prepared I/O object, now start listening.
|
||||
nsresult rv = Listen(aCOSocket);
|
||||
@ -334,13 +332,8 @@ nsresult
|
||||
ListenSocket::Listen(UnixSocketConnector* aConnector,
|
||||
ConnectionOrientedSocket* aCOSocket)
|
||||
{
|
||||
nsIThread* consumerThread = nullptr;
|
||||
nsresult rv = NS_GetCurrentThread(&consumerThread);
|
||||
if (NS_FAILED(rv)) {
|
||||
return rv;
|
||||
}
|
||||
|
||||
return Listen(aConnector, consumerThread, XRE_GetIOMessageLoop(), aCOSocket);
|
||||
return Listen(aConnector, MessageLoop::current(), XRE_GetIOMessageLoop(),
|
||||
aCOSocket);
|
||||
}
|
||||
|
||||
nsresult
|
||||
|
@ -11,7 +11,6 @@
|
||||
#include "mozilla/ipc/SocketBase.h"
|
||||
|
||||
class MessageLoop;
|
||||
class nsIThread;
|
||||
|
||||
namespace mozilla {
|
||||
namespace ipc {
|
||||
@ -37,14 +36,14 @@ public:
|
||||
* in a non-blocking manner.
|
||||
*
|
||||
* @param aConnector Connector object for socket-type-specific functions
|
||||
* @param aConsumerThread The socket's consumer thread.
|
||||
* @param aConsumerLoop The socket's consumer thread.
|
||||
* @param aIOLoop The socket's I/O thread.
|
||||
* @param aCOSocket The connection-oriented socket for handling the
|
||||
* accepted connection.
|
||||
* @return NS_OK on success, or an XPCOM error code otherwise.
|
||||
*/
|
||||
nsresult Listen(UnixSocketConnector* aConnector,
|
||||
nsIThread* aConsumerThread,
|
||||
MessageLoop* aConsumerLoop,
|
||||
MessageLoop* aIOLoop,
|
||||
ConnectionOrientedSocket* aCOSocket);
|
||||
|
||||
|
@ -254,52 +254,47 @@ SocketBase::SetConnectionStatus(SocketConnectionStatus aConnectionStatus)
|
||||
// SocketIOBase
|
||||
//
|
||||
|
||||
SocketIOBase::SocketIOBase(nsIThread* aConsumerThread)
|
||||
: mConsumerThread(aConsumerThread)
|
||||
SocketIOBase::SocketIOBase(MessageLoop* aConsumerLoop)
|
||||
: mConsumerLoop(aConsumerLoop)
|
||||
{
|
||||
MOZ_ASSERT(mConsumerThread);
|
||||
MOZ_ASSERT(mConsumerLoop);
|
||||
}
|
||||
|
||||
SocketIOBase::~SocketIOBase()
|
||||
{ }
|
||||
|
||||
nsIThread*
|
||||
MessageLoop*
|
||||
SocketIOBase::GetConsumerThread() const
|
||||
{
|
||||
return mConsumerThread;
|
||||
return mConsumerLoop;
|
||||
}
|
||||
|
||||
bool
|
||||
SocketIOBase::IsConsumerThread() const
|
||||
{
|
||||
nsIThread* thread = nullptr;
|
||||
if (NS_FAILED(NS_GetCurrentThread(&thread))) {
|
||||
return false;
|
||||
}
|
||||
return thread == GetConsumerThread();
|
||||
return GetConsumerThread() == MessageLoop::current();
|
||||
}
|
||||
|
||||
//
|
||||
// SocketIOEventRunnable
|
||||
// SocketEventTask
|
||||
//
|
||||
|
||||
SocketIOEventRunnable::SocketIOEventRunnable(SocketIOBase* aIO,
|
||||
SocketEvent aEvent)
|
||||
: SocketIORunnable<SocketIOBase>(aIO)
|
||||
SocketEventTask::SocketEventTask(SocketIOBase* aIO, SocketEvent aEvent)
|
||||
: SocketTask<SocketIOBase>(aIO)
|
||||
, mEvent(aEvent)
|
||||
{ }
|
||||
|
||||
NS_METHOD
|
||||
SocketIOEventRunnable::Run()
|
||||
void
|
||||
SocketEventTask::Run()
|
||||
{
|
||||
SocketIOBase* io = SocketIORunnable<SocketIOBase>::GetIO();
|
||||
SocketIOBase* io = SocketTask<SocketIOBase>::GetIO();
|
||||
|
||||
MOZ_ASSERT(io->IsConsumerThread());
|
||||
|
||||
if (NS_WARN_IF(io->IsShutdownOnConsumerThread())) {
|
||||
// Since we've already explicitly closed and the close
|
||||
// happened before this, this isn't really an error.
|
||||
return NS_OK;
|
||||
return;
|
||||
}
|
||||
|
||||
SocketBase* socketBase = io->GetSocketBase();
|
||||
@ -312,55 +307,49 @@ SocketIOEventRunnable::Run()
|
||||
} else if (mEvent == DISCONNECT) {
|
||||
socketBase->NotifyDisconnect();
|
||||
}
|
||||
|
||||
return NS_OK;
|
||||
}
|
||||
|
||||
//
|
||||
// SocketIORequestClosingRunnable
|
||||
// SocketRequestClosingTask
|
||||
//
|
||||
|
||||
SocketIORequestClosingRunnable::SocketIORequestClosingRunnable(
|
||||
SocketRequestClosingTask::SocketRequestClosingTask(
|
||||
SocketIOBase* aIO)
|
||||
: SocketIORunnable<SocketIOBase>(aIO)
|
||||
: SocketTask<SocketIOBase>(aIO)
|
||||
{ }
|
||||
|
||||
NS_METHOD
|
||||
SocketIORequestClosingRunnable::Run()
|
||||
void
|
||||
SocketRequestClosingTask::Run()
|
||||
{
|
||||
SocketIOBase* io = SocketIORunnable<SocketIOBase>::GetIO();
|
||||
SocketIOBase* io = SocketTask<SocketIOBase>::GetIO();
|
||||
|
||||
MOZ_ASSERT(io->IsConsumerThread());
|
||||
|
||||
if (NS_WARN_IF(io->IsShutdownOnConsumerThread())) {
|
||||
// Since we've already explicitly closed and the close
|
||||
// happened before this, this isn't really an error.
|
||||
return NS_OK;
|
||||
return;
|
||||
}
|
||||
|
||||
SocketBase* socketBase = io->GetSocketBase();
|
||||
MOZ_ASSERT(socketBase);
|
||||
|
||||
socketBase->Close();
|
||||
|
||||
return NS_OK;
|
||||
}
|
||||
|
||||
//
|
||||
// SocketIODeleteInstanceRunnable
|
||||
// SocketDeleteInstanceTask
|
||||
//
|
||||
|
||||
SocketIODeleteInstanceRunnable::SocketIODeleteInstanceRunnable(
|
||||
SocketDeleteInstanceTask::SocketDeleteInstanceTask(
|
||||
SocketIOBase* aIO)
|
||||
: mIO(aIO)
|
||||
{ }
|
||||
|
||||
NS_METHOD
|
||||
SocketIODeleteInstanceRunnable::Run()
|
||||
void
|
||||
SocketDeleteInstanceTask::Run()
|
||||
{
|
||||
mIO = nullptr; // delete instance
|
||||
|
||||
return NS_OK;
|
||||
}
|
||||
|
||||
//
|
||||
@ -385,8 +374,8 @@ SocketIOShutdownTask::Run()
|
||||
// shut down, so we can send a message to the consumer thread to
|
||||
// delete |io| safely knowing that it's not reference any longer.
|
||||
io->ShutdownOnIOThread();
|
||||
io->GetConsumerThread()->Dispatch(new SocketIODeleteInstanceRunnable(io),
|
||||
NS_DISPATCH_NORMAL);
|
||||
io->GetConsumerThread()->PostTask(FROM_HERE,
|
||||
new SocketDeleteInstanceTask(io));
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -11,8 +11,6 @@
|
||||
|
||||
#include "base/message_loop.h"
|
||||
#include "nsAutoPtr.h"
|
||||
#include "nsTArray.h"
|
||||
#include "nsThreadUtils.h"
|
||||
|
||||
namespace mozilla {
|
||||
namespace ipc {
|
||||
@ -382,33 +380,33 @@ public:
|
||||
*
|
||||
* @return A pointer to the consumer thread.
|
||||
*/
|
||||
nsIThread* GetConsumerThread() const;
|
||||
MessageLoop* GetConsumerThread() const;
|
||||
|
||||
/**
|
||||
* @return True if the current thread is thre consumer thread, or false
|
||||
* @return True if the current thread is the consumer thread, or false
|
||||
* otherwise.
|
||||
*/
|
||||
bool IsConsumerThread() const;
|
||||
|
||||
protected:
|
||||
SocketIOBase(nsIThread* nsConsumerThread);
|
||||
SocketIOBase(MessageLoop* aConsumerLoop);
|
||||
|
||||
private:
|
||||
nsCOMPtr<nsIThread> mConsumerThread;
|
||||
MessageLoop* mConsumerLoop;
|
||||
};
|
||||
|
||||
//
|
||||
// Socket I/O runnables
|
||||
// Socket tasks
|
||||
//
|
||||
|
||||
/* |SocketIORunnable| is a runnable for sending a message from
|
||||
/* |SocketTask| is a task for sending a message from
|
||||
* the I/O thread to the consumer thread.
|
||||
*/
|
||||
template <typename T>
|
||||
class SocketIORunnable : public nsRunnable
|
||||
class SocketTask : public Task
|
||||
{
|
||||
public:
|
||||
virtual ~SocketIORunnable()
|
||||
virtual ~SocketTask()
|
||||
{ }
|
||||
|
||||
T* GetIO() const
|
||||
@ -417,8 +415,8 @@ public:
|
||||
}
|
||||
|
||||
protected:
|
||||
SocketIORunnable(T* aIO)
|
||||
: mIO(aIO)
|
||||
SocketTask(T* aIO)
|
||||
: mIO(aIO)
|
||||
{
|
||||
MOZ_ASSERT(aIO);
|
||||
}
|
||||
@ -428,10 +426,10 @@ private:
|
||||
};
|
||||
|
||||
/**
|
||||
* |SocketIOEventRunnable| reports the connection state on the
|
||||
* |SocketEventTask| reports the connection state on the
|
||||
* I/O thread back to the consumer thread.
|
||||
*/
|
||||
class SocketIOEventRunnable final : public SocketIORunnable<SocketIOBase>
|
||||
class SocketEventTask final : public SocketTask<SocketIOBase>
|
||||
{
|
||||
public:
|
||||
enum SocketEvent {
|
||||
@ -440,36 +438,35 @@ public:
|
||||
DISCONNECT
|
||||
};
|
||||
|
||||
SocketIOEventRunnable(SocketIOBase* aIO, SocketEvent aEvent);
|
||||
SocketEventTask(SocketIOBase* aIO, SocketEvent aEvent);
|
||||
|
||||
NS_IMETHOD Run() override;
|
||||
void Run() override;
|
||||
|
||||
private:
|
||||
SocketEvent mEvent;
|
||||
};
|
||||
|
||||
/**
|
||||
* |SocketIORequestClosingRunnable| closes an instance of |SocketBase|
|
||||
* to the consumer thread.
|
||||
* |SocketRequestClosingTask| closes an instance of |SocketBase|
|
||||
* on the consumer thread.
|
||||
*/
|
||||
class SocketIORequestClosingRunnable final
|
||||
: public SocketIORunnable<SocketIOBase>
|
||||
class SocketRequestClosingTask final : public SocketTask<SocketIOBase>
|
||||
{
|
||||
public:
|
||||
SocketIORequestClosingRunnable(SocketIOBase* aIO);
|
||||
SocketRequestClosingTask(SocketIOBase* aIO);
|
||||
|
||||
NS_IMETHOD Run() override;
|
||||
void Run() override;
|
||||
};
|
||||
|
||||
/**
|
||||
* |SocketIODeleteInstanceRunnable| deletes an object on the consumer thread.
|
||||
* |SocketDeleteInstanceTask| deletes an object on the consumer thread.
|
||||
*/
|
||||
class SocketIODeleteInstanceRunnable final : public nsRunnable
|
||||
class SocketDeleteInstanceTask final : public Task
|
||||
{
|
||||
public:
|
||||
SocketIODeleteInstanceRunnable(SocketIOBase* aIO);
|
||||
SocketDeleteInstanceTask(SocketIOBase* aIO);
|
||||
|
||||
NS_IMETHOD Run() override;
|
||||
void Run() override;
|
||||
|
||||
private:
|
||||
nsAutoPtr<SocketIOBase> mIO;
|
||||
|
@ -25,15 +25,15 @@ class StreamSocketIO final : public ConnectionOrientedSocketIO
|
||||
public:
|
||||
class ConnectTask;
|
||||
class DelayedConnectTask;
|
||||
class ReceiveRunnable;
|
||||
class ReceiveTask;
|
||||
|
||||
StreamSocketIO(nsIThread* aConsumerThread,
|
||||
MessageLoop* mIOLoop,
|
||||
StreamSocketIO(MessageLoop* aConsumerLoop,
|
||||
MessageLoop* aIOLoop,
|
||||
StreamSocket* aStreamSocket,
|
||||
UnixSocketConnector* aConnector);
|
||||
StreamSocketIO(nsIThread* aConsumerThread,
|
||||
MessageLoop* mIOLoop, int aFd,
|
||||
ConnectionStatus aConnectionStatus,
|
||||
StreamSocketIO(MessageLoop* aConsumerLoop,
|
||||
MessageLoop* aIOLoop,
|
||||
int aFd, ConnectionStatus aConnectionStatus,
|
||||
StreamSocket* aStreamSocket,
|
||||
UnixSocketConnector* aConnector);
|
||||
~StreamSocketIO();
|
||||
@ -91,11 +91,11 @@ private:
|
||||
nsAutoPtr<UnixSocketRawData> mBuffer;
|
||||
};
|
||||
|
||||
StreamSocketIO::StreamSocketIO(nsIThread* aConsumerThread,
|
||||
StreamSocketIO::StreamSocketIO(MessageLoop* aConsumerLoop,
|
||||
MessageLoop* aIOLoop,
|
||||
StreamSocket* aStreamSocket,
|
||||
UnixSocketConnector* aConnector)
|
||||
: ConnectionOrientedSocketIO(aConsumerThread, aIOLoop, aConnector)
|
||||
: ConnectionOrientedSocketIO(aConsumerLoop, aIOLoop, aConnector)
|
||||
, mStreamSocket(aStreamSocket)
|
||||
, mShuttingDownOnIOThread(false)
|
||||
, mDelayedConnectTask(nullptr)
|
||||
@ -103,12 +103,12 @@ StreamSocketIO::StreamSocketIO(nsIThread* aConsumerThread,
|
||||
MOZ_ASSERT(mStreamSocket);
|
||||
}
|
||||
|
||||
StreamSocketIO::StreamSocketIO(nsIThread* aConsumerThread,
|
||||
StreamSocketIO::StreamSocketIO(MessageLoop* aConsumerLoop,
|
||||
MessageLoop* aIOLoop,
|
||||
int aFd, ConnectionStatus aConnectionStatus,
|
||||
StreamSocket* aStreamSocket,
|
||||
UnixSocketConnector* aConnector)
|
||||
: ConnectionOrientedSocketIO(aConsumerThread,
|
||||
: ConnectionOrientedSocketIO(aConsumerLoop,
|
||||
aIOLoop,
|
||||
aFd,
|
||||
aConnectionStatus,
|
||||
@ -183,36 +183,33 @@ StreamSocketIO::QueryReceiveBuffer(UnixSocketIOBuffer** aBuffer)
|
||||
}
|
||||
|
||||
/**
|
||||
* |ReceiveRunnable| transfers data received on the I/O thread
|
||||
* |ReceiveTask| transfers data received on the I/O thread
|
||||
* to an instance of |StreamSocket| on the consumer thread.
|
||||
*/
|
||||
class StreamSocketIO::ReceiveRunnable final
|
||||
: public SocketIORunnable<StreamSocketIO>
|
||||
class StreamSocketIO::ReceiveTask final : public SocketTask<StreamSocketIO>
|
||||
{
|
||||
public:
|
||||
ReceiveRunnable(StreamSocketIO* aIO, UnixSocketBuffer* aBuffer)
|
||||
: SocketIORunnable<StreamSocketIO>(aIO)
|
||||
ReceiveTask(StreamSocketIO* aIO, UnixSocketBuffer* aBuffer)
|
||||
: SocketTask<StreamSocketIO>(aIO)
|
||||
, mBuffer(aBuffer)
|
||||
{ }
|
||||
|
||||
NS_IMETHOD Run() override
|
||||
void Run() override
|
||||
{
|
||||
StreamSocketIO* io = SocketIORunnable<StreamSocketIO>::GetIO();
|
||||
StreamSocketIO* io = SocketTask<StreamSocketIO>::GetIO();
|
||||
|
||||
MOZ_ASSERT(io->IsConsumerThread());
|
||||
|
||||
if (NS_WARN_IF(io->IsShutdownOnConsumerThread())) {
|
||||
// Since we've already explicitly closed and the close
|
||||
// happened before this, this isn't really an error.
|
||||
return NS_OK;
|
||||
return;
|
||||
}
|
||||
|
||||
StreamSocket* streamSocket = io->GetStreamSocket();
|
||||
MOZ_ASSERT(streamSocket);
|
||||
|
||||
streamSocket->ReceiveSocketData(mBuffer);
|
||||
|
||||
return NS_OK;
|
||||
}
|
||||
|
||||
private:
|
||||
@ -222,8 +219,8 @@ private:
|
||||
void
|
||||
StreamSocketIO::ConsumeBuffer()
|
||||
{
|
||||
GetConsumerThread()->Dispatch(new ReceiveRunnable(this, mBuffer.forget()),
|
||||
NS_DISPATCH_NORMAL);
|
||||
GetConsumerThread()->PostTask(FROM_HERE,
|
||||
new ReceiveTask(this, mBuffer.forget()));
|
||||
}
|
||||
|
||||
void
|
||||
@ -345,11 +342,11 @@ StreamSocket::ReceiveSocketData(nsAutoPtr<UnixSocketBuffer>& aBuffer)
|
||||
|
||||
nsresult
|
||||
StreamSocket::Connect(UnixSocketConnector* aConnector, int aDelayMs,
|
||||
nsIThread* aConsumerThread, MessageLoop* aIOLoop)
|
||||
MessageLoop* aConsumerLoop, MessageLoop* aIOLoop)
|
||||
{
|
||||
MOZ_ASSERT(!mIO);
|
||||
|
||||
mIO = new StreamSocketIO(aConsumerThread, aIOLoop, this, aConnector);
|
||||
mIO = new StreamSocketIO(aConsumerLoop, aIOLoop, this, aConnector);
|
||||
SetConnectionStatus(SOCKET_CONNECTING);
|
||||
|
||||
if (aDelayMs > 0) {
|
||||
@ -367,20 +364,15 @@ StreamSocket::Connect(UnixSocketConnector* aConnector, int aDelayMs,
|
||||
nsresult
|
||||
StreamSocket::Connect(UnixSocketConnector* aConnector, int aDelayMs)
|
||||
{
|
||||
nsIThread* consumerThread = nullptr;
|
||||
nsresult rv = NS_GetCurrentThread(&consumerThread);
|
||||
if (NS_FAILED(rv)) {
|
||||
return rv;
|
||||
}
|
||||
|
||||
return Connect(aConnector, aDelayMs, consumerThread, XRE_GetIOMessageLoop());
|
||||
return Connect(aConnector, aDelayMs,
|
||||
MessageLoop::current(), XRE_GetIOMessageLoop());
|
||||
}
|
||||
|
||||
// |ConnectionOrientedSocket|
|
||||
|
||||
nsresult
|
||||
StreamSocket::PrepareAccept(UnixSocketConnector* aConnector,
|
||||
nsIThread* aConsumerThread,
|
||||
MessageLoop* aConsumerLoop,
|
||||
MessageLoop* aIOLoop,
|
||||
ConnectionOrientedSocketIO*& aIO)
|
||||
{
|
||||
@ -389,7 +381,7 @@ StreamSocket::PrepareAccept(UnixSocketConnector* aConnector,
|
||||
|
||||
SetConnectionStatus(SOCKET_CONNECTING);
|
||||
|
||||
mIO = new StreamSocketIO(aConsumerThread, aIOLoop,
|
||||
mIO = new StreamSocketIO(aConsumerLoop, aIOLoop,
|
||||
-1, UnixSocketWatcher::SOCKET_IS_CONNECTING,
|
||||
this, aConnector);
|
||||
aIO = mIO;
|
||||
|
@ -42,12 +42,12 @@ public:
|
||||
*
|
||||
* @param aConnector Connector object for socket type specific functions
|
||||
* @param aDelayMs Time delay in milliseconds.
|
||||
* @param aConsumerThread The socket's consumer thread.
|
||||
* @param aConsumerLoop The socket's consumer thread.
|
||||
* @param aIOLoop The socket's I/O thread.
|
||||
* @return NS_OK on success, or an XPCOM error code otherwise.
|
||||
*/
|
||||
nsresult Connect(UnixSocketConnector* aConnector, int aDelayMs,
|
||||
nsIThread* aConsumerThread, MessageLoop* aIOLoop);
|
||||
MessageLoop* aConsumerLoop, MessageLoop* aIOLoop);
|
||||
|
||||
/**
|
||||
* Starts a task on the socket that will try to connect to a socket in a
|
||||
@ -63,7 +63,7 @@ public:
|
||||
//
|
||||
|
||||
nsresult PrepareAccept(UnixSocketConnector* aConnector,
|
||||
nsIThread* aConsumerThread,
|
||||
MessageLoop* aConsumerLoop,
|
||||
MessageLoop* aIOLoop,
|
||||
ConnectionOrientedSocketIO*& aIO) override;
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user