Bug 1171994: Forward received RIL socket I/O via |WorkerCrossThreadDispatcher|, r=htsai

With this patch, |RilSocket| and it's helpers forward received data
via a WCTD. This will hand over the worker's JS context to the RIL
consumer.

In a later patch, the RIL consumer will be moved onto the RIL worker
thread and call the JS ril-worker code directly.
This commit is contained in:
Thomas Zimmermann 2015-07-14 16:57:00 +02:00
parent 9fd703e9c9
commit 5606499d67
3 changed files with 63 additions and 24 deletions

View File

@ -6,6 +6,7 @@
#include "RilSocket.h"
#include <fcntl.h>
#include "mozilla/dom/workers/Workers.h"
#include "mozilla/ipc/UnixSocketConnector.h"
#include "mozilla/RefPtr.h"
#include "nsXULAppAPI.h"
@ -16,6 +17,8 @@ static const size_t MAX_READ_SIZE = 1 << 16;
namespace mozilla {
namespace ipc {
USING_WORKERS_NAMESPACE
//
// RilSocketIO
//
@ -27,7 +30,8 @@ public:
class DelayedConnectTask;
class ReceiveTask;
RilSocketIO(MessageLoop* aConsumerLoop,
RilSocketIO(WorkerCrossThreadDispatcher* aDispatcher,
MessageLoop* aConsumerLoop,
MessageLoop* aIOLoop,
RilSocket* aRilSocket,
UnixSocketConnector* aConnector);
@ -62,6 +66,11 @@ public:
void ShutdownOnIOThread() override;
private:
/**
* Cross-thread dispatcher for the RIL worker
*/
nsRefPtr<WorkerCrossThreadDispatcher> mDispatcher;
/**
* Consumer pointer. Non-thread safe RefPtr, so should only be manipulated
* directly from consumer thread. All non-consumer-thread accesses should
@ -86,15 +95,18 @@ private:
nsAutoPtr<UnixSocketRawData> mBuffer;
};
RilSocketIO::RilSocketIO(MessageLoop* aConsumerLoop,
RilSocketIO::RilSocketIO(WorkerCrossThreadDispatcher* aDispatcher,
MessageLoop* aConsumerLoop,
MessageLoop* aIOLoop,
RilSocket* aRilSocket,
UnixSocketConnector* aConnector)
: ConnectionOrientedSocketIO(aConsumerLoop, aIOLoop, aConnector)
, mDispatcher(aDispatcher)
, mRilSocket(aRilSocket)
, mShuttingDownOnIOThread(false)
, mDelayedConnectTask(nullptr)
{
MOZ_ASSERT(mDispatcher);
MOZ_ASSERT(mRilSocket);
}
@ -164,41 +176,45 @@ RilSocketIO::QueryReceiveBuffer(UnixSocketIOBuffer** aBuffer)
* |ReceiveTask| transfers data received on the I/O thread
* to an instance of |RilSocket| on the consumer thread.
*/
class RilSocketIO::ReceiveTask final : public SocketTask<RilSocketIO>
class RilSocketIO::ReceiveTask final : public WorkerTask
{
public:
ReceiveTask(RilSocketIO* aIO, UnixSocketBuffer* aBuffer)
: SocketTask<RilSocketIO>(aIO)
: mIO(aIO)
, mBuffer(aBuffer)
{ }
void Run() override
{
RilSocketIO* io = SocketTask<RilSocketIO>::GetIO();
MOZ_ASSERT(mIO);
}
MOZ_ASSERT(io->IsConsumerThread());
bool RunTask(JSContext* aCx) override
{
// Dispatched via WCTD, but still needs to run on the consumer thread
MOZ_ASSERT(mIO->IsConsumerThread());
if (NS_WARN_IF(io->IsShutdownOnConsumerThread())) {
if (NS_WARN_IF(mIO->IsShutdownOnConsumerThread())) {
// Since we've already explicitly closed and the close
// happened before this, this isn't really an error.
return;
return true;
}
RilSocket* rilSocket = io->GetRilSocket();
RilSocket* rilSocket = mIO->GetRilSocket();
MOZ_ASSERT(rilSocket);
rilSocket->ReceiveSocketData(mBuffer);
rilSocket->ReceiveSocketData(aCx, mBuffer);
return true;
}
private:
RilSocketIO* mIO;
nsAutoPtr<UnixSocketBuffer> mBuffer;
};
void
RilSocketIO::ConsumeBuffer()
{
GetConsumerThread()->PostTask(FROM_HERE,
new ReceiveTask(this, mBuffer.forget()));
nsRefPtr<ReceiveTask> task = new ReceiveTask(this, mBuffer.forget());
NS_WARN_IF(!mDispatcher->PostTask(task));
}
void
@ -299,11 +315,14 @@ public:
// RilSocket
//
RilSocket::RilSocket(RilSocketConsumer* aConsumer, int aIndex)
RilSocket::RilSocket(WorkerCrossThreadDispatcher* aDispatcher,
RilSocketConsumer* aConsumer, int aIndex)
: mIO(nullptr)
, mDispatcher(aDispatcher)
, mConsumer(aConsumer)
, mIndex(aIndex)
{
MOZ_ASSERT(mDispatcher);
MOZ_ASSERT(mConsumer);
}
@ -313,9 +332,10 @@ RilSocket::~RilSocket()
}
void
RilSocket::ReceiveSocketData(nsAutoPtr<UnixSocketBuffer>& aBuffer)
RilSocket::ReceiveSocketData(JSContext* aCx,
nsAutoPtr<UnixSocketBuffer>& aBuffer)
{
mConsumer->ReceiveSocketData(mIndex, aBuffer);
mConsumer->ReceiveSocketData(aCx, mIndex, aBuffer);
}
nsresult
@ -324,7 +344,7 @@ RilSocket::Connect(UnixSocketConnector* aConnector, int aDelayMs,
{
MOZ_ASSERT(!mIO);
mIO = new RilSocketIO(aConsumerLoop, aIOLoop, this, aConnector);
mIO = new RilSocketIO(mDispatcher, aConsumerLoop, aIOLoop, this, aConnector);
SetConnectionStatus(SOCKET_CONNECTING);
if (aDelayMs > 0) {

View File

@ -9,8 +9,19 @@
#include "mozilla/ipc/ConnectionOrientedSocket.h"
class JSContext;
class MessageLoop;
namespace mozilla {
namespace dom {
namespace workers {
class WorkerCrossThreadDispatcher;
} // namespace workers
} // namespace dom
} // namespace mozilla
namespace mozilla {
namespace ipc {
@ -24,17 +35,20 @@ public:
/**
* Constructs an instance of |RilSocket|.
*
* @param aDispatcher The dispatcher class for the received messages.
* @param aConsumer The consumer for the socket.
* @param aIndex An arbitrary index.
*/
RilSocket(RilSocketConsumer* aConsumer, int aIndex);
RilSocket(mozilla::dom::workers::WorkerCrossThreadDispatcher* aDispatcher,
RilSocketConsumer* aConsumer, int aIndex);
/**
* Method to be called whenever data is received. Consumer-thread only.
* Method to be called whenever data is received. RIL-worker only.
*
* @param aCx The RIL worker's JS context.
* @param aBuffer Data received from the socket.
*/
void ReceiveSocketData(nsAutoPtr<UnixSocketBuffer>& aBuffer);
void ReceiveSocketData(JSContext* aCx, nsAutoPtr<UnixSocketBuffer>& aBuffer);
/**
* Starts a task on the socket that will try to connect to a socket in a
@ -85,6 +99,7 @@ protected:
private:
RilSocketIO* mIO;
nsRefPtr<mozilla::dom::workers::WorkerCrossThreadDispatcher> mDispatcher;
RilSocketConsumer* mConsumer;
int mIndex;
};

View File

@ -9,6 +9,8 @@
#include "nsAutoPtr.h"
class JSContext;
namespace mozilla {
namespace ipc {
@ -21,12 +23,14 @@ class RilSocketConsumer
{
public:
/**
* Method to be called whenever data is received. Consumer-thread only.
* Method to be called whenever data is received. RIL-worker only.
*
* @param aCx The RIL worker's JS context.
* @param aIndex The index that has been given to the stream socket.
* @param aBuffer Data received from the socket.
*/
virtual void ReceiveSocketData(int aIndex,
virtual void ReceiveSocketData(JSContext* aCx,
int aIndex,
nsAutoPtr<UnixSocketBuffer>& aBuffer) = 0;
/**