1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "net/quic/p2p/quic_p2p_session.h"
7 #include "base/callback_helpers.h"
8 #include "base/location.h"
9 #include "base/memory/weak_ptr.h"
10 #include "base/run_loop.h"
11 #include "base/single_thread_task_runner.h"
12 #include "base/thread_task_runner_handle.h"
13 #include "net/base/io_buffer.h"
14 #include "net/base/net_errors.h"
15 #include "net/quic/crypto/quic_random.h"
16 #include "net/quic/p2p/quic_p2p_crypto_config.h"
17 #include "net/quic/p2p/quic_p2p_stream.h"
18 #include "net/quic/quic_connection_helper.h"
19 #include "net/quic/quic_default_packet_writer.h"
20 #include "net/socket/socket.h"
21 #include "testing/gtest/include/gtest/gtest.h"
27 const char kTestSharedKey
[] = "Shared key exchanged out of bound.";
29 class FakeP2PDatagramSocket
: public Socket
{
31 FakeP2PDatagramSocket() : weak_factory_(this) {}
32 ~FakeP2PDatagramSocket() override
{}
34 void ConnectWith(FakeP2PDatagramSocket
* peer_socket
) {
35 peer_socket_
= peer_socket
->weak_factory_
.GetWeakPtr();
36 peer_socket
->peer_socket_
= weak_factory_
.GetWeakPtr();
39 void SetReadError(int error
) {
41 if (!read_callback_
.is_null()) {
42 base::ResetAndReturn(&read_callback_
).Run(error
);
46 void SetWriteError(int error
) { write_error_
= error
; }
48 void AppendInputPacket(const std::vector
<char>& data
) {
49 if (!read_callback_
.is_null()) {
50 int size
= std::min(read_buffer_size_
, static_cast<int>(data
.size()));
51 memcpy(read_buffer_
->data(), &data
[0], data
.size());
52 read_buffer_
= nullptr;
53 base::ResetAndReturn(&read_callback_
).Run(size
);
55 incoming_packets_
.push_back(data
);
60 int Read(IOBuffer
* buf
,
62 const CompletionCallback
& callback
) override
{
63 DCHECK(read_callback_
.is_null());
65 if (read_error_
!= OK
) {
69 if (!incoming_packets_
.empty()) {
70 scoped_refptr
<IOBuffer
> buffer(buf
);
72 std::min(static_cast<int>(incoming_packets_
.front().size()), buf_len
);
73 memcpy(buffer
->data(), &*incoming_packets_
.front().begin(), size
);
74 incoming_packets_
.pop_front();
77 read_callback_
= callback
;
79 read_buffer_size_
= buf_len
;
80 return ERR_IO_PENDING
;
84 int Write(IOBuffer
* buf
,
86 const CompletionCallback
& callback
) override
{
87 if (write_error_
!= OK
) {
92 base::ThreadTaskRunnerHandle::Get()->PostTask(
94 base::Bind(&FakeP2PDatagramSocket::AppendInputPacket
, peer_socket_
,
95 std::vector
<char>(buf
->data(), buf
->data() + buf_len
)));
101 int SetReceiveBufferSize(int32 size
) override
{
103 return ERR_NOT_IMPLEMENTED
;
105 int SetSendBufferSize(int32 size
) override
{
107 return ERR_NOT_IMPLEMENTED
;
111 int read_error_
= OK
;
112 int write_error_
= OK
;
114 scoped_refptr
<IOBuffer
> read_buffer_
;
115 int read_buffer_size_
;
116 CompletionCallback read_callback_
;
118 std::deque
<std::vector
<char>> incoming_packets_
;
120 base::WeakPtr
<FakeP2PDatagramSocket
> peer_socket_
;
122 base::WeakPtrFactory
<FakeP2PDatagramSocket
> weak_factory_
;
125 class DefaultPacketWriterFactory
: public QuicConnection::PacketWriterFactory
{
127 explicit DefaultPacketWriterFactory(Socket
* socket
) : socket_(socket
) {}
128 ~DefaultPacketWriterFactory() override
{}
130 QuicPacketWriter
* Create(QuicConnection
* connection
) const override
{
131 scoped_ptr
<net::QuicDefaultPacketWriter
> writer(
132 new net::QuicDefaultPacketWriter(socket_
));
133 writer
->SetConnection(connection
);
134 return writer
.release();
141 class TestP2PStreamDelegate
: public QuicP2PStream::Delegate
{
143 TestP2PStreamDelegate() {}
144 ~TestP2PStreamDelegate() override
{}
146 void OnDataReceived(const char* data
, int length
) override
{
147 received_data_
.append(data
, length
);
149 void OnClose(QuicErrorCode error
) override
{
154 const std::string
& received_data() { return received_data_
; }
155 bool is_closed() { return is_closed_
; }
156 QuicErrorCode
error() { return error_
; }
159 std::string received_data_
;
160 bool is_closed_
= false;
161 QuicErrorCode error_
= QUIC_NO_ERROR
;
163 DISALLOW_COPY_AND_ASSIGN(TestP2PStreamDelegate
);
166 class TestP2PSessionDelegate
: public QuicP2PSession::Delegate
{
168 void OnIncomingStream(QuicP2PStream
* stream
) override
{
169 last_incoming_stream_
= stream
;
170 stream
->SetDelegate(next_incoming_stream_delegate_
);
171 next_incoming_stream_delegate_
= nullptr;
172 if (!on_incoming_stream_callback_
.is_null())
173 base::ResetAndReturn(&on_incoming_stream_callback_
).Run();
176 void OnConnectionClosed(QuicErrorCode error
) override
{
181 void set_next_incoming_stream_delegate(QuicP2PStream::Delegate
* delegate
) {
182 next_incoming_stream_delegate_
= delegate
;
184 void set_on_incoming_stream_callback(const base::Closure
& callback
) {
185 on_incoming_stream_callback_
= callback
;
187 QuicP2PStream
* last_incoming_stream() { return last_incoming_stream_
; }
188 bool is_closed() { return is_closed_
; }
189 QuicErrorCode
error() { return error_
; }
192 QuicP2PStream::Delegate
* next_incoming_stream_delegate_
= nullptr;
193 base::Closure on_incoming_stream_callback_
;
194 QuicP2PStream
* last_incoming_stream_
= nullptr;
195 bool is_closed_
= false;
196 QuicErrorCode error_
= QUIC_NO_ERROR
;
201 class QuicP2PSessionTest
: public ::testing::Test
{
203 void OnWriteResult(int result
);
207 : quic_helper_(base::ThreadTaskRunnerHandle::Get().get(),
209 net::QuicRandom::GetInstance()) {
210 // Simulate out-of-bound config handshake.
211 CryptoHandshakeMessage hello_message
;
212 config_
.ToHandshakeMessage(&hello_message
);
213 std::string error_detail
;
214 EXPECT_EQ(QUIC_NO_ERROR
,
215 config_
.ProcessPeerHello(hello_message
, CLIENT
, &error_detail
));
218 void CreateSessions() {
219 socket1_
= new FakeP2PDatagramSocket();
220 socket2_
= new FakeP2PDatagramSocket();
221 socket1_
->ConnectWith(socket2_
);
223 QuicP2PCryptoConfig
crypto_config(kTestSharedKey
);
225 session1_
= CreateP2PSession(make_scoped_ptr(socket1_
), crypto_config
,
226 Perspective::IS_SERVER
);
227 session2_
= CreateP2PSession(make_scoped_ptr(socket2_
), crypto_config
,
228 Perspective::IS_CLIENT
);
231 scoped_ptr
<QuicP2PSession
> CreateP2PSession(scoped_ptr
<Socket
> socket
,
232 QuicP2PCryptoConfig crypto_config
,
233 Perspective perspective
) {
234 DefaultPacketWriterFactory
writer_factory(socket
.get());
235 net::IPAddressNumber
ip(net::kIPv4AddressSize
, 0);
236 scoped_ptr
<QuicConnection
> quic_connection1(
237 new QuicConnection(0, net::IPEndPoint(ip
, 0), &quic_helper_
,
238 writer_factory
, true /* owns_writer */, perspective
,
239 true /* is_secuire */, QuicSupportedVersions()));
241 scoped_ptr
<QuicP2PSession
> result(new QuicP2PSession(
242 config_
, crypto_config
, quic_connection1
.Pass(), socket
.Pass()));
243 result
->Initialize();
244 return result
.Pass();
247 void TestStreamConnection(QuicP2PSession
* from_session
,
248 QuicP2PSession
* to_session
,
249 QuicStreamId expected_stream_id
);
251 QuicClock quic_clock_
;
252 QuicConnectionHelper quic_helper_
;
255 FakeP2PDatagramSocket
* socket1_
;
256 scoped_ptr
<QuicP2PSession
> session1_
;
258 FakeP2PDatagramSocket
* socket2_
;
259 scoped_ptr
<QuicP2PSession
> session2_
;
262 void QuicP2PSessionTest::OnWriteResult(int result
) {
263 EXPECT_EQ(OK
, result
);
266 void QuicP2PSessionTest::TestStreamConnection(QuicP2PSession
* from_session
,
267 QuicP2PSession
* to_session
,
268 QuicStreamId expected_stream_id
) {
269 QuicP2PStream
* outgoing_stream
= from_session
->CreateOutgoingDynamicStream();
270 EXPECT_TRUE(outgoing_stream
);
271 TestP2PStreamDelegate outgoing_stream_delegate
;
272 outgoing_stream
->SetDelegate(&outgoing_stream_delegate
);
273 EXPECT_EQ(expected_stream_id
, outgoing_stream
->id());
275 // Send a test message to the client.
276 const char kTestMessage
[] = "Hi";
277 const char kTestResponse
[] = "Response";
278 outgoing_stream
->Write(
279 std::string(kTestMessage
),
280 base::Bind(&QuicP2PSessionTest::OnWriteResult
, base::Unretained(this)));
282 // Wait for the incoming stream to be created.
283 TestP2PStreamDelegate incoming_stream_delegate
;
284 base::RunLoop run_loop
;
285 TestP2PSessionDelegate session_delegate
;
286 session_delegate
.set_next_incoming_stream_delegate(&incoming_stream_delegate
);
287 session_delegate
.set_on_incoming_stream_callback(
288 base::Bind(&base::RunLoop::Quit
, base::Unretained(&run_loop
)));
289 to_session
->SetDelegate(&session_delegate
);
291 to_session
->SetDelegate(nullptr);
293 QuicP2PStream
* incoming_stream
= session_delegate
.last_incoming_stream();
294 ASSERT_TRUE(incoming_stream
);
295 EXPECT_EQ(expected_stream_id
, incoming_stream
->id());
296 EXPECT_EQ(kTestMessage
, incoming_stream_delegate
.received_data());
298 incoming_stream
->Write(
299 std::string(kTestResponse
),
300 base::Bind(&QuicP2PSessionTest::OnWriteResult
, base::Unretained(this)));
301 base::RunLoop().RunUntilIdle();
302 EXPECT_EQ(kTestResponse
, outgoing_stream_delegate
.received_data());
304 from_session
->CloseStream(outgoing_stream
->id());
305 base::RunLoop().RunUntilIdle();
307 EXPECT_TRUE(outgoing_stream_delegate
.is_closed());
308 EXPECT_TRUE(incoming_stream_delegate
.is_closed());
311 TEST_F(QuicP2PSessionTest
, ClientToServer
) {
313 TestStreamConnection(session2_
.get(), session1_
.get(), 3);
316 TEST_F(QuicP2PSessionTest
, ServerToClient
) {
318 TestStreamConnection(session1_
.get(), session2_
.get(), 2);
321 TEST_F(QuicP2PSessionTest
, TransportWriteError
) {
324 TestP2PSessionDelegate session_delegate
;
325 session1_
->SetDelegate(&session_delegate
);
327 QuicP2PStream
* stream
= session1_
->CreateOutgoingDynamicStream();
329 TestP2PStreamDelegate stream_delegate
;
330 stream
->SetDelegate(&stream_delegate
);
331 EXPECT_EQ(2U, stream
->id());
333 socket1_
->SetWriteError(ERR_INTERNET_DISCONNECTED
);
335 const char kTestMessage
[] = "Hi";
337 std::string(kTestMessage
),
338 base::Bind(&QuicP2PSessionTest::OnWriteResult
, base::Unretained(this)));
340 base::RunLoop().RunUntilIdle();
342 EXPECT_TRUE(stream_delegate
.is_closed());
343 EXPECT_EQ(QUIC_PACKET_WRITE_ERROR
, stream_delegate
.error());
344 EXPECT_TRUE(session_delegate
.is_closed());
345 EXPECT_EQ(QUIC_PACKET_WRITE_ERROR
, session_delegate
.error());
348 TEST_F(QuicP2PSessionTest
, TransportReceiveError
) {
351 TestP2PSessionDelegate session_delegate
;
352 session1_
->SetDelegate(&session_delegate
);
354 QuicP2PStream
* stream
= session1_
->CreateOutgoingDynamicStream();
356 TestP2PStreamDelegate stream_delegate
;
357 stream
->SetDelegate(&stream_delegate
);
358 EXPECT_EQ(2U, stream
->id());
360 socket1_
->SetReadError(ERR_INTERNET_DISCONNECTED
);
362 base::RunLoop().RunUntilIdle();
364 EXPECT_TRUE(stream_delegate
.is_closed());
365 EXPECT_EQ(QUIC_PACKET_READ_ERROR
, stream_delegate
.error());
366 EXPECT_TRUE(session_delegate
.is_closed());
367 EXPECT_EQ(QUIC_PACKET_READ_ERROR
, session_delegate
.error());