1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "mojo/system/data_pipe.h"
12 #include "base/compiler_specific.h"
13 #include "base/logging.h"
14 #include "mojo/system/constants.h"
15 #include "mojo/system/memory.h"
16 #include "mojo/system/options_validation.h"
17 #include "mojo/system/waiter_list.h"
23 MojoResult
DataPipe::ValidateCreateOptions(
24 const MojoCreateDataPipeOptions
* in_options
,
25 MojoCreateDataPipeOptions
* out_options
) {
26 const MojoCreateDataPipeOptionsFlags kKnownFlags
=
27 MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_MAY_DISCARD
;
28 static const MojoCreateDataPipeOptions kDefaultOptions
= {
29 static_cast<uint32_t>(sizeof(MojoCreateDataPipeOptions
)),
30 MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE
,
32 static_cast<uint32_t>(kDefaultDataPipeCapacityBytes
)
35 *out_options
= kDefaultOptions
;
37 return MOJO_RESULT_OK
;
40 ValidateOptionsStructPointerSizeAndFlags
<MojoCreateDataPipeOptions
>(
41 in_options
, kKnownFlags
, out_options
);
42 if (result
!= MOJO_RESULT_OK
)
45 // Checks for fields beyond |flags|:
47 if (!HAS_OPTIONS_STRUCT_MEMBER(MojoCreateDataPipeOptions
, element_num_bytes
,
49 return MOJO_RESULT_OK
;
50 if (in_options
->element_num_bytes
== 0)
51 return MOJO_RESULT_INVALID_ARGUMENT
;
52 out_options
->element_num_bytes
= in_options
->element_num_bytes
;
54 if (!HAS_OPTIONS_STRUCT_MEMBER(MojoCreateDataPipeOptions
, capacity_num_bytes
,
56 in_options
->capacity_num_bytes
== 0) {
57 // Round the default capacity down to a multiple of the element size (but at
58 // least one element).
59 out_options
->capacity_num_bytes
= std::max(
60 static_cast<uint32_t>(kDefaultDataPipeCapacityBytes
-
61 (kDefaultDataPipeCapacityBytes
% out_options
->element_num_bytes
)),
62 out_options
->element_num_bytes
);
63 return MOJO_RESULT_OK
;
65 if (in_options
->capacity_num_bytes
% out_options
->element_num_bytes
!= 0)
66 return MOJO_RESULT_INVALID_ARGUMENT
;
67 if (in_options
->capacity_num_bytes
> kMaxDataPipeCapacityBytes
)
68 return MOJO_RESULT_RESOURCE_EXHAUSTED
;
69 out_options
->capacity_num_bytes
= in_options
->capacity_num_bytes
;
71 return MOJO_RESULT_OK
;
74 void DataPipe::ProducerCancelAllWaiters() {
75 base::AutoLock
locker(lock_
);
76 DCHECK(has_local_producer_no_lock());
77 producer_waiter_list_
->CancelAllWaiters();
80 void DataPipe::ProducerClose() {
81 base::AutoLock
locker(lock_
);
82 DCHECK(producer_open_
);
83 producer_open_
= false;
84 DCHECK(has_local_producer_no_lock());
85 producer_waiter_list_
.reset();
86 // Not a bug, except possibly in "user" code.
87 DVLOG_IF(2, producer_in_two_phase_write_no_lock())
88 << "Producer closed with active two-phase write";
89 producer_two_phase_max_num_bytes_written_
= 0;
90 ProducerCloseImplNoLock();
91 AwakeConsumerWaitersForStateChangeNoLock();
94 MojoResult
DataPipe::ProducerWriteData(const void* elements
,
97 base::AutoLock
locker(lock_
);
98 DCHECK(has_local_producer_no_lock());
100 if (producer_in_two_phase_write_no_lock())
101 return MOJO_RESULT_BUSY
;
102 if (!consumer_open_no_lock())
103 return MOJO_RESULT_FAILED_PRECONDITION
;
105 // Returning "busy" takes priority over "invalid argument".
106 if (*num_bytes
% element_num_bytes_
!= 0)
107 return MOJO_RESULT_INVALID_ARGUMENT
;
110 return MOJO_RESULT_OK
; // Nothing to do.
112 MojoWaitFlags old_consumer_satisfied_flags
= ConsumerSatisfiedFlagsNoLock();
113 MojoResult rv
= ProducerWriteDataImplNoLock(elements
, num_bytes
, all_or_none
);
114 if (ConsumerSatisfiedFlagsNoLock() != old_consumer_satisfied_flags
)
115 AwakeConsumerWaitersForStateChangeNoLock();
119 MojoResult
DataPipe::ProducerBeginWriteData(void** buffer
,
120 uint32_t* buffer_num_bytes
,
122 base::AutoLock
locker(lock_
);
123 DCHECK(has_local_producer_no_lock());
125 if (producer_in_two_phase_write_no_lock())
126 return MOJO_RESULT_BUSY
;
127 if (!consumer_open_no_lock())
128 return MOJO_RESULT_FAILED_PRECONDITION
;
130 if (all_or_none
&& *buffer_num_bytes
% element_num_bytes_
!= 0)
131 return MOJO_RESULT_INVALID_ARGUMENT
;
133 MojoResult rv
= ProducerBeginWriteDataImplNoLock(buffer
, buffer_num_bytes
,
135 if (rv
!= MOJO_RESULT_OK
)
137 // Note: No need to awake producer waiters, even though we're going from
138 // writable to non-writable (since you can't wait on non-writability).
139 // Similarly, though this may have discarded data (in "may discard" mode),
140 // making it non-readable, there's still no need to awake consumer waiters.
141 DCHECK(producer_in_two_phase_write_no_lock());
142 return MOJO_RESULT_OK
;
145 MojoResult
DataPipe::ProducerEndWriteData(uint32_t num_bytes_written
) {
146 base::AutoLock
locker(lock_
);
147 DCHECK(has_local_producer_no_lock());
149 if (!producer_in_two_phase_write_no_lock())
150 return MOJO_RESULT_FAILED_PRECONDITION
;
151 // Note: Allow successful completion of the two-phase write even if the
152 // consumer has been closed.
154 MojoWaitFlags old_consumer_satisfied_flags
= ConsumerSatisfiedFlagsNoLock();
156 if (num_bytes_written
> producer_two_phase_max_num_bytes_written_
||
157 num_bytes_written
% element_num_bytes_
!= 0) {
158 rv
= MOJO_RESULT_INVALID_ARGUMENT
;
159 producer_two_phase_max_num_bytes_written_
= 0;
161 rv
= ProducerEndWriteDataImplNoLock(num_bytes_written
);
163 // Two-phase write ended even on failure.
164 DCHECK(!producer_in_two_phase_write_no_lock());
165 // If we're now writable, we *became* writable (since we weren't writable
166 // during the two-phase write), so awake producer waiters.
167 if ((ProducerSatisfiedFlagsNoLock() & MOJO_WAIT_FLAG_WRITABLE
))
168 AwakeProducerWaitersForStateChangeNoLock();
169 if (ConsumerSatisfiedFlagsNoLock() != old_consumer_satisfied_flags
)
170 AwakeConsumerWaitersForStateChangeNoLock();
174 MojoResult
DataPipe::ProducerAddWaiter(Waiter
* waiter
,
176 MojoResult wake_result
) {
177 base::AutoLock
locker(lock_
);
178 DCHECK(has_local_producer_no_lock());
180 if ((flags
& ProducerSatisfiedFlagsNoLock()))
181 return MOJO_RESULT_ALREADY_EXISTS
;
182 if (!(flags
& ProducerSatisfiableFlagsNoLock()))
183 return MOJO_RESULT_FAILED_PRECONDITION
;
185 producer_waiter_list_
->AddWaiter(waiter
, flags
, wake_result
);
186 return MOJO_RESULT_OK
;
189 void DataPipe::ProducerRemoveWaiter(Waiter
* waiter
) {
190 base::AutoLock
locker(lock_
);
191 DCHECK(has_local_producer_no_lock());
192 producer_waiter_list_
->RemoveWaiter(waiter
);
195 bool DataPipe::ProducerIsBusy() const {
196 base::AutoLock
locker(lock_
);
197 return producer_in_two_phase_write_no_lock();
200 void DataPipe::ConsumerCancelAllWaiters() {
201 base::AutoLock
locker(lock_
);
202 DCHECK(has_local_consumer_no_lock());
203 consumer_waiter_list_
->CancelAllWaiters();
206 void DataPipe::ConsumerClose() {
207 base::AutoLock
locker(lock_
);
208 DCHECK(consumer_open_
);
209 consumer_open_
= false;
210 DCHECK(has_local_consumer_no_lock());
211 consumer_waiter_list_
.reset();
212 // Not a bug, except possibly in "user" code.
213 DVLOG_IF(2, consumer_in_two_phase_read_no_lock())
214 << "Consumer closed with active two-phase read";
215 consumer_two_phase_max_num_bytes_read_
= 0;
216 ConsumerCloseImplNoLock();
217 AwakeProducerWaitersForStateChangeNoLock();
220 MojoResult
DataPipe::ConsumerReadData(void* elements
,
223 base::AutoLock
locker(lock_
);
224 DCHECK(has_local_consumer_no_lock());
226 if (consumer_in_two_phase_read_no_lock())
227 return MOJO_RESULT_BUSY
;
229 if (*num_bytes
% element_num_bytes_
!= 0)
230 return MOJO_RESULT_INVALID_ARGUMENT
;
233 return MOJO_RESULT_OK
; // Nothing to do.
235 MojoWaitFlags old_producer_satisfied_flags
= ProducerSatisfiedFlagsNoLock();
236 MojoResult rv
= ConsumerReadDataImplNoLock(elements
, num_bytes
, all_or_none
);
237 if (ProducerSatisfiedFlagsNoLock() != old_producer_satisfied_flags
)
238 AwakeProducerWaitersForStateChangeNoLock();
242 MojoResult
DataPipe::ConsumerDiscardData(uint32_t* num_bytes
,
244 base::AutoLock
locker(lock_
);
245 DCHECK(has_local_consumer_no_lock());
247 if (consumer_in_two_phase_read_no_lock())
248 return MOJO_RESULT_BUSY
;
250 if (*num_bytes
% element_num_bytes_
!= 0)
251 return MOJO_RESULT_INVALID_ARGUMENT
;
254 return MOJO_RESULT_OK
; // Nothing to do.
256 MojoWaitFlags old_producer_satisfied_flags
= ProducerSatisfiedFlagsNoLock();
257 MojoResult rv
= ConsumerDiscardDataImplNoLock(num_bytes
, all_or_none
);
258 if (ProducerSatisfiedFlagsNoLock() != old_producer_satisfied_flags
)
259 AwakeProducerWaitersForStateChangeNoLock();
263 MojoResult
DataPipe::ConsumerQueryData(uint32_t* num_bytes
) {
264 base::AutoLock
locker(lock_
);
265 DCHECK(has_local_consumer_no_lock());
267 if (consumer_in_two_phase_read_no_lock())
268 return MOJO_RESULT_BUSY
;
270 // Note: Don't need to validate |*num_bytes| for query.
271 return ConsumerQueryDataImplNoLock(num_bytes
);
274 MojoResult
DataPipe::ConsumerBeginReadData(const void** buffer
,
275 uint32_t* buffer_num_bytes
,
277 base::AutoLock
locker(lock_
);
278 DCHECK(has_local_consumer_no_lock());
280 if (consumer_in_two_phase_read_no_lock())
281 return MOJO_RESULT_BUSY
;
283 if (all_or_none
&& *buffer_num_bytes
% element_num_bytes_
!= 0)
284 return MOJO_RESULT_INVALID_ARGUMENT
;
286 MojoResult rv
= ConsumerBeginReadDataImplNoLock(buffer
, buffer_num_bytes
,
288 if (rv
!= MOJO_RESULT_OK
)
290 DCHECK(consumer_in_two_phase_read_no_lock());
291 return MOJO_RESULT_OK
;
294 MojoResult
DataPipe::ConsumerEndReadData(uint32_t num_bytes_read
) {
295 base::AutoLock
locker(lock_
);
296 DCHECK(has_local_consumer_no_lock());
298 if (!consumer_in_two_phase_read_no_lock())
299 return MOJO_RESULT_FAILED_PRECONDITION
;
301 MojoWaitFlags old_producer_satisfied_flags
= ProducerSatisfiedFlagsNoLock();
303 if (num_bytes_read
> consumer_two_phase_max_num_bytes_read_
||
304 num_bytes_read
% element_num_bytes_
!= 0) {
305 rv
= MOJO_RESULT_INVALID_ARGUMENT
;
306 consumer_two_phase_max_num_bytes_read_
= 0;
308 rv
= ConsumerEndReadDataImplNoLock(num_bytes_read
);
310 // Two-phase read ended even on failure.
311 DCHECK(!consumer_in_two_phase_read_no_lock());
312 // If we're now readable, we *became* readable (since we weren't readable
313 // during the two-phase read), so awake consumer waiters.
314 if ((ConsumerSatisfiedFlagsNoLock() & MOJO_WAIT_FLAG_READABLE
))
315 AwakeConsumerWaitersForStateChangeNoLock();
316 if (ProducerSatisfiedFlagsNoLock() != old_producer_satisfied_flags
)
317 AwakeProducerWaitersForStateChangeNoLock();
321 MojoResult
DataPipe::ConsumerAddWaiter(Waiter
* waiter
,
323 MojoResult wake_result
) {
324 base::AutoLock
locker(lock_
);
325 DCHECK(has_local_consumer_no_lock());
327 if ((flags
& ConsumerSatisfiedFlagsNoLock()))
328 return MOJO_RESULT_ALREADY_EXISTS
;
329 if (!(flags
& ConsumerSatisfiableFlagsNoLock()))
330 return MOJO_RESULT_FAILED_PRECONDITION
;
332 consumer_waiter_list_
->AddWaiter(waiter
, flags
, wake_result
);
333 return MOJO_RESULT_OK
;
336 void DataPipe::ConsumerRemoveWaiter(Waiter
* waiter
) {
337 base::AutoLock
locker(lock_
);
338 DCHECK(has_local_consumer_no_lock());
339 consumer_waiter_list_
->RemoveWaiter(waiter
);
342 bool DataPipe::ConsumerIsBusy() const {
343 base::AutoLock
locker(lock_
);
344 return consumer_in_two_phase_read_no_lock();
348 DataPipe::DataPipe(bool has_local_producer
,
349 bool has_local_consumer
,
350 const MojoCreateDataPipeOptions
& validated_options
)
351 : may_discard_((validated_options
.flags
&
352 MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_MAY_DISCARD
)),
353 element_num_bytes_(validated_options
.element_num_bytes
),
354 capacity_num_bytes_(validated_options
.capacity_num_bytes
),
355 producer_open_(true),
356 consumer_open_(true),
357 producer_waiter_list_(has_local_producer
? new WaiterList() : NULL
),
358 consumer_waiter_list_(has_local_consumer
? new WaiterList() : NULL
),
359 producer_two_phase_max_num_bytes_written_(0),
360 consumer_two_phase_max_num_bytes_read_(0) {
361 // Check that the passed in options actually are validated.
362 MojoCreateDataPipeOptions unused ALLOW_UNUSED
= { 0 };
363 DCHECK_EQ(ValidateCreateOptions(&validated_options
, &unused
), MOJO_RESULT_OK
);
366 DataPipe::~DataPipe() {
367 DCHECK(!producer_open_
);
368 DCHECK(!consumer_open_
);
369 DCHECK(!producer_waiter_list_
);
370 DCHECK(!consumer_waiter_list_
);
373 void DataPipe::AwakeProducerWaitersForStateChangeNoLock() {
374 lock_
.AssertAcquired();
375 if (!has_local_producer_no_lock())
377 producer_waiter_list_
->AwakeWaitersForStateChange(
378 ProducerSatisfiedFlagsNoLock(), ProducerSatisfiableFlagsNoLock());
381 void DataPipe::AwakeConsumerWaitersForStateChangeNoLock() {
382 lock_
.AssertAcquired();
383 if (!has_local_consumer_no_lock())
385 consumer_waiter_list_
->AwakeWaitersForStateChange(
386 ConsumerSatisfiedFlagsNoLock(), ConsumerSatisfiableFlagsNoLock());
389 } // namespace system