add racy RPC resolution. also add better debugging info and fix two shared-memory-race bugs.

This commit is contained in:
Chris Jones 2009-10-08 16:44:43 -05:00
parent 970c751691
commit 6c79e4bbd2
6 changed files with 173 additions and 43 deletions

View File

@ -141,14 +141,23 @@ class Message : public Pickle {
}
#if defined(CHROMIUM_MOZILLA_BUILD)
size_t rpc_remote_stack_depth() const {
size_t rpc_remote_stack_depth_guess() const {
return header()->rpc_remote_stack_depth_guess;
}
void set_rpc_remote_stack_depth(size_t depth) {
void set_rpc_remote_stack_depth_guess(size_t depth) {
DCHECK(is_rpc());
header()->rpc_remote_stack_depth_guess = depth;
}
size_t rpc_local_stack_depth() const {
return header()->rpc_local_stack_depth;
}
void set_rpc_local_stack_depth(size_t depth) {
DCHECK(is_rpc());
header()->rpc_local_stack_depth = depth;
}
#endif
template<class T>
@ -262,9 +271,10 @@ class Message : public Pickle {
uint32 num_fds; // the number of descriptors included with this message
#endif
#if defined(CHROMIUM_MOZILLA_BUILD)
// For RPC messages, what *this* side of the channel thinks the
// *other* side's RPC stack depth is.
// For RPC messages, a guess at what the *other* side's stack depth is.
size_t rpc_remote_stack_depth_guess;
// The actual local stack depth.
size_t rpc_local_stack_depth;
#endif
};
#pragma pack(pop)

View File

@ -79,6 +79,8 @@ AsyncChannel::Open(Transport* aTransport, MessageLoop* aIOLoop)
mChannelState = ChannelConnected;
}
mChild = needOpen;
mIOLoop = aIOLoop;
mWorkerLoop = MessageLoop::current();

View File

@ -158,6 +158,7 @@ protected:
CondVar mCvar;
MessageLoop* mIOLoop; // thread where IO happens
MessageLoop* mWorkerLoop; // thread where work is done
bool mChild; // am I the child or parent?
};

View File

@ -64,14 +64,15 @@ RPCChannel::Call(Message* msg, Message* reply)
NS_ABORT_IF_FALSE(msg->is_rpc(),
"can only Call() RPC messages here");
MutexAutoLock lock(mMutex);
if (!Connected())
// trying to Send() to a closed or error'd channel
return false;
MutexAutoLock lock(mMutex);
msg->set_rpc_remote_stack_depth(mRemoteStackDepth);
mStack.push(*msg);
msg->set_rpc_remote_stack_depth_guess(mRemoteStackDepthGuess);
msg->set_rpc_local_stack_depth(StackDepth());
// bypass |SyncChannel::Send| b/c RPCChannel implements its own
// waiting semantics
@ -103,8 +104,8 @@ RPCChannel::Call(Message* msg, Message* reply)
// (it may be an invalid message, but the sync handler will
// check that).
if (recvd.is_sync()) {
NS_ABORT_IF_FALSE(mPending.empty(),
"other side is malfunctioning");
if (!mPending.empty())
RPC_DEBUGABORT("other side is malfunctioning");
MutexAutoUnlock unlock(mMutex);
SyncChannel::OnDispatchMessage(recvd);
@ -116,13 +117,14 @@ RPCChannel::Call(Message* msg, Message* reply)
// reply message
if (recvd.is_reply()) {
NS_ABORT_IF_FALSE(0 < mStack.size(), "invalid RPC stack");
if (0 == mStack.size())
RPC_DEBUGABORT("invalid RPC stack");
const Message& outcall = mStack.top();
if (recvd.type() != (outcall.type()+1) && !recvd.is_reply_error()) {
// FIXME/cjones: handle error
NS_ABORT_IF_FALSE(0, "somebody's misbehavin'");
RPC_DEBUGABORT("somebody's misbehavin'", "rpc", true);
}
// we received a reply to our most recent outstanding
@ -147,8 +149,9 @@ RPCChannel::Call(Message* msg, Message* reply)
mPending.pop();
if (m.is_sync()) {
NS_ABORT_IF_FALSE(!seenBlocker,
"other side is malfunctioning");
if (seenBlocker)
RPC_DEBUGABORT("other side is malfunctioning",
"sync", m.is_reply());
seenBlocker = true;
MessageLoop::current()->PostTask(
@ -157,8 +160,9 @@ RPCChannel::Call(Message* msg, Message* reply)
&RPCChannel::OnDelegate, m));
}
else if (m.is_rpc()) {
NS_ABORT_IF_FALSE(!seenBlocker,
"other side is malfunctioning");
if (seenBlocker)
RPC_DEBUGABORT("other side is malfunctioning",
"rpc", m.is_reply());
seenBlocker = true;
MessageLoop::current()->PostTask(
@ -179,18 +183,25 @@ RPCChannel::Call(Message* msg, Message* reply)
// shouldn't have queued any more messages, since
// the other side is now supposed to be blocked on a
// reply from us!
if (mPending.size() > 0) {
NS_RUNTIMEABORT("other side should have been blocked");
}
if (!mPending.empty())
RPC_DEBUGABORT("other side should have been blocked");
}
// unlocks mMutex
return !isError;
}
// in-call. process in a new stack frame
NS_ABORT_IF_FALSE(mPending.empty(),
"other side is malfunctioning");
// in-call. process in a new stack frame. the other side
// should be blocked on us, hence an empty queue, except for
// the case where this side "won" an RPC race and the other
// side already replied back
if (!(mPending.empty()
|| (1 == mPending.size()
&& mPending.front().is_rpc()
&& mPending.front().is_reply()
&& 1 == StackDepth())))
RPC_DEBUGABORT("other side is malfunctioning", "rpc");
// "snapshot" the current stack depth while we own the Mutex
size_t stackDepth = StackDepth();
@ -213,7 +224,7 @@ RPCChannel::OnDelegate(const Message& msg)
return SyncChannel::OnDispatchMessage(msg);
else if (!msg.is_rpc())
return AsyncChannel::OnDispatchMessage(msg);
NS_RUNTIMEABORT("fatal logic error");
RPC_DEBUGABORT("fatal logic error");
}
void
@ -227,13 +238,17 @@ RPCChannel::OnMaybeDequeueOne()
if (mPending.empty())
return;
NS_ABORT_IF_FALSE(mPending.size() == 1, "should only have one msg");
NS_ABORT_IF_FALSE(mPending.front().is_sync(), "msg should be sync");
if (mPending.size() != 1)
RPC_DEBUGABORT("should only have one msg");
if (!(mPending.front().is_rpc() || mPending.front().is_sync()))
RPC_DEBUGABORT("msg should be RPC or sync", "async");
recvd = mPending.front();
mPending.pop();
}
return SyncChannel::OnDispatchMessage(recvd);
return recvd.is_sync() ?
SyncChannel::OnDispatchMessage(recvd)
: RPCChannel::OnIncall(recvd);
}
void
@ -254,17 +269,65 @@ RPCChannel::ProcessIncall(const Message& call, size_t stackDepth)
NS_ABORT_IF_FALSE(call.is_rpc(),
"should have been handled by SyncChannel");
// Race detection: see the long comment near mRemoteStackDepth
// in RPCChannel.h
NS_ASSERTION(stackDepth == call.rpc_remote_stack_depth(),
"RPC in-calls have raced!");
// Race detection: see the long comment near
// mRemoteStackDepthGuess in RPCChannel.h. "Remote" stack depth
// means our side, and "local" means other side.
if (call.rpc_remote_stack_depth_guess() != stackDepth) {
NS_WARNING("RPC in-calls have raced!");
// assumption: at this point, we have one call on our stack,
// as does the other side. But both we and the other side
// think that the opposite side *doesn't* have any calls on
// its stack. We need to verify this because race resolution
// depends on this fact.
if (!((1 == stackDepth && 0 == mRemoteStackDepthGuess)
&& (1 == call.rpc_local_stack_depth()
&& 0 == call.rpc_remote_stack_depth_guess())))
// TODO this /could/ be construed as evidence of a
// misbehaving process, so should probably go through
// regular error-handling channels
RPC_DEBUGABORT("fatal logic error");
// the "winner", if there is one, gets to defer processing of
// the other side's in-call
bool defer;
const char* winner;
switch (mRacePolicy) {
case RRPChildWins:
winner = "child";
defer = mChild;
break;
case RRPParentWins:
winner = "parent";
defer = !mChild;
break;
case RRPError:
NS_RUNTIMEABORT("NYI: 'Error' RPC race policy");
return;
default:
NS_RUNTIMEABORT("not reached");
return;
}
printf(" (%s won, so we're%sdeferring)\n",
winner, defer ? " " : " not ");
if (defer) {
mWorkerLoop->PostTask(
FROM_HERE,
NewRunnableMethod(this, &RPCChannel::OnIncall, call));
return;
}
// we "lost" and need to process the other side's in-call
}
Message* reply = nsnull;
++mRemoteStackDepth;
++mRemoteStackDepthGuess;
Result rv =
static_cast<RPCListener*>(mListener)->OnCallReceived(call, reply);
--mRemoteStackDepth;
--mRemoteStackDepthGuess;
switch (rv) {
case MsgProcessed:
@ -358,19 +421,31 @@ RPCChannel::OnMessageReceived(const Message& msg)
mWorkerLoop->PostTask(
FROM_HERE,
NewRunnableMethod(this,
&RPCChannel::OnMaybeDequeueOne));
NewRunnableMethod(this, &RPCChannel::OnMaybeDequeueOne));
return;
}
// OK: the RPC channel is idle, and we received an in-call.
// wake up the worker thread
// wake up the worker thread.
//
// Because of RPC race resolution, there's another sublety
// here. We can't enqueue an OnInCall() event directly,
// because while this code is executing, the worker thread
// concurrently might be making (or preparing to make) an
// out-call. If so, and race resolution is ParentWins or
// ChildWins, and this side is the "losing" side, then this
// side needs to "unblock" and process the new in-call. If
// the in-call were to go into the main event queue, then it
// would be lost. So it needs to go into mPending queue along
// with a OnMaybeDequeueOne() event. The OnMaybeDequeueOne()
// event handles the non-racy case.
NS_ABORT_IF_FALSE(msg.is_rpc(), "should be RPC");
mWorkerLoop->PostTask(FROM_HERE,
NewRunnableMethod(this,
&RPCChannel::OnIncall, msg));
mPending.push(msg);
mWorkerLoop->PostTask(
FROM_HERE,
NewRunnableMethod(this, &RPCChannel::OnMaybeDequeueOne));
}
else {
// we're waiting on an RPC reply
@ -403,7 +478,8 @@ RPCChannel::OnMessageReceived(const Message& msg)
// side is malfunctioning (compromised?).
if (AwaitingSyncReply() /* msg.is_rpc() */) {
// FIXME other error handling?
NS_RUNTIMEABORT("the other side is malfunctioning");
RPC_DEBUGABORT("the other side is malfunctioning",
"rpc", msg.is_reply());
return; // not reached
}

View File

@ -64,10 +64,18 @@ public:
Message*& aReply) = 0;
};
RPCChannel(RPCListener* aListener) :
// What happens if RPC calls race?
enum RacyRPCPolicy {
RRPError,
RRPChildWins,
RRPParentWins
};
RPCChannel(RPCListener* aListener, RacyRPCPolicy aPolicy=RRPChildWins) :
SyncChannel(aListener),
mPending(),
mRemoteStackDepth(0)
mRemoteStackDepthGuess(0),
mRacePolicy(aPolicy)
{
}
@ -105,6 +113,37 @@ private:
return mStack.size();
}
#define RPC_DEBUGABORT(...) \
DebugAbort(__FILE__, __LINE__,## __VA_ARGS__)
void DebugAbort(const char* file, int line,
const char* why,
const char* type="rpc", bool reply=false)
{
fprintf(stderr,
"[RPCChannel][%s][%s:%d] Aborting: %s (triggered by %s%s)\n",
mChild ? "Child" : "Parent",
file, line,
why,
type, reply ? "reply" : "");
// technically we need the mutex for this, but we're dying anyway
fprintf(stderr, " local RPC stack size: %zu\n",
mStack.size());
fprintf(stderr, " remote RPC stack guess: %zd\n",
mRemoteStackDepthGuess);
fprintf(stderr, " Pending queue size: %zu, front to back:\n",
mPending.size());
while (!mPending.empty()) {
fprintf(stderr, " [ %s%s ]\n",
mPending.front().is_rpc() ? "rpc" :
(mPending.front().is_sync() ? "sync" : "async"),
mPending.front().is_reply() ? "reply" : "");
mPending.pop();
}
NS_RUNTIMEABORT(why);
}
//
// Stack of all the RPC out-calls on which this RPCChannel is
// awaiting a response.
@ -187,7 +226,9 @@ private:
//
// TODO: and when we detect a race, what should we actually *do* ... ?
//
size_t mRemoteStackDepth;
size_t mRemoteStackDepthGuess;
RacyRPCPolicy mRacePolicy;
};

View File

@ -62,12 +62,12 @@ SyncChannel::Send(Message* msg, Message* reply)
"violation of sync handler invariant");
NS_ABORT_IF_FALSE(msg->is_sync(), "can only Send() sync messages here");
MutexAutoLock lock(mMutex);
if (!Connected())
// trying to Send() to a closed or error'd channel
return false;
MutexAutoLock lock(mMutex);
mPendingReply = msg->type() + 1;
if (!AsyncChannel::Send(msg))
// FIXME more sophisticated error handling