Fix "#if defined(DEBUG)" statements
[chromium-blink-merge.git] / net / tools / flip_server / sm_connection.cc
blob4acdea44adf10c2365b287125e94bb77f8ce6ed2
1 // Copyright (c) 2009 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "net/tools/flip_server/sm_connection.h"
7 #include <errno.h>
8 #include <netinet/tcp.h>
9 #include <sys/socket.h>
10 #include <unistd.h>
12 #include <algorithm>
13 #include <list>
14 #include <string>
16 #include "net/tools/flip_server/constants.h"
17 #include "net/tools/flip_server/flip_config.h"
18 #include "net/tools/flip_server/http_interface.h"
19 #include "net/tools/flip_server/spdy_interface.h"
20 #include "net/tools/flip_server/spdy_ssl.h"
21 #include "net/tools/flip_server/streamer_interface.h"
23 namespace net {
25 // static
26 bool SMConnection::force_spdy_ = false;
28 DataFrame::~DataFrame() {
29 if (delete_when_done)
30 delete[] data;
33 SMConnection::SMConnection(EpollServer* epoll_server,
34 SSLState* ssl_state,
35 MemoryCache* memory_cache,
36 FlipAcceptor* acceptor,
37 std::string log_prefix)
38 : last_read_time_(0),
39 fd_(-1),
40 events_(0),
41 registered_in_epoll_server_(false),
42 initialized_(false),
43 protocol_detected_(false),
44 connection_complete_(false),
45 connection_pool_(NULL),
46 epoll_server_(epoll_server),
47 ssl_state_(ssl_state),
48 memory_cache_(memory_cache),
49 acceptor_(acceptor),
50 read_buffer_(kSpdySegmentSize * 40),
51 sm_spdy_interface_(NULL),
52 sm_http_interface_(NULL),
53 sm_streamer_interface_(NULL),
54 sm_interface_(NULL),
55 log_prefix_(log_prefix),
56 max_bytes_sent_per_dowrite_(4096),
57 ssl_(NULL) {}
59 SMConnection::~SMConnection() {
60 if (initialized())
61 Reset();
64 EpollServer* SMConnection::epoll_server() { return epoll_server_; }
66 void SMConnection::ReadyToSend() {
67 VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT
68 << "Setting ready to send: EPOLLIN | EPOLLOUT";
69 epoll_server_->SetFDReady(fd_, EPOLLIN | EPOLLOUT);
72 void SMConnection::EnqueueDataFrame(DataFrame* df) {
73 output_list_.push_back(df);
74 VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT << "EnqueueDataFrame: "
75 << "size = " << df->size << ": Setting FD ready.";
76 ReadyToSend();
79 void SMConnection::InitSMConnection(SMConnectionPoolInterface* connection_pool,
80 SMInterface* sm_interface,
81 EpollServer* epoll_server,
82 int fd,
83 std::string server_ip,
84 std::string server_port,
85 std::string remote_ip,
86 bool use_ssl) {
87 if (initialized_) {
88 LOG(FATAL) << "Attempted to initialize already initialized server";
89 return;
92 client_ip_ = remote_ip;
94 if (fd == -1) {
95 // If fd == -1, then we are initializing a new connection that will
96 // connect to the backend.
98 // ret: -1 == error
99 // 0 == connection in progress
100 // 1 == connection complete
101 // TODO(kelindsay): is_numeric_host_address value needs to be detected
102 server_ip_ = server_ip;
103 server_port_ = server_port;
104 int ret = CreateConnectedSocket(
105 &fd_, server_ip, server_port, true, acceptor_->disable_nagle_);
107 if (ret < 0) {
108 LOG(ERROR) << "-1 Could not create connected socket";
109 return;
110 } else if (ret == 1) {
111 DCHECK_NE(-1, fd_);
112 connection_complete_ = true;
113 VLOG(1) << log_prefix_ << ACCEPTOR_CLIENT_IDENT
114 << "Connection complete to: " << server_ip_ << ":" << server_port_
115 << " ";
117 VLOG(1) << log_prefix_ << ACCEPTOR_CLIENT_IDENT
118 << "Connecting to server: " << server_ip_ << ":" << server_port_
119 << " ";
120 } else {
121 // If fd != -1 then we are initializing a connection that has just been
122 // accepted from the listen socket.
123 connection_complete_ = true;
124 if (epoll_server_ && registered_in_epoll_server_ && fd_ != -1) {
125 epoll_server_->UnregisterFD(fd_);
127 if (fd_ != -1) {
128 VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT
129 << "Closing pre-existing fd";
130 close(fd_);
131 fd_ = -1;
134 fd_ = fd;
137 registered_in_epoll_server_ = false;
138 // Set the last read time here as the idle checker will start from
139 // now.
140 last_read_time_ = time(NULL);
141 initialized_ = true;
143 connection_pool_ = connection_pool;
144 epoll_server_ = epoll_server;
146 if (sm_interface) {
147 sm_interface_ = sm_interface;
148 protocol_detected_ = true;
151 read_buffer_.Clear();
153 epoll_server_->RegisterFD(fd_, this, EPOLLIN | EPOLLOUT | EPOLLET);
155 if (use_ssl) {
156 ssl_ = CreateSSLContext(ssl_state_->ssl_ctx);
157 SSL_set_fd(ssl_, fd_);
158 PrintSslError();
162 void SMConnection::CorkSocket() {
163 int state = 1;
164 int rv = setsockopt(fd_, IPPROTO_TCP, TCP_CORK, &state, sizeof(state));
165 if (rv < 0)
166 VLOG(1) << "setsockopt(CORK): " << errno;
169 void SMConnection::UncorkSocket() {
170 int state = 0;
171 int rv = setsockopt(fd_, IPPROTO_TCP, TCP_CORK, &state, sizeof(state));
172 if (rv < 0)
173 VLOG(1) << "setsockopt(CORK): " << errno;
176 int SMConnection::Send(const char* data, int len, int flags) {
177 int rv = 0;
178 CorkSocket();
179 if (ssl_) {
180 ssize_t bytes_written = 0;
181 // Write smallish chunks to SSL so that we don't have large
182 // multi-packet TLS records to receive before being able to handle
183 // the data. We don't have to be too careful here, because our data
184 // frames are already getting chunked appropriately, and those are
185 // the most likely "big" frames.
186 while (len > 0) {
187 const int kMaxTLSRecordSize = 1500;
188 const char* ptr = &(data[bytes_written]);
189 int chunksize = std::min(len, kMaxTLSRecordSize);
190 rv = SSL_write(ssl_, ptr, chunksize);
191 VLOG(2) << "SSLWrite(" << chunksize << " bytes): " << rv;
192 if (rv <= 0) {
193 switch (SSL_get_error(ssl_, rv)) {
194 case SSL_ERROR_WANT_READ:
195 case SSL_ERROR_WANT_WRITE:
196 case SSL_ERROR_WANT_ACCEPT:
197 case SSL_ERROR_WANT_CONNECT:
198 rv = -2;
199 break;
200 default:
201 PrintSslError();
202 break;
204 break;
206 bytes_written += rv;
207 len -= rv;
208 if (rv != chunksize)
209 break; // If we couldn't write everything, we're implicitly stalled
211 // If we wrote some data, return that count. Otherwise
212 // return the stall error.
213 if (bytes_written > 0)
214 rv = bytes_written;
215 } else {
216 rv = send(fd_, data, len, flags);
218 if (!(flags & MSG_MORE))
219 UncorkSocket();
220 return rv;
223 void SMConnection::OnRegistration(EpollServer* eps, int fd, int event_mask) {
224 registered_in_epoll_server_ = true;
227 void SMConnection::OnEvent(int fd, EpollEvent* event) {
228 events_ |= event->in_events;
229 HandleEvents();
230 if (events_) {
231 event->out_ready_mask = events_;
232 events_ = 0;
236 void SMConnection::OnUnregistration(int fd, bool replaced) {
237 registered_in_epoll_server_ = false;
240 void SMConnection::OnShutdown(EpollServer* eps, int fd) {
241 Cleanup("OnShutdown");
242 return;
245 void SMConnection::Cleanup(const char* cleanup) {
246 VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT << "Cleanup: " << cleanup;
247 if (!initialized_)
248 return;
249 Reset();
250 if (connection_pool_)
251 connection_pool_->SMConnectionDone(this);
252 if (sm_interface_)
253 sm_interface_->ResetForNewConnection();
254 last_read_time_ = 0;
257 void SMConnection::HandleEvents() {
258 VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT
259 << "Received: " << EpollServer::EventMaskToString(events_).c_str();
261 if (events_ & EPOLLIN) {
262 if (!DoRead())
263 goto handle_close_or_error;
266 if (events_ & EPOLLOUT) {
267 // Check if we have connected or not
268 if (connection_complete_ == false) {
269 int sock_error;
270 socklen_t sock_error_len = sizeof(sock_error);
271 int ret =
272 getsockopt(fd_, SOL_SOCKET, SO_ERROR, &sock_error, &sock_error_len);
273 if (ret != 0) {
274 VLOG(1) << log_prefix_ << ACCEPTOR_CLIENT_IDENT
275 << "getsockopt error: " << errno << ": " << strerror(errno);
276 goto handle_close_or_error;
278 if (sock_error == 0) {
279 connection_complete_ = true;
280 VLOG(1) << log_prefix_ << ACCEPTOR_CLIENT_IDENT
281 << "Connection complete to " << server_ip_ << ":"
282 << server_port_ << " ";
283 } else if (sock_error == EINPROGRESS) {
284 return;
285 } else {
286 VLOG(1) << log_prefix_ << ACCEPTOR_CLIENT_IDENT
287 << "error connecting to server";
288 goto handle_close_or_error;
291 if (!DoWrite())
292 goto handle_close_or_error;
295 if (events_ & (EPOLLHUP | EPOLLERR)) {
296 VLOG(1) << log_prefix_ << ACCEPTOR_CLIENT_IDENT << "!!! Got HUP or ERR";
297 goto handle_close_or_error;
299 return;
301 handle_close_or_error:
302 Cleanup("HandleEvents");
305 // Decide if SPDY was negotiated.
306 bool SMConnection::WasSpdyNegotiated(SpdyMajorVersion* version_negotiated) {
307 *version_negotiated = SPDY3;
308 if (force_spdy())
309 return true;
311 // If this is an SSL connection, check if NPN specifies SPDY.
312 if (ssl_) {
313 const unsigned char* npn_proto;
314 unsigned int npn_proto_len;
315 SSL_get0_next_proto_negotiated(ssl_, &npn_proto, &npn_proto_len);
316 if (npn_proto_len > 0) {
317 std::string npn_proto_str((const char*)npn_proto, npn_proto_len);
318 VLOG(1) << log_prefix_ << ACCEPTOR_CLIENT_IDENT
319 << "NPN protocol detected: " << npn_proto_str;
320 if (!strncmp(reinterpret_cast<const char*>(npn_proto),
321 "spdy/2",
322 npn_proto_len)) {
323 *version_negotiated = SPDY2;
324 return true;
326 if (!strncmp(reinterpret_cast<const char*>(npn_proto),
327 "spdy/3",
328 npn_proto_len)) {
329 *version_negotiated = SPDY3;
330 return true;
332 if (!strncmp(reinterpret_cast<const char*>(npn_proto),
333 "spdy/4a2",
334 npn_proto_len)) {
335 *version_negotiated = SPDY4;
336 return true;
341 return false;
344 bool SMConnection::SetupProtocolInterfaces() {
345 DCHECK(!protocol_detected_);
346 protocol_detected_ = true;
348 SpdyMajorVersion version;
349 bool spdy_negotiated = WasSpdyNegotiated(&version);
350 bool using_ssl = ssl_ != NULL;
352 if (using_ssl)
353 VLOG(1) << (SSL_session_reused(ssl_) ? "Resumed" : "Renegotiated")
354 << " SSL Session.";
356 if (acceptor_->spdy_only_ && !spdy_negotiated) {
357 VLOG(1) << log_prefix_ << ACCEPTOR_CLIENT_IDENT
358 << "SPDY proxy only, closing HTTPS connection.";
359 return false;
362 switch (acceptor_->flip_handler_type_) {
363 case FLIP_HANDLER_HTTP_SERVER: {
364 DCHECK(!spdy_negotiated);
365 VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT
366 << (sm_http_interface_ ? "Creating" : "Reusing")
367 << " HTTP interface.";
368 if (!sm_http_interface_)
369 sm_http_interface_ = new HttpSM(this, NULL, memory_cache_, acceptor_);
370 sm_interface_ = sm_http_interface_;
371 break;
373 case FLIP_HANDLER_PROXY: {
374 VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT
375 << (sm_streamer_interface_ ? "Creating" : "Reusing")
376 << " PROXY Streamer interface.";
377 if (!sm_streamer_interface_) {
378 sm_streamer_interface_ =
379 new StreamerSM(this, NULL, epoll_server_, acceptor_);
380 sm_streamer_interface_->set_is_request();
382 sm_interface_ = sm_streamer_interface_;
383 // If spdy is not negotiated, the streamer interface will proxy all
384 // data to the origin server.
385 if (!spdy_negotiated)
386 break;
388 // Otherwise fall through into the case below.
389 case FLIP_HANDLER_SPDY_SERVER: {
390 DCHECK(spdy_negotiated);
391 VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT
392 << (sm_spdy_interface_ ? "Creating" : "Reusing")
393 << " SPDY interface.";
394 if (sm_spdy_interface_)
395 sm_spdy_interface_->CreateFramer(version);
396 else
397 sm_spdy_interface_ = new SpdySM(
398 this, NULL, epoll_server_, memory_cache_, acceptor_, version);
399 sm_interface_ = sm_spdy_interface_;
400 break;
404 CorkSocket();
405 if (!sm_interface_->PostAcceptHook())
406 return false;
408 return true;
411 bool SMConnection::DoRead() {
412 VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT << "DoRead()";
413 while (!read_buffer_.Full()) {
414 char* bytes;
415 int size;
416 if (fd_ == -1) {
417 VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT
418 << "DoRead(): fd_ == -1. Invalid FD. Returning false";
419 return false;
421 read_buffer_.GetWritablePtr(&bytes, &size);
422 ssize_t bytes_read = 0;
423 if (ssl_) {
424 bytes_read = SSL_read(ssl_, bytes, size);
425 if (bytes_read < 0) {
426 int err = SSL_get_error(ssl_, bytes_read);
427 switch (err) {
428 case SSL_ERROR_WANT_READ:
429 case SSL_ERROR_WANT_WRITE:
430 case SSL_ERROR_WANT_ACCEPT:
431 case SSL_ERROR_WANT_CONNECT:
432 events_ &= ~EPOLLIN;
433 VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT
434 << "DoRead: SSL WANT_XXX: " << err;
435 goto done;
436 default:
437 PrintSslError();
438 goto error_or_close;
441 } else {
442 bytes_read = recv(fd_, bytes, size, MSG_DONTWAIT);
444 int stored_errno = errno;
445 if (bytes_read == -1) {
446 switch (stored_errno) {
447 case EAGAIN:
448 events_ &= ~EPOLLIN;
449 VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT
450 << "Got EAGAIN while reading";
451 goto done;
452 case EINTR:
453 VLOG(1) << log_prefix_ << ACCEPTOR_CLIENT_IDENT
454 << "Got EINTR while reading";
455 continue;
456 default:
457 VLOG(1) << log_prefix_ << ACCEPTOR_CLIENT_IDENT
458 << "While calling recv, got error: "
459 << (ssl_ ? "(ssl error)" : strerror(stored_errno));
460 goto error_or_close;
462 } else if (bytes_read > 0) {
463 VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT << "read " << bytes_read
464 << " bytes";
465 last_read_time_ = time(NULL);
466 // If the protocol hasn't been detected yet, set up the handlers
467 // we'll need.
468 if (!protocol_detected_) {
469 if (!SetupProtocolInterfaces()) {
470 LOG(ERROR) << "Error setting up protocol interfaces.";
471 goto error_or_close;
474 read_buffer_.AdvanceWritablePtr(bytes_read);
475 if (!DoConsumeReadData())
476 goto error_or_close;
477 continue;
478 } else { // bytes_read == 0
479 VLOG(1) << log_prefix_ << ACCEPTOR_CLIENT_IDENT
480 << "0 bytes read with recv call.";
482 goto error_or_close;
484 done:
485 VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT << "DoRead done!";
486 return true;
488 error_or_close:
489 VLOG(1) << log_prefix_ << ACCEPTOR_CLIENT_IDENT
490 << "DoRead(): error_or_close. "
491 << "Cleaning up, then returning false";
492 Cleanup("DoRead");
493 return false;
496 bool SMConnection::DoConsumeReadData() {
497 char* bytes;
498 int size;
499 read_buffer_.GetReadablePtr(&bytes, &size);
500 while (size != 0) {
501 size_t bytes_consumed = sm_interface_->ProcessReadInput(bytes, size);
502 VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT << "consumed "
503 << bytes_consumed << " bytes";
504 if (bytes_consumed == 0) {
505 break;
507 read_buffer_.AdvanceReadablePtr(bytes_consumed);
508 if (sm_interface_->MessageFullyRead()) {
509 VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT
510 << "HandleRequestFullyRead: Setting EPOLLOUT";
511 HandleResponseFullyRead();
512 events_ |= EPOLLOUT;
513 } else if (sm_interface_->Error()) {
514 LOG(ERROR) << log_prefix_ << ACCEPTOR_CLIENT_IDENT
515 << "Framer error detected: Setting EPOLLOUT: "
516 << sm_interface_->ErrorAsString();
517 // this causes everything to be closed/cleaned up.
518 events_ |= EPOLLOUT;
519 return false;
521 read_buffer_.GetReadablePtr(&bytes, &size);
523 return true;
526 void SMConnection::HandleResponseFullyRead() { sm_interface_->Cleanup(); }
528 bool SMConnection::DoWrite() {
529 size_t bytes_sent = 0;
530 int flags = MSG_NOSIGNAL | MSG_DONTWAIT;
531 if (fd_ == -1) {
532 VLOG(1) << log_prefix_ << ACCEPTOR_CLIENT_IDENT
533 << "DoWrite: fd == -1. Returning false.";
534 return false;
536 if (output_list_.empty()) {
537 VLOG(2) << log_prefix_ << "DoWrite: Output list empty.";
538 if (sm_interface_) {
539 sm_interface_->GetOutput();
541 if (output_list_.empty()) {
542 events_ &= ~EPOLLOUT;
545 while (!output_list_.empty()) {
546 VLOG(2) << log_prefix_
547 << "DoWrite: Items in output list: " << output_list_.size();
548 if (bytes_sent >= max_bytes_sent_per_dowrite_) {
549 VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT
550 << " byte sent >= max bytes sent per write: Setting EPOLLOUT: "
551 << bytes_sent;
552 events_ |= EPOLLOUT;
553 break;
555 if (sm_interface_ && output_list_.size() < 2) {
556 sm_interface_->GetOutput();
558 DataFrame* data_frame = output_list_.front();
559 const char* bytes = data_frame->data;
560 int size = data_frame->size;
561 bytes += data_frame->index;
562 size -= data_frame->index;
563 DCHECK_GE(size, 0);
564 if (size <= 0) {
565 output_list_.pop_front();
566 delete data_frame;
567 continue;
570 flags = MSG_NOSIGNAL | MSG_DONTWAIT;
571 // Look for a queue size > 1 because |this| frame is remains on the list
572 // until it has finished sending.
573 if (output_list_.size() > 1) {
574 VLOG(2) << log_prefix_ << "Outlist size: " << output_list_.size()
575 << ": Adding MSG_MORE flag";
576 flags |= MSG_MORE;
578 VLOG(2) << log_prefix_ << "Attempting to send " << size << " bytes.";
579 ssize_t bytes_written = Send(bytes, size, flags);
580 int stored_errno = errno;
581 if (bytes_written == -1) {
582 switch (stored_errno) {
583 case EAGAIN:
584 events_ &= ~EPOLLOUT;
585 VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT
586 << "Got EAGAIN while writing";
587 goto done;
588 case EINTR:
589 VLOG(1) << log_prefix_ << ACCEPTOR_CLIENT_IDENT
590 << "Got EINTR while writing";
591 continue;
592 default:
593 VLOG(1) << log_prefix_ << ACCEPTOR_CLIENT_IDENT
594 << "While calling send, got error: " << stored_errno << ": "
595 << (ssl_ ? "" : strerror(stored_errno));
596 goto error_or_close;
598 } else if (bytes_written > 0) {
599 VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT
600 << "Wrote: " << bytes_written << " bytes";
601 data_frame->index += bytes_written;
602 bytes_sent += bytes_written;
603 continue;
604 } else if (bytes_written == -2) {
605 // -2 handles SSL_ERROR_WANT_* errors
606 events_ &= ~EPOLLOUT;
607 goto done;
609 VLOG(1) << log_prefix_ << ACCEPTOR_CLIENT_IDENT
610 << "0 bytes written with send call.";
611 goto error_or_close;
613 done:
614 UncorkSocket();
615 return true;
617 error_or_close:
618 VLOG(1) << log_prefix_ << ACCEPTOR_CLIENT_IDENT
619 << "DoWrite: error_or_close. Returning false "
620 << "after cleaning up";
621 Cleanup("DoWrite");
622 UncorkSocket();
623 return false;
626 void SMConnection::Reset() {
627 VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT << "Resetting";
628 if (ssl_) {
629 SSL_shutdown(ssl_);
630 PrintSslError();
631 SSL_free(ssl_);
632 PrintSslError();
633 ssl_ = NULL;
635 if (registered_in_epoll_server_) {
636 epoll_server_->UnregisterFD(fd_);
637 registered_in_epoll_server_ = false;
639 if (fd_ >= 0) {
640 VLOG(2) << log_prefix_ << ACCEPTOR_CLIENT_IDENT << "Closing connection";
641 close(fd_);
642 fd_ = -1;
644 read_buffer_.Clear();
645 initialized_ = false;
646 protocol_detected_ = false;
647 events_ = 0;
648 for (std::list<DataFrame*>::iterator i = output_list_.begin();
649 i != output_list_.end();
650 ++i) {
651 delete *i;
653 output_list_.clear();
656 // static
657 SMConnection* SMConnection::NewSMConnection(EpollServer* epoll_server,
658 SSLState* ssl_state,
659 MemoryCache* memory_cache,
660 FlipAcceptor* acceptor,
661 std::string log_prefix) {
662 return new SMConnection(
663 epoll_server, ssl_state, memory_cache, acceptor, log_prefix);
666 } // namespace net