1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "network/public/cpp/web_socket_read_queue.h"
8 #include "base/logging.h"
12 struct WebSocketReadQueue::Operation
{
14 base::Callback
<void(const char*)> callback_
;
17 WebSocketReadQueue::WebSocketReadQueue(DataPipeConsumerHandle handle
)
18 : handle_(handle
), is_busy_(false), weak_factory_(this) {
21 WebSocketReadQueue::~WebSocketReadQueue() {
24 void WebSocketReadQueue::Read(uint32_t num_bytes
,
25 base::Callback
<void(const char*)> callback
) {
26 Operation
* op
= new Operation
;
27 op
->num_bytes_
= num_bytes
;
28 op
->callback_
= callback
;
38 void WebSocketReadQueue::TryToRead() {
40 DCHECK(!queue_
.empty());
42 Operation
* op
= queue_
[0];
43 const void* buffer
= NULL
;
44 uint32_t bytes_read
= op
->num_bytes_
;
45 MojoResult result
= BeginReadDataRaw(
46 handle_
, &buffer
, &bytes_read
, MOJO_READ_DATA_FLAG_ALL_OR_NONE
);
47 if (result
== MOJO_RESULT_SHOULD_WAIT
) {
52 // Ensure |op| is deleted, whether or not |this| goes away.
53 scoped_ptr
<Operation
> op_deleter(op
);
54 queue_
.weak_erase(queue_
.begin());
56 // http://crbug.com/490193 This should run callback as well. May need to
57 // change the callback signature.
58 if (result
!= MOJO_RESULT_OK
)
61 uint32_t num_bytes
= op_deleter
->num_bytes_
;
62 DCHECK_LE(num_bytes
, bytes_read
);
63 DataPipeConsumerHandle handle
= handle_
;
65 base::WeakPtr
<WebSocketReadQueue
> self(weak_factory_
.GetWeakPtr());
67 // This call may delete |this|. In that case, |self| will be invalidated.
68 // It may re-enter Read() too. Because |is_busy_| is true during the whole
69 // process, TryToRead() won't be re-entered.
70 op
->callback_
.Run(static_cast<const char*>(buffer
));
72 EndReadDataRaw(handle
, num_bytes
);
76 } while (!queue_
.empty());
80 void WebSocketReadQueue::Wait() {
82 handle_watcher_
.Start(
84 MOJO_HANDLE_SIGNAL_READABLE
,
85 MOJO_DEADLINE_INDEFINITE
,
86 base::Bind(&WebSocketReadQueue::OnHandleReady
, base::Unretained(this)));
89 void WebSocketReadQueue::OnHandleReady(MojoResult result
) {