Updating trunk VERSION from 2139.0 to 2140.0
[chromium-blink-merge.git] / mojo / system / data_pipe.cc
blobdfef4c4333112a279e52b04f0abd1932661c7681
1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "mojo/system/data_pipe.h"
7 #include <string.h>
9 #include <algorithm>
10 #include <limits>
12 #include "base/compiler_specific.h"
13 #include "base/logging.h"
14 #include "mojo/system/constants.h"
15 #include "mojo/system/memory.h"
16 #include "mojo/system/options_validation.h"
17 #include "mojo/system/waiter_list.h"
19 namespace mojo {
20 namespace system {
22 // static
23 const MojoCreateDataPipeOptions DataPipe::kDefaultCreateOptions = {
24 static_cast<uint32_t>(sizeof(MojoCreateDataPipeOptions)),
25 MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, 1u,
26 static_cast<uint32_t>(kDefaultDataPipeCapacityBytes)};
28 // static
29 MojoResult DataPipe::ValidateCreateOptions(
30 UserPointer<const MojoCreateDataPipeOptions> in_options,
31 MojoCreateDataPipeOptions* out_options) {
32 const MojoCreateDataPipeOptionsFlags kKnownFlags =
33 MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_MAY_DISCARD;
35 *out_options = kDefaultCreateOptions;
36 if (in_options.IsNull())
37 return MOJO_RESULT_OK;
39 UserOptionsReader<MojoCreateDataPipeOptions> reader(in_options);
40 if (!reader.is_valid())
41 return MOJO_RESULT_INVALID_ARGUMENT;
43 if (!OPTIONS_STRUCT_HAS_MEMBER(MojoCreateDataPipeOptions, flags, reader))
44 return MOJO_RESULT_OK;
45 if ((reader.options().flags & ~kKnownFlags))
46 return MOJO_RESULT_UNIMPLEMENTED;
47 out_options->flags = reader.options().flags;
49 // Checks for fields beyond |flags|:
51 if (!OPTIONS_STRUCT_HAS_MEMBER(
52 MojoCreateDataPipeOptions, element_num_bytes, reader))
53 return MOJO_RESULT_OK;
54 if (reader.options().element_num_bytes == 0)
55 return MOJO_RESULT_INVALID_ARGUMENT;
56 out_options->element_num_bytes = reader.options().element_num_bytes;
58 if (!OPTIONS_STRUCT_HAS_MEMBER(
59 MojoCreateDataPipeOptions, capacity_num_bytes, reader) ||
60 reader.options().capacity_num_bytes == 0) {
61 // Round the default capacity down to a multiple of the element size (but at
62 // least one element).
63 out_options->capacity_num_bytes =
64 std::max(static_cast<uint32_t>(kDefaultDataPipeCapacityBytes -
65 (kDefaultDataPipeCapacityBytes %
66 out_options->element_num_bytes)),
67 out_options->element_num_bytes);
68 return MOJO_RESULT_OK;
70 if (reader.options().capacity_num_bytes % out_options->element_num_bytes != 0)
71 return MOJO_RESULT_INVALID_ARGUMENT;
72 if (reader.options().capacity_num_bytes > kMaxDataPipeCapacityBytes)
73 return MOJO_RESULT_RESOURCE_EXHAUSTED;
74 out_options->capacity_num_bytes = reader.options().capacity_num_bytes;
76 return MOJO_RESULT_OK;
79 void DataPipe::ProducerCancelAllWaiters() {
80 base::AutoLock locker(lock_);
81 DCHECK(has_local_producer_no_lock());
82 producer_waiter_list_->CancelAllWaiters();
85 void DataPipe::ProducerClose() {
86 base::AutoLock locker(lock_);
87 DCHECK(producer_open_);
88 producer_open_ = false;
89 DCHECK(has_local_producer_no_lock());
90 producer_waiter_list_.reset();
91 // Not a bug, except possibly in "user" code.
92 DVLOG_IF(2, producer_in_two_phase_write_no_lock())
93 << "Producer closed with active two-phase write";
94 producer_two_phase_max_num_bytes_written_ = 0;
95 ProducerCloseImplNoLock();
96 AwakeConsumerWaitersForStateChangeNoLock(
97 ConsumerGetHandleSignalsStateImplNoLock());
100 MojoResult DataPipe::ProducerWriteData(UserPointer<const void> elements,
101 UserPointer<uint32_t> num_bytes,
102 bool all_or_none) {
103 base::AutoLock locker(lock_);
104 DCHECK(has_local_producer_no_lock());
106 if (producer_in_two_phase_write_no_lock())
107 return MOJO_RESULT_BUSY;
108 if (!consumer_open_no_lock())
109 return MOJO_RESULT_FAILED_PRECONDITION;
111 // Returning "busy" takes priority over "invalid argument".
112 uint32_t max_num_bytes_to_write = num_bytes.Get();
113 if (max_num_bytes_to_write % element_num_bytes_ != 0)
114 return MOJO_RESULT_INVALID_ARGUMENT;
116 if (max_num_bytes_to_write == 0)
117 return MOJO_RESULT_OK; // Nothing to do.
119 uint32_t min_num_bytes_to_write = all_or_none ? max_num_bytes_to_write : 0;
121 HandleSignalsState old_consumer_state =
122 ConsumerGetHandleSignalsStateImplNoLock();
123 MojoResult rv = ProducerWriteDataImplNoLock(
124 elements, num_bytes, max_num_bytes_to_write, min_num_bytes_to_write);
125 HandleSignalsState new_consumer_state =
126 ConsumerGetHandleSignalsStateImplNoLock();
127 if (!new_consumer_state.equals(old_consumer_state))
128 AwakeConsumerWaitersForStateChangeNoLock(new_consumer_state);
129 return rv;
132 MojoResult DataPipe::ProducerBeginWriteData(
133 UserPointer<void*> buffer,
134 UserPointer<uint32_t> buffer_num_bytes,
135 bool all_or_none) {
136 base::AutoLock locker(lock_);
137 DCHECK(has_local_producer_no_lock());
139 if (producer_in_two_phase_write_no_lock())
140 return MOJO_RESULT_BUSY;
141 if (!consumer_open_no_lock())
142 return MOJO_RESULT_FAILED_PRECONDITION;
144 uint32_t min_num_bytes_to_write = 0;
145 if (all_or_none) {
146 min_num_bytes_to_write = buffer_num_bytes.Get();
147 if (min_num_bytes_to_write % element_num_bytes_ != 0)
148 return MOJO_RESULT_INVALID_ARGUMENT;
151 MojoResult rv = ProducerBeginWriteDataImplNoLock(
152 buffer, buffer_num_bytes, min_num_bytes_to_write);
153 if (rv != MOJO_RESULT_OK)
154 return rv;
155 // Note: No need to awake producer waiters, even though we're going from
156 // writable to non-writable (since you can't wait on non-writability).
157 // Similarly, though this may have discarded data (in "may discard" mode),
158 // making it non-readable, there's still no need to awake consumer waiters.
159 DCHECK(producer_in_two_phase_write_no_lock());
160 return MOJO_RESULT_OK;
163 MojoResult DataPipe::ProducerEndWriteData(uint32_t num_bytes_written) {
164 base::AutoLock locker(lock_);
165 DCHECK(has_local_producer_no_lock());
167 if (!producer_in_two_phase_write_no_lock())
168 return MOJO_RESULT_FAILED_PRECONDITION;
169 // Note: Allow successful completion of the two-phase write even if the
170 // consumer has been closed.
172 HandleSignalsState old_consumer_state =
173 ConsumerGetHandleSignalsStateImplNoLock();
174 MojoResult rv;
175 if (num_bytes_written > producer_two_phase_max_num_bytes_written_ ||
176 num_bytes_written % element_num_bytes_ != 0) {
177 rv = MOJO_RESULT_INVALID_ARGUMENT;
178 producer_two_phase_max_num_bytes_written_ = 0;
179 } else {
180 rv = ProducerEndWriteDataImplNoLock(num_bytes_written);
182 // Two-phase write ended even on failure.
183 DCHECK(!producer_in_two_phase_write_no_lock());
184 // If we're now writable, we *became* writable (since we weren't writable
185 // during the two-phase write), so awake producer waiters.
186 HandleSignalsState new_producer_state =
187 ProducerGetHandleSignalsStateImplNoLock();
188 if (new_producer_state.satisfies(MOJO_HANDLE_SIGNAL_WRITABLE))
189 AwakeProducerWaitersForStateChangeNoLock(new_producer_state);
190 HandleSignalsState new_consumer_state =
191 ConsumerGetHandleSignalsStateImplNoLock();
192 if (!new_consumer_state.equals(old_consumer_state))
193 AwakeConsumerWaitersForStateChangeNoLock(new_consumer_state);
194 return rv;
197 HandleSignalsState DataPipe::ProducerGetHandleSignalsState() {
198 base::AutoLock locker(lock_);
199 DCHECK(has_local_producer_no_lock());
200 return ProducerGetHandleSignalsStateImplNoLock();
203 MojoResult DataPipe::ProducerAddWaiter(Waiter* waiter,
204 MojoHandleSignals signals,
205 uint32_t context,
206 HandleSignalsState* signals_state) {
207 base::AutoLock locker(lock_);
208 DCHECK(has_local_producer_no_lock());
210 HandleSignalsState producer_state = ProducerGetHandleSignalsStateImplNoLock();
211 if (producer_state.satisfies(signals)) {
212 if (signals_state)
213 *signals_state = producer_state;
214 return MOJO_RESULT_ALREADY_EXISTS;
216 if (!producer_state.can_satisfy(signals)) {
217 if (signals_state)
218 *signals_state = producer_state;
219 return MOJO_RESULT_FAILED_PRECONDITION;
222 producer_waiter_list_->AddWaiter(waiter, signals, context);
223 return MOJO_RESULT_OK;
226 void DataPipe::ProducerRemoveWaiter(Waiter* waiter,
227 HandleSignalsState* signals_state) {
228 base::AutoLock locker(lock_);
229 DCHECK(has_local_producer_no_lock());
230 producer_waiter_list_->RemoveWaiter(waiter);
231 if (signals_state)
232 *signals_state = ProducerGetHandleSignalsStateImplNoLock();
235 bool DataPipe::ProducerIsBusy() const {
236 base::AutoLock locker(lock_);
237 return producer_in_two_phase_write_no_lock();
240 void DataPipe::ConsumerCancelAllWaiters() {
241 base::AutoLock locker(lock_);
242 DCHECK(has_local_consumer_no_lock());
243 consumer_waiter_list_->CancelAllWaiters();
246 void DataPipe::ConsumerClose() {
247 base::AutoLock locker(lock_);
248 DCHECK(consumer_open_);
249 consumer_open_ = false;
250 DCHECK(has_local_consumer_no_lock());
251 consumer_waiter_list_.reset();
252 // Not a bug, except possibly in "user" code.
253 DVLOG_IF(2, consumer_in_two_phase_read_no_lock())
254 << "Consumer closed with active two-phase read";
255 consumer_two_phase_max_num_bytes_read_ = 0;
256 ConsumerCloseImplNoLock();
257 AwakeProducerWaitersForStateChangeNoLock(
258 ProducerGetHandleSignalsStateImplNoLock());
261 MojoResult DataPipe::ConsumerReadData(UserPointer<void> elements,
262 UserPointer<uint32_t> num_bytes,
263 bool all_or_none) {
264 base::AutoLock locker(lock_);
265 DCHECK(has_local_consumer_no_lock());
267 if (consumer_in_two_phase_read_no_lock())
268 return MOJO_RESULT_BUSY;
270 uint32_t max_num_bytes_to_read = num_bytes.Get();
271 if (max_num_bytes_to_read % element_num_bytes_ != 0)
272 return MOJO_RESULT_INVALID_ARGUMENT;
274 if (max_num_bytes_to_read == 0)
275 return MOJO_RESULT_OK; // Nothing to do.
277 uint32_t min_num_bytes_to_read = all_or_none ? max_num_bytes_to_read : 0;
279 HandleSignalsState old_producer_state =
280 ProducerGetHandleSignalsStateImplNoLock();
281 MojoResult rv = ConsumerReadDataImplNoLock(
282 elements, num_bytes, max_num_bytes_to_read, min_num_bytes_to_read);
283 HandleSignalsState new_producer_state =
284 ProducerGetHandleSignalsStateImplNoLock();
285 if (!new_producer_state.equals(old_producer_state))
286 AwakeProducerWaitersForStateChangeNoLock(new_producer_state);
287 return rv;
290 MojoResult DataPipe::ConsumerDiscardData(UserPointer<uint32_t> num_bytes,
291 bool all_or_none) {
292 base::AutoLock locker(lock_);
293 DCHECK(has_local_consumer_no_lock());
295 if (consumer_in_two_phase_read_no_lock())
296 return MOJO_RESULT_BUSY;
298 uint32_t max_num_bytes_to_discard = num_bytes.Get();
299 if (max_num_bytes_to_discard % element_num_bytes_ != 0)
300 return MOJO_RESULT_INVALID_ARGUMENT;
302 if (max_num_bytes_to_discard == 0)
303 return MOJO_RESULT_OK; // Nothing to do.
305 uint32_t min_num_bytes_to_discard =
306 all_or_none ? max_num_bytes_to_discard : 0;
308 HandleSignalsState old_producer_state =
309 ProducerGetHandleSignalsStateImplNoLock();
310 MojoResult rv = ConsumerDiscardDataImplNoLock(
311 num_bytes, max_num_bytes_to_discard, min_num_bytes_to_discard);
312 HandleSignalsState new_producer_state =
313 ProducerGetHandleSignalsStateImplNoLock();
314 if (!new_producer_state.equals(old_producer_state))
315 AwakeProducerWaitersForStateChangeNoLock(new_producer_state);
316 return rv;
319 MojoResult DataPipe::ConsumerQueryData(UserPointer<uint32_t> num_bytes) {
320 base::AutoLock locker(lock_);
321 DCHECK(has_local_consumer_no_lock());
323 if (consumer_in_two_phase_read_no_lock())
324 return MOJO_RESULT_BUSY;
326 // Note: Don't need to validate |*num_bytes| for query.
327 return ConsumerQueryDataImplNoLock(num_bytes);
330 MojoResult DataPipe::ConsumerBeginReadData(
331 UserPointer<const void*> buffer,
332 UserPointer<uint32_t> buffer_num_bytes,
333 bool all_or_none) {
334 base::AutoLock locker(lock_);
335 DCHECK(has_local_consumer_no_lock());
337 if (consumer_in_two_phase_read_no_lock())
338 return MOJO_RESULT_BUSY;
340 uint32_t min_num_bytes_to_read = 0;
341 if (all_or_none) {
342 min_num_bytes_to_read = buffer_num_bytes.Get();
343 if (min_num_bytes_to_read % element_num_bytes_ != 0)
344 return MOJO_RESULT_INVALID_ARGUMENT;
347 MojoResult rv = ConsumerBeginReadDataImplNoLock(
348 buffer, buffer_num_bytes, min_num_bytes_to_read);
349 if (rv != MOJO_RESULT_OK)
350 return rv;
351 DCHECK(consumer_in_two_phase_read_no_lock());
352 return MOJO_RESULT_OK;
355 MojoResult DataPipe::ConsumerEndReadData(uint32_t num_bytes_read) {
356 base::AutoLock locker(lock_);
357 DCHECK(has_local_consumer_no_lock());
359 if (!consumer_in_two_phase_read_no_lock())
360 return MOJO_RESULT_FAILED_PRECONDITION;
362 HandleSignalsState old_producer_state =
363 ProducerGetHandleSignalsStateImplNoLock();
364 MojoResult rv;
365 if (num_bytes_read > consumer_two_phase_max_num_bytes_read_ ||
366 num_bytes_read % element_num_bytes_ != 0) {
367 rv = MOJO_RESULT_INVALID_ARGUMENT;
368 consumer_two_phase_max_num_bytes_read_ = 0;
369 } else {
370 rv = ConsumerEndReadDataImplNoLock(num_bytes_read);
372 // Two-phase read ended even on failure.
373 DCHECK(!consumer_in_two_phase_read_no_lock());
374 // If we're now readable, we *became* readable (since we weren't readable
375 // during the two-phase read), so awake consumer waiters.
376 HandleSignalsState new_consumer_state =
377 ConsumerGetHandleSignalsStateImplNoLock();
378 if (new_consumer_state.satisfies(MOJO_HANDLE_SIGNAL_READABLE))
379 AwakeConsumerWaitersForStateChangeNoLock(new_consumer_state);
380 HandleSignalsState new_producer_state =
381 ProducerGetHandleSignalsStateImplNoLock();
382 if (!new_producer_state.equals(old_producer_state))
383 AwakeProducerWaitersForStateChangeNoLock(new_producer_state);
384 return rv;
387 HandleSignalsState DataPipe::ConsumerGetHandleSignalsState() {
388 base::AutoLock locker(lock_);
389 DCHECK(has_local_consumer_no_lock());
390 return ConsumerGetHandleSignalsStateImplNoLock();
393 MojoResult DataPipe::ConsumerAddWaiter(Waiter* waiter,
394 MojoHandleSignals signals,
395 uint32_t context,
396 HandleSignalsState* signals_state) {
397 base::AutoLock locker(lock_);
398 DCHECK(has_local_consumer_no_lock());
400 HandleSignalsState consumer_state = ConsumerGetHandleSignalsStateImplNoLock();
401 if (consumer_state.satisfies(signals)) {
402 if (signals_state)
403 *signals_state = consumer_state;
404 return MOJO_RESULT_ALREADY_EXISTS;
406 if (!consumer_state.can_satisfy(signals)) {
407 if (signals_state)
408 *signals_state = consumer_state;
409 return MOJO_RESULT_FAILED_PRECONDITION;
412 consumer_waiter_list_->AddWaiter(waiter, signals, context);
413 return MOJO_RESULT_OK;
416 void DataPipe::ConsumerRemoveWaiter(Waiter* waiter,
417 HandleSignalsState* signals_state) {
418 base::AutoLock locker(lock_);
419 DCHECK(has_local_consumer_no_lock());
420 consumer_waiter_list_->RemoveWaiter(waiter);
421 if (signals_state)
422 *signals_state = ConsumerGetHandleSignalsStateImplNoLock();
425 bool DataPipe::ConsumerIsBusy() const {
426 base::AutoLock locker(lock_);
427 return consumer_in_two_phase_read_no_lock();
430 DataPipe::DataPipe(bool has_local_producer,
431 bool has_local_consumer,
432 const MojoCreateDataPipeOptions& validated_options)
433 : may_discard_((validated_options.flags &
434 MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_MAY_DISCARD)),
435 element_num_bytes_(validated_options.element_num_bytes),
436 capacity_num_bytes_(validated_options.capacity_num_bytes),
437 producer_open_(true),
438 consumer_open_(true),
439 producer_waiter_list_(has_local_producer ? new WaiterList() : NULL),
440 consumer_waiter_list_(has_local_consumer ? new WaiterList() : NULL),
441 producer_two_phase_max_num_bytes_written_(0),
442 consumer_two_phase_max_num_bytes_read_(0) {
443 // Check that the passed in options actually are validated.
444 MojoCreateDataPipeOptions unused ALLOW_UNUSED = {0};
445 DCHECK_EQ(ValidateCreateOptions(MakeUserPointer(&validated_options), &unused),
446 MOJO_RESULT_OK);
449 DataPipe::~DataPipe() {
450 DCHECK(!producer_open_);
451 DCHECK(!consumer_open_);
452 DCHECK(!producer_waiter_list_);
453 DCHECK(!consumer_waiter_list_);
456 void DataPipe::AwakeProducerWaitersForStateChangeNoLock(
457 const HandleSignalsState& new_producer_state) {
458 lock_.AssertAcquired();
459 if (!has_local_producer_no_lock())
460 return;
461 producer_waiter_list_->AwakeWaitersForStateChange(new_producer_state);
464 void DataPipe::AwakeConsumerWaitersForStateChangeNoLock(
465 const HandleSignalsState& new_consumer_state) {
466 lock_.AssertAcquired();
467 if (!has_local_consumer_no_lock())
468 return;
469 consumer_waiter_list_->AwakeWaitersForStateChange(new_consumer_state);
472 } // namespace system
473 } // namespace mojo