1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "remoting/protocol/quic_channel_factory.h"
10 #include "base/location.h"
11 #include "base/single_thread_task_runner.h"
12 #include "base/stl_util.h"
13 #include "base/thread_task_runner_handle.h"
14 #include "net/base/io_buffer.h"
15 #include "net/base/net_errors.h"
16 #include "net/quic/crypto/crypto_framer.h"
17 #include "net/quic/crypto/crypto_handshake_message.h"
18 #include "net/quic/crypto/crypto_protocol.h"
19 #include "net/quic/crypto/quic_random.h"
20 #include "net/quic/p2p/quic_p2p_crypto_config.h"
21 #include "net/quic/p2p/quic_p2p_session.h"
22 #include "net/quic/p2p/quic_p2p_stream.h"
23 #include "net/quic/quic_clock.h"
24 #include "net/quic/quic_connection_helper.h"
25 #include "net/quic/quic_default_packet_writer.h"
26 #include "net/quic/quic_protocol.h"
27 #include "net/socket/stream_socket.h"
28 #include "remoting/base/constants.h"
29 #include "remoting/protocol/datagram_channel_factory.h"
30 #include "remoting/protocol/p2p_datagram_socket.h"
31 #include "remoting/protocol/quic_channel.h"
38 // The maximum receive window sizes for QUIC sessions and streams. These are
39 // the same values that are used in chrome.
40 const int kQuicSessionMaxRecvWindowSize
= 15 * 1024 * 1024; // 15 MB
41 const int kQuicStreamMaxRecvWindowSize
= 6 * 1024 * 1024; // 6 MB
43 class P2PQuicPacketWriter
: public net::QuicPacketWriter
{
45 P2PQuicPacketWriter(net::QuicConnection
* connection
,
46 P2PDatagramSocket
* socket
)
47 : connection_(connection
), socket_(socket
), weak_factory_(this) {}
48 ~P2PQuicPacketWriter() override
{}
50 // QuicPacketWriter interface.
51 net::WriteResult
WritePacket(const char* buffer
,
53 const net::IPAddressNumber
& self_address
,
54 const net::IPEndPoint
& peer_address
) override
{
55 DCHECK(!write_blocked_
);
57 scoped_refptr
<net::StringIOBuffer
> buf(
58 new net::StringIOBuffer(std::string(buffer
, buf_len
)));
59 int result
= socket_
->Send(buf
, buf_len
,
60 base::Bind(&P2PQuicPacketWriter::OnSendComplete
,
61 weak_factory_
.GetWeakPtr()));
62 net::WriteStatus status
= net::WRITE_STATUS_OK
;
64 if (result
== net::ERR_IO_PENDING
) {
65 status
= net::WRITE_STATUS_BLOCKED
;
66 write_blocked_
= true;
68 status
= net::WRITE_STATUS_ERROR
;
72 return net::WriteResult(status
, result
);
74 bool IsWriteBlockedDataBuffered() const override
{
75 // P2PDatagramSocket::Send() method buffer the data until the Send is
79 bool IsWriteBlocked() const override
{ return write_blocked_
; }
80 void SetWritable() override
{ write_blocked_
= false; }
81 net::QuicByteCount
GetMaxPacketSize(const net::IPEndPoint
& peer_address
) const
83 return net::kMaxPacketSize
;
87 void OnSendComplete(int result
){
88 DCHECK_NE(result
, net::ERR_IO_PENDING
);
89 write_blocked_
= false;
91 connection_
->OnWriteError(result
);
93 connection_
->OnCanWrite();
96 net::QuicConnection
* connection_
;
97 P2PDatagramSocket
* socket_
;
99 // Whether a write is currently in flight.
100 bool write_blocked_
= false;
102 base::WeakPtrFactory
<P2PQuicPacketWriter
> weak_factory_
;
104 DISALLOW_COPY_AND_ASSIGN(P2PQuicPacketWriter
);
107 class QuicPacketWriterFactory
108 : public net::QuicConnection::PacketWriterFactory
{
110 explicit QuicPacketWriterFactory(P2PDatagramSocket
* socket
)
112 ~QuicPacketWriterFactory() override
{}
114 net::QuicPacketWriter
* Create(
115 net::QuicConnection
* connection
) const override
{
116 return new P2PQuicPacketWriter(connection
, socket_
);
120 P2PDatagramSocket
* socket_
;
123 class P2PDatagramSocketAdapter
: public net::Socket
{
125 explicit P2PDatagramSocketAdapter(scoped_ptr
<P2PDatagramSocket
> socket
)
126 : socket_(socket
.Pass()) {}
127 ~P2PDatagramSocketAdapter() override
{}
129 int Read(net::IOBuffer
* buf
, int buf_len
,
130 const net::CompletionCallback
& callback
) override
{
131 return socket_
->Recv(buf
, buf_len
, callback
);
133 int Write(net::IOBuffer
* buf
, int buf_len
,
134 const net::CompletionCallback
& callback
) override
{
135 return socket_
->Send(buf
, buf_len
, callback
);
138 int SetReceiveBufferSize(int32_t size
) override
{
140 return net::ERR_FAILED
;
143 int SetSendBufferSize(int32_t size
) override
{
145 return net::ERR_FAILED
;
149 scoped_ptr
<P2PDatagramSocket
> socket_
;
154 class QuicChannelFactory::Core
: public net::QuicP2PSession::Delegate
{
156 Core(const std::string
& session_id
, bool is_server
);
159 // Called from ~QuicChannelFactory() to synchronously release underlying
160 // socket. Core is destroyed later asynchronously.
163 // Implementation of all all methods for QuicChannelFactory.
164 const std::string
& CreateSessionInitiateConfigMessage();
165 bool ProcessSessionAcceptConfigMessage(const std::string
& message
);
167 bool ProcessSessionInitiateConfigMessage(const std::string
& message
);
168 const std::string
& CreateSessionAcceptConfigMessage();
170 void Start(DatagramChannelFactory
* factory
, const std::string
& shared_secret
);
172 void CreateChannel(const std::string
& name
,
173 const ChannelCreatedCallback
& callback
);
174 void CancelChannelCreation(const std::string
& name
);
177 friend class QuicChannelFactory
;
179 struct PendingChannel
{
180 PendingChannel(const std::string
& name
,
181 const ChannelCreatedCallback
& callback
)
182 : name(name
), callback(callback
) {}
185 ChannelCreatedCallback callback
;
188 // QuicP2PSession::Delegate interface.
189 void OnIncomingStream(net::QuicP2PStream
* stream
) override
;
190 void OnConnectionClosed(net::QuicErrorCode error
) override
;
192 void OnBaseChannelReady(scoped_ptr
<P2PDatagramSocket
> socket
);
194 void OnNameReceived(QuicChannel
* channel
);
196 void OnChannelDestroyed(int stream_id
);
198 std::string session_id_
;
200 DatagramChannelFactory
* base_channel_factory_
= nullptr;
202 net::QuicConfig quic_config_
;
203 std::string shared_secret_
;
204 std::string session_initiate_quic_config_message_
;
205 std::string session_accept_quic_config_message_
;
207 net::QuicClock quic_clock_
;
208 net::QuicConnectionHelper quic_helper_
;
209 scoped_ptr
<net::QuicP2PSession
> quic_session_
;
210 bool connected_
= false;
212 std::vector
<PendingChannel
*> pending_channels_
;
213 std::vector
<QuicChannel
*> unnamed_incoming_channels_
;
215 base::WeakPtrFactory
<Core
> weak_factory_
;
217 DISALLOW_COPY_AND_ASSIGN(Core
);
220 QuicChannelFactory::Core::Core(const std::string
& session_id
, bool is_server
)
221 : session_id_(session_id
),
222 is_server_(is_server
),
223 quic_helper_(base::ThreadTaskRunnerHandle::Get().get(),
225 net::QuicRandom::GetInstance()),
226 weak_factory_(this) {
227 quic_config_
.SetInitialSessionFlowControlWindowToSend(
228 kQuicSessionMaxRecvWindowSize
);
229 quic_config_
.SetInitialStreamFlowControlWindowToSend(
230 kQuicStreamMaxRecvWindowSize
);
233 QuicChannelFactory::Core::~Core() {}
235 void QuicChannelFactory::Core::Close() {
236 DCHECK(pending_channels_
.empty());
238 // Cancel creation of the base channel if it hasn't finished.
239 if (base_channel_factory_
)
240 base_channel_factory_
->CancelChannelCreation(kQuicChannelName
);
242 if (quic_session_
&& quic_session_
->connection()->connected())
243 quic_session_
->connection()->CloseConnection(net::QUIC_NO_ERROR
, false);
245 DCHECK(unnamed_incoming_channels_
.empty());
248 void QuicChannelFactory::Core::Start(DatagramChannelFactory
* factory
,
249 const std::string
& shared_secret
) {
250 base_channel_factory_
= factory
;
251 shared_secret_
= shared_secret
;
253 base_channel_factory_
->CreateChannel(
255 base::Bind(&Core::OnBaseChannelReady
, weak_factory_
.GetWeakPtr()));
259 QuicChannelFactory::Core::CreateSessionInitiateConfigMessage() {
262 net::CryptoHandshakeMessage handshake_message
;
263 handshake_message
.set_tag(net::kCHLO
);
264 quic_config_
.ToHandshakeMessage(&handshake_message
);
266 session_initiate_quic_config_message_
=
267 handshake_message
.GetSerialized().AsStringPiece().as_string();
268 return session_initiate_quic_config_message_
;
271 bool QuicChannelFactory::Core::ProcessSessionAcceptConfigMessage(
272 const std::string
& message
) {
275 session_accept_quic_config_message_
= message
;
277 scoped_ptr
<net::CryptoHandshakeMessage
> parsed_message(
278 net::CryptoFramer::ParseMessage(message
));
279 if (!parsed_message
) {
280 LOG(ERROR
) << "Received invalid QUIC config.";
284 if (parsed_message
->tag() != net::kSHLO
) {
285 LOG(ERROR
) << "Received QUIC handshake message with unexpected tag "
286 << parsed_message
->tag();
290 std::string error_message
;
291 net::QuicErrorCode error
= quic_config_
.ProcessPeerHello(
292 *parsed_message
, net::SERVER
, &error_message
);
293 if (error
!= net::QUIC_NO_ERROR
) {
294 LOG(ERROR
) << "Failed to process QUIC handshake message: "
302 bool QuicChannelFactory::Core::ProcessSessionInitiateConfigMessage(
303 const std::string
& message
) {
306 session_initiate_quic_config_message_
= message
;
308 scoped_ptr
<net::CryptoHandshakeMessage
> parsed_message(
309 net::CryptoFramer::ParseMessage(message
));
310 if (!parsed_message
) {
311 LOG(ERROR
) << "Received invalid QUIC config.";
315 if (parsed_message
->tag() != net::kCHLO
) {
316 LOG(ERROR
) << "Received QUIC handshake message with unexpected tag "
317 << parsed_message
->tag();
321 std::string error_message
;
322 net::QuicErrorCode error
= quic_config_
.ProcessPeerHello(
323 *parsed_message
, net::CLIENT
, &error_message
);
324 if (error
!= net::QUIC_NO_ERROR
) {
325 LOG(ERROR
) << "Failed to process QUIC handshake message: "
334 QuicChannelFactory::Core::CreateSessionAcceptConfigMessage() {
337 if (session_initiate_quic_config_message_
.empty()) {
338 // Don't send quic-config to the client if the client didn't include the
339 // config in the session-initiate message.
340 DCHECK(session_accept_quic_config_message_
.empty());
341 return session_accept_quic_config_message_
;
344 net::CryptoHandshakeMessage handshake_message
;
345 handshake_message
.set_tag(net::kSHLO
);
346 quic_config_
.ToHandshakeMessage(&handshake_message
);
348 session_accept_quic_config_message_
=
349 handshake_message
.GetSerialized().AsStringPiece().as_string();
350 return session_accept_quic_config_message_
;
353 // StreamChannelFactory interface.
354 void QuicChannelFactory::Core::CreateChannel(
355 const std::string
& name
,
356 const ChannelCreatedCallback
& callback
) {
357 if (quic_session_
&& quic_session_
->connection()->connected()) {
359 net::QuicP2PStream
* stream
= quic_session_
->CreateOutgoingDynamicStream();
360 scoped_ptr
<QuicChannel
> channel(new QuicClientChannel(
361 stream
, base::Bind(&Core::OnChannelDestroyed
, base::Unretained(this),
364 callback
.Run(channel
.Pass());
366 // On the server side wait for the client to create a QUIC stream and
367 // send the name. The channel will be connected in OnNameReceived().
368 pending_channels_
.push_back(new PendingChannel(name
, callback
));
370 } else if (!base_channel_factory_
) {
371 // Fail synchronously if we failed to connect transport.
372 callback
.Run(nullptr);
374 // Still waiting for the transport.
375 pending_channels_
.push_back(new PendingChannel(name
, callback
));
379 void QuicChannelFactory::Core::CancelChannelCreation(const std::string
& name
) {
380 for (auto it
= pending_channels_
.begin(); it
!= pending_channels_
.end();
382 if ((*it
)->name
== name
) {
384 pending_channels_
.erase(it
);
390 void QuicChannelFactory::Core::OnBaseChannelReady(
391 scoped_ptr
<P2PDatagramSocket
> socket
) {
392 base_channel_factory_
= nullptr;
394 // Failed to connect underlying transport connection. Fail all pending
397 while (!pending_channels_
.empty()) {
398 scoped_ptr
<PendingChannel
> pending_channel(pending_channels_
.front());
399 pending_channels_
.erase(pending_channels_
.begin());
400 pending_channel
->callback
.Run(nullptr);
405 QuicPacketWriterFactory
writer_factory(socket
.get());
406 net::IPAddressNumber
ip(net::kIPv4AddressSize
, 0);
407 scoped_ptr
<net::QuicConnection
> quic_connection(new net::QuicConnection(
408 0, net::IPEndPoint(ip
, 0), &quic_helper_
, writer_factory
,
409 true /* owns_writer */,
410 is_server_
? net::Perspective::IS_SERVER
: net::Perspective::IS_CLIENT
,
411 true /* is_secure */, net::QuicSupportedVersions()));
413 net::QuicP2PCryptoConfig
quic_crypto_config(shared_secret_
);
414 quic_crypto_config
.set_hkdf_input_suffix(
415 session_id_
+ '\0' + kQuicChannelName
+ '\0' +
416 session_initiate_quic_config_message_
+
417 session_accept_quic_config_message_
);
419 quic_session_
.reset(new net::QuicP2PSession(
420 quic_config_
, quic_crypto_config
, quic_connection
.Pass(),
421 make_scoped_ptr(new P2PDatagramSocketAdapter(socket
.Pass()))));
422 quic_session_
->SetDelegate(this);
423 quic_session_
->Initialize();
426 // On the client create streams for all pending channels and send a name for
428 while (!pending_channels_
.empty()) {
429 scoped_ptr
<PendingChannel
> pending_channel(pending_channels_
.front());
430 pending_channels_
.erase(pending_channels_
.begin());
432 net::QuicP2PStream
* stream
= quic_session_
->CreateOutgoingDynamicStream();
433 scoped_ptr
<QuicChannel
> channel(new QuicClientChannel(
434 stream
, base::Bind(&Core::OnChannelDestroyed
, base::Unretained(this),
436 pending_channel
->name
));
437 pending_channel
->callback
.Run(channel
.Pass());
442 void QuicChannelFactory::Core::OnIncomingStream(net::QuicP2PStream
* stream
) {
443 QuicServerChannel
* channel
= new QuicServerChannel(
444 stream
, base::Bind(&Core::OnChannelDestroyed
, base::Unretained(this),
446 unnamed_incoming_channels_
.push_back(channel
);
447 channel
->ReceiveName(
448 base::Bind(&Core::OnNameReceived
, base::Unretained(this), channel
));
451 void QuicChannelFactory::Core::OnConnectionClosed(net::QuicErrorCode error
) {
452 if (error
!= net::QUIC_NO_ERROR
)
453 LOG(ERROR
) << "QUIC connection was closed, error_code=" << error
;
455 while (!pending_channels_
.empty()) {
456 scoped_ptr
<PendingChannel
> pending_channel(pending_channels_
.front());
457 pending_channels_
.erase(pending_channels_
.begin());
458 pending_channel
->callback
.Run(nullptr);
462 void QuicChannelFactory::Core::OnNameReceived(QuicChannel
* channel
) {
465 scoped_ptr
<QuicChannel
> owned_channel(channel
);
467 auto it
= std::find(unnamed_incoming_channels_
.begin(),
468 unnamed_incoming_channels_
.end(), channel
);
469 DCHECK(it
!= unnamed_incoming_channels_
.end());
470 unnamed_incoming_channels_
.erase(it
);
472 if (channel
->name().empty()) {
473 // Failed to read a name for incoming channel.
477 for (auto it
= pending_channels_
.begin();
478 it
!= pending_channels_
.end(); ++it
) {
479 if ((*it
)->name
== channel
->name()) {
480 scoped_ptr
<PendingChannel
> pending_channel(*it
);
481 pending_channels_
.erase(it
);
482 pending_channel
->callback
.Run(owned_channel
.Pass());
487 LOG(ERROR
) << "Unexpected incoming channel: " << channel
->name();
490 void QuicChannelFactory::Core::OnChannelDestroyed(int stream_id
) {
492 quic_session_
->CloseStream(stream_id
);
495 QuicChannelFactory::QuicChannelFactory(const std::string
& session_id
,
497 : core_(new Core(session_id
, is_server
)) {}
499 QuicChannelFactory::~QuicChannelFactory() {
501 base::ThreadTaskRunnerHandle::Get()->DeleteSoon(FROM_HERE
, core_
.release());
504 const std::string
& QuicChannelFactory::CreateSessionInitiateConfigMessage() {
505 return core_
->CreateSessionInitiateConfigMessage();
508 bool QuicChannelFactory::ProcessSessionAcceptConfigMessage(
509 const std::string
& message
) {
510 return core_
->ProcessSessionAcceptConfigMessage(message
);
513 bool QuicChannelFactory::ProcessSessionInitiateConfigMessage(
514 const std::string
& message
) {
515 return core_
->ProcessSessionInitiateConfigMessage(message
);
518 const std::string
& QuicChannelFactory::CreateSessionAcceptConfigMessage() {
519 return core_
->CreateSessionAcceptConfigMessage();
522 void QuicChannelFactory::Start(DatagramChannelFactory
* factory
,
523 const std::string
& shared_secret
) {
524 core_
->Start(factory
, shared_secret
);
527 void QuicChannelFactory::CreateChannel(const std::string
& name
,
528 const ChannelCreatedCallback
& callback
) {
529 core_
->CreateChannel(name
, callback
);
532 void QuicChannelFactory::CancelChannelCreation(const std::string
& name
) {
533 core_
->CancelChannelCreation(name
);
536 net::QuicP2PSession
* QuicChannelFactory::GetP2PSessionForTests() {
537 return core_
->quic_session_
.get();
540 } // namespace protocol
541 } // namespace remoting