Bug 521929, part 1: Add a "seqno" field to synchronous messages. r=bent

This commit is contained in:
Chris Jones 2010-01-21 20:04:09 -06:00
parent c68bd0290e
commit 5f451f4f10
5 changed files with 40 additions and 9 deletions

View File

@ -38,6 +38,7 @@ Message::Message(int32 routing_id, uint16 type, PriorityValue priority)
#if defined(CHROMIUM_MOZILLA_BUILD)
header()->rpc_remote_stack_depth_guess = static_cast<size_t>(-1);
header()->rpc_local_stack_depth = static_cast<size_t>(-1);
header()->seqno = 0;
#endif
InitLoggingVariables();
}

View File

@ -142,7 +142,7 @@ class Message : public Pickle {
#if defined(CHROMIUM_MOZILLA_BUILD)
size_t rpc_remote_stack_depth_guess() const {
return header()->rpc_remote_stack_depth_guess;
return header()->rpc_remote_stack_depth_guess;
}
void set_rpc_remote_stack_depth_guess(size_t depth) {
@ -151,13 +151,21 @@ class Message : public Pickle {
}
size_t rpc_local_stack_depth() const {
return header()->rpc_local_stack_depth;
return header()->rpc_local_stack_depth;
}
void set_rpc_local_stack_depth(size_t depth) {
DCHECK(is_rpc());
header()->rpc_local_stack_depth = depth;
}
int32 seqno() const {
return header()->seqno;
}
void set_seqno(int32 seqno) {
header()->seqno = seqno;
}
#endif
template<class T>
@ -275,6 +283,8 @@ class Message : public Pickle {
size_t rpc_remote_stack_depth_guess;
// The actual local stack depth.
size_t rpc_local_stack_depth;
// Sequence number
int32 seqno;
#endif
};
#pragma pack(pop)

View File

@ -96,9 +96,10 @@ RPCChannel::Call(Message* msg, Message* reply)
return false;
}
mStack.push(*msg);
msg->set_seqno(NextSeqno());
msg->set_rpc_remote_stack_depth_guess(mRemoteStackDepthGuess);
msg->set_rpc_local_stack_depth(StackDepth());
msg->set_rpc_local_stack_depth(1 + StackDepth());
mStack.push(*msg);
mIOLoop->PostTask(
FROM_HERE,
@ -137,7 +138,7 @@ RPCChannel::Call(Message* msg, Message* reply)
continue;
}
NS_ABORT_IF_FALSE(recvd.is_rpc(), "wtf???");
RPC_ASSERT(recvd.is_rpc(), "wtf???");
if (recvd.is_reply()) {
RPC_ASSERT(0 < mStack.size(), "invalid RPC stack");
@ -146,7 +147,9 @@ RPCChannel::Call(Message* msg, Message* reply)
// FIXME/cjones: handle error
RPC_ASSERT(
recvd.type() == (outcall.type()+1) || recvd.is_reply_error(),
recvd.is_reply_error() ||
(recvd.type() == (outcall.type()+1) &&
recvd.seqno() == outcall.seqno()),
"somebody's misbehavin'", "rpc", true);
// we received a reply to our most recent outstanding
@ -355,6 +358,8 @@ RPCChannel::DispatchIncall(const Message& call)
reply->set_reply_error();
}
reply->set_seqno(call.seqno());
mIOLoop->PostTask(
FROM_HERE,
NewRunnableMethod(this, &RPCChannel::OnSend, reply));

View File

@ -58,7 +58,8 @@ namespace ipc {
SyncChannel::SyncChannel(SyncListener* aListener)
: AsyncChannel(aListener),
mPendingReply(0),
mProcessingSyncMessage(false)
mProcessingSyncMessage(false),
mNextSeqno(0)
{
MOZ_COUNT_CTOR(SyncChannel);
}
@ -81,6 +82,8 @@ SyncChannel::Send(Message* msg, Message* reply)
"violation of sync handler invariant");
NS_ABORT_IF_FALSE(msg->is_sync(), "can only Send() sync messages here");
msg->set_seqno(NextSeqno());
MutexAutoLock lock(mMutex);
if (!Connected()) {
@ -89,6 +92,7 @@ SyncChannel::Send(Message* msg, Message* reply)
}
mPendingReply = msg->type() + 1;
int32 msgSeqno = msg->seqno();
mIOLoop->PostTask(
FROM_HERE,
NewRunnableMethod(this, &SyncChannel::OnSend, msg));
@ -117,8 +121,9 @@ SyncChannel::Send(Message* msg, Message* reply)
// FIXME/cjones: real error handling
NS_ABORT_IF_FALSE(mRecvd.is_sync() && mRecvd.is_reply() &&
(mPendingReply == mRecvd.type() ||
mRecvd.is_reply_error()),
(mRecvd.is_reply_error() ||
(mPendingReply == mRecvd.type() &&
msgSeqno == mRecvd.seqno())),
"unexpected sync message");
mPendingReply = 0;
@ -151,6 +156,8 @@ SyncChannel::OnDispatchMessage(const Message& msg)
reply->set_reply_error();
}
reply->set_seqno(msg.seqno());
mIOLoop->PostTask(
FROM_HERE,
NewRunnableMethod(this, &SyncChannel::OnSend, reply));

View File

@ -105,9 +105,17 @@ protected:
return mPendingReply != 0;
}
int32 NextSeqno() {
AssertWorkerThread();
return mChild ? --mNextSeqno : ++mNextSeqno;
}
MessageId mPendingReply;
bool mProcessingSyncMessage;
Message mRecvd;
// This is only accessed from the worker thread; seqno's are
// completely opaque to the IO thread.
int32 mNextSeqno;
static bool sIsPumpingMessages;
};