Fix typo in 9b54bd30006c008b4a951331b273613d5bac3abf
[pm.git] / ipc / glue / MessageChannel.cpp
blob43233cab4d61029df05e6b51cac2f462f011e23e
1 /* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*-
2 * vim: sw=4 ts=4 et :
3 */
4 /* This Source Code Form is subject to the terms of the Mozilla Public
5 * License, v. 2.0. If a copy of the MPL was not distributed with this
6 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
8 #include "mozilla/ipc/MessageChannel.h"
9 #include "mozilla/ipc/ProtocolUtils.h"
11 #include "mozilla/dom/ScriptSettings.h"
13 #include "mozilla/Assertions.h"
14 #include "mozilla/DebugOnly.h"
15 #include "mozilla/Move.h"
16 #include "mozilla/SizePrintfMacros.h"
17 #include "nsDebug.h"
18 #include "nsISupportsImpl.h"
19 #include "nsContentUtils.h"
21 #include "prprf.h"
23 // Undo the damage done by mozzconf.h
24 #undef compress
27 * IPC design:
29 * There are three kinds of messages: async, sync, and intr. Sync and intr
30 * messages are blocking. Only intr and high-priority sync messages can nest.
32 * Terminology: To dispatch a message Foo is to run the RecvFoo code for
33 * it. This is also called "handling" the message.
35 * Sync messages have priorities while async and intr messages always have
36 * normal priority. The three possible priorities are normal, high, and urgent.
37 * The intended uses of these priorities are:
38 * NORMAL - most messages.
39 * HIGH - CPOW-related messages, which can go in either direction.
40 * URGENT - messages where we don't want to dispatch
41 * incoming CPOWs while waiting for the response.
43 * To avoid jank, the parent process is not allowed to send sync messages of
44 * normal priority. The parent also is not allowed to send urgent messages at
45 * all. When a process is waiting for a response to a sync message M0, it will
46 * dispatch an incoming message M if:
47 * 1. M has a higher priority than M0, or
48 * 2. if M has the same priority as M0 and we're in the child, or
49 * 3. if M has the same priority as M0 and it was sent by the other side
50 while dispatching M0 (nesting).
51 * The idea is that higher priority messages should take precendence, and we
52 * also want to allow nesting. The purpose of rule 2 is to handle a race where
53 * both processes send to each other simultaneously. In this case, we resolve
54 * the race in favor of the parent (so the child dispatches first).
56 * Sync messages satisfy the following properties:
57 * A. When waiting for a response to a sync message, we won't dispatch any
58 * messages of lower priority.
59 * B. Sync messages of the same priority will be dispatched roughly in the
60 * order they were sent. The exception is when the parent and child send
61 * sync messages to each other simulataneously. In this case, the parent's
62 * message is dispatched first. While it is dispatched, the child may send
63 * further nested messages, and these messages may be dispatched before the
64 * child's original message. We can consider ordering to be preserved here
65 * because we pretend that the child's original message wasn't sent until
66 * after the parent's message is finished being dispatched.
68 * Intr messages are blocking but not prioritized. While waiting for an intr
69 * response, all incoming messages are dispatched until a response is
70 * received. Intr messages also can be nested. When two intr messages race with
71 * each other, a similar scheme is used to ensure that one side wins. The
72 * winning side is chosen based on the message type.
74 * Intr messages differ from sync messages in that, while sending an intr
75 * message, we may dispatch an async message. This causes some additional
76 * complexity. One issue is that replies can be received out of order. It's also
77 * more difficult to determine whether one message is nested inside
78 * another. Consequently, intr handling uses mOutOfTurnReplies and
79 * mRemoteStackDepthGuess, which are not needed for sync messages.
82 using namespace mozilla;
83 using namespace std;
85 using mozilla::dom::AutoNoJSAPI;
86 using mozilla::dom::ScriptSettingsInitialized;
87 using mozilla::MonitorAutoLock;
88 using mozilla::MonitorAutoUnlock;
90 template<>
91 struct RunnableMethodTraits<mozilla::ipc::MessageChannel>
93 static void RetainCallee(mozilla::ipc::MessageChannel* obj) { }
94 static void ReleaseCallee(mozilla::ipc::MessageChannel* obj) { }
97 #define IPC_ASSERT(_cond, ...) \
98 do { \
99 if (!(_cond)) \
100 DebugAbort(__FILE__, __LINE__, #_cond,## __VA_ARGS__); \
101 } while (0)
103 static bool gParentIsBlocked;
105 namespace mozilla {
106 namespace ipc {
108 const int32_t MessageChannel::kNoTimeout = INT32_MIN;
110 // static
111 bool MessageChannel::sIsPumpingMessages = false;
113 enum Direction
115 IN_MESSAGE,
116 OUT_MESSAGE
119 class MessageChannel::InterruptFrame
121 private:
122 enum Semantics
124 INTR_SEMS,
125 SYNC_SEMS,
126 ASYNC_SEMS
129 public:
130 InterruptFrame(Direction direction, const Message* msg)
131 : mMessageName(strdup(msg->name())),
132 mMessageRoutingId(msg->routing_id()),
133 mMesageSemantics(msg->is_interrupt() ? INTR_SEMS :
134 msg->is_sync() ? SYNC_SEMS :
135 ASYNC_SEMS),
136 mDirection(direction),
137 mMoved(false)
139 MOZ_ASSERT(mMessageName);
142 InterruptFrame(InterruptFrame&& aOther)
144 MOZ_ASSERT(aOther.mMessageName);
145 mMessageName = aOther.mMessageName;
146 aOther.mMessageName = nullptr;
147 aOther.mMoved = true;
149 mMessageRoutingId = aOther.mMessageRoutingId;
150 mMesageSemantics = aOther.mMesageSemantics;
151 mDirection = aOther.mDirection;
154 ~InterruptFrame()
156 MOZ_ASSERT_IF(!mMessageName, mMoved);
158 if (mMessageName)
159 free(const_cast<char*>(mMessageName));
162 InterruptFrame& operator=(InterruptFrame&& aOther)
164 MOZ_ASSERT(&aOther != this);
165 this->~InterruptFrame();
166 new (this) InterruptFrame(mozilla::Move(aOther));
167 return *this;
170 bool IsInterruptIncall() const
172 return INTR_SEMS == mMesageSemantics && IN_MESSAGE == mDirection;
175 bool IsInterruptOutcall() const
177 return INTR_SEMS == mMesageSemantics && OUT_MESSAGE == mDirection;
180 bool IsOutgoingSync() const {
181 return (mMesageSemantics == INTR_SEMS || mMesageSemantics == SYNC_SEMS) &&
182 mDirection == OUT_MESSAGE;
185 void Describe(int32_t* id, const char** dir, const char** sems,
186 const char** name) const
188 *id = mMessageRoutingId;
189 *dir = (IN_MESSAGE == mDirection) ? "in" : "out";
190 *sems = (INTR_SEMS == mMesageSemantics) ? "intr" :
191 (SYNC_SEMS == mMesageSemantics) ? "sync" :
192 "async";
193 *name = mMessageName;
196 private:
197 const char* mMessageName;
198 int32_t mMessageRoutingId;
199 Semantics mMesageSemantics;
200 Direction mDirection;
201 DebugOnly<bool> mMoved;
203 // Disable harmful methods.
204 InterruptFrame(const InterruptFrame& aOther) = delete;
205 InterruptFrame& operator=(const InterruptFrame&) = delete;
208 class MOZ_STACK_CLASS MessageChannel::CxxStackFrame
210 public:
211 CxxStackFrame(MessageChannel& that, Direction direction, const Message* msg)
212 : mThat(that)
214 mThat.AssertWorkerThread();
216 if (mThat.mCxxStackFrames.empty())
217 mThat.EnteredCxxStack();
219 mThat.mCxxStackFrames.append(InterruptFrame(direction, msg));
221 const InterruptFrame& frame = mThat.mCxxStackFrames.back();
223 if (frame.IsInterruptIncall())
224 mThat.EnteredCall();
226 if (frame.IsOutgoingSync())
227 mThat.EnteredSyncSend();
229 mThat.mSawInterruptOutMsg |= frame.IsInterruptOutcall();
232 ~CxxStackFrame() {
233 mThat.AssertWorkerThread();
235 MOZ_ASSERT(!mThat.mCxxStackFrames.empty());
237 const InterruptFrame& frame = mThat.mCxxStackFrames.back();
238 bool exitingSync = frame.IsOutgoingSync();
239 bool exitingCall = frame.IsInterruptIncall();
240 mThat.mCxxStackFrames.shrinkBy(1);
242 bool exitingStack = mThat.mCxxStackFrames.empty();
244 // mListener could have gone away if Close() was called while
245 // MessageChannel code was still on the stack
246 if (!mThat.mListener)
247 return;
249 if (exitingCall)
250 mThat.ExitedCall();
252 if (exitingSync)
253 mThat.ExitedSyncSend();
255 if (exitingStack)
256 mThat.ExitedCxxStack();
258 private:
259 MessageChannel& mThat;
261 // Disable harmful methods.
262 CxxStackFrame() = delete;
263 CxxStackFrame(const CxxStackFrame&) = delete;
264 CxxStackFrame& operator=(const CxxStackFrame&) = delete;
267 namespace {
269 class MOZ_STACK_CLASS MaybeScriptBlocker {
270 public:
271 explicit MaybeScriptBlocker(MessageChannel *aChannel, bool aBlock
272 MOZ_GUARD_OBJECT_NOTIFIER_PARAM)
273 : mBlocked(aChannel->ShouldBlockScripts() && aBlock)
275 MOZ_GUARD_OBJECT_NOTIFIER_INIT;
276 if (mBlocked) {
277 nsContentUtils::AddScriptBlocker();
280 ~MaybeScriptBlocker() {
281 if (mBlocked) {
282 nsContentUtils::RemoveScriptBlocker();
285 private:
286 MOZ_DECL_USE_GUARD_OBJECT_NOTIFIER
287 bool mBlocked;
290 } /* namespace {} */
292 MessageChannel::MessageChannel(MessageListener *aListener)
293 : mListener(aListener),
294 mChannelState(ChannelClosed),
295 mSide(UnknownSide),
296 mLink(nullptr),
297 mWorkerLoop(nullptr),
298 mChannelErrorTask(nullptr),
299 mWorkerLoopID(-1),
300 mTimeoutMs(kNoTimeout),
301 mInTimeoutSecondHalf(false),
302 mNextSeqno(0),
303 mAwaitingSyncReply(false),
304 mAwaitingSyncReplyPriority(0),
305 mDispatchingSyncMessage(false),
306 mDispatchingSyncMessagePriority(0),
307 mDispatchingAsyncMessage(false),
308 mDispatchingAsyncMessagePriority(0),
309 mCurrentTransaction(0),
310 mTimedOutMessageSeqno(0),
311 mRecvdErrors(0),
312 mRemoteStackDepthGuess(false),
313 mSawInterruptOutMsg(false),
314 mIsWaitingForIncoming(false),
315 mAbortOnError(false),
316 mBlockScripts(false),
317 mFlags(REQUIRE_DEFAULT),
318 mPeerPidSet(false),
319 mPeerPid(-1)
321 MOZ_COUNT_CTOR(ipc::MessageChannel);
323 #ifdef OS_WIN
324 mTopFrame = nullptr;
325 mIsSyncWaitingOnNonMainThread = false;
326 #endif
328 mDequeueOneTask = new RefCountedTask(NewRunnableMethod(
329 this,
330 &MessageChannel::OnMaybeDequeueOne));
332 mOnChannelConnectedTask = new RefCountedTask(NewRunnableMethod(
333 this,
334 &MessageChannel::DispatchOnChannelConnected));
336 #ifdef OS_WIN
337 mEvent = CreateEventW(nullptr, TRUE, FALSE, nullptr);
338 NS_ASSERTION(mEvent, "CreateEvent failed! Nothing is going to work!");
339 #endif
342 MessageChannel::~MessageChannel()
344 MOZ_COUNT_DTOR(ipc::MessageChannel);
345 IPC_ASSERT(mCxxStackFrames.empty(), "mismatched CxxStackFrame ctor/dtors");
346 #ifdef OS_WIN
347 DebugOnly<BOOL> ok = CloseHandle(mEvent);
348 MOZ_ASSERT(ok);
349 #endif
350 Clear();
353 static void
354 PrintErrorMessage(Side side, const char* channelName, const char* msg)
356 const char *from = (side == ChildSide)
357 ? "Child"
358 : ((side == ParentSide) ? "Parent" : "Unknown");
359 printf_stderr("\n###!!! [%s][%s] Error: %s\n\n", from, channelName, msg);
362 bool
363 MessageChannel::Connected() const
365 mMonitor->AssertCurrentThreadOwns();
367 // The transport layer allows us to send messages before
368 // receiving the "connected" ack from the remote side.
369 return (ChannelOpening == mChannelState || ChannelConnected == mChannelState);
372 bool
373 MessageChannel::CanSend() const
375 if (!mMonitor) {
376 return false;
378 MonitorAutoLock lock(*mMonitor);
379 return Connected();
382 void
383 MessageChannel::Clear()
385 // Don't clear mWorkerLoopID; we use it in AssertLinkThread() and
386 // AssertWorkerThread().
388 // Also don't clear mListener. If we clear it, then sending a message
389 // through this channel after it's Clear()'ed can cause this process to
390 // crash.
392 // In practice, mListener owns the channel, so the channel gets deleted
393 // before mListener. But just to be safe, mListener is a weak pointer.
395 mDequeueOneTask->Cancel();
397 mWorkerLoop = nullptr;
398 delete mLink;
399 mLink = nullptr;
401 mOnChannelConnectedTask->Cancel();
403 if (mChannelErrorTask) {
404 mChannelErrorTask->Cancel();
405 mChannelErrorTask = nullptr;
408 // Free up any memory used by pending messages.
409 mPending.clear();
410 mRecvd = nullptr;
411 mOutOfTurnReplies.clear();
412 while (!mDeferred.empty()) {
413 mDeferred.pop();
417 bool
418 MessageChannel::Open(Transport* aTransport, MessageLoop* aIOLoop, Side aSide)
420 NS_PRECONDITION(!mLink, "Open() called > once");
422 mMonitor = new RefCountedMonitor();
423 mWorkerLoop = MessageLoop::current();
424 mWorkerLoopID = mWorkerLoop->id();
426 ProcessLink *link = new ProcessLink(this);
427 link->Open(aTransport, aIOLoop, aSide); // :TODO: n.b.: sets mChild
428 mLink = link;
429 return true;
432 bool
433 MessageChannel::Open(MessageChannel *aTargetChan, MessageLoop *aTargetLoop, Side aSide)
435 // Opens a connection to another thread in the same process.
437 // This handshake proceeds as follows:
438 // - Let A be the thread initiating the process (either child or parent)
439 // and B be the other thread.
440 // - A spawns thread for B, obtaining B's message loop
441 // - A creates ProtocolChild and ProtocolParent instances.
442 // Let PA be the one appropriate to A and PB the side for B.
443 // - A invokes PA->Open(PB, ...):
444 // - set state to mChannelOpening
445 // - this will place a work item in B's worker loop (see next bullet)
446 // and then spins until PB->mChannelState becomes mChannelConnected
447 // - meanwhile, on PB's worker loop, the work item is removed and:
448 // - invokes PB->SlaveOpen(PA, ...):
449 // - sets its state and that of PA to Connected
450 NS_PRECONDITION(aTargetChan, "Need a target channel");
451 NS_PRECONDITION(ChannelClosed == mChannelState, "Not currently closed");
453 CommonThreadOpenInit(aTargetChan, aSide);
455 Side oppSide = UnknownSide;
456 switch(aSide) {
457 case ChildSide: oppSide = ParentSide; break;
458 case ParentSide: oppSide = ChildSide; break;
459 case UnknownSide: break;
462 mMonitor = new RefCountedMonitor();
464 MonitorAutoLock lock(*mMonitor);
465 mChannelState = ChannelOpening;
466 aTargetLoop->PostTask(
467 FROM_HERE,
468 NewRunnableMethod(aTargetChan, &MessageChannel::OnOpenAsSlave, this, oppSide));
470 while (ChannelOpening == mChannelState)
471 mMonitor->Wait();
472 NS_ASSERTION(ChannelConnected == mChannelState, "not connected when awoken");
473 return (ChannelConnected == mChannelState);
476 void
477 MessageChannel::OnOpenAsSlave(MessageChannel *aTargetChan, Side aSide)
479 // Invoked when the other side has begun the open.
480 NS_PRECONDITION(ChannelClosed == mChannelState,
481 "Not currently closed");
482 NS_PRECONDITION(ChannelOpening == aTargetChan->mChannelState,
483 "Target channel not in the process of opening");
485 CommonThreadOpenInit(aTargetChan, aSide);
486 mMonitor = aTargetChan->mMonitor;
488 MonitorAutoLock lock(*mMonitor);
489 NS_ASSERTION(ChannelOpening == aTargetChan->mChannelState,
490 "Target channel not in the process of opening");
491 mChannelState = ChannelConnected;
492 aTargetChan->mChannelState = ChannelConnected;
493 aTargetChan->mMonitor->Notify();
496 void
497 MessageChannel::CommonThreadOpenInit(MessageChannel *aTargetChan, Side aSide)
499 mWorkerLoop = MessageLoop::current();
500 mWorkerLoopID = mWorkerLoop->id();
501 mLink = new ThreadLink(this, aTargetChan);
502 mSide = aSide;
505 bool
506 MessageChannel::Echo(Message* aMsg)
508 nsAutoPtr<Message> msg(aMsg);
509 AssertWorkerThread();
510 mMonitor->AssertNotCurrentThreadOwns();
511 if (MSG_ROUTING_NONE == msg->routing_id()) {
512 ReportMessageRouteError("MessageChannel::Echo");
513 return false;
516 MonitorAutoLock lock(*mMonitor);
518 if (!Connected()) {
519 ReportConnectionError("MessageChannel");
520 return false;
523 mLink->EchoMessage(msg.forget());
524 return true;
527 bool
528 MessageChannel::Send(Message* aMsg)
530 CxxStackFrame frame(*this, OUT_MESSAGE, aMsg);
532 nsAutoPtr<Message> msg(aMsg);
533 AssertWorkerThread();
534 mMonitor->AssertNotCurrentThreadOwns();
535 if (MSG_ROUTING_NONE == msg->routing_id()) {
536 ReportMessageRouteError("MessageChannel::Send");
537 return false;
540 MonitorAutoLock lock(*mMonitor);
541 if (!Connected()) {
542 ReportConnectionError("MessageChannel");
543 return false;
545 mLink->SendMessage(msg.forget());
546 return true;
549 bool
550 MessageChannel::MaybeInterceptSpecialIOMessage(const Message& aMsg)
552 AssertLinkThread();
553 mMonitor->AssertCurrentThreadOwns();
555 if (MSG_ROUTING_NONE == aMsg.routing_id() &&
556 GOODBYE_MESSAGE_TYPE == aMsg.type())
558 // :TODO: Sort out Close() on this side racing with Close() on the
559 // other side
560 mChannelState = ChannelClosing;
561 if (LoggingEnabled()) {
562 printf("NOTE: %s process received `Goodbye', closing down\n",
563 (mSide == ChildSide) ? "child" : "parent");
565 return true;
567 return false;
570 bool
571 MessageChannel::ShouldDeferMessage(const Message& aMsg)
573 // Never defer messages that have the highest priority, even async
574 // ones. This is safe because only the child can send these messages, so
575 // they can never nest.
576 if (aMsg.priority() == IPC::Message::PRIORITY_URGENT)
577 return false;
579 // Unless they're urgent, we always defer async messages.
580 if (!aMsg.is_sync()) {
581 MOZ_ASSERT(aMsg.priority() == IPC::Message::PRIORITY_NORMAL);
582 return true;
585 int msgPrio = aMsg.priority();
586 int waitingPrio = AwaitingSyncReplyPriority();
588 // Always defer if the priority of the incoming message is less than the
589 // priority of the message we're awaiting.
590 if (msgPrio < waitingPrio)
591 return true;
593 // Never defer if the message has strictly greater priority.
594 if (msgPrio > waitingPrio)
595 return false;
597 // When both sides send sync messages of the same priority, we resolve the
598 // race by dispatching in the child and deferring the incoming message in
599 // the parent. However, the parent still needs to dispatch nested sync
600 // messages.
602 // Deferring in the parent only sort of breaks message ordering. When the
603 // child's message comes in, we can pretend the child hasn't quite
604 // finished sending it yet. Since the message is sync, we know that the
605 // child hasn't moved on yet.
606 return mSide == ParentSide && aMsg.transaction_id() != mCurrentTransaction;
609 void
610 MessageChannel::OnMessageReceivedFromLink(const Message& aMsg)
612 AssertLinkThread();
613 mMonitor->AssertCurrentThreadOwns();
615 if (MaybeInterceptSpecialIOMessage(aMsg))
616 return;
618 // Regardless of the Interrupt stack, if we're awaiting a sync reply,
619 // we know that it needs to be immediately handled to unblock us.
620 if (aMsg.is_sync() && aMsg.is_reply()) {
621 if (aMsg.seqno() == mTimedOutMessageSeqno) {
622 // Drop the message, but allow future sync messages to be sent.
623 mTimedOutMessageSeqno = 0;
624 return;
627 MOZ_ASSERT(AwaitingSyncReply());
628 MOZ_ASSERT(!mRecvd);
630 // Rather than storing errors in mRecvd, we mark them in
631 // mRecvdErrors. We need a counter because multiple replies can arrive
632 // when a timeout happens, as in the following example. Imagine the
633 // child is running slowly. The parent sends a sync message P1. It times
634 // out. The child eventually sends a sync message C1. While waiting for
635 // the C1 response, the child dispatches P1. In doing so, it sends sync
636 // message C2. At that point, it's valid for the parent to send error
637 // responses for both C1 and C2.
638 if (aMsg.is_reply_error()) {
639 mRecvdErrors++;
640 NotifyWorkerThread();
641 return;
644 mRecvd = new Message(aMsg);
645 NotifyWorkerThread();
646 return;
649 // Prioritized messages cannot be compressed.
650 MOZ_ASSERT(!aMsg.compress() || aMsg.priority() == IPC::Message::PRIORITY_NORMAL);
652 bool compress = (aMsg.compress() && !mPending.empty() &&
653 mPending.back().type() == aMsg.type() &&
654 mPending.back().routing_id() == aMsg.routing_id());
655 if (compress) {
656 // This message type has compression enabled, and the back of the
657 // queue was the same message type and routed to the same destination.
658 // Replace it with the newer message.
659 MOZ_ASSERT(mPending.back().compress());
660 mPending.pop_back();
663 bool shouldWakeUp = AwaitingInterruptReply() ||
664 (AwaitingSyncReply() && !ShouldDeferMessage(aMsg)) ||
665 AwaitingIncomingMessage();
667 // There are three cases we're concerned about, relating to the state of the
668 // main thread:
670 // (1) We are waiting on a sync reply - main thread is blocked on the
671 // IPC monitor.
672 // - If the message is high priority, we wake up the main thread to
673 // deliver the message depending on ShouldDeferMessage. Otherwise, we
674 // leave it in the mPending queue, posting a task to the main event
675 // loop, where it will be processed once the synchronous reply has been
676 // received.
678 // (2) We are waiting on an Interrupt reply - main thread is blocked on the
679 // IPC monitor.
680 // - Always notify and wake up the main thread.
682 // (3) We are not waiting on a reply.
683 // - We post a task to the main event loop.
685 // Note that, we may notify the main thread even though the monitor is not
686 // blocked. This is okay, since we always check for pending events before
687 // blocking again.
689 mPending.push_back(aMsg);
691 if (shouldWakeUp) {
692 NotifyWorkerThread();
693 } else {
694 // Worker thread is either not blocked on a reply, or this is an
695 // incoming Interrupt that raced with outgoing sync, and needs to be
696 // deferred to a later event-loop iteration.
697 if (!compress) {
698 // If we compressed away the previous message, we'll re-use
699 // its pending task.
700 mWorkerLoop->PostTask(FROM_HERE, new DequeueTask(mDequeueOneTask));
705 bool
706 MessageChannel::Send(Message* aMsg, Message* aReply)
708 // See comment in DispatchSyncMessage.
709 MaybeScriptBlocker scriptBlocker(this, true);
711 // Sanity checks.
712 AssertWorkerThread();
713 mMonitor->AssertNotCurrentThreadOwns();
715 if (mCurrentTransaction == 0)
716 mListener->OnBeginSyncTransaction();
718 #ifdef OS_WIN
719 SyncStackFrame frame(this, false);
720 #endif
722 CxxStackFrame f(*this, OUT_MESSAGE, aMsg);
724 MonitorAutoLock lock(*mMonitor);
726 if (mTimedOutMessageSeqno) {
727 // Don't bother sending another sync message if a previous one timed out
728 // and we haven't received a reply for it. Once the original timed-out
729 // message receives a reply, we'll be able to send more sync messages
730 // again.
731 return false;
734 IPC_ASSERT(aMsg->is_sync(), "can only Send() sync messages here");
735 IPC_ASSERT(aMsg->priority() >= DispatchingSyncMessagePriority(),
736 "can't send sync message of a lesser priority than what's being dispatched");
737 IPC_ASSERT(mAwaitingSyncReplyPriority <= aMsg->priority(),
738 "nested sync message sends must be of increasing priority");
740 IPC_ASSERT(DispatchingSyncMessagePriority() != IPC::Message::PRIORITY_URGENT,
741 "not allowed to send messages while dispatching urgent messages");
742 IPC_ASSERT(DispatchingAsyncMessagePriority() != IPC::Message::PRIORITY_URGENT,
743 "not allowed to send messages while dispatching urgent messages");
745 nsAutoPtr<Message> msg(aMsg);
747 if (!Connected()) {
748 ReportConnectionError("MessageChannel::SendAndWait");
749 return false;
752 msg->set_seqno(NextSeqno());
754 int32_t seqno = msg->seqno();
755 DebugOnly<msgid_t> replyType = msg->type() + 1;
757 AutoSetValue<bool> replies(mAwaitingSyncReply, true);
758 AutoSetValue<int> prio(mAwaitingSyncReplyPriority, msg->priority());
759 AutoEnterTransaction transact(this, seqno);
761 int32_t transaction = mCurrentTransaction;
762 msg->set_transaction_id(transaction);
764 mLink->SendMessage(msg.forget());
766 while (true) {
767 // Loop until there aren't any more priority messages to process.
768 for (;;) {
769 mozilla::Vector<Message> toProcess;
771 for (MessageQueue::iterator it = mPending.begin(); it != mPending.end(); ) {
772 Message &msg = *it;
773 if (!ShouldDeferMessage(msg)) {
774 toProcess.append(Move(msg));
775 it = mPending.erase(it);
776 continue;
778 it++;
781 if (toProcess.empty())
782 break;
784 // Processing these messages could result in more messages, so we
785 // loop around to check for more afterwards.
786 for (auto it = toProcess.begin(); it != toProcess.end(); it++)
787 ProcessPendingRequest(*it);
790 // See if we've received a reply.
791 if (mRecvdErrors) {
792 mRecvdErrors--;
793 return false;
796 if (mRecvd) {
797 MOZ_ASSERT(mRecvd->is_reply(), "expected reply");
798 MOZ_ASSERT(!mRecvd->is_reply_error());
799 MOZ_ASSERT(mRecvd->type() == replyType, "wrong reply type");
800 MOZ_ASSERT(mRecvd->seqno() == seqno);
801 MOZ_ASSERT(mRecvd->is_sync());
803 *aReply = Move(*mRecvd);
804 mRecvd = nullptr;
805 return true;
808 MOZ_ASSERT(!mTimedOutMessageSeqno);
810 bool maybeTimedOut = !WaitForSyncNotify();
812 if (!Connected()) {
813 ReportConnectionError("MessageChannel::SendAndWait");
814 return false;
817 // We only time out a message if it initiated a new transaction (i.e.,
818 // if neither side has any other message Sends on the stack).
819 bool canTimeOut = transaction == seqno;
820 if (maybeTimedOut && canTimeOut && !ShouldContinueFromTimeout()) {
821 mTimedOutMessageSeqno = seqno;
822 return false;
826 return true;
829 bool
830 MessageChannel::Call(Message* aMsg, Message* aReply)
832 AssertWorkerThread();
833 mMonitor->AssertNotCurrentThreadOwns();
835 #ifdef OS_WIN
836 SyncStackFrame frame(this, true);
837 #endif
839 // This must come before MonitorAutoLock, as its destructor acquires the
840 // monitor lock.
841 CxxStackFrame cxxframe(*this, OUT_MESSAGE, aMsg);
843 MonitorAutoLock lock(*mMonitor);
844 if (!Connected()) {
845 ReportConnectionError("MessageChannel::Call");
846 return false;
849 // Sanity checks.
850 IPC_ASSERT(!AwaitingSyncReply(),
851 "cannot issue Interrupt call while blocked on sync request");
852 IPC_ASSERT(!DispatchingSyncMessage(),
853 "violation of sync handler invariant");
854 IPC_ASSERT(aMsg->is_interrupt(), "can only Call() Interrupt messages here");
856 nsAutoPtr<Message> msg(aMsg);
858 msg->set_seqno(NextSeqno());
859 msg->set_interrupt_remote_stack_depth_guess(mRemoteStackDepthGuess);
860 msg->set_interrupt_local_stack_depth(1 + InterruptStackDepth());
861 mInterruptStack.push(*msg);
862 mLink->SendMessage(msg.forget());
864 while (true) {
865 // if a handler invoked by *Dispatch*() spun a nested event
866 // loop, and the connection was broken during that loop, we
867 // might have already processed the OnError event. if so,
868 // trying another loop iteration will be futile because
869 // channel state will have been cleared
870 if (!Connected()) {
871 ReportConnectionError("MessageChannel::Call");
872 return false;
875 // Now might be the time to process a message deferred because of race
876 // resolution.
877 MaybeUndeferIncall();
879 // Wait for an event to occur.
880 while (!InterruptEventOccurred()) {
881 bool maybeTimedOut = !WaitForInterruptNotify();
883 // We might have received a "subtly deferred" message in a nested
884 // loop that it's now time to process.
885 if (InterruptEventOccurred() ||
886 (!maybeTimedOut && (!mDeferred.empty() || !mOutOfTurnReplies.empty())))
888 break;
891 if (maybeTimedOut && !ShouldContinueFromTimeout())
892 return false;
895 Message recvd;
896 MessageMap::iterator it;
898 if ((it = mOutOfTurnReplies.find(mInterruptStack.top().seqno()))
899 != mOutOfTurnReplies.end())
901 recvd = Move(it->second);
902 mOutOfTurnReplies.erase(it);
903 } else if (!mPending.empty()) {
904 recvd = Move(mPending.front());
905 mPending.pop_front();
906 } else {
907 // because of subtleties with nested event loops, it's possible
908 // that we got here and nothing happened. or, we might have a
909 // deferred in-call that needs to be processed. either way, we
910 // won't break the inner while loop again until something new
911 // happens.
912 continue;
915 // If the message is not Interrupt, we can dispatch it as normal.
916 if (!recvd.is_interrupt()) {
918 AutoEnterTransaction transaction(this, recvd);
919 MonitorAutoUnlock unlock(*mMonitor);
920 CxxStackFrame frame(*this, IN_MESSAGE, &recvd);
921 DispatchMessage(recvd);
923 if (!Connected()) {
924 ReportConnectionError("MessageChannel::DispatchMessage");
925 return false;
927 continue;
930 // If the message is an Interrupt reply, either process it as a reply to our
931 // call, or add it to the list of out-of-turn replies we've received.
932 if (recvd.is_reply()) {
933 IPC_ASSERT(!mInterruptStack.empty(), "invalid Interrupt stack");
935 // If this is not a reply the call we've initiated, add it to our
936 // out-of-turn replies and keep polling for events.
938 const Message &outcall = mInterruptStack.top();
940 // Note, In the parent, sequence numbers increase from 0, and
941 // in the child, they decrease from 0.
942 if ((mSide == ChildSide && recvd.seqno() > outcall.seqno()) ||
943 (mSide != ChildSide && recvd.seqno() < outcall.seqno()))
945 mOutOfTurnReplies[recvd.seqno()] = Move(recvd);
946 continue;
949 IPC_ASSERT(recvd.is_reply_error() ||
950 (recvd.type() == (outcall.type() + 1) &&
951 recvd.seqno() == outcall.seqno()),
952 "somebody's misbehavin'", true);
955 // We received a reply to our most recent outstanding call. Pop
956 // this frame and return the reply.
957 mInterruptStack.pop();
959 bool is_reply_error = recvd.is_reply_error();
960 if (!is_reply_error) {
961 *aReply = Move(recvd);
964 // If we have no more pending out calls waiting on replies, then
965 // the reply queue should be empty.
966 IPC_ASSERT(!mInterruptStack.empty() || mOutOfTurnReplies.empty(),
967 "still have pending replies with no pending out-calls",
968 true);
970 return !is_reply_error;
973 // Dispatch an Interrupt in-call. Snapshot the current stack depth while we
974 // own the monitor.
975 size_t stackDepth = InterruptStackDepth();
977 MonitorAutoUnlock unlock(*mMonitor);
979 CxxStackFrame frame(*this, IN_MESSAGE, &recvd);
980 DispatchInterruptMessage(recvd, stackDepth);
982 if (!Connected()) {
983 ReportConnectionError("MessageChannel::DispatchInterruptMessage");
984 return false;
988 return true;
991 bool
992 MessageChannel::WaitForIncomingMessage()
994 #ifdef OS_WIN
995 SyncStackFrame frame(this, true);
996 #endif
998 { // Scope for lock
999 MonitorAutoLock lock(*mMonitor);
1000 AutoEnterWaitForIncoming waitingForIncoming(*this);
1001 if (mChannelState != ChannelConnected) {
1002 return false;
1004 if (!HasPendingEvents()) {
1005 return WaitForInterruptNotify();
1009 return OnMaybeDequeueOne();
1012 bool
1013 MessageChannel::HasPendingEvents()
1015 AssertWorkerThread();
1016 mMonitor->AssertCurrentThreadOwns();
1017 return Connected() && !mPending.empty();
1020 bool
1021 MessageChannel::InterruptEventOccurred()
1023 AssertWorkerThread();
1024 mMonitor->AssertCurrentThreadOwns();
1025 IPC_ASSERT(InterruptStackDepth() > 0, "not in wait loop");
1027 return (!Connected() ||
1028 !mPending.empty() ||
1029 (!mOutOfTurnReplies.empty() &&
1030 mOutOfTurnReplies.find(mInterruptStack.top().seqno()) !=
1031 mOutOfTurnReplies.end()));
1034 bool
1035 MessageChannel::ProcessPendingRequest(const Message &aUrgent)
1037 AssertWorkerThread();
1038 mMonitor->AssertCurrentThreadOwns();
1040 // Note that it is possible we could have sent a sync message at
1041 // the same time the parent process sent an urgent message, and
1042 // therefore mPendingUrgentRequest is set *and* mRecvd is set as
1043 // well, because the link thread received both before the worker
1044 // thread woke up.
1046 // In this case, we process the urgent message first, but we need
1047 // to save the reply.
1048 nsAutoPtr<Message> savedReply(mRecvd.forget());
1051 // In order to send the parent RPC messages and guarantee it will
1052 // wake up, we must re-use its transaction.
1053 AutoEnterTransaction transaction(this, aUrgent);
1055 MonitorAutoUnlock unlock(*mMonitor);
1056 DispatchMessage(aUrgent);
1058 if (!Connected()) {
1059 ReportConnectionError("MessageChannel::ProcessPendingRequest");
1060 return false;
1063 // In between having dispatched our reply to the parent process, and
1064 // re-acquiring the monitor, the parent process could have already
1065 // processed that reply and sent the reply to our sync message. If so,
1066 // our saved reply should be empty.
1067 IPC_ASSERT(!mRecvd || !savedReply, "unknown reply");
1068 if (!mRecvd)
1069 mRecvd = savedReply.forget();
1070 return true;
1073 bool
1074 MessageChannel::DequeueOne(Message *recvd)
1076 AssertWorkerThread();
1077 mMonitor->AssertCurrentThreadOwns();
1079 if (!Connected()) {
1080 ReportConnectionError("OnMaybeDequeueOne");
1081 return false;
1084 if (!mDeferred.empty())
1085 MaybeUndeferIncall();
1087 if (mPending.empty())
1088 return false;
1090 *recvd = Move(mPending.front());
1091 mPending.pop_front();
1092 return true;
1095 bool
1096 MessageChannel::OnMaybeDequeueOne()
1098 AssertWorkerThread();
1099 mMonitor->AssertNotCurrentThreadOwns();
1101 Message recvd;
1103 MonitorAutoLock lock(*mMonitor);
1104 if (!DequeueOne(&recvd))
1105 return false;
1107 if (IsOnCxxStack() && recvd.is_interrupt() && recvd.is_reply()) {
1108 // We probably just received a reply in a nested loop for an
1109 // Interrupt call sent before entering that loop.
1110 mOutOfTurnReplies[recvd.seqno()] = Move(recvd);
1111 return false;
1115 // We should not be in a transaction yet if we're not blocked.
1116 MOZ_ASSERT(mCurrentTransaction == 0);
1117 AutoEnterTransaction transaction(this, recvd);
1119 MonitorAutoUnlock unlock(*mMonitor);
1121 CxxStackFrame frame(*this, IN_MESSAGE, &recvd);
1122 DispatchMessage(recvd);
1124 return true;
1127 void
1128 MessageChannel::DispatchMessage(const Message &aMsg)
1130 Maybe<AutoNoJSAPI> nojsapi;
1131 if (ScriptSettingsInitialized() && NS_IsMainThread())
1132 nojsapi.emplace();
1133 if (aMsg.is_sync())
1134 DispatchSyncMessage(aMsg);
1135 else if (aMsg.is_interrupt())
1136 DispatchInterruptMessage(aMsg, 0);
1137 else
1138 DispatchAsyncMessage(aMsg);
1141 void
1142 MessageChannel::DispatchSyncMessage(const Message& aMsg)
1144 AssertWorkerThread();
1146 nsAutoPtr<Message> reply;
1148 int prio = aMsg.priority();
1150 // We don't want to run any code that might run a nested event loop here, so
1151 // we avoid running event handlers. Once we've sent the response to the
1152 // urgent message, it's okay to run event handlers again since the parent is
1153 // no longer blocked.
1154 MOZ_ASSERT_IF(prio > IPC::Message::PRIORITY_NORMAL, NS_IsMainThread());
1155 MaybeScriptBlocker scriptBlocker(this, prio > IPC::Message::PRIORITY_NORMAL);
1157 IPC_ASSERT(prio >= mDispatchingSyncMessagePriority,
1158 "priority inversion while dispatching sync message");
1159 IPC_ASSERT(prio >= mAwaitingSyncReplyPriority,
1160 "dispatching a message of lower priority while waiting for a response");
1162 bool dummy;
1163 bool& blockingVar = ShouldBlockScripts() ? gParentIsBlocked : dummy;
1165 Result rv;
1166 if (mTimedOutMessageSeqno) {
1167 // If the other side sends a message in response to one of our messages
1168 // that we've timed out, then we reply with an error.
1170 // We even reject messages that were sent before the other side even got
1171 // to our timed out message.
1172 rv = MsgNotAllowed;
1173 } else {
1174 AutoSetValue<bool> blocked(blockingVar, true);
1175 AutoSetValue<bool> sync(mDispatchingSyncMessage, true);
1176 AutoSetValue<int> prioSet(mDispatchingSyncMessagePriority, prio);
1177 rv = mListener->OnMessageReceived(aMsg, *getter_Transfers(reply));
1180 if (!MaybeHandleError(rv, aMsg, "DispatchSyncMessage")) {
1181 reply = new Message();
1182 reply->set_sync();
1183 reply->set_priority(aMsg.priority());
1184 reply->set_reply();
1185 reply->set_reply_error();
1187 reply->set_seqno(aMsg.seqno());
1188 reply->set_transaction_id(aMsg.transaction_id());
1190 MonitorAutoLock lock(*mMonitor);
1191 if (ChannelConnected == mChannelState) {
1192 mLink->SendMessage(reply.forget());
1196 void
1197 MessageChannel::DispatchAsyncMessage(const Message& aMsg)
1199 AssertWorkerThread();
1200 MOZ_ASSERT(!aMsg.is_interrupt() && !aMsg.is_sync());
1202 if (aMsg.routing_id() == MSG_ROUTING_NONE) {
1203 NS_RUNTIMEABORT("unhandled special message!");
1206 Result rv;
1208 int prio = aMsg.priority();
1209 AutoSetValue<bool> async(mDispatchingAsyncMessage, true);
1210 AutoSetValue<int> prioSet(mDispatchingAsyncMessagePriority, prio);
1211 rv = mListener->OnMessageReceived(aMsg);
1213 MaybeHandleError(rv, aMsg, "DispatchAsyncMessage");
1216 void
1217 MessageChannel::DispatchInterruptMessage(const Message& aMsg, size_t stackDepth)
1219 AssertWorkerThread();
1220 mMonitor->AssertNotCurrentThreadOwns();
1222 IPC_ASSERT(aMsg.is_interrupt() && !aMsg.is_reply(), "wrong message type");
1224 // Race detection: see the long comment near mRemoteStackDepthGuess in
1225 // MessageChannel.h. "Remote" stack depth means our side, and "local" means
1226 // the other side.
1227 if (aMsg.interrupt_remote_stack_depth_guess() != RemoteViewOfStackDepth(stackDepth)) {
1228 // Interrupt in-calls have raced. The winner, if there is one, gets to defer
1229 // processing of the other side's in-call.
1230 bool defer;
1231 const char* winner;
1232 switch (mListener->MediateInterruptRace((mSide == ChildSide) ? aMsg : mInterruptStack.top(),
1233 (mSide != ChildSide) ? mInterruptStack.top() : aMsg))
1235 case RIPChildWins:
1236 winner = "child";
1237 defer = (mSide == ChildSide);
1238 break;
1239 case RIPParentWins:
1240 winner = "parent";
1241 defer = (mSide != ChildSide);
1242 break;
1243 case RIPError:
1244 NS_RUNTIMEABORT("NYI: 'Error' Interrupt race policy");
1245 return;
1246 default:
1247 NS_RUNTIMEABORT("not reached");
1248 return;
1251 if (LoggingEnabled()) {
1252 printf_stderr(" (%s: %s won, so we're%sdeferring)\n",
1253 (mSide == ChildSide) ? "child" : "parent",
1254 winner,
1255 defer ? " " : " not ");
1258 if (defer) {
1259 // We now know the other side's stack has one more frame
1260 // than we thought.
1261 ++mRemoteStackDepthGuess; // decremented in MaybeProcessDeferred()
1262 mDeferred.push(aMsg);
1263 return;
1266 // We "lost" and need to process the other side's in-call. Don't need
1267 // to fix up the mRemoteStackDepthGuess here, because we're just about
1268 // to increment it in DispatchCall(), which will make it correct again.
1271 #ifdef OS_WIN
1272 SyncStackFrame frame(this, true);
1273 #endif
1275 nsAutoPtr<Message> reply;
1277 ++mRemoteStackDepthGuess;
1278 Result rv = mListener->OnCallReceived(aMsg, *getter_Transfers(reply));
1279 --mRemoteStackDepthGuess;
1281 if (!MaybeHandleError(rv, aMsg, "DispatchInterruptMessage")) {
1282 reply = new Message();
1283 reply->set_interrupt();
1284 reply->set_reply();
1285 reply->set_reply_error();
1287 reply->set_seqno(aMsg.seqno());
1289 MonitorAutoLock lock(*mMonitor);
1290 if (ChannelConnected == mChannelState) {
1291 mLink->SendMessage(reply.forget());
1295 void
1296 MessageChannel::MaybeUndeferIncall()
1298 AssertWorkerThread();
1299 mMonitor->AssertCurrentThreadOwns();
1301 if (mDeferred.empty())
1302 return;
1304 size_t stackDepth = InterruptStackDepth();
1306 // the other side can only *under*-estimate our actual stack depth
1307 IPC_ASSERT(mDeferred.top().interrupt_remote_stack_depth_guess() <= stackDepth,
1308 "fatal logic error");
1310 if (mDeferred.top().interrupt_remote_stack_depth_guess() < RemoteViewOfStackDepth(stackDepth))
1311 return;
1313 // maybe time to process this message
1314 Message call = mDeferred.top();
1315 mDeferred.pop();
1317 // fix up fudge factor we added to account for race
1318 IPC_ASSERT(0 < mRemoteStackDepthGuess, "fatal logic error");
1319 --mRemoteStackDepthGuess;
1321 MOZ_ASSERT(call.priority() == IPC::Message::PRIORITY_NORMAL);
1322 mPending.push_back(call);
1325 void
1326 MessageChannel::FlushPendingInterruptQueue()
1328 AssertWorkerThread();
1329 mMonitor->AssertNotCurrentThreadOwns();
1332 MonitorAutoLock lock(*mMonitor);
1334 if (mDeferred.empty()) {
1335 if (mPending.empty())
1336 return;
1338 const Message& last = mPending.back();
1339 if (!last.is_interrupt() || last.is_reply())
1340 return;
1344 while (OnMaybeDequeueOne());
1347 void
1348 MessageChannel::ExitedCxxStack()
1350 mListener->OnExitedCxxStack();
1351 if (mSawInterruptOutMsg) {
1352 MonitorAutoLock lock(*mMonitor);
1353 // see long comment in OnMaybeDequeueOne()
1354 EnqueuePendingMessages();
1355 mSawInterruptOutMsg = false;
1359 void
1360 MessageChannel::EnqueuePendingMessages()
1362 AssertWorkerThread();
1363 mMonitor->AssertCurrentThreadOwns();
1365 MaybeUndeferIncall();
1367 for (size_t i = 0; i < mDeferred.size(); ++i) {
1368 mWorkerLoop->PostTask(FROM_HERE, new DequeueTask(mDequeueOneTask));
1371 // XXX performance tuning knob: could process all or k pending
1372 // messages here, rather than enqueuing for later processing
1374 for (size_t i = 0; i < mPending.size(); ++i) {
1375 mWorkerLoop->PostTask(FROM_HERE, new DequeueTask(mDequeueOneTask));
1379 static inline bool
1380 IsTimeoutExpired(PRIntervalTime aStart, PRIntervalTime aTimeout)
1382 return (aTimeout != PR_INTERVAL_NO_TIMEOUT) &&
1383 (aTimeout <= (PR_IntervalNow() - aStart));
1386 bool
1387 MessageChannel::WaitResponse(bool aWaitTimedOut)
1389 if (aWaitTimedOut) {
1390 if (mInTimeoutSecondHalf) {
1391 // We've really timed out this time.
1392 return false;
1394 // Try a second time.
1395 mInTimeoutSecondHalf = true;
1396 } else {
1397 mInTimeoutSecondHalf = false;
1399 return true;
1402 #ifndef OS_WIN
1403 bool
1404 MessageChannel::WaitForSyncNotify()
1406 PRIntervalTime timeout = (kNoTimeout == mTimeoutMs) ?
1407 PR_INTERVAL_NO_TIMEOUT :
1408 PR_MillisecondsToInterval(mTimeoutMs);
1409 // XXX could optimize away this syscall for "no timeout" case if desired
1410 PRIntervalTime waitStart = PR_IntervalNow();
1412 mMonitor->Wait(timeout);
1414 // If the timeout didn't expire, we know we received an event. The
1415 // converse is not true.
1416 return WaitResponse(IsTimeoutExpired(waitStart, timeout));
1419 bool
1420 MessageChannel::WaitForInterruptNotify()
1422 return WaitForSyncNotify();
1425 void
1426 MessageChannel::NotifyWorkerThread()
1428 mMonitor->Notify();
1430 #endif
1432 bool
1433 MessageChannel::ShouldContinueFromTimeout()
1435 AssertWorkerThread();
1436 mMonitor->AssertCurrentThreadOwns();
1438 bool cont;
1440 MonitorAutoUnlock unlock(*mMonitor);
1441 cont = mListener->OnReplyTimeout();
1444 static enum { UNKNOWN, NOT_DEBUGGING, DEBUGGING } sDebuggingChildren = UNKNOWN;
1446 if (sDebuggingChildren == UNKNOWN) {
1447 sDebuggingChildren = getenv("MOZ_DEBUG_CHILD_PROCESS") ? DEBUGGING : NOT_DEBUGGING;
1449 if (sDebuggingChildren == DEBUGGING) {
1450 return true;
1453 return cont;
1456 void
1457 MessageChannel::SetReplyTimeoutMs(int32_t aTimeoutMs)
1459 // Set channel timeout value. Since this is broken up into
1460 // two period, the minimum timeout value is 2ms.
1461 AssertWorkerThread();
1462 mTimeoutMs = (aTimeoutMs <= 0)
1463 ? kNoTimeout
1464 : (int32_t)ceil((double)aTimeoutMs / 2.0);
1467 void
1468 MessageChannel::OnChannelConnected(int32_t peer_id)
1470 MOZ_ASSERT(!mPeerPidSet);
1471 mPeerPidSet = true;
1472 mPeerPid = peer_id;
1473 mWorkerLoop->PostTask(FROM_HERE, new DequeueTask(mOnChannelConnectedTask));
1476 void
1477 MessageChannel::DispatchOnChannelConnected()
1479 AssertWorkerThread();
1480 MOZ_ASSERT(mPeerPidSet);
1481 if (mListener)
1482 mListener->OnChannelConnected(mPeerPid);
1485 void
1486 MessageChannel::ReportMessageRouteError(const char* channelName) const
1488 PrintErrorMessage(mSide, channelName, "Need a route");
1489 mListener->OnProcessingError(MsgRouteError, "MsgRouteError");
1492 void
1493 MessageChannel::ReportConnectionError(const char* aChannelName) const
1495 AssertWorkerThread();
1496 mMonitor->AssertCurrentThreadOwns();
1498 const char* errorMsg = nullptr;
1499 switch (mChannelState) {
1500 case ChannelClosed:
1501 errorMsg = "Closed channel: cannot send/recv";
1502 break;
1503 case ChannelOpening:
1504 errorMsg = "Opening channel: not yet ready for send/recv";
1505 break;
1506 case ChannelTimeout:
1507 errorMsg = "Channel timeout: cannot send/recv";
1508 break;
1509 case ChannelClosing:
1510 errorMsg = "Channel closing: too late to send/recv, messages will be lost";
1511 break;
1512 case ChannelError:
1513 errorMsg = "Channel error: cannot send/recv";
1514 break;
1516 default:
1517 NS_RUNTIMEABORT("unreached");
1520 PrintErrorMessage(mSide, aChannelName, errorMsg);
1522 MonitorAutoUnlock unlock(*mMonitor);
1523 mListener->OnProcessingError(MsgDropped, errorMsg);
1526 bool
1527 MessageChannel::MaybeHandleError(Result code, const Message& aMsg, const char* channelName)
1529 if (MsgProcessed == code)
1530 return true;
1532 const char* errorMsg = nullptr;
1533 switch (code) {
1534 case MsgNotKnown:
1535 errorMsg = "Unknown message: not processed";
1536 break;
1537 case MsgNotAllowed:
1538 errorMsg = "Message not allowed: cannot be sent/recvd in this state";
1539 break;
1540 case MsgPayloadError:
1541 errorMsg = "Payload error: message could not be deserialized";
1542 break;
1543 case MsgProcessingError:
1544 errorMsg = "Processing error: message was deserialized, but the handler returned false (indicating failure)";
1545 break;
1546 case MsgRouteError:
1547 errorMsg = "Route error: message sent to unknown actor ID";
1548 break;
1549 case MsgValueError:
1550 errorMsg = "Value error: message was deserialized, but contained an illegal value";
1551 break;
1553 default:
1554 NS_RUNTIMEABORT("unknown Result code");
1555 return false;
1558 char reason[512];
1559 PR_snprintf(reason, sizeof(reason),
1560 "(msgtype=0x%lX,name=%s) %s",
1561 aMsg.type(), aMsg.name(), errorMsg);
1563 PrintErrorMessage(mSide, channelName, reason);
1565 mListener->OnProcessingError(code, reason);
1567 return false;
1570 void
1571 MessageChannel::OnChannelErrorFromLink()
1573 AssertLinkThread();
1574 mMonitor->AssertCurrentThreadOwns();
1576 if (InterruptStackDepth() > 0)
1577 NotifyWorkerThread();
1579 if (AwaitingSyncReply() || AwaitingIncomingMessage())
1580 NotifyWorkerThread();
1582 if (ChannelClosing != mChannelState) {
1583 if (mAbortOnError) {
1584 NS_RUNTIMEABORT("Aborting on channel error.");
1586 mChannelState = ChannelError;
1587 mMonitor->Notify();
1590 PostErrorNotifyTask();
1593 void
1594 MessageChannel::NotifyMaybeChannelError()
1596 mMonitor->AssertNotCurrentThreadOwns();
1598 // TODO sort out Close() on this side racing with Close() on the other side
1599 if (ChannelClosing == mChannelState) {
1600 // the channel closed, but we received a "Goodbye" message warning us
1601 // about it. no worries
1602 mChannelState = ChannelClosed;
1603 NotifyChannelClosed();
1604 return;
1607 // Oops, error! Let the listener know about it.
1608 mChannelState = ChannelError;
1609 mListener->OnChannelError();
1610 Clear();
1613 void
1614 MessageChannel::OnNotifyMaybeChannelError()
1616 AssertWorkerThread();
1617 mMonitor->AssertNotCurrentThreadOwns();
1619 mChannelErrorTask = nullptr;
1621 // OnChannelError holds mMonitor when it posts this task and this
1622 // task cannot be allowed to run until OnChannelError has
1623 // exited. We enforce that order by grabbing the mutex here which
1624 // should only continue once OnChannelError has completed.
1626 MonitorAutoLock lock(*mMonitor);
1627 // nothing to do here
1630 if (IsOnCxxStack()) {
1631 mChannelErrorTask =
1632 NewRunnableMethod(this, &MessageChannel::OnNotifyMaybeChannelError);
1633 // 10 ms delay is completely arbitrary
1634 mWorkerLoop->PostDelayedTask(FROM_HERE, mChannelErrorTask, 10);
1635 return;
1638 NotifyMaybeChannelError();
1641 void
1642 MessageChannel::PostErrorNotifyTask()
1644 mMonitor->AssertCurrentThreadOwns();
1646 if (mChannelErrorTask)
1647 return;
1649 // This must be the last code that runs on this thread!
1650 mChannelErrorTask =
1651 NewRunnableMethod(this, &MessageChannel::OnNotifyMaybeChannelError);
1652 mWorkerLoop->PostTask(FROM_HERE, mChannelErrorTask);
1655 // Special async message.
1656 class GoodbyeMessage : public IPC::Message
1658 public:
1659 GoodbyeMessage() :
1660 IPC::Message(MSG_ROUTING_NONE, GOODBYE_MESSAGE_TYPE, PRIORITY_NORMAL)
1663 static bool Read(const Message* msg) {
1664 return true;
1666 void Log(const std::string& aPrefix, FILE* aOutf) const {
1667 fputs("(special `Goodbye' message)", aOutf);
1671 void
1672 MessageChannel::SynchronouslyClose()
1674 AssertWorkerThread();
1675 mMonitor->AssertCurrentThreadOwns();
1676 mLink->SendClose();
1677 while (ChannelClosed != mChannelState)
1678 mMonitor->Wait();
1681 void
1682 MessageChannel::CloseWithError()
1684 AssertWorkerThread();
1686 MonitorAutoLock lock(*mMonitor);
1687 if (ChannelConnected != mChannelState) {
1688 return;
1690 SynchronouslyClose();
1691 mChannelState = ChannelError;
1692 PostErrorNotifyTask();
1695 void
1696 MessageChannel::CloseWithTimeout()
1698 AssertWorkerThread();
1700 MonitorAutoLock lock(*mMonitor);
1701 if (ChannelConnected != mChannelState) {
1702 return;
1704 SynchronouslyClose();
1705 mChannelState = ChannelTimeout;
1708 void
1709 MessageChannel::BlockScripts()
1711 MOZ_ASSERT(NS_IsMainThread());
1712 mBlockScripts = true;
1715 void
1716 MessageChannel::Close()
1718 AssertWorkerThread();
1721 MonitorAutoLock lock(*mMonitor);
1723 if (ChannelError == mChannelState || ChannelTimeout == mChannelState) {
1724 // See bug 538586: if the listener gets deleted while the
1725 // IO thread's NotifyChannelError event is still enqueued
1726 // and subsequently deletes us, then the error event will
1727 // also be deleted and the listener will never be notified
1728 // of the channel error.
1729 if (mListener) {
1730 MonitorAutoUnlock unlock(*mMonitor);
1731 NotifyMaybeChannelError();
1733 return;
1736 if (ChannelOpening == mChannelState) {
1737 // SynchronouslyClose() waits for an ack from the other side, so
1738 // the opening sequence should complete before this returns.
1739 SynchronouslyClose();
1740 mChannelState = ChannelError;
1741 NotifyMaybeChannelError();
1742 return;
1745 if (ChannelConnected != mChannelState) {
1746 // XXX be strict about this until there's a compelling reason
1747 // to relax
1748 NS_RUNTIMEABORT("Close() called on closed channel!");
1751 // notify the other side that we're about to close our socket
1752 mLink->SendMessage(new GoodbyeMessage());
1753 SynchronouslyClose();
1756 NotifyChannelClosed();
1759 void
1760 MessageChannel::NotifyChannelClosed()
1762 mMonitor->AssertNotCurrentThreadOwns();
1764 if (ChannelClosed != mChannelState)
1765 NS_RUNTIMEABORT("channel should have been closed!");
1767 // OK, the IO thread just closed the channel normally. Let the
1768 // listener know about it.
1769 mListener->OnChannelClose();
1771 Clear();
1774 void
1775 MessageChannel::DebugAbort(const char* file, int line, const char* cond,
1776 const char* why,
1777 bool reply) const
1779 printf_stderr("###!!! [MessageChannel][%s][%s:%d] "
1780 "Assertion (%s) failed. %s %s\n",
1781 mSide == ChildSide ? "Child" : "Parent",
1782 file, line, cond,
1783 why,
1784 reply ? "(reply)" : "");
1785 // technically we need the mutex for this, but we're dying anyway
1786 DumpInterruptStack(" ");
1787 printf_stderr(" remote Interrupt stack guess: %" PRIuSIZE "\n",
1788 mRemoteStackDepthGuess);
1789 printf_stderr(" deferred stack size: %" PRIuSIZE "\n",
1790 mDeferred.size());
1791 printf_stderr(" out-of-turn Interrupt replies stack size: %" PRIuSIZE "\n",
1792 mOutOfTurnReplies.size());
1793 printf_stderr(" Pending queue size: %" PRIuSIZE ", front to back:\n",
1794 mPending.size());
1796 MessageQueue pending = mPending;
1797 while (!pending.empty()) {
1798 printf_stderr(" [ %s%s ]\n",
1799 pending.front().is_interrupt() ? "intr" :
1800 (pending.front().is_sync() ? "sync" : "async"),
1801 pending.front().is_reply() ? "reply" : "");
1802 pending.pop_front();
1805 NS_RUNTIMEABORT(why);
1808 void
1809 MessageChannel::DumpInterruptStack(const char* const pfx) const
1811 NS_WARN_IF_FALSE(MessageLoop::current() != mWorkerLoop,
1812 "The worker thread had better be paused in a debugger!");
1814 printf_stderr("%sMessageChannel 'backtrace':\n", pfx);
1816 // print a python-style backtrace, first frame to last
1817 for (uint32_t i = 0; i < mCxxStackFrames.length(); ++i) {
1818 int32_t id;
1819 const char* dir;
1820 const char* sems;
1821 const char* name;
1822 mCxxStackFrames[i].Describe(&id, &dir, &sems, &name);
1824 printf_stderr("%s[(%u) %s %s %s(actor=%d) ]\n", pfx,
1825 i, dir, sems, name, id);
1829 bool
1830 ParentProcessIsBlocked()
1832 return gParentIsBlocked;
1835 } // ipc
1836 } // mozilla