1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "mojo/system/data_pipe.h"
12 #include "base/compiler_specific.h"
13 #include "base/logging.h"
14 #include "mojo/system/constants.h"
15 #include "mojo/system/memory.h"
16 #include "mojo/system/options_validation.h"
17 #include "mojo/system/waiter_list.h"
23 const MojoCreateDataPipeOptions
DataPipe::kDefaultCreateOptions
= {
24 static_cast<uint32_t>(sizeof(MojoCreateDataPipeOptions
)),
25 MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE
, 1u,
26 static_cast<uint32_t>(kDefaultDataPipeCapacityBytes
)};
29 MojoResult
DataPipe::ValidateCreateOptions(
30 UserPointer
<const MojoCreateDataPipeOptions
> in_options
,
31 MojoCreateDataPipeOptions
* out_options
) {
32 const MojoCreateDataPipeOptionsFlags kKnownFlags
=
33 MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_MAY_DISCARD
;
35 *out_options
= kDefaultCreateOptions
;
36 if (in_options
.IsNull())
37 return MOJO_RESULT_OK
;
39 UserOptionsReader
<MojoCreateDataPipeOptions
> reader(in_options
);
40 if (!reader
.is_valid())
41 return MOJO_RESULT_INVALID_ARGUMENT
;
43 if (!OPTIONS_STRUCT_HAS_MEMBER(MojoCreateDataPipeOptions
, flags
, reader
))
44 return MOJO_RESULT_OK
;
45 if ((reader
.options().flags
& ~kKnownFlags
))
46 return MOJO_RESULT_UNIMPLEMENTED
;
47 out_options
->flags
= reader
.options().flags
;
49 // Checks for fields beyond |flags|:
51 if (!OPTIONS_STRUCT_HAS_MEMBER(
52 MojoCreateDataPipeOptions
, element_num_bytes
, reader
))
53 return MOJO_RESULT_OK
;
54 if (reader
.options().element_num_bytes
== 0)
55 return MOJO_RESULT_INVALID_ARGUMENT
;
56 out_options
->element_num_bytes
= reader
.options().element_num_bytes
;
58 if (!OPTIONS_STRUCT_HAS_MEMBER(
59 MojoCreateDataPipeOptions
, capacity_num_bytes
, reader
) ||
60 reader
.options().capacity_num_bytes
== 0) {
61 // Round the default capacity down to a multiple of the element size (but at
62 // least one element).
63 out_options
->capacity_num_bytes
=
64 std::max(static_cast<uint32_t>(kDefaultDataPipeCapacityBytes
-
65 (kDefaultDataPipeCapacityBytes
%
66 out_options
->element_num_bytes
)),
67 out_options
->element_num_bytes
);
68 return MOJO_RESULT_OK
;
70 if (reader
.options().capacity_num_bytes
% out_options
->element_num_bytes
!= 0)
71 return MOJO_RESULT_INVALID_ARGUMENT
;
72 if (reader
.options().capacity_num_bytes
> kMaxDataPipeCapacityBytes
)
73 return MOJO_RESULT_RESOURCE_EXHAUSTED
;
74 out_options
->capacity_num_bytes
= reader
.options().capacity_num_bytes
;
76 return MOJO_RESULT_OK
;
79 void DataPipe::ProducerCancelAllWaiters() {
80 base::AutoLock
locker(lock_
);
81 DCHECK(has_local_producer_no_lock());
82 producer_waiter_list_
->CancelAllWaiters();
85 void DataPipe::ProducerClose() {
86 base::AutoLock
locker(lock_
);
87 DCHECK(producer_open_
);
88 producer_open_
= false;
89 DCHECK(has_local_producer_no_lock());
90 producer_waiter_list_
.reset();
91 // Not a bug, except possibly in "user" code.
92 DVLOG_IF(2, producer_in_two_phase_write_no_lock())
93 << "Producer closed with active two-phase write";
94 producer_two_phase_max_num_bytes_written_
= 0;
95 ProducerCloseImplNoLock();
96 AwakeConsumerWaitersForStateChangeNoLock(
97 ConsumerGetHandleSignalsStateImplNoLock());
100 MojoResult
DataPipe::ProducerWriteData(UserPointer
<const void> elements
,
101 UserPointer
<uint32_t> num_bytes
,
103 base::AutoLock
locker(lock_
);
104 DCHECK(has_local_producer_no_lock());
106 if (producer_in_two_phase_write_no_lock())
107 return MOJO_RESULT_BUSY
;
108 if (!consumer_open_no_lock())
109 return MOJO_RESULT_FAILED_PRECONDITION
;
111 // Returning "busy" takes priority over "invalid argument".
112 uint32_t max_num_bytes_to_write
= num_bytes
.Get();
113 if (max_num_bytes_to_write
% element_num_bytes_
!= 0)
114 return MOJO_RESULT_INVALID_ARGUMENT
;
116 if (max_num_bytes_to_write
== 0)
117 return MOJO_RESULT_OK
; // Nothing to do.
119 uint32_t min_num_bytes_to_write
= all_or_none
? max_num_bytes_to_write
: 0;
121 HandleSignalsState old_consumer_state
=
122 ConsumerGetHandleSignalsStateImplNoLock();
123 MojoResult rv
= ProducerWriteDataImplNoLock(
124 elements
, num_bytes
, max_num_bytes_to_write
, min_num_bytes_to_write
);
125 HandleSignalsState new_consumer_state
=
126 ConsumerGetHandleSignalsStateImplNoLock();
127 if (!new_consumer_state
.equals(old_consumer_state
))
128 AwakeConsumerWaitersForStateChangeNoLock(new_consumer_state
);
132 MojoResult
DataPipe::ProducerBeginWriteData(
133 UserPointer
<void*> buffer
,
134 UserPointer
<uint32_t> buffer_num_bytes
,
136 base::AutoLock
locker(lock_
);
137 DCHECK(has_local_producer_no_lock());
139 if (producer_in_two_phase_write_no_lock())
140 return MOJO_RESULT_BUSY
;
141 if (!consumer_open_no_lock())
142 return MOJO_RESULT_FAILED_PRECONDITION
;
144 uint32_t min_num_bytes_to_write
= 0;
146 min_num_bytes_to_write
= buffer_num_bytes
.Get();
147 if (min_num_bytes_to_write
% element_num_bytes_
!= 0)
148 return MOJO_RESULT_INVALID_ARGUMENT
;
151 MojoResult rv
= ProducerBeginWriteDataImplNoLock(
152 buffer
, buffer_num_bytes
, min_num_bytes_to_write
);
153 if (rv
!= MOJO_RESULT_OK
)
155 // Note: No need to awake producer waiters, even though we're going from
156 // writable to non-writable (since you can't wait on non-writability).
157 // Similarly, though this may have discarded data (in "may discard" mode),
158 // making it non-readable, there's still no need to awake consumer waiters.
159 DCHECK(producer_in_two_phase_write_no_lock());
160 return MOJO_RESULT_OK
;
163 MojoResult
DataPipe::ProducerEndWriteData(uint32_t num_bytes_written
) {
164 base::AutoLock
locker(lock_
);
165 DCHECK(has_local_producer_no_lock());
167 if (!producer_in_two_phase_write_no_lock())
168 return MOJO_RESULT_FAILED_PRECONDITION
;
169 // Note: Allow successful completion of the two-phase write even if the
170 // consumer has been closed.
172 HandleSignalsState old_consumer_state
=
173 ConsumerGetHandleSignalsStateImplNoLock();
175 if (num_bytes_written
> producer_two_phase_max_num_bytes_written_
||
176 num_bytes_written
% element_num_bytes_
!= 0) {
177 rv
= MOJO_RESULT_INVALID_ARGUMENT
;
178 producer_two_phase_max_num_bytes_written_
= 0;
180 rv
= ProducerEndWriteDataImplNoLock(num_bytes_written
);
182 // Two-phase write ended even on failure.
183 DCHECK(!producer_in_two_phase_write_no_lock());
184 // If we're now writable, we *became* writable (since we weren't writable
185 // during the two-phase write), so awake producer waiters.
186 HandleSignalsState new_producer_state
=
187 ProducerGetHandleSignalsStateImplNoLock();
188 if (new_producer_state
.satisfies(MOJO_HANDLE_SIGNAL_WRITABLE
))
189 AwakeProducerWaitersForStateChangeNoLock(new_producer_state
);
190 HandleSignalsState new_consumer_state
=
191 ConsumerGetHandleSignalsStateImplNoLock();
192 if (!new_consumer_state
.equals(old_consumer_state
))
193 AwakeConsumerWaitersForStateChangeNoLock(new_consumer_state
);
197 HandleSignalsState
DataPipe::ProducerGetHandleSignalsState() {
198 base::AutoLock
locker(lock_
);
199 DCHECK(has_local_producer_no_lock());
200 return ProducerGetHandleSignalsStateImplNoLock();
203 MojoResult
DataPipe::ProducerAddWaiter(Waiter
* waiter
,
204 MojoHandleSignals signals
,
206 HandleSignalsState
* signals_state
) {
207 base::AutoLock
locker(lock_
);
208 DCHECK(has_local_producer_no_lock());
210 HandleSignalsState producer_state
= ProducerGetHandleSignalsStateImplNoLock();
211 if (producer_state
.satisfies(signals
)) {
213 *signals_state
= producer_state
;
214 return MOJO_RESULT_ALREADY_EXISTS
;
216 if (!producer_state
.can_satisfy(signals
)) {
218 *signals_state
= producer_state
;
219 return MOJO_RESULT_FAILED_PRECONDITION
;
222 producer_waiter_list_
->AddWaiter(waiter
, signals
, context
);
223 return MOJO_RESULT_OK
;
226 void DataPipe::ProducerRemoveWaiter(Waiter
* waiter
,
227 HandleSignalsState
* signals_state
) {
228 base::AutoLock
locker(lock_
);
229 DCHECK(has_local_producer_no_lock());
230 producer_waiter_list_
->RemoveWaiter(waiter
);
232 *signals_state
= ProducerGetHandleSignalsStateImplNoLock();
235 bool DataPipe::ProducerIsBusy() const {
236 base::AutoLock
locker(lock_
);
237 return producer_in_two_phase_write_no_lock();
240 void DataPipe::ConsumerCancelAllWaiters() {
241 base::AutoLock
locker(lock_
);
242 DCHECK(has_local_consumer_no_lock());
243 consumer_waiter_list_
->CancelAllWaiters();
246 void DataPipe::ConsumerClose() {
247 base::AutoLock
locker(lock_
);
248 DCHECK(consumer_open_
);
249 consumer_open_
= false;
250 DCHECK(has_local_consumer_no_lock());
251 consumer_waiter_list_
.reset();
252 // Not a bug, except possibly in "user" code.
253 DVLOG_IF(2, consumer_in_two_phase_read_no_lock())
254 << "Consumer closed with active two-phase read";
255 consumer_two_phase_max_num_bytes_read_
= 0;
256 ConsumerCloseImplNoLock();
257 AwakeProducerWaitersForStateChangeNoLock(
258 ProducerGetHandleSignalsStateImplNoLock());
261 MojoResult
DataPipe::ConsumerReadData(UserPointer
<void> elements
,
262 UserPointer
<uint32_t> num_bytes
,
264 base::AutoLock
locker(lock_
);
265 DCHECK(has_local_consumer_no_lock());
267 if (consumer_in_two_phase_read_no_lock())
268 return MOJO_RESULT_BUSY
;
270 uint32_t max_num_bytes_to_read
= num_bytes
.Get();
271 if (max_num_bytes_to_read
% element_num_bytes_
!= 0)
272 return MOJO_RESULT_INVALID_ARGUMENT
;
274 if (max_num_bytes_to_read
== 0)
275 return MOJO_RESULT_OK
; // Nothing to do.
277 uint32_t min_num_bytes_to_read
= all_or_none
? max_num_bytes_to_read
: 0;
279 HandleSignalsState old_producer_state
=
280 ProducerGetHandleSignalsStateImplNoLock();
281 MojoResult rv
= ConsumerReadDataImplNoLock(
282 elements
, num_bytes
, max_num_bytes_to_read
, min_num_bytes_to_read
);
283 HandleSignalsState new_producer_state
=
284 ProducerGetHandleSignalsStateImplNoLock();
285 if (!new_producer_state
.equals(old_producer_state
))
286 AwakeProducerWaitersForStateChangeNoLock(new_producer_state
);
290 MojoResult
DataPipe::ConsumerDiscardData(UserPointer
<uint32_t> num_bytes
,
292 base::AutoLock
locker(lock_
);
293 DCHECK(has_local_consumer_no_lock());
295 if (consumer_in_two_phase_read_no_lock())
296 return MOJO_RESULT_BUSY
;
298 uint32_t max_num_bytes_to_discard
= num_bytes
.Get();
299 if (max_num_bytes_to_discard
% element_num_bytes_
!= 0)
300 return MOJO_RESULT_INVALID_ARGUMENT
;
302 if (max_num_bytes_to_discard
== 0)
303 return MOJO_RESULT_OK
; // Nothing to do.
305 uint32_t min_num_bytes_to_discard
=
306 all_or_none
? max_num_bytes_to_discard
: 0;
308 HandleSignalsState old_producer_state
=
309 ProducerGetHandleSignalsStateImplNoLock();
310 MojoResult rv
= ConsumerDiscardDataImplNoLock(
311 num_bytes
, max_num_bytes_to_discard
, min_num_bytes_to_discard
);
312 HandleSignalsState new_producer_state
=
313 ProducerGetHandleSignalsStateImplNoLock();
314 if (!new_producer_state
.equals(old_producer_state
))
315 AwakeProducerWaitersForStateChangeNoLock(new_producer_state
);
319 MojoResult
DataPipe::ConsumerQueryData(UserPointer
<uint32_t> num_bytes
) {
320 base::AutoLock
locker(lock_
);
321 DCHECK(has_local_consumer_no_lock());
323 if (consumer_in_two_phase_read_no_lock())
324 return MOJO_RESULT_BUSY
;
326 // Note: Don't need to validate |*num_bytes| for query.
327 return ConsumerQueryDataImplNoLock(num_bytes
);
330 MojoResult
DataPipe::ConsumerBeginReadData(
331 UserPointer
<const void*> buffer
,
332 UserPointer
<uint32_t> buffer_num_bytes
,
334 base::AutoLock
locker(lock_
);
335 DCHECK(has_local_consumer_no_lock());
337 if (consumer_in_two_phase_read_no_lock())
338 return MOJO_RESULT_BUSY
;
340 uint32_t min_num_bytes_to_read
= 0;
342 min_num_bytes_to_read
= buffer_num_bytes
.Get();
343 if (min_num_bytes_to_read
% element_num_bytes_
!= 0)
344 return MOJO_RESULT_INVALID_ARGUMENT
;
347 MojoResult rv
= ConsumerBeginReadDataImplNoLock(
348 buffer
, buffer_num_bytes
, min_num_bytes_to_read
);
349 if (rv
!= MOJO_RESULT_OK
)
351 DCHECK(consumer_in_two_phase_read_no_lock());
352 return MOJO_RESULT_OK
;
355 MojoResult
DataPipe::ConsumerEndReadData(uint32_t num_bytes_read
) {
356 base::AutoLock
locker(lock_
);
357 DCHECK(has_local_consumer_no_lock());
359 if (!consumer_in_two_phase_read_no_lock())
360 return MOJO_RESULT_FAILED_PRECONDITION
;
362 HandleSignalsState old_producer_state
=
363 ProducerGetHandleSignalsStateImplNoLock();
365 if (num_bytes_read
> consumer_two_phase_max_num_bytes_read_
||
366 num_bytes_read
% element_num_bytes_
!= 0) {
367 rv
= MOJO_RESULT_INVALID_ARGUMENT
;
368 consumer_two_phase_max_num_bytes_read_
= 0;
370 rv
= ConsumerEndReadDataImplNoLock(num_bytes_read
);
372 // Two-phase read ended even on failure.
373 DCHECK(!consumer_in_two_phase_read_no_lock());
374 // If we're now readable, we *became* readable (since we weren't readable
375 // during the two-phase read), so awake consumer waiters.
376 HandleSignalsState new_consumer_state
=
377 ConsumerGetHandleSignalsStateImplNoLock();
378 if (new_consumer_state
.satisfies(MOJO_HANDLE_SIGNAL_READABLE
))
379 AwakeConsumerWaitersForStateChangeNoLock(new_consumer_state
);
380 HandleSignalsState new_producer_state
=
381 ProducerGetHandleSignalsStateImplNoLock();
382 if (!new_producer_state
.equals(old_producer_state
))
383 AwakeProducerWaitersForStateChangeNoLock(new_producer_state
);
387 HandleSignalsState
DataPipe::ConsumerGetHandleSignalsState() {
388 base::AutoLock
locker(lock_
);
389 DCHECK(has_local_consumer_no_lock());
390 return ConsumerGetHandleSignalsStateImplNoLock();
393 MojoResult
DataPipe::ConsumerAddWaiter(Waiter
* waiter
,
394 MojoHandleSignals signals
,
396 HandleSignalsState
* signals_state
) {
397 base::AutoLock
locker(lock_
);
398 DCHECK(has_local_consumer_no_lock());
400 HandleSignalsState consumer_state
= ConsumerGetHandleSignalsStateImplNoLock();
401 if (consumer_state
.satisfies(signals
)) {
403 *signals_state
= consumer_state
;
404 return MOJO_RESULT_ALREADY_EXISTS
;
406 if (!consumer_state
.can_satisfy(signals
)) {
408 *signals_state
= consumer_state
;
409 return MOJO_RESULT_FAILED_PRECONDITION
;
412 consumer_waiter_list_
->AddWaiter(waiter
, signals
, context
);
413 return MOJO_RESULT_OK
;
416 void DataPipe::ConsumerRemoveWaiter(Waiter
* waiter
,
417 HandleSignalsState
* signals_state
) {
418 base::AutoLock
locker(lock_
);
419 DCHECK(has_local_consumer_no_lock());
420 consumer_waiter_list_
->RemoveWaiter(waiter
);
422 *signals_state
= ConsumerGetHandleSignalsStateImplNoLock();
425 bool DataPipe::ConsumerIsBusy() const {
426 base::AutoLock
locker(lock_
);
427 return consumer_in_two_phase_read_no_lock();
430 DataPipe::DataPipe(bool has_local_producer
,
431 bool has_local_consumer
,
432 const MojoCreateDataPipeOptions
& validated_options
)
433 : may_discard_((validated_options
.flags
&
434 MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_MAY_DISCARD
)),
435 element_num_bytes_(validated_options
.element_num_bytes
),
436 capacity_num_bytes_(validated_options
.capacity_num_bytes
),
437 producer_open_(true),
438 consumer_open_(true),
439 producer_waiter_list_(has_local_producer
? new WaiterList() : NULL
),
440 consumer_waiter_list_(has_local_consumer
? new WaiterList() : NULL
),
441 producer_two_phase_max_num_bytes_written_(0),
442 consumer_two_phase_max_num_bytes_read_(0) {
443 // Check that the passed in options actually are validated.
444 MojoCreateDataPipeOptions unused ALLOW_UNUSED
= {0};
445 DCHECK_EQ(ValidateCreateOptions(MakeUserPointer(&validated_options
), &unused
),
449 DataPipe::~DataPipe() {
450 DCHECK(!producer_open_
);
451 DCHECK(!consumer_open_
);
452 DCHECK(!producer_waiter_list_
);
453 DCHECK(!consumer_waiter_list_
);
456 void DataPipe::AwakeProducerWaitersForStateChangeNoLock(
457 const HandleSignalsState
& new_producer_state
) {
458 lock_
.AssertAcquired();
459 if (!has_local_producer_no_lock())
461 producer_waiter_list_
->AwakeWaitersForStateChange(new_producer_state
);
464 void DataPipe::AwakeConsumerWaitersForStateChangeNoLock(
465 const HandleSignalsState
& new_consumer_state
) {
466 lock_
.AssertAcquired();
467 if (!has_local_consumer_no_lock())
469 consumer_waiter_list_
->AwakeWaitersForStateChange(new_consumer_state
);
472 } // namespace system