make RPC unblock for async and sync, as it supposed to have

This commit is contained in:
Chris Jones 2009-09-10 18:54:37 -05:00
parent c6ad60ed4d
commit 88d2f89607
3 changed files with 91 additions and 23 deletions

View File

@ -43,6 +43,7 @@
#include "nsDebug.h"
using mozilla::MutexAutoLock;
using mozilla::MutexAutoUnlock;
template<>
struct RunnableMethodTraits<mozilla::ipc::RPCChannel>
@ -74,28 +75,46 @@ RPCChannel::Call(Message* msg, Message* reply)
AsyncChannel::Send(msg);
while (1) {
// here we're waiting for something to happen. it may either:
// (1) a reply to an outstanding message
// (2) a recursive call from the other side
// or
// (3) any other message
// here we're waiting for something to happen. it may legally
// be either:
// (1) async msg
// (2) reply to an outstanding message (sync or rpc)
// (3) recursive call from the other side
mCvar.Wait();
Message recvd = mPending.top();
mPending.pop();
NS_ABORT_IF_FALSE(recvd.is_rpc(),
"should have been delegated to SyncChannel");
// async, take care of it and move on
if (!recvd.is_sync() && !recvd.is_rpc()) {
MutexAutoUnlock unlock(mMutex);
// RPC reply message
AsyncChannel::OnDispatchMessage(recvd);
continue;
}
// something sync. Let the sync dispatcher take care of it
// (it may be an invalid message, but the sync handler will
// check that).
if (recvd.is_sync()) {
MutexAutoUnlock unlock(mMutex);
SyncChannel::OnDispatchMessage(recvd);
continue;
}
// from here on, we know that recvd.is_rpc()
NS_ABORT_IF_FALSE(recvd.is_rpc(), "wtf???");
// reply message
if (recvd.is_reply()) {
NS_ASSERTION(0 < mPending.size(), "invalid RPC stack");
NS_ABORT_IF_FALSE(0 < mPending.size(), "invalid RPC stack");
const Message& pending = mPending.top();
if (recvd.type() != (pending.type()+1) && !recvd.is_reply_error()) {
// FIXME/cjones: handle error
NS_ASSERTION(0, "somebody's misbehavin'");
NS_ABORT_IF_FALSE(0, "somebody's misbehavin'");
}
// we received a reply to our most recent outstanding
@ -110,17 +129,15 @@ RPCChannel::Call(Message* msg, Message* reply)
mMutex.Unlock();
return !isError;
}
// RPC in-call
// in-call
else {
// "snapshot" the current stack depth while we own the Mutex
size_t stackDepth = StackDepth();
mMutex.Unlock();
MutexAutoUnlock unlock(mMutex);
// someone called in to us from the other side. handle the call
ProcessIncall(recvd, stackDepth);
// FIXME/cjones: error handling
mMutex.Lock();
}
}
@ -194,13 +211,21 @@ RPCChannel::ProcessIncall(const Message& call, size_t stackDepth)
void
RPCChannel::OnMessageReceived(const Message& msg)
{
if (!msg.is_rpc()) {
return SyncChannel::OnMessageReceived(msg);
}
MutexAutoLock lock(mMutex);
if (0 == StackDepth()) {
// we're idle wrt to the RPC layer, and this message could be
// async, sync, or rpc.
//
// if it's *not* an RPC message, we delegate processing to the
// SyncChannel. it knows how to properly dispatch sync and
// async messages, and the sync channel also will do error
// checking wrt to its invariants
if (!msg.is_rpc()) {
// unlocks mMutex
return SyncChannel::OnMessageReceived(msg);
}
// wake up the worker, there's a new in-call to process
// NB: the interaction between this and SyncChannel is rather
@ -216,13 +241,52 @@ RPCChannel::OnMessageReceived(const Message& msg)
// It's not possible for us to otherwise receive an RPC
// in-call while awaiting a sync response in any case where
// both us and the other side are behaving legally. Is it
// worth trying to detect this oddball case?
// worth trying to detect this case? (It's kinda hard.)
mWorkerLoop->PostTask(FROM_HERE,
NewRunnableMethod(this,
&RPCChannel::OnIncall, msg));
}
else {
// let the worker know something new has happened
// we're waiting on an RPC reply
// NB some logic here is duplicated with SyncChannel. this is
// to allow more local reasoning
// if we're waiting on a sync reply, and this message is sync,
// dispatch it to the sync message handler. It will check that
// it's a reply, and the right kind of reply, then do its
// thing.
if (AwaitingSyncReply()
&& msg.is_sync()) {
// wake up worker thread (at SyncChannel::Send) awaiting
// this reply
mRecvd = msg;
mCvar.Notify();
return;
}
// got an async message while waiting on a sync reply. allowed,
// but we defer processing until the sync reply comes back.
if (AwaitingSyncReply()
&& !msg.is_sync() && !msg.is_rpc()) {
// releases mMutex
return AsyncChannel::OnMessageReceived(msg);
}
// if this side and the other were functioning correctly, we'd
// never reach this case. RPCChannel::Call explicitly checks
// for and disallows this case. so if we reach here, the other
// side is malfunctioning (compromised?).
if (AwaitingSyncReply() /* msg.is_rpc() */) {
// FIXME other error handling?
NS_RUNTIMEABORT("the other side is malfunctioning");
return; // not reached
}
// otherwise, we (legally) either got (i) async msg; (ii) sync
// in-msg; (iii) re-entrant rpc in-call; (iv) rpc reply we
// were awaiting. Dispatch to the worker, where invariants
// are checked and the message processed.
mPending.push(msg);
mCvar.Notify();
}

View File

@ -102,12 +102,14 @@ private:
// The stack is also used by the IO thread to transfer received
// messages to the worker thread, only when the worker thread is
// awaiting an RPC response. Until the worker pops the top of the
// stack, it may (legally) contain either of
// stack, it may (legally) contain one of
//
// - async msg
// - sync in-msg (msg.is_sync() && !msg.is_reply())
// - RPC in-call (msg.is_rpc() && !msg.is_reply())
// - RPC reply (msg.is_rpc() && msg.is_reply())
//
// In both cases, the worker will pop the message off the stack
// In any cases, the worker will pop the message off the stack
// and process it ASAP, returning |mPending| to a quiescent state.
//
std::stack<Message> mPending;

View File

@ -78,7 +78,8 @@ SyncChannel::Send(Message* msg, Message* reply)
// (NB: IPDL prevents the latter from occuring in actor code)
// FIXME/cjones: real error handling
NS_ABORT_IF_FALSE(mRecvd.is_reply() && mPendingReply == mRecvd.type(),
NS_ABORT_IF_FALSE(mRecvd.is_sync()
&& mRecvd.is_reply() && mPendingReply == mRecvd.type(),
"unexpected sync message");
mPendingReply = 0;
@ -91,6 +92,7 @@ void
SyncChannel::OnDispatchMessage(const Message& msg)
{
NS_ABORT_IF_FALSE(msg.is_sync(), "only sync messages here");
NS_ABORT_IF_FALSE(!msg.is_reply(), "wasn't awaiting reply");
Message* reply;