Bug 540886, part 2: Offer a BlockChild() interface to RPC protocols that allows parents to prevent children from sending messages back of their own volition until the parent calls UnblockChild(). r=bent

--HG--
extra : transplant_source : %2A%A7%D7%2C%7B%90%1B%9BZS%E5%3E%E1%A1M%F2%A7P%99E
This commit is contained in:
Chris Jones 2010-01-27 00:41:32 -06:00
parent 5b711f24bf
commit eebed3acd2
5 changed files with 201 additions and 8 deletions

View File

@ -52,6 +52,8 @@
// enum in ipc_channel.h. They need to be kept in sync.
namespace {
enum {
UNBLOCK_CHILD_MESSAGE_TYPE = kuint16max - 4,
BLOCK_CHILD_MESSAGE_TYPE = kuint16max - 3,
SHMEM_CREATED_MESSAGE_TYPE = kuint16max - 2,
GOODBYE_MESSAGE_TYPE = kuint16max - 1,
};

View File

@ -38,6 +38,7 @@
* ***** END LICENSE BLOCK ***** */
#include "mozilla/ipc/RPCChannel.h"
#include "mozilla/ipc/ProtocolUtils.h"
#include "nsDebug.h"
#include "nsTraceRefcnt.h"
@ -58,6 +59,33 @@ struct RunnableMethodTraits<mozilla::ipc::RPCChannel>
static void ReleaseCallee(mozilla::ipc::RPCChannel* obj) { }
};
namespace
{
// Async (from the sending side's perspective)
class BlockChildMessage : public IPC::Message
{
public:
enum { ID = BLOCK_CHILD_MESSAGE_TYPE };
BlockChildMessage() :
Message(MSG_ROUTING_NONE, ID, IPC::Message::PRIORITY_NORMAL)
{ }
};
// Async
class UnblockChildMessage : public IPC::Message
{
public:
enum { ID = UNBLOCK_CHILD_MESSAGE_TYPE };
UnblockChildMessage() :
Message(MSG_ROUTING_NONE, ID, IPC::Message::PRIORITY_NORMAL)
{ }
};
} // namespace <anon>
namespace mozilla {
namespace ipc {
@ -69,7 +97,8 @@ RPCChannel::RPCChannel(RPCListener* aListener,
mOutOfTurnReplies(),
mDeferred(),
mRemoteStackDepthGuess(0),
mRacePolicy(aPolicy)
mRacePolicy(aPolicy),
mBlockedOnParent(false)
{
MOZ_COUNT_CTOR(RPCChannel);
}
@ -387,6 +416,111 @@ RPCChannel::DispatchIncall(const Message& call)
NewRunnableMethod(this, &RPCChannel::OnSend, reply));
}
bool
RPCChannel::BlockChild()
{
AssertWorkerThread();
if (mChild)
NS_RUNTIMEABORT("child tried to block parent");
SendSpecialMessage(new BlockChildMessage());
return true;
}
bool
RPCChannel::UnblockChild()
{
AssertWorkerThread();
if (mChild)
NS_RUNTIMEABORT("child tried to unblock parent");
SendSpecialMessage(new UnblockChildMessage());
return true;
}
bool
RPCChannel::OnSpecialMessage(uint16 id, const Message& msg)
{
AssertWorkerThread();
switch (id) {
case BLOCK_CHILD_MESSAGE_TYPE:
BlockOnParent();
return true;
case UNBLOCK_CHILD_MESSAGE_TYPE:
UnblockFromParent();
return true;
default:
return SyncChannel::OnSpecialMessage(id, msg);
}
}
void
RPCChannel::BlockOnParent()
{
AssertWorkerThread();
if (!mChild)
NS_RUNTIMEABORT("child tried to block parent");
MutexAutoLock lock(mMutex);
if (mBlockedOnParent || AwaitingSyncReply() || 0 < StackDepth())
NS_RUNTIMEABORT("attempt to block child when it's already blocked");
mBlockedOnParent = true;
while (1) {
// XXX this dispatch loop shares some similarities with the
// one in Call(), but the logic is simpler and different
// enough IMHO to warrant its own dispatch loop
while (Connected() && mPending.empty() && mBlockedOnParent) {
WaitForNotify();
}
if (!Connected()) {
mBlockedOnParent = false;
ReportConnectionError("RPCChannel");
break;
}
if (!mPending.empty()) {
Message recvd = mPending.front();
mPending.pop();
MutexAutoUnlock unlock(mMutex);
if (recvd.is_rpc()) {
// stack depth must be 0 here
Incall(recvd, 0);
}
else if (recvd.is_sync()) {
SyncChannel::OnDispatchMessage(recvd);
}
else {
AsyncChannel::OnDispatchMessage(recvd);
}
}
// the last message, if async, may have been the one that
// unblocks us
if (!mBlockedOnParent)
break;
}
EnqueuePendingMessages();
}
void
RPCChannel::UnblockFromParent()
{
AssertWorkerThread();
if (!mChild)
NS_RUNTIMEABORT("child tried to block parent");
MutexAutoLock lock(mMutex);
mBlockedOnParent = false;
}
void
RPCChannel::DebugAbort(const char* file, int line, const char* cond,
@ -444,7 +578,7 @@ RPCChannel::OnMessageReceived(const Message& msg)
mPending.push(msg);
if (0 == StackDepth())
if (0 == StackDepth() && !mBlockedOnParent)
// the worker thread might be idle, make sure it wakes up
mWorkerLoop->PostTask(
FROM_HERE,

View File

@ -81,10 +81,38 @@ public:
// Make an RPC to the other side of the channel
bool Call(Message* msg, Message* reply);
// Asynchronously, send the child a message that puts it in such a
// state that it can't send messages to the parent unless the
// parent sends a message to it first. The child stays in this
// state until the parent calls |UnblockChild()|.
//
// It is an error to
// - call this on the child side of the channel.
// - nest |BlockChild()| calls
// - call this when the child is already blocked on a sync or RPC
// in-/out- message/call
//
// Return true iff successful.
bool BlockChild();
// Asynchronously undo |BlockChild()|.
//
// It is an error to
// - call this on the child side of the channel
// - call this without a matching |BlockChild()|
//
// Return true iff successful.
bool UnblockChild();
NS_OVERRIDE
virtual bool OnSpecialMessage(uint16 id, const Message& msg);
// Override the SyncChannel handler so we can dispatch RPC
// messages. Called on the IO thread only.
NS_OVERRIDE virtual void OnMessageReceived(const Message& msg);
NS_OVERRIDE virtual void OnChannelError();
NS_OVERRIDE
virtual void OnMessageReceived(const Message& msg);
NS_OVERRIDE
virtual void OnChannelError();
private:
// Called on worker thread only
@ -96,6 +124,9 @@ private:
void Incall(const Message& call, size_t stackDepth);
void DispatchIncall(const Message& call);
void BlockOnParent();
void UnblockFromParent();
// Called from both threads
size_t StackDepth() {
mMutex.AssertCurrentThreadOwns();
@ -197,6 +228,9 @@ private:
//
size_t mRemoteStackDepthGuess;
RacyRPCPolicy mRacePolicy;
// True iff the parent has put us in a |BlockChild()| state.
bool mBlockedOnParent;
};

View File

@ -93,6 +93,14 @@ protected:
}
void OnDispatchMessage(const Message& aMsg);
NS_OVERRIDE
bool OnSpecialMessage(uint16 id, const Message& msg)
{
// SyncChannel doesn't care about any special messages yet
return AsyncChannel::OnSpecialMessage(id, msg);
}
void WaitForNotify();
// Executed on the IO thread.

View File

@ -2617,17 +2617,16 @@ class _GenerateProtocolActorCode(ipdl.ast.Visitor):
ret=Type.BOOL,
virtual=1, pure=1)))
# optional Shutdown() method; default is no-op
# optional ActorDestroy() method; default is no-op
self.cls.addstmts([
Whitespace.NL,
MethodDefn(MethodDecl(
_destroyMethod().name,
params=[ Decl(_DestroyReason.Type(), 'why') ],
virtual=1))
virtual=1)),
Whitespace.NL
])
self.cls.addstmt(Whitespace.NL)
self.cls.addstmts((
[ Label.PRIVATE ]
+ self.standardTypedefs()
@ -2879,6 +2878,22 @@ class _GenerateProtocolActorCode(ipdl.ast.Visitor):
self.cls.addstmts([ otherpid, Whitespace.NL,
getdump, Whitespace.NL ])
if (p.decl.type.isToplevel() and self.side is 'parent'
and p.decl.type.talksRpc()):
# offer BlockChild() and UnblockChild().
# See ipc/glue/RPCChannel.h
blockchild = MethodDefn(MethodDecl(
'BlockChild', ret=Type.BOOL))
blockchild.addstmt(StmtReturn(ExprCall(
ExprSelect(p.channelVar(), '.', 'BlockChild'))))
unblockchild = MethodDefn(MethodDecl(
'UnblockChild', ret=Type.BOOL))
unblockchild.addstmt(StmtReturn(ExprCall(
ExprSelect(p.channelVar(), '.', 'UnblockChild'))))
self.cls.addstmts([ blockchild, unblockchild, Whitespace.NL ])
## private methods
self.cls.addstmt(Label.PRIVATE)