[Sync] Don't synchronously stop datatypes that encounter errors
[chromium-blink-merge.git] / mojo / system / local_message_pipe_endpoint.cc
blob16f58bcc744c1942d0ca4e063a3765289762d949
1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "mojo/system/local_message_pipe_endpoint.h"
7 #include <string.h>
9 #include "base/logging.h"
10 #include "mojo/system/dispatcher.h"
11 #include "mojo/system/message_in_transit.h"
13 namespace mojo {
14 namespace system {
16 LocalMessagePipeEndpoint::LocalMessagePipeEndpoint()
17 : is_open_(true), is_peer_open_(true) {
20 LocalMessagePipeEndpoint::~LocalMessagePipeEndpoint() {
21 DCHECK(!is_open_);
22 DCHECK(message_queue_.IsEmpty()); // Should be implied by not being open.
25 MessagePipeEndpoint::Type LocalMessagePipeEndpoint::GetType() const {
26 return kTypeLocal;
29 bool LocalMessagePipeEndpoint::OnPeerClose() {
30 DCHECK(is_open_);
31 DCHECK(is_peer_open_);
33 HandleSignalsState old_state = GetHandleSignalsState();
34 is_peer_open_ = false;
35 HandleSignalsState new_state = GetHandleSignalsState();
37 if (!new_state.equals(old_state))
38 waiter_list_.AwakeWaitersForStateChange(new_state);
40 return true;
43 void LocalMessagePipeEndpoint::EnqueueMessage(
44 scoped_ptr<MessageInTransit> message) {
45 DCHECK(is_open_);
46 DCHECK(is_peer_open_);
48 bool was_empty = message_queue_.IsEmpty();
49 message_queue_.AddMessage(message.Pass());
50 if (was_empty)
51 waiter_list_.AwakeWaitersForStateChange(GetHandleSignalsState());
54 void LocalMessagePipeEndpoint::Close() {
55 DCHECK(is_open_);
56 is_open_ = false;
57 message_queue_.Clear();
60 void LocalMessagePipeEndpoint::CancelAllWaiters() {
61 DCHECK(is_open_);
62 waiter_list_.CancelAllWaiters();
65 MojoResult LocalMessagePipeEndpoint::ReadMessage(
66 UserPointer<void> bytes,
67 UserPointer<uint32_t> num_bytes,
68 DispatcherVector* dispatchers,
69 uint32_t* num_dispatchers,
70 MojoReadMessageFlags flags) {
71 DCHECK(is_open_);
72 DCHECK(!dispatchers || dispatchers->empty());
74 const uint32_t max_bytes = num_bytes.IsNull() ? 0 : num_bytes.Get();
75 const uint32_t max_num_dispatchers = num_dispatchers ? *num_dispatchers : 0;
77 if (message_queue_.IsEmpty()) {
78 return is_peer_open_ ? MOJO_RESULT_SHOULD_WAIT
79 : MOJO_RESULT_FAILED_PRECONDITION;
82 // TODO(vtl): If |flags & MOJO_READ_MESSAGE_FLAG_MAY_DISCARD|, we could pop
83 // and release the lock immediately.
84 bool enough_space = true;
85 MessageInTransit* message = message_queue_.PeekMessage();
86 if (!num_bytes.IsNull())
87 num_bytes.Put(message->num_bytes());
88 if (message->num_bytes() <= max_bytes)
89 bytes.PutArray(message->bytes(), message->num_bytes());
90 else
91 enough_space = false;
93 if (DispatcherVector* queued_dispatchers = message->dispatchers()) {
94 if (num_dispatchers)
95 *num_dispatchers = static_cast<uint32_t>(queued_dispatchers->size());
96 if (enough_space) {
97 if (queued_dispatchers->empty()) {
98 // Nothing to do.
99 } else if (queued_dispatchers->size() <= max_num_dispatchers) {
100 DCHECK(dispatchers);
101 dispatchers->swap(*queued_dispatchers);
102 } else {
103 enough_space = false;
106 } else {
107 if (num_dispatchers)
108 *num_dispatchers = 0;
111 message = NULL;
113 if (enough_space || (flags & MOJO_READ_MESSAGE_FLAG_MAY_DISCARD)) {
114 message_queue_.DiscardMessage();
116 // Now it's empty, thus no longer readable.
117 if (message_queue_.IsEmpty()) {
118 // It's currently not possible to wait for non-readability, but we should
119 // do the state change anyway.
120 waiter_list_.AwakeWaitersForStateChange(GetHandleSignalsState());
124 if (!enough_space)
125 return MOJO_RESULT_RESOURCE_EXHAUSTED;
127 return MOJO_RESULT_OK;
130 HandleSignalsState LocalMessagePipeEndpoint::GetHandleSignalsState() const {
131 HandleSignalsState rv;
132 if (!message_queue_.IsEmpty()) {
133 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_READABLE;
134 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE;
136 if (is_peer_open_) {
137 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_WRITABLE;
138 rv.satisfiable_signals |=
139 MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE;
141 return rv;
144 MojoResult LocalMessagePipeEndpoint::AddWaiter(
145 Waiter* waiter,
146 MojoHandleSignals signals,
147 uint32_t context,
148 HandleSignalsState* signals_state) {
149 DCHECK(is_open_);
151 HandleSignalsState state = GetHandleSignalsState();
152 if (state.satisfies(signals)) {
153 if (signals_state)
154 *signals_state = state;
155 return MOJO_RESULT_ALREADY_EXISTS;
157 if (!state.can_satisfy(signals)) {
158 if (signals_state)
159 *signals_state = state;
160 return MOJO_RESULT_FAILED_PRECONDITION;
163 waiter_list_.AddWaiter(waiter, signals, context);
164 return MOJO_RESULT_OK;
167 void LocalMessagePipeEndpoint::RemoveWaiter(Waiter* waiter,
168 HandleSignalsState* signals_state) {
169 DCHECK(is_open_);
170 waiter_list_.RemoveWaiter(waiter);
171 if (signals_state)
172 *signals_state = GetHandleSignalsState();
175 } // namespace system
176 } // namespace mojo