/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- * vim: sw=4 ts=4 et : * This Source Code Form is subject to the terms of the Mozilla Public * License, v. 2.0. If a copy of the MPL was not distributed with this * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ #ifndef ipc_glue_RPCChannel_h #define ipc_glue_RPCChannel_h 1 #include #include #include #include #include "base/basictypes.h" #include "nsAtomicRefcnt.h" #include "mozilla/ipc/SyncChannel.h" #include "nsAutoPtr.h" namespace mozilla { namespace ipc { //----------------------------------------------------------------------------- class RPCChannel : public SyncChannel { friend class CxxStackFrame; public: // What happens if RPC calls race? enum RacyRPCPolicy { RRPError, RRPChildWins, RRPParentWins }; class /*NS_INTERFACE_CLASS*/ RPCListener : public SyncChannel::SyncListener { public: virtual ~RPCListener() { } virtual void OnChannelClose() = 0; virtual void OnChannelError() = 0; virtual Result OnMessageReceived(const Message& aMessage) = 0; virtual void OnProcessingError(Result aError) = 0; virtual int32_t GetProtocolTypeId() = 0; virtual bool OnReplyTimeout() = 0; virtual Result OnMessageReceived(const Message& aMessage, Message*& aReply) = 0; virtual Result OnCallReceived(const Message& aMessage, Message*& aReply) = 0; virtual void OnChannelConnected(int32_t peer_pid) {} virtual void OnEnteredCxxStack() { NS_RUNTIMEABORT("default impl shouldn't be invoked"); } virtual void OnExitedCxxStack() { NS_RUNTIMEABORT("default impl shouldn't be invoked"); } virtual void OnEnteredCall() { NS_RUNTIMEABORT("default impl shouldn't be invoked"); } virtual void OnExitedCall() { NS_RUNTIMEABORT("default impl shouldn't be invoked"); } virtual RacyRPCPolicy MediateRPCRace(const Message& parent, const Message& child) { return RRPChildWins; } virtual void ProcessRemoteNativeEventsInRPCCall() {}; }; RPCChannel(RPCListener* aListener); virtual ~RPCChannel(); void Clear() MOZ_OVERRIDE; // Make an RPC to the other side of the channel bool Call(Message* msg, Message* reply); // RPCChannel overrides these so that the async and sync messages // can be counted against mStackFrames virtual bool Send(Message* msg) MOZ_OVERRIDE; virtual bool Send(Message* msg, Message* reply) MOZ_OVERRIDE; // Asynchronously, send the child a message that puts it in such a // state that it can't send messages to the parent unless the // parent sends a message to it first. The child stays in this // state until the parent calls |UnblockChild()|. // // It is an error to // - call this on the child side of the channel. // - nest |BlockChild()| calls // - call this when the child is already blocked on a sync or RPC // in-/out- message/call // // Return true iff successful. bool BlockChild(); // Asynchronously undo |BlockChild()|. // // It is an error to // - call this on the child side of the channel // - call this without a matching |BlockChild()| // // Return true iff successful. bool UnblockChild(); // Return true iff this has code on the C++ stack. bool IsOnCxxStack() const { return !mCxxStackFrames.empty(); } virtual bool OnSpecialMessage(uint16_t id, const Message& msg) MOZ_OVERRIDE; /** * If there is a pending RPC message, process all pending messages. * * @note This method is used on Windows when we detect that an outbound * OLE RPC call is being made to unblock the parent. */ void FlushPendingRPCQueue(); #ifdef OS_WIN void ProcessNativeEventsInRPCCall(); static void NotifyGeckoEventDispatch(); protected: bool WaitForNotify(); void SpinInternalEventLoop(); #endif protected: virtual void OnMessageReceivedFromLink(const Message& msg) MOZ_OVERRIDE; virtual void OnChannelErrorFromLink() MOZ_OVERRIDE; private: // Called on worker thread only RPCListener* Listener() const { return static_cast(mListener); } virtual bool ShouldDeferNotifyMaybeError() const MOZ_OVERRIDE { return IsOnCxxStack(); } bool EventOccurred() const; void MaybeUndeferIncall(); void EnqueuePendingMessages(); /** * Process one deferred or pending message. * @return true if a message was processed */ bool OnMaybeDequeueOne(); /** * The "remote view of stack depth" can be different than the * actual stack depth when there are out-of-turn replies. When we * receive one, our actual RPC stack depth doesn't decrease, but * the other side (that sent the reply) thinks it has. So, the * "view" returned here is |stackDepth| minus the number of * out-of-turn replies. * * Only called from the worker thread. */ size_t RemoteViewOfStackDepth(size_t stackDepth) const; void Incall(const Message& call, size_t stackDepth); void DispatchIncall(const Message& call); void BlockOnParent(); void UnblockFromParent(); // This helper class managed RPCChannel.mCxxStackDepth on behalf // of RPCChannel. When the stack depth is incremented from zero // to non-zero, it invokes an RPCChannel callback, and similarly // for when the depth goes from non-zero to zero; void EnteredCxxStack() { Listener()->OnEnteredCxxStack(); } void ExitedCxxStack(); void EnteredCall() { Listener()->OnEnteredCall(); } void ExitedCall() { Listener()->OnExitedCall(); } enum Direction { IN_MESSAGE, OUT_MESSAGE }; struct RPCFrame { RPCFrame(Direction direction, const Message* msg) : mDirection(direction), mMsg(msg) { } bool IsRPCIncall() const { return mMsg->is_rpc() && IN_MESSAGE == mDirection; } bool IsRPCOutcall() const { return mMsg->is_rpc() && OUT_MESSAGE == mDirection; } void Describe(int32_t* id, const char** dir, const char** sems, const char** name) const { *id = mMsg->routing_id(); *dir = (IN_MESSAGE == mDirection) ? "in" : "out"; *sems = mMsg->is_rpc() ? "rpc" : mMsg->is_sync() ? "sync" : "async"; *name = mMsg->name(); } Direction mDirection; const Message* mMsg; }; class NS_STACK_CLASS CxxStackFrame { public: CxxStackFrame(RPCChannel& that, Direction direction, const Message* msg) : mThat(that) { mThat.AssertWorkerThread(); if (mThat.mCxxStackFrames.empty()) mThat.EnteredCxxStack(); mThat.mCxxStackFrames.push_back(RPCFrame(direction, msg)); const RPCFrame& frame = mThat.mCxxStackFrames.back(); if (frame.IsRPCIncall()) mThat.EnteredCall(); mThat.mSawRPCOutMsg |= frame.IsRPCOutcall(); } ~CxxStackFrame() { bool exitingCall = mThat.mCxxStackFrames.back().IsRPCIncall(); mThat.mCxxStackFrames.pop_back(); bool exitingStack = mThat.mCxxStackFrames.empty(); // mListener could have gone away if Close() was called while // RPCChannel code was still on the stack if (!mThat.mListener) return; mThat.AssertWorkerThread(); if (exitingCall) mThat.ExitedCall(); if (exitingStack) mThat.ExitedCxxStack(); } private: RPCChannel& mThat; // disable harmful methods CxxStackFrame(); CxxStackFrame(const CxxStackFrame&); CxxStackFrame& operator=(const CxxStackFrame&); }; // Called from both threads size_t StackDepth() const { mMonitor->AssertCurrentThreadOwns(); return mStack.size(); } void DebugAbort(const char* file, int line, const char* cond, const char* why, const char* type="rpc", bool reply=false) const; // This method is only safe to call on the worker thread, or in a // debugger with all threads paused. void DumpRPCStack(const char* const pfx="") const; // // Queue of all incoming messages, except for replies to sync // messages, which are delivered directly to the SyncChannel // through its mRecvd member. // // If both this side and the other side are functioning correctly, // the queue can only be in certain configurations. Let // // |A<| be an async in-message, // |S<| be a sync in-message, // |C<| be an RPC in-call, // |R<| be an RPC reply. // // The queue can only match this configuration // // A<* (S< | C< | R< (?{mStack.size() == 1} A<* (S< | C<))) // // The other side can send as many async messages |A<*| as it // wants before sending us a blocking message. // // The first case is |S<|, a sync in-msg. The other side must be // blocked, and thus can't send us any more messages until we // process the sync in-msg. // // The second case is |C<|, an RPC in-call; the other side must be // blocked. (There's a subtlety here: this in-call might have // raced with an out-call, but we detect that with the mechanism // below, |mRemoteStackDepth|, and races don't matter to the // queue.) // // Final case, the other side replied to our most recent out-call // |R<|. If that was the *only* out-call on our stack, // |?{mStack.size() == 1}|, then other side "finished with us," // and went back to its own business. That business might have // included sending any number of async message |A<*| until // sending a blocking message |(S< | C<)|. If we had more than // one RPC call on our stack, the other side *better* not have // sent us another blocking message, because it's blocked on a // reply from us. // typedef std::deque MessageQueue; MessageQueue mPending; // // Stack of all the RPC out-calls on which this RPCChannel is // awaiting a response. // std::stack mStack; // // Map of replies received "out of turn", because of RPC // in-calls racing with replies to outstanding in-calls. See // https://bugzilla.mozilla.org/show_bug.cgi?id=521929. // typedef std::map MessageMap; MessageMap mOutOfTurnReplies; // // Stack of RPC in-calls that were deferred because of race // conditions. // std::stack mDeferred; // // This is what we think the RPC stack depth is on the "other // side" of this RPC channel. We maintain this variable so that // we can detect racy RPC calls. With each RPC out-call sent, we // send along what *we* think the stack depth of the remote side // is *before* it will receive the RPC call. // // After sending the out-call, our stack depth is "incremented" // by pushing that pending message onto mPending. // // Then when processing an in-call |c|, it must be true that // // mStack.size() == c.remoteDepth // // i.e., my depth is actually the same as what the other side // thought it was when it sent in-call |c|. If this fails to // hold, we have detected racy RPC calls. // // We then increment mRemoteStackDepth *just before* processing // the in-call, since we know the other side is waiting on it, and // decrement it *just after* finishing processing that in-call, // since our response will pop the top of the other side's // |mPending|. // // One nice aspect of this race detection is that it is symmetric; // if one side detects a race, then the other side must also // detect the same race. // size_t mRemoteStackDepthGuess; // True iff the parent has put us in a |BlockChild()| state. bool mBlockedOnParent; // Approximation of Sync/RPCChannel-code frames on the C++ stack. // It can only be interpreted as the implication // // !mCxxStackFrames.empty() => RPCChannel code on C++ stack // // This member is only accessed on the worker thread, and so is // not protected by mMonitor. It is managed exclusively by the // helper |class CxxStackFrame|. std::vector mCxxStackFrames; // Did we process an RPC out-call during this stack? Only // meaningful in ExitedCxxStack(), from which this variable is // reset. bool mSawRPCOutMsg; private: // // All dequeuing tasks require a single point of cancellation, // which is handled via a reference-counted task. // class RefCountedTask { public: RefCountedTask(CancelableTask* aTask) : mTask(aTask) , mRefCnt(0) {} ~RefCountedTask() { delete mTask; } void Run() { mTask->Run(); } void Cancel() { mTask->Cancel(); } void AddRef() { NS_AtomicIncrementRefcnt(mRefCnt); } void Release() { if (NS_AtomicDecrementRefcnt(mRefCnt) == 0) delete this; } private: CancelableTask* mTask; nsrefcnt mRefCnt; }; // // Wrap an existing task which can be cancelled at any time // without the wrapper's knowledge. // class DequeueTask : public Task { public: DequeueTask(RefCountedTask* aTask) : mTask(aTask) {} void Run() { mTask->Run(); } private: nsRefPtr mTask; }; // A task encapsulating dequeuing one pending task nsRefPtr mDequeueOneTask; }; } // namespace ipc } // namespace mozilla #endif // ifndef ipc_glue_RPCChannel_h