Fix typo in 9b54bd30006c008b4a951331b273613d5bac3abf
[pm.git] / ipc / glue / MessageChannel.h
blobcc81e435ce6fed41e919c106ec535eb3b184fb3f
1 /* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*-
2 * vim: sw=4 ts=4 et :
3 */
4 /* This Source Code Form is subject to the terms of the Mozilla Public
5 * License, v. 2.0. If a copy of the MPL was not distributed with this
6 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
8 #ifndef ipc_glue_MessageChannel_h
9 #define ipc_glue_MessageChannel_h 1
11 #include "base/basictypes.h"
12 #include "base/message_loop.h"
14 #include "mozilla/DebugOnly.h"
15 #include "mozilla/Monitor.h"
16 #include "mozilla/Vector.h"
17 #include "mozilla/WeakPtr.h"
18 #include "mozilla/ipc/Transport.h"
19 #include "MessageLink.h"
20 #include "nsAutoPtr.h"
22 #include <deque>
23 #include <stack>
24 #include <math.h>
26 namespace mozilla {
27 namespace ipc {
29 class MessageChannel;
31 class RefCountedMonitor : public Monitor
33 public:
34 RefCountedMonitor()
35 : Monitor("mozilla.ipc.MessageChannel.mMonitor")
38 NS_INLINE_DECL_THREADSAFE_REFCOUNTING(RefCountedMonitor)
40 private:
41 ~RefCountedMonitor() {}
44 class MessageChannel : HasResultCodes
46 friend class ProcessLink;
47 friend class ThreadLink;
49 class CxxStackFrame;
50 class InterruptFrame;
52 typedef mozilla::Monitor Monitor;
54 public:
55 static const int32_t kNoTimeout;
57 typedef IPC::Message Message;
58 typedef mozilla::ipc::Transport Transport;
60 explicit MessageChannel(MessageListener *aListener);
61 ~MessageChannel();
63 // "Open" from the perspective of the transport layer; the underlying
64 // socketpair/pipe should already be created.
66 // Returns true if the transport layer was successfully connected,
67 // i.e., mChannelState == ChannelConnected.
68 bool Open(Transport* aTransport, MessageLoop* aIOLoop=0, Side aSide=UnknownSide);
70 // "Open" a connection to another thread in the same process.
72 // Returns true if the transport layer was successfully connected,
73 // i.e., mChannelState == ChannelConnected.
75 // For more details on the process of opening a channel between
76 // threads, see the extended comment on this function
77 // in MessageChannel.cpp.
78 bool Open(MessageChannel *aTargetChan, MessageLoop *aTargetLoop, Side aSide);
80 // Close the underlying transport channel.
81 void Close();
83 // Force the channel to behave as if a channel error occurred. Valid
84 // for process links only, not thread links.
85 void CloseWithError();
87 void CloseWithTimeout();
89 void SetAbortOnError(bool abort)
91 mAbortOnError = true;
94 // Misc. behavioral traits consumers can request for this channel
95 enum ChannelFlags {
96 REQUIRE_DEFAULT = 0,
97 // Windows: if this channel operates on the UI thread, indicates
98 // WindowsMessageLoop code should enable deferred native message
99 // handling to prevent deadlocks. Should only be used for protocols
100 // that manage child processes which might create native UI, like
101 // plugins.
102 REQUIRE_DEFERRED_MESSAGE_PROTECTION = 1 << 0
104 void SetChannelFlags(ChannelFlags aFlags) { mFlags = aFlags; }
105 ChannelFlags GetChannelFlags() { return mFlags; }
107 void BlockScripts();
109 bool ShouldBlockScripts() const
111 return mBlockScripts;
114 // Asynchronously send a message to the other side of the channel
115 bool Send(Message* aMsg);
117 // Asynchronously deliver a message back to this side of the
118 // channel
119 bool Echo(Message* aMsg);
121 // Synchronously send |msg| (i.e., wait for |reply|)
122 bool Send(Message* aMsg, Message* aReply);
124 // Make an Interrupt call to the other side of the channel
125 bool Call(Message* aMsg, Message* aReply);
127 // Wait until a message is received
128 bool WaitForIncomingMessage();
130 bool CanSend() const;
132 void SetReplyTimeoutMs(int32_t aTimeoutMs);
134 bool IsOnCxxStack() const {
135 return !mCxxStackFrames.empty();
138 void FlushPendingInterruptQueue();
140 // Unsound_IsClosed and Unsound_NumQueuedMessages are safe to call from any
141 // thread, but they make no guarantees about whether you'll get an
142 // up-to-date value; the values are written on one thread and read without
143 // locking, on potentially different threads. Thus you should only use
144 // them when you don't particularly care about getting a recent value (e.g.
145 // in a memory report).
146 bool Unsound_IsClosed() const {
147 return mLink ? mLink->Unsound_IsClosed() : true;
149 uint32_t Unsound_NumQueuedMessages() const {
150 return mLink ? mLink->Unsound_NumQueuedMessages() : 0;
153 static bool IsPumpingMessages() {
154 return sIsPumpingMessages;
156 static void SetIsPumpingMessages(bool aIsPumping) {
157 sIsPumpingMessages = aIsPumping;
160 #ifdef OS_WIN
161 struct MOZ_STACK_CLASS SyncStackFrame
163 SyncStackFrame(MessageChannel* channel, bool interrupt);
164 ~SyncStackFrame();
166 bool mInterrupt;
167 bool mSpinNestedEvents;
168 bool mListenerNotified;
169 MessageChannel* mChannel;
171 // The previous stack frame for this channel.
172 SyncStackFrame* mPrev;
174 // The previous stack frame on any channel.
175 SyncStackFrame* mStaticPrev;
177 friend struct MessageChannel::SyncStackFrame;
179 static bool IsSpinLoopActive() {
180 for (SyncStackFrame* frame = sStaticTopFrame; frame; frame = frame->mPrev) {
181 if (frame->mSpinNestedEvents)
182 return true;
184 return false;
187 protected:
188 // The deepest sync stack frame for this channel.
189 SyncStackFrame* mTopFrame;
191 bool mIsSyncWaitingOnNonMainThread;
193 // The deepest sync stack frame on any channel.
194 static SyncStackFrame* sStaticTopFrame;
196 public:
197 void ProcessNativeEventsInInterruptCall();
198 static void NotifyGoannaEventDispatch();
200 private:
201 void SpinInternalEventLoop();
202 #endif
204 private:
205 void CommonThreadOpenInit(MessageChannel *aTargetChan, Side aSide);
206 void OnOpenAsSlave(MessageChannel *aTargetChan, Side aSide);
208 void PostErrorNotifyTask();
209 void OnNotifyMaybeChannelError();
210 void ReportConnectionError(const char* aChannelName) const;
211 void ReportMessageRouteError(const char* channelName) const;
212 bool MaybeHandleError(Result code, const Message& aMsg, const char* channelName);
214 void Clear();
216 // Send OnChannelConnected notification to listeners.
217 void DispatchOnChannelConnected();
219 bool InterruptEventOccurred();
220 bool HasPendingEvents();
222 bool ProcessPendingRequest(const Message &aUrgent);
224 void MaybeUndeferIncall();
225 void EnqueuePendingMessages();
227 // Executed on the worker thread. Dequeues one pending message.
228 bool OnMaybeDequeueOne();
229 bool DequeueOne(Message *recvd);
231 // Dispatches an incoming message to its appropriate handler.
232 void DispatchMessage(const Message &aMsg);
234 // DispatchMessage will route to one of these functions depending on the
235 // protocol type of the message.
236 void DispatchSyncMessage(const Message &aMsg);
237 void DispatchUrgentMessage(const Message &aMsg);
238 void DispatchAsyncMessage(const Message &aMsg);
239 void DispatchRPCMessage(const Message &aMsg);
240 void DispatchInterruptMessage(const Message &aMsg, size_t aStackDepth);
242 // Return true if the wait ended because a notification was received.
244 // Return false if the time elapsed from when we started the process of
245 // waiting until afterwards exceeded the currently allotted timeout.
246 // That *DOES NOT* mean false => "no event" (== timeout); there are many
247 // circumstances that could cause the measured elapsed time to exceed the
248 // timeout EVEN WHEN we were notified.
250 // So in sum: true is a meaningful return value; false isn't,
251 // necessarily.
252 bool WaitForSyncNotify();
253 bool WaitForInterruptNotify();
255 bool WaitResponse(bool aWaitTimedOut);
257 bool ShouldContinueFromTimeout();
259 // The "remote view of stack depth" can be different than the
260 // actual stack depth when there are out-of-turn replies. When we
261 // receive one, our actual Interrupt stack depth doesn't decrease, but
262 // the other side (that sent the reply) thinks it has. So, the
263 // "view" returned here is |stackDepth| minus the number of
264 // out-of-turn replies.
266 // Only called from the worker thread.
267 size_t RemoteViewOfStackDepth(size_t stackDepth) const {
268 AssertWorkerThread();
269 return stackDepth - mOutOfTurnReplies.size();
272 int32_t NextSeqno() {
273 AssertWorkerThread();
274 return (mSide == ChildSide) ? --mNextSeqno : ++mNextSeqno;
277 // This helper class manages mCxxStackDepth on behalf of MessageChannel.
278 // When the stack depth is incremented from zero to non-zero, it invokes
279 // a callback, and similarly for when the depth goes from non-zero to zero.
280 void EnteredCxxStack() {
281 mListener->OnEnteredCxxStack();
284 void ExitedCxxStack();
286 void EnteredCall() {
287 mListener->OnEnteredCall();
290 void ExitedCall() {
291 mListener->OnExitedCall();
294 void EnteredSyncSend() {
295 mListener->OnEnteredSyncSend();
298 void ExitedSyncSend() {
299 mListener->OnExitedSyncSend();
302 MessageListener *Listener() const {
303 return mListener.get();
306 void DebugAbort(const char* file, int line, const char* cond,
307 const char* why,
308 bool reply=false) const;
310 // This method is only safe to call on the worker thread, or in a
311 // debugger with all threads paused.
312 void DumpInterruptStack(const char* const pfx="") const;
314 private:
315 // Called from both threads
316 size_t InterruptStackDepth() const {
317 mMonitor->AssertCurrentThreadOwns();
318 return mInterruptStack.size();
321 // Returns true if we're blocking waiting for a reply.
322 bool AwaitingSyncReply() const {
323 mMonitor->AssertCurrentThreadOwns();
324 return mAwaitingSyncReply;
326 int AwaitingSyncReplyPriority() const {
327 mMonitor->AssertCurrentThreadOwns();
328 return mAwaitingSyncReplyPriority;
330 bool AwaitingInterruptReply() const {
331 mMonitor->AssertCurrentThreadOwns();
332 return !mInterruptStack.empty();
334 bool AwaitingIncomingMessage() const {
335 mMonitor->AssertCurrentThreadOwns();
336 return mIsWaitingForIncoming;
339 class MOZ_STACK_CLASS AutoEnterWaitForIncoming
341 public:
342 explicit AutoEnterWaitForIncoming(MessageChannel& aChannel)
343 : mChannel(aChannel)
345 aChannel.mMonitor->AssertCurrentThreadOwns();
346 aChannel.mIsWaitingForIncoming = true;
349 ~AutoEnterWaitForIncoming()
351 mChannel.mIsWaitingForIncoming = false;
354 private:
355 MessageChannel& mChannel;
357 friend class AutoEnterWaitForIncoming;
359 // Returns true if we're dispatching a sync message's callback.
360 bool DispatchingSyncMessage() const {
361 AssertWorkerThread();
362 return mDispatchingSyncMessage;
365 int DispatchingSyncMessagePriority() const {
366 AssertWorkerThread();
367 return mDispatchingSyncMessagePriority;
370 bool DispatchingAsyncMessage() const {
371 AssertWorkerThread();
372 return mDispatchingAsyncMessage;
375 int DispatchingAsyncMessagePriority() const {
376 AssertWorkerThread();
377 return mDispatchingAsyncMessagePriority;
380 bool Connected() const;
382 private:
383 // Executed on the IO thread.
384 void NotifyWorkerThread();
386 // Return true if |aMsg| is a special message targeted at the IO
387 // thread, in which case it shouldn't be delivered to the worker.
388 bool MaybeInterceptSpecialIOMessage(const Message& aMsg);
390 void OnChannelConnected(int32_t peer_id);
392 // Tell the IO thread to close the channel and wait for it to ACK.
393 void SynchronouslyClose();
395 bool ShouldDeferMessage(const Message& aMsg);
396 void OnMessageReceivedFromLink(const Message& aMsg);
397 void OnChannelErrorFromLink();
399 private:
400 // Run on the not current thread.
401 void NotifyChannelClosed();
402 void NotifyMaybeChannelError();
404 private:
405 // Can be run on either thread
406 void AssertWorkerThread() const
408 MOZ_ASSERT(mWorkerLoopID == MessageLoop::current()->id(),
409 "not on worker thread!");
412 // The "link" thread is either the I/O thread (ProcessLink) or the
413 // other actor's work thread (ThreadLink). In either case, it is
414 // NOT our worker thread.
415 void AssertLinkThread() const
417 MOZ_ASSERT(mWorkerLoopID != MessageLoop::current()->id(),
418 "on worker thread but should not be!");
421 private:
422 typedef IPC::Message::msgid_t msgid_t;
423 typedef std::deque<Message> MessageQueue;
424 typedef std::map<size_t, Message> MessageMap;
426 // All dequeuing tasks require a single point of cancellation,
427 // which is handled via a reference-counted task.
428 class RefCountedTask
430 public:
431 explicit RefCountedTask(CancelableTask* aTask)
432 : mTask(aTask)
434 private:
435 ~RefCountedTask() { delete mTask; }
436 public:
437 void Run() { mTask->Run(); }
438 void Cancel() { mTask->Cancel(); }
440 NS_INLINE_DECL_THREADSAFE_REFCOUNTING(RefCountedTask)
442 private:
443 CancelableTask* mTask;
446 // Wrap an existing task which can be cancelled at any time
447 // without the wrapper's knowledge.
448 class DequeueTask : public Task
450 public:
451 explicit DequeueTask(RefCountedTask* aTask)
452 : mTask(aTask)
454 void Run() override { mTask->Run(); }
456 private:
457 nsRefPtr<RefCountedTask> mTask;
460 private:
461 mozilla::WeakPtr<MessageListener> mListener;
462 ChannelState mChannelState;
463 nsRefPtr<RefCountedMonitor> mMonitor;
464 Side mSide;
465 MessageLink* mLink;
466 MessageLoop* mWorkerLoop; // thread where work is done
467 CancelableTask* mChannelErrorTask; // NotifyMaybeChannelError runnable
469 // id() of mWorkerLoop. This persists even after mWorkerLoop is cleared
470 // during channel shutdown.
471 int mWorkerLoopID;
473 // A task encapsulating dequeuing one pending message.
474 nsRefPtr<RefCountedTask> mDequeueOneTask;
476 // Timeout periods are broken up in two to prevent system suspension from
477 // triggering an abort. This method (called by WaitForEvent with a 'did
478 // timeout' flag) decides if we should wait again for half of mTimeoutMs
479 // or give up.
480 int32_t mTimeoutMs;
481 bool mInTimeoutSecondHalf;
483 // Worker-thread only; sequence numbers for messages that require
484 // synchronous replies.
485 int32_t mNextSeqno;
487 static bool sIsPumpingMessages;
489 template<class T>
490 class AutoSetValue {
491 public:
492 explicit AutoSetValue(T &var, const T &newValue)
493 : mVar(var), mPrev(var)
495 mVar = newValue;
497 ~AutoSetValue() {
498 mVar = mPrev;
500 private:
501 T& mVar;
502 T mPrev;
505 // Worker thread only.
506 bool mAwaitingSyncReply;
507 int mAwaitingSyncReplyPriority;
509 // Set while we are dispatching a synchronous message. Only for use on the
510 // worker thread.
511 bool mDispatchingSyncMessage;
512 int mDispatchingSyncMessagePriority;
514 bool mDispatchingAsyncMessage;
515 int mDispatchingAsyncMessagePriority;
517 // When we send an urgent request from the parent process, we could race
518 // with an RPC message that was issued by the child beforehand. In this
519 // case, if the parent were to wake up while waiting for the urgent reply,
520 // and process the RPC, it could send an additional urgent message. The
521 // child would wake up to process the urgent message (as it always will),
522 // then send a reply, which could be received by the parent out-of-order
523 // with respect to the first urgent reply.
525 // To address this problem, urgent or RPC requests are associated with a
526 // "transaction". Whenever one side of the channel wishes to start a
527 // chain of RPC/urgent messages, it allocates a new transaction ID. Any
528 // messages the parent receives, not apart of this transaction, are
529 // deferred. When issuing RPC/urgent requests on top of a started
530 // transaction, the initiating transaction ID is used.
532 // To ensure IDs are unique, we use sequence numbers for transaction IDs,
533 // which grow in opposite directions from child to parent.
535 // The current transaction ID.
536 int32_t mCurrentTransaction;
538 class AutoEnterTransaction
540 public:
541 explicit AutoEnterTransaction(MessageChannel *aChan, int32_t aMsgSeqno)
542 : mChan(aChan),
543 mOldTransaction(mChan->mCurrentTransaction)
545 mChan->mMonitor->AssertCurrentThreadOwns();
546 if (mChan->mCurrentTransaction == 0)
547 mChan->mCurrentTransaction = aMsgSeqno;
549 explicit AutoEnterTransaction(MessageChannel *aChan, const Message &aMessage)
550 : mChan(aChan),
551 mOldTransaction(mChan->mCurrentTransaction)
553 mChan->mMonitor->AssertCurrentThreadOwns();
555 if (!aMessage.is_sync())
556 return;
558 MOZ_ASSERT_IF(mChan->mSide == ParentSide && mOldTransaction != aMessage.transaction_id(),
559 !mOldTransaction || aMessage.priority() > mChan->AwaitingSyncReplyPriority());
560 mChan->mCurrentTransaction = aMessage.transaction_id();
562 ~AutoEnterTransaction() {
563 mChan->mMonitor->AssertCurrentThreadOwns();
564 mChan->mCurrentTransaction = mOldTransaction;
567 private:
568 MessageChannel *mChan;
569 int32_t mOldTransaction;
572 // If a sync message times out, we store its sequence number here. Any
573 // future sync messages will fail immediately. Once the reply for original
574 // sync message is received, we allow sync messages again.
576 // When a message times out, nothing is done to inform the other side. The
577 // other side will eventually dispatch the message and send a reply. Our
578 // side is responsible for replying to all sync messages sent by the other
579 // side when it dispatches the timed out message. The response is always an
580 // error.
582 // A message is only timed out if it initiated a transaction. This avoids
583 // hitting a lot of corner cases with message nesting that we don't really
584 // care about.
585 int32_t mTimedOutMessageSeqno;
587 // If waiting for the reply to a sync out-message, it will be saved here
588 // on the I/O thread and then read and cleared by the worker thread.
589 nsAutoPtr<Message> mRecvd;
591 // If a sync message reply that is an error arrives, we increment this
592 // counter rather than storing it in mRecvd.
593 size_t mRecvdErrors;
595 // Queue of all incoming messages, except for replies to sync and urgent
596 // messages, which are delivered directly to mRecvd, and any pending urgent
597 // incall, which is stored in mPendingUrgentRequest.
599 // If both this side and the other side are functioning correctly, the queue
600 // can only be in certain configurations. Let
602 // |A<| be an async in-message,
603 // |S<| be a sync in-message,
604 // |C<| be an Interrupt in-call,
605 // |R<| be an Interrupt reply.
607 // The queue can only match this configuration
609 // A<* (S< | C< | R< (?{mStack.size() == 1} A<* (S< | C<)))
611 // The other side can send as many async messages |A<*| as it wants before
612 // sending us a blocking message.
614 // The first case is |S<|, a sync in-msg. The other side must be blocked,
615 // and thus can't send us any more messages until we process the sync
616 // in-msg.
618 // The second case is |C<|, an Interrupt in-call; the other side must be blocked.
619 // (There's a subtlety here: this in-call might have raced with an
620 // out-call, but we detect that with the mechanism below,
621 // |mRemoteStackDepth|, and races don't matter to the queue.)
623 // Final case, the other side replied to our most recent out-call |R<|.
624 // If that was the *only* out-call on our stack, |?{mStack.size() == 1}|,
625 // then other side "finished with us," and went back to its own business.
626 // That business might have included sending any number of async message
627 // |A<*| until sending a blocking message |(S< | C<)|. If we had more than
628 // one Interrupt call on our stack, the other side *better* not have sent us
629 // another blocking message, because it's blocked on a reply from us.
631 MessageQueue mPending;
633 // Stack of all the out-calls on which this channel is awaiting responses.
634 // Each stack refers to a different protocol and the stacks are mutually
635 // exclusive: multiple outcalls of the same kind cannot be initiated while
636 // another is active.
637 std::stack<Message> mInterruptStack;
639 // This is what we think the Interrupt stack depth is on the "other side" of this
640 // Interrupt channel. We maintain this variable so that we can detect racy Interrupt
641 // calls. With each Interrupt out-call sent, we send along what *we* think the
642 // stack depth of the remote side is *before* it will receive the Interrupt call.
644 // After sending the out-call, our stack depth is "incremented" by pushing
645 // that pending message onto mPending.
647 // Then when processing an in-call |c|, it must be true that
649 // mStack.size() == c.remoteDepth
651 // I.e., my depth is actually the same as what the other side thought it
652 // was when it sent in-call |c|. If this fails to hold, we have detected
653 // racy Interrupt calls.
655 // We then increment mRemoteStackDepth *just before* processing the
656 // in-call, since we know the other side is waiting on it, and decrement
657 // it *just after* finishing processing that in-call, since our response
658 // will pop the top of the other side's |mPending|.
660 // One nice aspect of this race detection is that it is symmetric; if one
661 // side detects a race, then the other side must also detect the same race.
662 size_t mRemoteStackDepthGuess;
664 // Approximation of code frames on the C++ stack. It can only be
665 // interpreted as the implication:
667 // !mCxxStackFrames.empty() => MessageChannel code on C++ stack
669 // This member is only accessed on the worker thread, and so is not
670 // protected by mMonitor. It is managed exclusively by the helper
671 // |class CxxStackFrame|.
672 mozilla::Vector<InterruptFrame> mCxxStackFrames;
674 // Did we process an Interrupt out-call during this stack? Only meaningful in
675 // ExitedCxxStack(), from which this variable is reset.
676 bool mSawInterruptOutMsg;
678 // Are we waiting on this channel for an incoming message? This is used
679 // to implement WaitForIncomingMessage(). Must only be accessed while owning
680 // mMonitor.
681 bool mIsWaitingForIncoming;
683 // Map of replies received "out of turn", because of Interrupt
684 // in-calls racing with replies to outstanding in-calls. See
685 // https://bugzilla.mozilla.org/show_bug.cgi?id=521929.
686 MessageMap mOutOfTurnReplies;
688 // Stack of Interrupt in-calls that were deferred because of race
689 // conditions.
690 std::stack<Message> mDeferred;
692 #ifdef OS_WIN
693 HANDLE mEvent;
694 #endif
696 // Should the channel abort the process from the I/O thread when
697 // a channel error occurs?
698 bool mAbortOnError;
700 // Should we prevent scripts from running while dispatching urgent messages?
701 bool mBlockScripts;
703 // See SetChannelFlags
704 ChannelFlags mFlags;
706 // Task and state used to asynchronously notify channel has been connected
707 // safely. This is necessary to be able to cancel notification if we are
708 // closed at the same time.
709 nsRefPtr<RefCountedTask> mOnChannelConnectedTask;
710 DebugOnly<bool> mPeerPidSet;
711 int32_t mPeerPid;
714 bool
715 ParentProcessIsBlocked();
717 } // namespace ipc
718 } // namespace mozilla
720 #endif // ifndef ipc_glue_MessageChannel_h