Bug 1166638: Use |StreamSocketConsumer| in |StreamSocket|, r=kmachulis, dlee, chucklee, htsai

This patch converts |StreamSocket| to forward events and data to an
instance of |StreamSocketConsumer|. All users are converted and the
related listener and consumer classes are removed.
This commit is contained in:
Thomas Zimmermann 2015-05-26 13:24:20 +02:00
parent 6dc6f61bb6
commit f07de924ac
10 changed files with 246 additions and 229 deletions

View File

@ -27,6 +27,36 @@ using namespace android;
using namespace mozilla::dom;
using namespace mozilla::ipc;
namespace {
class SendNfcSocketDataTask final : public nsRunnable
{
public:
SendNfcSocketDataTask(StreamSocket* aSocket, UnixSocketRawData* aRawData)
: mSocket(aSocket)
, mRawData(aRawData)
{ }
NS_IMETHOD Run()
{
MOZ_ASSERT(NS_IsMainThread());
if (!mSocket || mSocket->GetConnectionStatus() != SOCKET_CONNECTED) {
// Probably shutting down.
return NS_OK;
}
mSocket->SendSocketData(mRawData.forget());
return NS_OK;
}
private:
nsRefPtr<StreamSocket> mSocket;
nsAutoPtr<UnixSocketRawData> mRawData;
};
} // anonymous namespace
namespace mozilla {
static NfcService* gNfcService;
@ -41,11 +71,15 @@ assertIsNfcServiceThread()
}
// Runnable used to call Marshall on the NFC thread.
class NfcCommandRunnable : public nsRunnable
class NfcCommandRunnable final : public nsRunnable
{
public:
NfcCommandRunnable(NfcMessageHandler* aHandler, NfcConsumer* aConsumer, CommandOptions aOptions)
: mHandler(aHandler), mConsumer(aConsumer), mOptions(aOptions)
NfcCommandRunnable(NfcMessageHandler* aHandler,
NfcService* aService,
CommandOptions aOptions)
: mHandler(aHandler)
, mService(aService)
, mOptions(aOptions)
{
MOZ_ASSERT(NS_IsMainThread());
}
@ -60,13 +94,13 @@ public:
parcel.setDataPosition(0);
uint32_t sizeBE = htonl(parcel.dataSize() - sizeof(int));
parcel.writeInt32(sizeBE);
mConsumer->PostToNfcDaemon(parcel.data(), parcel.dataSize());
mService->PostToNfcDaemon(parcel.data(), parcel.dataSize());
return NS_OK;
}
private:
NfcMessageHandler* mHandler;
NfcConsumer* mConsumer;
NfcService* mService;
CommandOptions mOptions;
};
@ -307,15 +341,15 @@ NfcService::Start(nsINfcGonkEventListener* aListener)
mListener = aListener;
mHandler = new NfcMessageHandler();
mConsumer = new NfcConsumer(this);
mStreamSocket = new StreamSocket(this, NfcSocketListener::STREAM_SOCKET);
mListenSocketName = BASE_SOCKET_NAME;
mListenSocket = new NfcListenSocket(this);
nsresult rv = mListenSocket->Listen(new NfcConnector(mListenSocketName),
mConsumer);
mStreamSocket);
if (NS_FAILED(rv)) {
mConsumer = nullptr;
mStreamSocket = nullptr;
return rv;
}
@ -324,8 +358,8 @@ NfcService::Start(nsINfcGonkEventListener* aListener)
NS_WARNING("Can't create Nfc worker thread.");
mListenSocket->Close();
mListenSocket = nullptr;
mConsumer->Shutdown();
mConsumer = nullptr;
mStreamSocket->Close();
mStreamSocket = nullptr;
return NS_ERROR_FAILURE;
}
@ -344,13 +378,24 @@ NfcService::Shutdown()
mListenSocket->Close();
mListenSocket = nullptr;
mConsumer->Shutdown();
mConsumer = nullptr;
mStreamSocket->Close();
mStreamSocket = nullptr;
return NS_OK;
}
bool
NfcService::PostToNfcDaemon(const uint8_t* aData, size_t aSize)
{
MOZ_ASSERT(!NS_IsMainThread());
UnixSocketRawData* raw = new UnixSocketRawData(aData, aSize);
nsRefPtr<SendNfcSocketDataTask> task =
new SendNfcSocketDataTask(mStreamSocket, raw);
NS_DispatchToMainThread(task);
return true;
}
NS_IMETHODIMP
NfcService::SendCommand(JS::HandleValue aOptions, JSContext* aCx)
{
@ -365,7 +410,8 @@ NfcService::SendCommand(JS::HandleValue aOptions, JSContext* aCx)
// Dispatch the command to the NFC thread.
CommandOptions commandOptions(options);
nsCOMPtr<nsIRunnable> runnable = new NfcCommandRunnable(mHandler, mConsumer, commandOptions);
nsCOMPtr<nsIRunnable> runnable = new NfcCommandRunnable(mHandler, this,
commandOptions);
mThread->Dispatch(runnable, nsIEventTarget::DISPATCH_NORMAL);
return NS_OK;
}
@ -428,6 +474,33 @@ NfcService::OnDisconnect(enum SocketType aSocketType)
MOZ_ASSERT(NS_IsMainThread());
}
// |StreamSocketConsumer|
void
NfcService::ReceiveSocketData(
int aIndex, nsAutoPtr<mozilla::ipc::UnixSocketBuffer>& aBuffer)
{
ReceiveSocketData(aBuffer);
}
void
NfcService::OnConnectSuccess(int aIndex)
{
OnConnectSuccess(static_cast<enum SocketType>(aIndex));
}
void
NfcService::OnConnectError(int aIndex)
{
OnConnectSuccess(static_cast<enum SocketType>(aIndex));
}
void
NfcService::OnDisconnect(int aIndex)
{
OnConnectSuccess(static_cast<enum SocketType>(aIndex));
}
NS_GENERIC_FACTORY_SINGLETON_CONSTRUCTOR(NfcService,
NfcService::FactoryCreate)

View File

@ -9,6 +9,7 @@
#include "mozilla/ipc/Nfc.h"
#include "mozilla/ipc/SocketBase.h"
#include "mozilla/ipc/StreamSocketConsumer.h"
#include "nsCOMPtr.h"
#include "nsINfcService.h"
#include "NfcMessageHandler.h"
@ -20,8 +21,10 @@ namespace dom {
class NfcEventOptions;
} // namespace dom
class NfcService final : public nsINfcService,
public mozilla::ipc::NfcSocketListener
class NfcService final
: public nsINfcService
, public mozilla::ipc::StreamSocketConsumer
, public mozilla::ipc::NfcSocketListener
{
public:
NS_DECL_ISUPPORTS
@ -31,6 +34,8 @@ public:
void DispatchNfcEvent(const mozilla::dom::NfcEventOptions& aOptions);
bool PostToNfcDaemon(const uint8_t* aData, size_t aSize);
virtual void ReceiveSocketData(
nsAutoPtr<mozilla::ipc::UnixSocketBuffer>& aBuffer) override;
@ -42,6 +47,15 @@ public:
return mThread;
}
// Methods for |StreamSocketConsumer|
//
void ReceiveSocketData(
int aIndex, nsAutoPtr<mozilla::ipc::UnixSocketBuffer>& aBuffer) override;
void OnConnectSuccess(int aIndex) override;
void OnConnectError(int aIndex) override;
void OnDisconnect(int aIndex) override;
private:
NfcService();
~NfcService();
@ -49,7 +63,7 @@ private:
nsCOMPtr<nsIThread> mThread;
nsCOMPtr<nsINfcGonkEventListener> mListener;
nsRefPtr<mozilla::ipc::NfcListenSocket> mListenSocket;
nsRefPtr<mozilla::ipc::NfcConsumer> mConsumer;
nsRefPtr<mozilla::ipc::StreamSocket> mStreamSocket;
nsAutoPtr<NfcMessageHandler> mHandler;
nsCString mListenSocketName;
};

View File

@ -704,47 +704,6 @@ KeyStore::ListenSocket::OnDisconnect()
mKeyStore->OnDisconnect(LISTEN_SOCKET);
}
//
// KeyStore::StreamSocket
//
KeyStore::StreamSocket::StreamSocket(KeyStore* aKeyStore)
: mKeyStore(aKeyStore)
{
MOZ_ASSERT(mKeyStore);
MOZ_COUNT_CTOR(KeyStore::StreamSocket);
}
KeyStore::StreamSocket::~StreamSocket()
{
MOZ_COUNT_DTOR(KeyStore::StreamSocket);
}
void
KeyStore::StreamSocket::OnConnectSuccess()
{
mKeyStore->OnConnectSuccess(STREAM_SOCKET);
}
void
KeyStore::StreamSocket::OnConnectError()
{
mKeyStore->OnConnectError(STREAM_SOCKET);
}
void
KeyStore::StreamSocket::OnDisconnect()
{
mKeyStore->OnDisconnect(STREAM_SOCKET);
}
void
KeyStore::StreamSocket::ReceiveSocketData(nsAutoPtr<UnixSocketBuffer>& aBuffer)
{
mKeyStore->ReceiveSocketData(aBuffer);
}
//
// KeyStore
//
@ -795,7 +754,7 @@ KeyStore::Listen()
if (mStreamSocket) {
mStreamSocket->Close();
} else {
mStreamSocket = new StreamSocket(this);
mStreamSocket = new StreamSocket(this, STREAM_SOCKET);
}
if (!mListenSocket) {
@ -1053,5 +1012,31 @@ KeyStore::OnDisconnect(SocketType aSocketType)
}
}
// |StreamSocketConsumer|
void
KeyStore::ReceiveSocketData(int aIndex, nsAutoPtr<UnixSocketBuffer>& aMessage)
{
ReceiveSocketData(aMessage);
}
void
KeyStore::OnConnectSuccess(int aIndex)
{
OnConnectSuccess(static_cast<SocketType>(aIndex));
}
void
KeyStore::OnConnectError(int aIndex)
{
OnConnectError(static_cast<SocketType>(aIndex));
}
void
KeyStore::OnDisconnect(int aIndex)
{
OnDisconnect(static_cast<SocketType>(aIndex));
}
} // namespace ipc
} // namespace mozilla

View File

@ -12,6 +12,7 @@
#include "cert.h"
#include "mozilla/ipc/ListenSocket.h"
#include "mozilla/ipc/StreamSocket.h"
#include "mozilla/ipc/StreamSocketConsumer.h"
#include "nsNSSShutDown.h"
namespace mozilla {
@ -78,7 +79,9 @@ typedef enum {
STATE_PROCESSING
} ProtocolHandlerState;
class KeyStore final : public nsNSSShutDownObject
class KeyStore final
: public StreamSocketConsumer
, public nsNSSShutDownObject
{
public:
NS_INLINE_DECL_THREADSAFE_REFCOUNTING(KeyStore)
@ -113,25 +116,6 @@ private:
KeyStore* mKeyStore;
};
class StreamSocket final : public mozilla::ipc::StreamSocket
{
public:
StreamSocket(KeyStore* aKeyStore);
~StreamSocket();
// SocketConsumerBase
//
void OnConnectSuccess() override;
void OnConnectError() override;
void OnDisconnect() override;
void ReceiveSocketData(nsAutoPtr<UnixSocketBuffer>& aBuffer) override;
private:
KeyStore* mKeyStore;
};
~KeyStore();
void ReceiveSocketData(nsAutoPtr<UnixSocketBuffer>& aMessage);
@ -157,6 +141,15 @@ private:
void SendResponse(ResponseCode response);
void SendData(const uint8_t *data, int length);
// Methods for |StreamSocketConsumer|
//
void ReceiveSocketData(int aIndex,
nsAutoPtr<UnixSocketBuffer>& aMessage) override;
void OnConnectSuccess(int aIndex) override;
void OnConnectError(int aIndex) override;
void OnDisconnect(int aIndex) override;
bool mShutdown;
nsRefPtr<ListenSocket> mListenSocket;

View File

@ -27,37 +27,6 @@
using namespace mozilla::ipc;
namespace {
class SendNfcSocketDataTask final : public nsRunnable
{
public:
SendNfcSocketDataTask(NfcConsumer* aConsumer, UnixSocketRawData* aRawData)
: mConsumer(aConsumer)
, mRawData(aRawData)
{ }
NS_IMETHOD Run()
{
MOZ_ASSERT(NS_IsMainThread());
if (!mConsumer ||
mConsumer->GetConnectionStatus() != SOCKET_CONNECTED) {
// Probably shuting down.
return NS_OK;
}
mConsumer->SendSocketData(mRawData.forget());
return NS_OK;
}
private:
NfcConsumer* mConsumer;
nsAutoPtr<UnixSocketRawData> mRawData;
};
} // anonymous namespace
namespace mozilla {
namespace ipc {
@ -93,73 +62,5 @@ NfcListenSocket::OnDisconnect()
}
}
//
// NfcConsumer
//
NfcConsumer::NfcConsumer(NfcSocketListener* aListener)
: mListener(aListener)
{ }
void
NfcConsumer::Shutdown()
{
MOZ_ASSERT(NS_IsMainThread());
Close();
}
bool
NfcConsumer::PostToNfcDaemon(const uint8_t* aData, size_t aSize)
{
MOZ_ASSERT(!NS_IsMainThread());
UnixSocketRawData* raw = new UnixSocketRawData(aData, aSize);
nsRefPtr<SendNfcSocketDataTask> task = new SendNfcSocketDataTask(this, raw);
NS_DispatchToMainThread(task);
return true;
}
void
NfcConsumer::ReceiveSocketData(nsAutoPtr<UnixSocketBuffer>& aBuffer)
{
MOZ_ASSERT(NS_IsMainThread());
if (mListener) {
mListener->ReceiveSocketData(aBuffer);
}
}
void
NfcConsumer::OnConnectSuccess()
{
CHROMIUM_LOG("NFC: %s\n", __FUNCTION__);
if (mListener) {
mListener->OnConnectSuccess(NfcSocketListener::STREAM_SOCKET);
}
// Nothing to do here.
}
void
NfcConsumer::OnConnectError()
{
CHROMIUM_LOG("NFC: %s\n", __FUNCTION__);
if (mListener) {
mListener->OnConnectError(NfcSocketListener::STREAM_SOCKET);
}
}
void
NfcConsumer::OnDisconnect()
{
CHROMIUM_LOG("NFC: %s\n", __FUNCTION__);
if (mListener) {
mListener->OnDisconnect(NfcSocketListener::STREAM_SOCKET);
}
}
} // namespace ipc
} // namespace mozilla

View File

@ -43,25 +43,6 @@ private:
NfcSocketListener* mListener;
};
class NfcConsumer final : public mozilla::ipc::StreamSocket
{
public:
NfcConsumer(NfcSocketListener* aListener);
void Shutdown();
bool PostToNfcDaemon(const uint8_t* aData, size_t aSize);
private:
void ReceiveSocketData(nsAutoPtr<UnixSocketBuffer>& aBuffer) override;
void OnConnectSuccess() override;
void OnConnectError() override;
void OnDisconnect() override;
private:
NfcSocketListener* mListener;
};
} // namespace ipc
} // namepsace mozilla

View File

@ -32,7 +32,7 @@ namespace {
static const char RIL_SOCKET_NAME[] = "/dev/socket/rilproxy";
static nsTArray<nsRefPtr<mozilla::ipc::RilConsumer> > sRilConsumers;
static nsTArray<nsAutoPtr<mozilla::ipc::RilConsumer>> sRilConsumers;
class ConnectWorkerToRIL final : public WorkerTask
{
@ -53,15 +53,13 @@ public:
{
MOZ_ASSERT(NS_IsMainThread());
if (sRilConsumers.Length() <= mClientId ||
!sRilConsumers[mClientId] ||
sRilConsumers[mClientId]->GetConnectionStatus() != SOCKET_CONNECTED) {
// Probably shuting down.
if (sRilConsumers.Length() <= mClientId || !sRilConsumers[mClientId]) {
// Probably shutting down.
delete mRawData;
return NS_OK;
}
sRilConsumers[mClientId]->SendSocketData(mRawData);
sRilConsumers[mClientId]->Send(mRawData);
return NS_OK;
}
@ -203,7 +201,6 @@ namespace ipc {
RilConsumer::RilConsumer(unsigned long aClientId,
WorkerCrossThreadDispatcher* aDispatcher)
: mDispatcher(aDispatcher)
, mClientId(aClientId)
, mShutdown(false)
{
// Only append client id after RIL_SOCKET_NAME when it's not connected to
@ -217,7 +214,8 @@ RilConsumer::RilConsumer(unsigned long aClientId,
mAddress = addr_un.sun_path;
}
Connect(new RilConnector(mAddress, mClientId));
mSocket = new StreamSocket(this, aClientId);
mSocket->Connect(new RilConnector(mAddress, aClientId));
}
nsresult
@ -250,7 +248,7 @@ RilConsumer::Shutdown()
MOZ_ASSERT(NS_IsMainThread());
for (unsigned long i = 0; i < sRilConsumers.Length(); i++) {
nsRefPtr<RilConsumer>& instance = sRilConsumers[i];
nsAutoPtr<RilConsumer> instance(sRilConsumers[i]);
if (!instance) {
continue;
}
@ -262,36 +260,58 @@ RilConsumer::Shutdown()
}
void
RilConsumer::ReceiveSocketData(nsAutoPtr<UnixSocketBuffer>& aBuffer)
RilConsumer::Send(UnixSocketRawData* aRawData)
{
if (!mSocket || mSocket->GetConnectionStatus() != SOCKET_CONNECTED) {
// Probably shutting down.
delete aRawData;
return;
}
mSocket->SendSocketData(aRawData);
}
void
RilConsumer::Close()
{
mSocket->Close();
mSocket = nullptr;
}
// |StreamSocketConnector|
void
RilConsumer::ReceiveSocketData(int aIndex,
nsAutoPtr<UnixSocketBuffer>& aBuffer)
{
MOZ_ASSERT(NS_IsMainThread());
nsRefPtr<DispatchRILEvent> dre(new DispatchRILEvent(mClientId, aBuffer.forget()));
nsRefPtr<DispatchRILEvent> dre(new DispatchRILEvent(aIndex, aBuffer.forget()));
mDispatcher->PostTask(dre);
}
void
RilConsumer::OnConnectSuccess()
RilConsumer::OnConnectSuccess(int aIndex)
{
// Nothing to do here.
CHROMIUM_LOG("RIL[%lu]: %s\n", mClientId, __FUNCTION__);
CHROMIUM_LOG("RIL[%d]: %s\n", aIndex, __FUNCTION__);
}
void
RilConsumer::OnConnectError()
RilConsumer::OnConnectError(int aIndex)
{
CHROMIUM_LOG("RIL[%lu]: %s\n", mClientId, __FUNCTION__);
CHROMIUM_LOG("RIL[%d]: %s\n", aIndex, __FUNCTION__);
Close();
}
void
RilConsumer::OnDisconnect()
RilConsumer::OnDisconnect(int aIndex)
{
CHROMIUM_LOG("RIL[%lu]: %s\n", mClientId, __FUNCTION__);
CHROMIUM_LOG("RIL[%d]: %s\n", aIndex, __FUNCTION__);
if (mShutdown) {
return;
}
Connect(new RilConnector(mAddress, mClientId), GetSuggestedConnectDelayMs());
mSocket->Connect(new RilConnector(mAddress, aIndex),
mSocket->GetSuggestedConnectDelayMs());
}
} // namespace ipc

View File

@ -9,11 +9,12 @@
#include <mozilla/dom/workers/Workers.h>
#include <mozilla/ipc/StreamSocket.h>
#include <mozilla/ipc/StreamSocketConsumer.h>
namespace mozilla {
namespace ipc {
class RilConsumer final : public mozilla::ipc::StreamSocket
class RilConsumer final : public StreamSocketConsumer
{
public:
static nsresult Register(
@ -21,18 +22,25 @@ public:
mozilla::dom::workers::WorkerCrossThreadDispatcher* aDispatcher);
static void Shutdown();
void Send(UnixSocketRawData* aRawData);
private:
RilConsumer(unsigned long aClientId,
mozilla::dom::workers::WorkerCrossThreadDispatcher* aDispatcher);
void ReceiveSocketData(nsAutoPtr<UnixSocketBuffer>& aBuffer) override;
void Close();
void OnConnectSuccess() override;
void OnConnectError() override;
void OnDisconnect() override;
// Methods for |StreamSocketConsumer|
//
void ReceiveSocketData(int aIndex,
nsAutoPtr<UnixSocketBuffer>& aBuffer) override;
void OnConnectSuccess(int aIndex) override;
void OnConnectError(int aIndex) override;
void OnDisconnect(int aIndex) override;
nsRefPtr<StreamSocket> mSocket;
nsRefPtr<mozilla::dom::workers::WorkerCrossThreadDispatcher> mDispatcher;
unsigned long mClientId;
nsCString mAddress;
bool mShutdown;
};

View File

@ -8,6 +8,7 @@
#include <fcntl.h>
#include "mozilla/RefPtr.h"
#include "nsXULAppAPI.h"
#include "StreamSocketConsumer.h"
#include "UnixSocketConnector.h"
static const size_t MAX_READ_SIZE = 1 << 16;
@ -499,15 +500,27 @@ public:
// StreamSocket
//
StreamSocket::StreamSocket()
: mIO(nullptr)
{ }
StreamSocket::StreamSocket(StreamSocketConsumer* aConsumer, int aIndex)
: mConsumer(aConsumer)
, mIndex(aIndex)
, mIO(nullptr)
{
MOZ_ASSERT(mConsumer);
}
StreamSocket::~StreamSocket()
{
MOZ_ASSERT(!mIO);
}
void
StreamSocket::ReceiveSocketData(nsAutoPtr<UnixSocketBuffer>& aBuffer)
{
MOZ_ASSERT(NS_IsMainThread());
mConsumer->ReceiveSocketData(mIndex, aBuffer);
}
nsresult
StreamSocket::Connect(UnixSocketConnector* aConnector,
int aDelayMs)
@ -586,5 +599,29 @@ StreamSocket::Close()
NotifyDisconnect();
}
void
StreamSocket::OnConnectSuccess()
{
MOZ_ASSERT(NS_IsMainThread());
mConsumer->OnConnectSuccess(mIndex);
}
void
StreamSocket::OnConnectError()
{
MOZ_ASSERT(NS_IsMainThread());
mConsumer->OnConnectError(mIndex);
}
void
StreamSocket::OnDisconnect()
{
MOZ_ASSERT(NS_IsMainThread());
mConsumer->OnDisconnect(mIndex);
}
} // namespace ipc
} // namespace mozilla

View File

@ -12,21 +12,21 @@
namespace mozilla {
namespace ipc {
class StreamSocketConsumer;
class StreamSocketIO;
class UnixSocketConnector;
class StreamSocket : public ConnectionOrientedSocket
class StreamSocket final : public ConnectionOrientedSocket
{
public:
StreamSocket();
StreamSocket(StreamSocketConsumer* aConsumer, int aIndex);
/**
* Method to be called whenever data is received. This is only called on the
* main thread.
* Method to be called whenever data is received. Main-thread only.
*
* @param aBuffer Data received from the socket.
*/
virtual void ReceiveSocketData(nsAutoPtr<UnixSocketBuffer>& aBuffer) = 0;
void ReceiveSocketData(nsAutoPtr<UnixSocketBuffer>& aBuffer);
/**
* Starts a task on the socket that will try to connect to a socket in a
@ -53,11 +53,16 @@ public:
//
void Close() override;
void OnConnectSuccess() override;
void OnConnectError() override;
void OnDisconnect() override;
protected:
virtual ~StreamSocket();
private:
StreamSocketConsumer* mConsumer;
int mIndex;
StreamSocketIO* mIO;
};