By moving the call to Load() up in SearchProvider::Start(), we are giving a chance...
[chromium-blink-merge.git] / net / tools / flip_server / sm_connection.cc
blobf47b502d0d7e2bfc0bec889b5737f59dd5962416
1 // Copyright (c) 2009 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "net/tools/flip_server/sm_connection.h"
7 #include <errno.h>
8 #include <netinet/tcp.h>
9 #include <sys/socket.h>
11 #include <list>
12 #include <string>
14 #include "net/tools/flip_server/constants.h"
15 #include "net/tools/flip_server/flip_config.h"
16 #include "net/tools/flip_server/http_interface.h"
17 #include "net/tools/flip_server/spdy_interface.h"
18 #include "net/tools/flip_server/spdy_ssl.h"
19 #include "net/tools/flip_server/streamer_interface.h"
21 namespace net {
23 // static
24 bool SMConnection::force_spdy_ = false;
26 DataFrame::~DataFrame() {
27 if (delete_when_done)
28 delete[] data;
31 SMConnection::SMConnection(EpollServer* epoll_server,
32 SSLState* ssl_state,
33 MemoryCache* memory_cache,
34 FlipAcceptor* acceptor,
35 std::string log_prefix)
36 : last_read_time_(0),
37 fd_(-1),
38 events_(0),
39 registered_in_epoll_server_(false),
40 initialized_(false),
41 protocol_detected_(false),
42 connection_complete_(false),
43 connection_pool_(NULL),
44 epoll_server_(epoll_server),
45 ssl_state_(ssl_state),
46 memory_cache_(memory_cache),
47 acceptor_(acceptor),
48 read_buffer_(kSpdySegmentSize * 40),
49 sm_spdy_interface_(NULL),
50 sm_http_interface_(NULL),
51 sm_streamer_interface_(NULL),
52 sm_interface_(NULL),
53 log_prefix_(log_prefix),
54 max_bytes_sent_per_dowrite_(4096),
55 ssl_(NULL) {
58 SMConnection::~SMConnection() {
59 if (initialized())
60 Reset();
63 EpollServer* SMConnection::epoll_server() {
64 return epoll_server_;
67 void SMConnection::ReadyToSend() {
68 VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT
69 << "Setting ready to send: EPOLLIN | EPOLLOUT";
70 epoll_server_->SetFDReady(fd_, EPOLLIN | EPOLLOUT);
73 void SMConnection::EnqueueDataFrame(DataFrame* df) {
74 output_list_.push_back(df);
75 VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT << "EnqueueDataFrame: "
76 << "size = " << df->size << ": Setting FD ready.";
77 ReadyToSend();
80 void SMConnection::InitSMConnection(SMConnectionPoolInterface* connection_pool,
81 SMInterface* sm_interface,
82 EpollServer* epoll_server,
83 int fd,
84 std::string server_ip,
85 std::string server_port,
86 std::string remote_ip,
87 bool use_ssl) {
88 if (initialized_) {
89 LOG(FATAL) << "Attempted to initialize already initialized server";
90 return;
93 client_ip_ = remote_ip;
95 if (fd == -1) {
96 // If fd == -1, then we are initializing a new connection that will
97 // connect to the backend.
99 // ret: -1 == error
100 // 0 == connection in progress
101 // 1 == connection complete
102 // TODO(kelindsay): is_numeric_host_address value needs to be detected
103 server_ip_ = server_ip;
104 server_port_ = server_port;
105 int ret = CreateConnectedSocket(&fd_,
106 server_ip,
107 server_port,
108 true,
109 acceptor_->disable_nagle_);
111 if (ret < 0) {
112 LOG(ERROR) << "-1 Could not create connected socket";
113 return;
114 } else if (ret == 1) {
115 DCHECK_NE(-1, fd_);
116 connection_complete_ = true;
117 VLOG(1) << log_prefix_ << ACCEPTOR_CLIENT_IDENT
118 << "Connection complete to: " << server_ip_ << ":"
119 << server_port_ << " ";
121 VLOG(1) << log_prefix_ << ACCEPTOR_CLIENT_IDENT
122 << "Connecting to server: " << server_ip_ << ":"
123 << server_port_ << " ";
124 } else {
125 // If fd != -1 then we are initializing a connection that has just been
126 // accepted from the listen socket.
127 connection_complete_ = true;
128 if (epoll_server_ && registered_in_epoll_server_ && fd_ != -1) {
129 epoll_server_->UnregisterFD(fd_);
131 if (fd_ != -1) {
132 VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT
133 << "Closing pre-existing fd";
134 close(fd_);
135 fd_ = -1;
138 fd_ = fd;
141 registered_in_epoll_server_ = false;
142 // Set the last read time here as the idle checker will start from
143 // now.
144 last_read_time_ = time(NULL);
145 initialized_ = true;
147 connection_pool_ = connection_pool;
148 epoll_server_ = epoll_server;
150 if (sm_interface) {
151 sm_interface_ = sm_interface;
152 protocol_detected_ = true;
155 read_buffer_.Clear();
157 epoll_server_->RegisterFD(fd_, this, EPOLLIN | EPOLLOUT | EPOLLET);
159 if (use_ssl) {
160 ssl_ = CreateSSLContext(ssl_state_->ssl_ctx);
161 SSL_set_fd(ssl_, fd_);
162 PrintSslError();
166 void SMConnection::CorkSocket() {
167 int state = 1;
168 int rv = setsockopt(fd_, IPPROTO_TCP, TCP_CORK, &state, sizeof(state));
169 if (rv < 0)
170 VLOG(1) << "setsockopt(CORK): " << errno;
173 void SMConnection::UncorkSocket() {
174 int state = 0;
175 int rv = setsockopt(fd_, IPPROTO_TCP, TCP_CORK, &state, sizeof(state));
176 if (rv < 0)
177 VLOG(1) << "setsockopt(CORK): " << errno;
180 int SMConnection::Send(const char* data, int len, int flags) {
181 int rv = 0;
182 CorkSocket();
183 if (ssl_) {
184 ssize_t bytes_written = 0;
185 // Write smallish chunks to SSL so that we don't have large
186 // multi-packet TLS records to receive before being able to handle
187 // the data. We don't have to be too careful here, because our data
188 // frames are already getting chunked appropriately, and those are
189 // the most likely "big" frames.
190 while (len > 0) {
191 const int kMaxTLSRecordSize = 1500;
192 const char* ptr = &(data[bytes_written]);
193 int chunksize = std::min(len, kMaxTLSRecordSize);
194 rv = SSL_write(ssl_, ptr, chunksize);
195 VLOG(2) << "SSLWrite(" << chunksize << " bytes): " << rv;
196 if (rv <= 0) {
197 switch (SSL_get_error(ssl_, rv)) {
198 case SSL_ERROR_WANT_READ:
199 case SSL_ERROR_WANT_WRITE:
200 case SSL_ERROR_WANT_ACCEPT:
201 case SSL_ERROR_WANT_CONNECT:
202 rv = -2;
203 break;
204 default:
205 PrintSslError();
206 break;
208 break;
210 bytes_written += rv;
211 len -= rv;
212 if (rv != chunksize)
213 break; // If we couldn't write everything, we're implicitly stalled
215 // If we wrote some data, return that count. Otherwise
216 // return the stall error.
217 if (bytes_written > 0)
218 rv = bytes_written;
219 } else {
220 rv = send(fd_, data, len, flags);
222 if (!(flags & MSG_MORE))
223 UncorkSocket();
224 return rv;
227 void SMConnection::OnRegistration(EpollServer* eps, int fd, int event_mask) {
228 registered_in_epoll_server_ = true;
231 void SMConnection::OnEvent(int fd, EpollEvent* event) {
232 events_ |= event->in_events;
233 HandleEvents();
234 if (events_) {
235 event->out_ready_mask = events_;
236 events_ = 0;
240 void SMConnection::OnUnregistration(int fd, bool replaced) {
241 registered_in_epoll_server_ = false;
244 void SMConnection::OnShutdown(EpollServer* eps, int fd) {
245 Cleanup("OnShutdown");
246 return;
249 void SMConnection::Cleanup(const char* cleanup) {
250 VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT << "Cleanup: " << cleanup;
251 if (!initialized_)
252 return;
253 Reset();
254 if (connection_pool_)
255 connection_pool_->SMConnectionDone(this);
256 if (sm_interface_)
257 sm_interface_->ResetForNewConnection();
258 last_read_time_ = 0;
261 void SMConnection::HandleEvents() {
262 VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT << "Received: "
263 << EpollServer::EventMaskToString(events_).c_str();
265 if (events_ & EPOLLIN) {
266 if (!DoRead())
267 goto handle_close_or_error;
270 if (events_ & EPOLLOUT) {
271 // Check if we have connected or not
272 if (connection_complete_ == false) {
273 int sock_error;
274 socklen_t sock_error_len = sizeof(sock_error);
275 int ret = getsockopt(fd_, SOL_SOCKET, SO_ERROR, &sock_error,
276 &sock_error_len);
277 if (ret != 0) {
278 VLOG(1) << log_prefix_ << ACCEPTOR_CLIENT_IDENT
279 << "getsockopt error: " << errno << ": " << strerror(errno);
280 goto handle_close_or_error;
282 if (sock_error == 0) {
283 connection_complete_ = true;
284 VLOG(1) << log_prefix_ << ACCEPTOR_CLIENT_IDENT
285 << "Connection complete to " << server_ip_ << ":"
286 << server_port_ << " ";
287 } else if (sock_error == EINPROGRESS) {
288 return;
289 } else {
290 VLOG(1) << log_prefix_ << ACCEPTOR_CLIENT_IDENT
291 << "error connecting to server";
292 goto handle_close_or_error;
295 if (!DoWrite())
296 goto handle_close_or_error;
299 if (events_ & (EPOLLHUP | EPOLLERR)) {
300 VLOG(1) << log_prefix_ << ACCEPTOR_CLIENT_IDENT << "!!! Got HUP or ERR";
301 goto handle_close_or_error;
303 return;
305 handle_close_or_error:
306 Cleanup("HandleEvents");
309 // Decide if SPDY was negotiated.
310 bool SMConnection::WasSpdyNegotiated() {
311 if (force_spdy())
312 return true;
314 // If this is an SSL connection, check if NPN specifies SPDY.
315 if (ssl_) {
316 const unsigned char *npn_proto;
317 unsigned int npn_proto_len;
318 SSL_get0_next_proto_negotiated(ssl_, &npn_proto, &npn_proto_len);
319 if (npn_proto_len > 0) {
320 std::string npn_proto_str((const char *)npn_proto, npn_proto_len);
321 VLOG(1) << log_prefix_ << ACCEPTOR_CLIENT_IDENT
322 << "NPN protocol detected: " << npn_proto_str;
323 if (!strncmp(reinterpret_cast<const char*>(npn_proto),
324 "spdy/2", npn_proto_len))
325 return true;
329 return false;
332 bool SMConnection::SetupProtocolInterfaces() {
333 DCHECK(!protocol_detected_);
334 protocol_detected_ = true;
336 bool spdy_negotiated = WasSpdyNegotiated();
337 bool using_ssl = ssl_ != NULL;
339 if (using_ssl)
340 VLOG(1) << (SSL_session_reused(ssl_) ? "Resumed" : "Renegotiated")
341 << " SSL Session.";
343 if (acceptor_->spdy_only_ && !spdy_negotiated) {
344 VLOG(1) << log_prefix_ << ACCEPTOR_CLIENT_IDENT
345 << "SPDY proxy only, closing HTTPS connection.";
346 return false;
349 switch (acceptor_->flip_handler_type_) {
350 case FLIP_HANDLER_HTTP_SERVER:
352 DCHECK(!spdy_negotiated);
353 VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT
354 << (sm_http_interface_ ? "Creating" : "Reusing")
355 << " HTTP interface.";
356 if (!sm_http_interface_)
357 sm_http_interface_ = new HttpSM(this,
358 NULL,
359 epoll_server_,
360 memory_cache_,
361 acceptor_);
362 sm_interface_ = sm_http_interface_;
364 break;
365 case FLIP_HANDLER_PROXY:
367 VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT
368 << (sm_streamer_interface_ ? "Creating" : "Reusing")
369 << " PROXY Streamer interface.";
370 if (!sm_streamer_interface_) {
371 sm_streamer_interface_ = new StreamerSM(this,
372 NULL,
373 epoll_server_,
374 acceptor_);
375 sm_streamer_interface_->set_is_request();
377 sm_interface_ = sm_streamer_interface_;
378 // If spdy is not negotiated, the streamer interface will proxy all
379 // data to the origin server.
380 if (!spdy_negotiated)
381 break;
383 // Otherwise fall through into the case below.
384 case FLIP_HANDLER_SPDY_SERVER:
386 DCHECK(spdy_negotiated);
387 VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT
388 << (sm_spdy_interface_ ? "Creating" : "Reusing")
389 << " SPDY interface.";
390 if (!sm_spdy_interface_)
391 sm_spdy_interface_ = new SpdySM(this,
392 NULL,
393 epoll_server_,
394 memory_cache_,
395 acceptor_);
396 sm_interface_ = sm_spdy_interface_;
398 break;
401 CorkSocket();
402 if (!sm_interface_->PostAcceptHook())
403 return false;
405 return true;
408 bool SMConnection::DoRead() {
409 VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT << "DoRead()";
410 while (!read_buffer_.Full()) {
411 char* bytes;
412 int size;
413 if (fd_ == -1) {
414 VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT
415 << "DoRead(): fd_ == -1. Invalid FD. Returning false";
416 return false;
418 read_buffer_.GetWritablePtr(&bytes, &size);
419 ssize_t bytes_read = 0;
420 if (ssl_) {
421 bytes_read = SSL_read(ssl_, bytes, size);
422 if (bytes_read < 0) {
423 int err = SSL_get_error(ssl_, bytes_read);
424 switch (err) {
425 case SSL_ERROR_WANT_READ:
426 case SSL_ERROR_WANT_WRITE:
427 case SSL_ERROR_WANT_ACCEPT:
428 case SSL_ERROR_WANT_CONNECT:
429 events_ &= ~EPOLLIN;
430 VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT
431 << "DoRead: SSL WANT_XXX: " << err;
432 goto done;
433 default:
434 PrintSslError();
435 goto error_or_close;
438 } else {
439 bytes_read = recv(fd_, bytes, size, MSG_DONTWAIT);
441 int stored_errno = errno;
442 if (bytes_read == -1) {
443 switch (stored_errno) {
444 case EAGAIN:
445 events_ &= ~EPOLLIN;
446 VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT
447 << "Got EAGAIN while reading";
448 goto done;
449 case EINTR:
450 VLOG(1) << log_prefix_ << ACCEPTOR_CLIENT_IDENT
451 << "Got EINTR while reading";
452 continue;
453 default:
454 VLOG(1) << log_prefix_ << ACCEPTOR_CLIENT_IDENT
455 << "While calling recv, got error: "
456 << (ssl_?"(ssl error)":strerror(stored_errno));
457 goto error_or_close;
459 } else if (bytes_read > 0) {
460 VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT << "read " << bytes_read
461 << " bytes";
462 last_read_time_ = time(NULL);
463 // If the protocol hasn't been detected yet, set up the handlers
464 // we'll need.
465 if (!protocol_detected_) {
466 if (!SetupProtocolInterfaces()) {
467 LOG(ERROR) << "Error setting up protocol interfaces.";
468 goto error_or_close;
471 read_buffer_.AdvanceWritablePtr(bytes_read);
472 if (!DoConsumeReadData())
473 goto error_or_close;
474 continue;
475 } else { // bytes_read == 0
476 VLOG(1) << log_prefix_ << ACCEPTOR_CLIENT_IDENT
477 << "0 bytes read with recv call.";
479 goto error_or_close;
481 done:
482 VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT << "DoRead done!";
483 return true;
485 error_or_close:
486 VLOG(1) << log_prefix_ << ACCEPTOR_CLIENT_IDENT
487 << "DoRead(): error_or_close. "
488 << "Cleaning up, then returning false";
489 Cleanup("DoRead");
490 return false;
493 bool SMConnection::DoConsumeReadData() {
494 char* bytes;
495 int size;
496 read_buffer_.GetReadablePtr(&bytes, &size);
497 while (size != 0) {
498 size_t bytes_consumed = sm_interface_->ProcessReadInput(bytes, size);
499 VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT << "consumed "
500 << bytes_consumed << " bytes";
501 if (bytes_consumed == 0) {
502 break;
504 read_buffer_.AdvanceReadablePtr(bytes_consumed);
505 if (sm_interface_->MessageFullyRead()) {
506 VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT
507 << "HandleRequestFullyRead: Setting EPOLLOUT";
508 HandleResponseFullyRead();
509 events_ |= EPOLLOUT;
510 } else if (sm_interface_->Error()) {
511 LOG(ERROR) << log_prefix_ << ACCEPTOR_CLIENT_IDENT
512 << "Framer error detected: Setting EPOLLOUT: "
513 << sm_interface_->ErrorAsString();
514 // this causes everything to be closed/cleaned up.
515 events_ |= EPOLLOUT;
516 return false;
518 read_buffer_.GetReadablePtr(&bytes, &size);
520 return true;
523 void SMConnection::HandleResponseFullyRead() {
524 sm_interface_->Cleanup();
527 bool SMConnection::DoWrite() {
528 size_t bytes_sent = 0;
529 int flags = MSG_NOSIGNAL | MSG_DONTWAIT;
530 if (fd_ == -1) {
531 VLOG(1) << log_prefix_ << ACCEPTOR_CLIENT_IDENT
532 << "DoWrite: fd == -1. Returning false.";
533 return false;
535 if (output_list_.empty()) {
536 VLOG(2) << log_prefix_ << "DoWrite: Output list empty.";
537 if (sm_interface_) {
538 sm_interface_->GetOutput();
540 if (output_list_.empty()) {
541 events_ &= ~EPOLLOUT;
544 while (!output_list_.empty()) {
545 VLOG(2) << log_prefix_ << "DoWrite: Items in output list: "
546 << output_list_.size();
547 if (bytes_sent >= max_bytes_sent_per_dowrite_) {
548 VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT
549 << " byte sent >= max bytes sent per write: Setting EPOLLOUT: "
550 << bytes_sent;
551 events_ |= EPOLLOUT;
552 break;
554 if (sm_interface_ && output_list_.size() < 2) {
555 sm_interface_->GetOutput();
557 DataFrame* data_frame = output_list_.front();
558 const char* bytes = data_frame->data;
559 int size = data_frame->size;
560 bytes += data_frame->index;
561 size -= data_frame->index;
562 DCHECK_GE(size, 0);
563 if (size <= 0) {
564 output_list_.pop_front();
565 delete data_frame;
566 continue;
569 flags = MSG_NOSIGNAL | MSG_DONTWAIT;
570 // Look for a queue size > 1 because |this| frame is remains on the list
571 // until it has finished sending.
572 if (output_list_.size() > 1) {
573 VLOG(2) << log_prefix_ << "Outlist size: " << output_list_.size()
574 << ": Adding MSG_MORE flag";
575 flags |= MSG_MORE;
577 VLOG(2) << log_prefix_ << "Attempting to send " << size << " bytes.";
578 ssize_t bytes_written = Send(bytes, size, flags);
579 int stored_errno = errno;
580 if (bytes_written == -1) {
581 switch (stored_errno) {
582 case EAGAIN:
583 events_ &= ~EPOLLOUT;
584 VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT
585 << "Got EAGAIN while writing";
586 goto done;
587 case EINTR:
588 VLOG(1) << log_prefix_ << ACCEPTOR_CLIENT_IDENT
589 << "Got EINTR while writing";
590 continue;
591 default:
592 VLOG(1) << log_prefix_ << ACCEPTOR_CLIENT_IDENT
593 << "While calling send, got error: " << stored_errno
594 << ": " << (ssl_?"":strerror(stored_errno));
595 goto error_or_close;
597 } else if (bytes_written > 0) {
598 VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT << "Wrote: "
599 << bytes_written << " bytes";
600 data_frame->index += bytes_written;
601 bytes_sent += bytes_written;
602 continue;
603 } else if (bytes_written == -2) {
604 // -2 handles SSL_ERROR_WANT_* errors
605 events_ &= ~EPOLLOUT;
606 goto done;
608 VLOG(1) << log_prefix_ << ACCEPTOR_CLIENT_IDENT
609 << "0 bytes written with send call.";
610 goto error_or_close;
612 done:
613 UncorkSocket();
614 return true;
616 error_or_close:
617 VLOG(1) << log_prefix_ << ACCEPTOR_CLIENT_IDENT
618 << "DoWrite: error_or_close. Returning false "
619 << "after cleaning up";
620 Cleanup("DoWrite");
621 UncorkSocket();
622 return false;
625 void SMConnection::Reset() {
626 VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT << "Resetting";
627 if (ssl_) {
628 SSL_shutdown(ssl_);
629 PrintSslError();
630 SSL_free(ssl_);
631 PrintSslError();
632 ssl_ = NULL;
634 if (registered_in_epoll_server_) {
635 epoll_server_->UnregisterFD(fd_);
636 registered_in_epoll_server_ = false;
638 if (fd_ >= 0) {
639 VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT << "Closing connection";
640 close(fd_);
641 fd_ = -1;
643 read_buffer_.Clear();
644 initialized_ = false;
645 protocol_detected_ = false;
646 events_ = 0;
647 for (std::list<DataFrame*>::iterator i =
648 output_list_.begin();
649 i != output_list_.end();
650 ++i) {
651 delete *i;
653 output_list_.clear();
656 // static
657 SMConnection* SMConnection::NewSMConnection(EpollServer* epoll_server,
658 SSLState *ssl_state,
659 MemoryCache* memory_cache,
660 FlipAcceptor *acceptor,
661 std::string log_prefix) {
662 return new SMConnection(epoll_server, ssl_state, memory_cache,
663 acceptor, log_prefix);
666 } // namespace net