1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "network/public/cpp/web_socket_read_queue.h"
8 #include "base/logging.h"
9 #include "base/memory/scoped_ptr.h"
13 struct WebSocketReadQueue::Operation
{
14 Operation(uint32_t num_bytes
,
15 const base::Callback
<void(const char*)>& callback
)
16 : num_bytes_(num_bytes
), callback_(callback
), current_num_bytes_(0) {}
19 base::Callback
<void(const char*)> callback_
;
21 // If the initial read doesn't return enough data, this array is used to
22 // accumulate data from multiple reads.
23 scoped_ptr
<char[]> data_buffer_
;
24 // The number of bytes accumulated so far.
25 uint32_t current_num_bytes_
;
28 WebSocketReadQueue::WebSocketReadQueue(DataPipeConsumerHandle handle
)
29 : handle_(handle
), is_busy_(false), weak_factory_(this) {
32 WebSocketReadQueue::~WebSocketReadQueue() {
35 void WebSocketReadQueue::Read(
37 const base::Callback
<void(const char*)>& callback
) {
38 Operation
* op
= new Operation(num_bytes
, callback
);
48 void WebSocketReadQueue::TryToRead() {
50 DCHECK(!queue_
.empty());
52 Operation
* op
= queue_
[0];
53 const void* buffer
= nullptr;
54 uint32_t buffer_size
= 0;
55 MojoResult result
= BeginReadDataRaw(handle_
, &buffer
, &buffer_size
,
56 MOJO_READ_DATA_FLAG_NONE
);
57 if (result
== MOJO_RESULT_SHOULD_WAIT
) {
62 // http://crbug.com/490193 This should run callback as well. May need to
63 // change the callback signature.
64 if (result
!= MOJO_RESULT_OK
)
67 uint32_t bytes_read
= buffer_size
< op
->num_bytes_
- op
->current_num_bytes_
69 : op
->num_bytes_
- op
->current_num_bytes_
;
71 // If this is not the initial read, or this is the initial read but doesn't
72 // return enough data, copy the data into |op->data_buffer_|.
73 if (op
->data_buffer_
||
74 bytes_read
< op
->num_bytes_
- op
->current_num_bytes_
) {
75 if (!op
->data_buffer_
) {
76 DCHECK_EQ(0u, op
->current_num_bytes_
);
77 op
->data_buffer_
.reset(new char[op
->num_bytes_
]);
80 memcpy(op
->data_buffer_
.get() + op
->current_num_bytes_
, buffer
,
83 op
->current_num_bytes_
+= bytes_read
;
84 DataPipeConsumerHandle handle
= handle_
;
85 base::WeakPtr
<WebSocketReadQueue
> self(weak_factory_
.GetWeakPtr());
87 if (op
->current_num_bytes_
>= op
->num_bytes_
) {
88 DCHECK_EQ(op
->current_num_bytes_
, op
->num_bytes_
);
89 const char* returned_buffer
= op
->data_buffer_
90 ? op
->data_buffer_
.get()
91 : static_cast<const char*>(buffer
);
93 // Ensure |op| is deleted, whether or not |this| goes away.
94 scoped_ptr
<Operation
> op_deleter(op
);
95 queue_
.weak_erase(queue_
.begin());
97 // This call may delete |this|. In that case, |self| will be invalidated.
98 // It may re-enter Read() too. Because |is_busy_| is true during the whole
99 // process, TryToRead() won't be re-entered.
100 op
->callback_
.Run(returned_buffer
);
103 EndReadDataRaw(handle
, bytes_read
);
107 } while (!queue_
.empty());
111 void WebSocketReadQueue::Wait() {
113 handle_watcher_
.Start(
115 MOJO_HANDLE_SIGNAL_READABLE
,
116 MOJO_DEADLINE_INDEFINITE
,
117 base::Bind(&WebSocketReadQueue::OnHandleReady
, base::Unretained(this)));
120 void WebSocketReadQueue::OnHandleReady(MojoResult result
) {