Merge Chromium + Blink git repositories
[chromium-blink-merge.git] / mojo / services / network / public / cpp / web_socket_read_queue.cc
blobc5923d3de1d12f9a5a24c6a9f1acaf22c4d933cf
1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "network/public/cpp/web_socket_read_queue.h"
7 #include "base/bind.h"
8 #include "base/logging.h"
9 #include "base/memory/scoped_ptr.h"
11 namespace mojo {
13 struct WebSocketReadQueue::Operation {
14 Operation(uint32_t num_bytes,
15 const base::Callback<void(const char*)>& callback)
16 : num_bytes_(num_bytes), callback_(callback), current_num_bytes_(0) {}
18 uint32_t num_bytes_;
19 base::Callback<void(const char*)> callback_;
21 // If the initial read doesn't return enough data, this array is used to
22 // accumulate data from multiple reads.
23 scoped_ptr<char[]> data_buffer_;
24 // The number of bytes accumulated so far.
25 uint32_t current_num_bytes_;
28 WebSocketReadQueue::WebSocketReadQueue(DataPipeConsumerHandle handle)
29 : handle_(handle), is_busy_(false), weak_factory_(this) {
32 WebSocketReadQueue::~WebSocketReadQueue() {
35 void WebSocketReadQueue::Read(
36 uint32_t num_bytes,
37 const base::Callback<void(const char*)>& callback) {
38 Operation* op = new Operation(num_bytes, callback);
39 queue_.push_back(op);
41 if (is_busy_)
42 return;
44 is_busy_ = true;
45 TryToRead();
48 void WebSocketReadQueue::TryToRead() {
49 DCHECK(is_busy_);
50 DCHECK(!queue_.empty());
51 do {
52 Operation* op = queue_[0];
53 const void* buffer = nullptr;
54 uint32_t buffer_size = 0;
55 MojoResult result = BeginReadDataRaw(handle_, &buffer, &buffer_size,
56 MOJO_READ_DATA_FLAG_NONE);
57 if (result == MOJO_RESULT_SHOULD_WAIT) {
58 Wait();
59 return;
62 // http://crbug.com/490193 This should run callback as well. May need to
63 // change the callback signature.
64 if (result != MOJO_RESULT_OK)
65 return;
67 uint32_t bytes_read = buffer_size < op->num_bytes_ - op->current_num_bytes_
68 ? buffer_size
69 : op->num_bytes_ - op->current_num_bytes_;
71 // If this is not the initial read, or this is the initial read but doesn't
72 // return enough data, copy the data into |op->data_buffer_|.
73 if (op->data_buffer_ ||
74 bytes_read < op->num_bytes_ - op->current_num_bytes_) {
75 if (!op->data_buffer_) {
76 DCHECK_EQ(0u, op->current_num_bytes_);
77 op->data_buffer_.reset(new char[op->num_bytes_]);
80 memcpy(op->data_buffer_.get() + op->current_num_bytes_, buffer,
81 bytes_read);
83 op->current_num_bytes_ += bytes_read;
84 DataPipeConsumerHandle handle = handle_;
85 base::WeakPtr<WebSocketReadQueue> self(weak_factory_.GetWeakPtr());
87 if (op->current_num_bytes_ >= op->num_bytes_) {
88 DCHECK_EQ(op->current_num_bytes_, op->num_bytes_);
89 const char* returned_buffer = op->data_buffer_
90 ? op->data_buffer_.get()
91 : static_cast<const char*>(buffer);
93 // Ensure |op| is deleted, whether or not |this| goes away.
94 scoped_ptr<Operation> op_deleter(op);
95 queue_.weak_erase(queue_.begin());
97 // This call may delete |this|. In that case, |self| will be invalidated.
98 // It may re-enter Read() too. Because |is_busy_| is true during the whole
99 // process, TryToRead() won't be re-entered.
100 op->callback_.Run(returned_buffer);
103 EndReadDataRaw(handle, bytes_read);
105 if (!self)
106 return;
107 } while (!queue_.empty());
108 is_busy_ = false;
111 void WebSocketReadQueue::Wait() {
112 DCHECK(is_busy_);
113 handle_watcher_.Start(
114 handle_,
115 MOJO_HANDLE_SIGNAL_READABLE,
116 MOJO_DEADLINE_INDEFINITE,
117 base::Bind(&WebSocketReadQueue::OnHandleReady, base::Unretained(this)));
120 void WebSocketReadQueue::OnHandleReady(MojoResult result) {
121 DCHECK(is_busy_);
122 TryToRead();
125 } // namespace mojo