1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "net/quic/p2p/quic_p2p_session.h"
7 #include "base/callback_helpers.h"
8 #include "base/location.h"
9 #include "base/memory/weak_ptr.h"
10 #include "base/run_loop.h"
11 #include "base/single_thread_task_runner.h"
12 #include "base/thread_task_runner_handle.h"
13 #include "net/base/io_buffer.h"
14 #include "net/base/net_errors.h"
15 #include "net/quic/crypto/quic_random.h"
16 #include "net/quic/p2p/quic_p2p_crypto_config.h"
17 #include "net/quic/p2p/quic_p2p_stream.h"
18 #include "net/quic/quic_connection_helper.h"
19 #include "net/quic/quic_default_packet_writer.h"
20 #include "net/socket/socket.h"
21 #include "testing/gtest/include/gtest/gtest.h"
27 const char kTestSharedKey
[] = "Shared key exchanged out of bound.";
29 class FakeP2PDatagramSocket
: public Socket
{
31 FakeP2PDatagramSocket() : weak_factory_(this) {}
32 ~FakeP2PDatagramSocket() override
{}
34 base::WeakPtr
<FakeP2PDatagramSocket
> GetWeakPtr() {
35 return weak_factory_
.GetWeakPtr();
38 void ConnectWith(FakeP2PDatagramSocket
* peer_socket
) {
39 peer_socket_
= peer_socket
->GetWeakPtr();
40 peer_socket
->peer_socket_
= GetWeakPtr();
43 void SetReadError(int error
) {
45 if (!read_callback_
.is_null()) {
46 base::ResetAndReturn(&read_callback_
).Run(error
);
50 void SetWriteError(int error
) { write_error_
= error
; }
52 void AppendInputPacket(const std::vector
<char>& data
) {
53 if (!read_callback_
.is_null()) {
54 int size
= std::min(read_buffer_size_
, static_cast<int>(data
.size()));
55 memcpy(read_buffer_
->data(), &data
[0], data
.size());
56 read_buffer_
= nullptr;
57 base::ResetAndReturn(&read_callback_
).Run(size
);
59 incoming_packets_
.push_back(data
);
64 int Read(IOBuffer
* buf
,
66 const CompletionCallback
& callback
) override
{
67 DCHECK(read_callback_
.is_null());
69 if (read_error_
!= OK
) {
73 if (!incoming_packets_
.empty()) {
74 scoped_refptr
<IOBuffer
> buffer(buf
);
76 std::min(static_cast<int>(incoming_packets_
.front().size()), buf_len
);
77 memcpy(buffer
->data(), &*incoming_packets_
.front().begin(), size
);
78 incoming_packets_
.pop_front();
81 read_callback_
= callback
;
83 read_buffer_size_
= buf_len
;
84 return ERR_IO_PENDING
;
88 int Write(IOBuffer
* buf
,
90 const CompletionCallback
& callback
) override
{
91 if (write_error_
!= OK
) {
96 base::ThreadTaskRunnerHandle::Get()->PostTask(
98 base::Bind(&FakeP2PDatagramSocket::AppendInputPacket
, peer_socket_
,
99 std::vector
<char>(buf
->data(), buf
->data() + buf_len
)));
105 int SetReceiveBufferSize(int32 size
) override
{
107 return ERR_NOT_IMPLEMENTED
;
109 int SetSendBufferSize(int32 size
) override
{
111 return ERR_NOT_IMPLEMENTED
;
115 int read_error_
= OK
;
116 int write_error_
= OK
;
118 scoped_refptr
<IOBuffer
> read_buffer_
;
119 int read_buffer_size_
;
120 CompletionCallback read_callback_
;
122 std::deque
<std::vector
<char>> incoming_packets_
;
124 base::WeakPtr
<FakeP2PDatagramSocket
> peer_socket_
;
126 base::WeakPtrFactory
<FakeP2PDatagramSocket
> weak_factory_
;
129 class DefaultPacketWriterFactory
: public QuicConnection::PacketWriterFactory
{
131 explicit DefaultPacketWriterFactory(Socket
* socket
) : socket_(socket
) {}
132 ~DefaultPacketWriterFactory() override
{}
134 QuicPacketWriter
* Create(QuicConnection
* connection
) const override
{
135 scoped_ptr
<net::QuicDefaultPacketWriter
> writer(
136 new net::QuicDefaultPacketWriter(socket_
));
137 writer
->SetConnection(connection
);
138 return writer
.release();
145 class TestP2PStreamDelegate
: public QuicP2PStream::Delegate
{
147 TestP2PStreamDelegate() {}
148 ~TestP2PStreamDelegate() override
{}
150 void OnDataReceived(const char* data
, int length
) override
{
151 received_data_
.append(data
, length
);
153 void OnClose(QuicErrorCode error
) override
{
158 const std::string
& received_data() { return received_data_
; }
159 bool is_closed() { return is_closed_
; }
160 QuicErrorCode
error() { return error_
; }
163 std::string received_data_
;
164 bool is_closed_
= false;
165 QuicErrorCode error_
= QUIC_NO_ERROR
;
167 DISALLOW_COPY_AND_ASSIGN(TestP2PStreamDelegate
);
170 class TestP2PSessionDelegate
: public QuicP2PSession::Delegate
{
172 void OnIncomingStream(QuicP2PStream
* stream
) override
{
173 last_incoming_stream_
= stream
;
174 stream
->SetDelegate(next_incoming_stream_delegate_
);
175 next_incoming_stream_delegate_
= nullptr;
176 if (!on_incoming_stream_callback_
.is_null())
177 base::ResetAndReturn(&on_incoming_stream_callback_
).Run();
180 void OnConnectionClosed(QuicErrorCode error
) override
{
185 void set_next_incoming_stream_delegate(QuicP2PStream::Delegate
* delegate
) {
186 next_incoming_stream_delegate_
= delegate
;
188 void set_on_incoming_stream_callback(const base::Closure
& callback
) {
189 on_incoming_stream_callback_
= callback
;
191 QuicP2PStream
* last_incoming_stream() { return last_incoming_stream_
; }
192 bool is_closed() { return is_closed_
; }
193 QuicErrorCode
error() { return error_
; }
196 QuicP2PStream::Delegate
* next_incoming_stream_delegate_
= nullptr;
197 base::Closure on_incoming_stream_callback_
;
198 QuicP2PStream
* last_incoming_stream_
= nullptr;
199 bool is_closed_
= false;
200 QuicErrorCode error_
= QUIC_NO_ERROR
;
205 class QuicP2PSessionTest
: public ::testing::Test
{
207 void OnWriteResult(int result
);
211 : quic_helper_(base::ThreadTaskRunnerHandle::Get().get(),
213 net::QuicRandom::GetInstance()) {
214 // Simulate out-of-bound config handshake.
215 CryptoHandshakeMessage hello_message
;
216 config_
.ToHandshakeMessage(&hello_message
);
217 std::string error_detail
;
218 EXPECT_EQ(QUIC_NO_ERROR
,
219 config_
.ProcessPeerHello(hello_message
, CLIENT
, &error_detail
));
222 void CreateSessions() {
223 scoped_ptr
<FakeP2PDatagramSocket
> socket1(new FakeP2PDatagramSocket());
224 scoped_ptr
<FakeP2PDatagramSocket
> socket2(new FakeP2PDatagramSocket());
225 socket1
->ConnectWith(socket2
.get());
227 socket1_
= socket1
->GetWeakPtr();
228 socket2_
= socket2
->GetWeakPtr();
230 QuicP2PCryptoConfig
crypto_config(kTestSharedKey
);
233 CreateP2PSession(socket1
.Pass(), crypto_config
, Perspective::IS_SERVER
);
235 CreateP2PSession(socket2
.Pass(), crypto_config
, Perspective::IS_CLIENT
);
238 scoped_ptr
<QuicP2PSession
> CreateP2PSession(scoped_ptr
<Socket
> socket
,
239 QuicP2PCryptoConfig crypto_config
,
240 Perspective perspective
) {
241 DefaultPacketWriterFactory
writer_factory(socket
.get());
242 net::IPAddressNumber
ip(net::kIPv4AddressSize
, 0);
243 scoped_ptr
<QuicConnection
> quic_connection1(
244 new QuicConnection(0, net::IPEndPoint(ip
, 0), &quic_helper_
,
245 writer_factory
, true /* owns_writer */, perspective
,
246 true /* is_secuire */, QuicSupportedVersions()));
248 scoped_ptr
<QuicP2PSession
> result(new QuicP2PSession(
249 config_
, crypto_config
, quic_connection1
.Pass(), socket
.Pass()));
250 result
->Initialize();
251 return result
.Pass();
254 void TestStreamConnection(QuicP2PSession
* from_session
,
255 QuicP2PSession
* to_session
,
256 QuicStreamId expected_stream_id
);
258 QuicClock quic_clock_
;
259 QuicConnectionHelper quic_helper_
;
262 base::WeakPtr
<FakeP2PDatagramSocket
> socket1_
;
263 scoped_ptr
<QuicP2PSession
> session1_
;
265 base::WeakPtr
<FakeP2PDatagramSocket
> socket2_
;
266 scoped_ptr
<QuicP2PSession
> session2_
;
269 void QuicP2PSessionTest::OnWriteResult(int result
) {
270 EXPECT_EQ(OK
, result
);
273 void QuicP2PSessionTest::TestStreamConnection(QuicP2PSession
* from_session
,
274 QuicP2PSession
* to_session
,
275 QuicStreamId expected_stream_id
) {
276 QuicP2PStream
* outgoing_stream
= from_session
->CreateOutgoingDynamicStream();
277 EXPECT_TRUE(outgoing_stream
);
278 TestP2PStreamDelegate outgoing_stream_delegate
;
279 outgoing_stream
->SetDelegate(&outgoing_stream_delegate
);
280 EXPECT_EQ(expected_stream_id
, outgoing_stream
->id());
282 // Send a test message to the client.
283 const char kTestMessage
[] = "Hi";
284 const char kTestResponse
[] = "Response";
285 outgoing_stream
->Write(
286 std::string(kTestMessage
),
287 base::Bind(&QuicP2PSessionTest::OnWriteResult
, base::Unretained(this)));
289 // Wait for the incoming stream to be created.
290 TestP2PStreamDelegate incoming_stream_delegate
;
291 base::RunLoop run_loop
;
292 TestP2PSessionDelegate session_delegate
;
293 session_delegate
.set_next_incoming_stream_delegate(&incoming_stream_delegate
);
294 session_delegate
.set_on_incoming_stream_callback(
295 base::Bind(&base::RunLoop::Quit
, base::Unretained(&run_loop
)));
296 to_session
->SetDelegate(&session_delegate
);
298 to_session
->SetDelegate(nullptr);
300 QuicP2PStream
* incoming_stream
= session_delegate
.last_incoming_stream();
301 ASSERT_TRUE(incoming_stream
);
302 EXPECT_EQ(expected_stream_id
, incoming_stream
->id());
303 EXPECT_EQ(kTestMessage
, incoming_stream_delegate
.received_data());
305 incoming_stream
->Write(
306 std::string(kTestResponse
),
307 base::Bind(&QuicP2PSessionTest::OnWriteResult
, base::Unretained(this)));
308 base::RunLoop().RunUntilIdle();
309 EXPECT_EQ(kTestResponse
, outgoing_stream_delegate
.received_data());
311 from_session
->CloseStream(outgoing_stream
->id());
312 base::RunLoop().RunUntilIdle();
314 EXPECT_TRUE(outgoing_stream_delegate
.is_closed());
315 EXPECT_TRUE(incoming_stream_delegate
.is_closed());
318 TEST_F(QuicP2PSessionTest
, ClientToServer
) {
320 TestStreamConnection(session2_
.get(), session1_
.get(), 3);
323 TEST_F(QuicP2PSessionTest
, ServerToClient
) {
325 TestStreamConnection(session1_
.get(), session2_
.get(), 2);
328 TEST_F(QuicP2PSessionTest
, DestroySocketWhenClosed
) {
331 // The socket must be destroyed when connection is closed.
332 EXPECT_TRUE(socket1_
);
333 session1_
->connection()->CloseConnection(net::QUIC_NO_ERROR
, false);
334 EXPECT_FALSE(socket1_
);
337 TEST_F(QuicP2PSessionTest
, TransportWriteError
) {
340 TestP2PSessionDelegate session_delegate
;
341 session1_
->SetDelegate(&session_delegate
);
343 QuicP2PStream
* stream
= session1_
->CreateOutgoingDynamicStream();
345 TestP2PStreamDelegate stream_delegate
;
346 stream
->SetDelegate(&stream_delegate
);
347 EXPECT_EQ(2U, stream
->id());
349 socket1_
->SetWriteError(ERR_INTERNET_DISCONNECTED
);
351 const char kTestMessage
[] = "Hi";
353 std::string(kTestMessage
),
354 base::Bind(&QuicP2PSessionTest::OnWriteResult
, base::Unretained(this)));
356 base::RunLoop().RunUntilIdle();
358 EXPECT_TRUE(stream_delegate
.is_closed());
359 EXPECT_EQ(QUIC_PACKET_WRITE_ERROR
, stream_delegate
.error());
360 EXPECT_TRUE(session_delegate
.is_closed());
361 EXPECT_EQ(QUIC_PACKET_WRITE_ERROR
, session_delegate
.error());
363 // Verify that the socket was destroyed.
364 EXPECT_FALSE(socket1_
);
367 TEST_F(QuicP2PSessionTest
, TransportReceiveError
) {
370 TestP2PSessionDelegate session_delegate
;
371 session1_
->SetDelegate(&session_delegate
);
373 QuicP2PStream
* stream
= session1_
->CreateOutgoingDynamicStream();
375 TestP2PStreamDelegate stream_delegate
;
376 stream
->SetDelegate(&stream_delegate
);
377 EXPECT_EQ(2U, stream
->id());
379 socket1_
->SetReadError(ERR_INTERNET_DISCONNECTED
);
381 base::RunLoop().RunUntilIdle();
383 EXPECT_TRUE(stream_delegate
.is_closed());
384 EXPECT_EQ(QUIC_PACKET_READ_ERROR
, stream_delegate
.error());
385 EXPECT_TRUE(session_delegate
.is_closed());
386 EXPECT_EQ(QUIC_PACKET_READ_ERROR
, session_delegate
.error());
388 // Verify that the socket was destroyed.
389 EXPECT_FALSE(socket1_
);