1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "components/devtools_bridge/session_dependency_factory.h"
8 #include "base/location.h"
9 #include "base/task_runner.h"
10 #include "base/threading/thread.h"
11 #include "components/devtools_bridge/abstract_data_channel.h"
12 #include "components/devtools_bridge/abstract_peer_connection.h"
13 #include "components/devtools_bridge/rtc_configuration.h"
14 #include "third_party/libjingle/source/talk/app/webrtc/mediaconstraintsinterface.h"
15 #include "third_party/libjingle/source/talk/app/webrtc/peerconnectioninterface.h"
16 #include "third_party/webrtc/base/bind.h"
17 #include "third_party/webrtc/base/messagehandler.h"
18 #include "third_party/webrtc/base/messagequeue.h"
19 #include "third_party/webrtc/base/ssladapter.h"
20 #include "third_party/webrtc/base/thread.h"
22 namespace devtools_bridge
{
24 class RTCConfiguration::Impl
25 : public RTCConfiguration
,
26 public webrtc::PeerConnectionInterface::RTCConfiguration
{
29 virtual void AddIceServer(
30 const std::string
& uri
,
31 const std::string
& username
,
32 const std::string
& credential
) override
{
33 webrtc::PeerConnectionInterface::IceServer server
;
35 server
.username
= username
;
36 server
.password
= credential
;
37 servers
.push_back(server
);
40 const Impl
& impl() const override
{
45 webrtc::PeerConnectionInterface::RTCConfiguration base_
;
51 void CheckedRelease(rtc::scoped_refptr
<T
>* ptr
) {
52 CHECK_EQ(0, ptr
->release()->Release());
55 class MediaConstraints
56 : public webrtc::MediaConstraintsInterface
{
58 virtual ~MediaConstraints() {}
60 virtual const Constraints
& GetMandatory() const override
{
64 virtual const Constraints
& GetOptional() const override
{
68 void AddMandatory(const std::string
& key
, const std::string
& value
) {
69 mandatory_
.push_back(Constraint(key
, value
));
73 Constraints mandatory_
;
74 Constraints optional_
;
78 * Posts tasks on signaling thread. If stopped (when SesseionDependencyFactry
79 * is destroying) ignores posted tasks.
81 class SignalingThreadTaskRunner
: public base::TaskRunner
,
82 private rtc::MessageHandler
{
84 explicit SignalingThreadTaskRunner(rtc::Thread
* thread
) : thread_(thread
) {}
86 bool PostDelayedTask(const tracked_objects::Location
& from_here
,
87 const base::Closure
& task
,
88 base::TimeDelta delay
) override
{
89 DCHECK(delay
.ToInternalValue() == 0);
91 rtc::CritScope
scope(&critical_section_
);
94 thread_
->Send(this, 0, new Task(task
));
99 bool RunsTasksOnCurrentThread() const override
{
100 rtc::CritScope
scope(&critical_section_
);
102 return thread_
!= NULL
&& thread_
->IsCurrent();
106 rtc::CritScope
scope(&critical_section_
);
111 typedef rtc::TypedMessageData
<base::Closure
> Task
;
113 ~SignalingThreadTaskRunner() override
{}
115 void OnMessage(rtc::Message
* msg
) override
{
116 static_cast<Task
*>(msg
->pdata
)->data().Run();
119 mutable rtc::CriticalSection critical_section_
;
120 rtc::Thread
* thread_
; // Guarded by |critical_section_|.
123 class DataChannelObserverImpl
: public webrtc::DataChannelObserver
{
125 DataChannelObserverImpl(
126 webrtc::DataChannelInterface
* data_channel
,
127 scoped_ptr
<AbstractDataChannel::Observer
> observer
)
128 : data_channel_(data_channel
),
129 observer_(observer
.Pass()) {
133 open_
= data_channel_
->state() == webrtc::DataChannelInterface::kOpen
;
136 virtual void OnStateChange() override
{
137 bool open
= data_channel_
->state() == webrtc::DataChannelInterface::kOpen
;
139 if (open
== open_
) return;
145 observer_
->OnClose();
149 virtual void OnMessage(const webrtc::DataBuffer
& buffer
) override
{
150 observer_
->OnMessage(buffer
.data
.data(), buffer
.size());
154 webrtc::DataChannelInterface
* const data_channel_
;
155 scoped_ptr
<AbstractDataChannel::Observer
> const observer_
;
160 * Thread-safe view on AbstractDataChannel.
162 class DataChannelProxyImpl
: public AbstractDataChannel::Proxy
{
164 DataChannelProxyImpl(
165 SessionDependencyFactory
* factory
,
166 rtc::scoped_refptr
<webrtc::DataChannelInterface
> data_channel
)
167 : data_channel_(data_channel
),
168 signaling_thread_task_runner_(
169 factory
->signaling_thread_task_runner()) {
172 void StopOnSignalingThread() {
173 data_channel_
= NULL
;
176 virtual void SendBinaryMessage(const void* data
, size_t length
) override
{
177 auto buffer
= make_scoped_ptr(new webrtc::DataBuffer(rtc::Buffer(), true));
178 buffer
->data
.SetData(data
, length
);
180 signaling_thread_task_runner_
->PostTask(
181 FROM_HERE
, base::Bind(
182 &DataChannelProxyImpl::SendMessageOnSignalingThread
,
184 base::Passed(&buffer
)));
187 virtual void Close() override
{
188 signaling_thread_task_runner_
->PostTask(
189 FROM_HERE
, base::Bind(&DataChannelProxyImpl::CloseOnSignalingThread
,
195 ~DataChannelProxyImpl() override
{}
197 void SendMessageOnSignalingThread(scoped_ptr
<webrtc::DataBuffer
> message
) {
198 if (data_channel_
!= NULL
)
199 data_channel_
->Send(*message
);
202 void CloseOnSignalingThread() {
203 if (data_channel_
!= NULL
)
204 data_channel_
->Close();
207 // Accessed on signaling thread.
208 rtc::scoped_refptr
<webrtc::DataChannelInterface
> data_channel_
;
210 const scoped_refptr
<base::TaskRunner
> signaling_thread_task_runner_
;
213 class DataChannelImpl
: public AbstractDataChannel
{
216 SessionDependencyFactory
* factory
,
217 rtc::Thread
* const signaling_thread
,
218 rtc::scoped_refptr
<webrtc::DataChannelInterface
> impl
)
220 signaling_thread_(signaling_thread
),
226 signaling_thread_
->Invoke
<void>(rtc::Bind(
227 &DataChannelProxyImpl::StopOnSignalingThread
, proxy_
.get()));
231 virtual void RegisterObserver(scoped_ptr
<Observer
> observer
) override
{
232 observer_
.reset(new DataChannelObserverImpl(impl_
.get(), observer
.Pass()));
233 signaling_thread_
->Invoke
<void>(rtc::Bind(
234 &DataChannelImpl::RegisterObserverOnSignalingThread
, this));
237 virtual void UnregisterObserver() override
{
238 DCHECK(observer_
.get() != NULL
);
239 impl_
->UnregisterObserver();
243 virtual void SendBinaryMessage(void* data
, size_t length
) override
{
244 SendMessage(data
, length
, true);
247 virtual void SendTextMessage(void* data
, size_t length
) override
{
248 SendMessage(data
, length
, false);
251 void SendMessage(void* data
, size_t length
, bool is_binary
) {
252 impl_
->Send(webrtc::DataBuffer(rtc::Buffer(data
, length
), is_binary
));
255 void Close() override
{
259 scoped_refptr
<Proxy
> proxy() override
{
261 proxy_
= new DataChannelProxyImpl(factory_
, impl_
);
266 void RegisterObserverOnSignalingThread() {
267 // State initialization and observer registration happen atomically
268 // if done on the signaling thread (see rtc::Thread::Send).
269 observer_
->InitState();
270 impl_
->RegisterObserver(observer_
.get());
273 SessionDependencyFactory
* const factory_
;
274 scoped_refptr
<DataChannelProxyImpl
> proxy_
;
275 rtc::Thread
* const signaling_thread_
;
276 scoped_ptr
<DataChannelObserverImpl
> observer_
;
277 const rtc::scoped_refptr
<webrtc::DataChannelInterface
> impl_
;
280 class PeerConnectionObserverImpl
281 : public webrtc::PeerConnectionObserver
{
283 PeerConnectionObserverImpl(AbstractPeerConnection::Delegate
* delegate
)
284 : delegate_(delegate
),
288 virtual void OnAddStream(webrtc::MediaStreamInterface
* stream
) override
{}
290 virtual void OnRemoveStream(webrtc::MediaStreamInterface
* stream
) override
{}
292 virtual void OnDataChannel(webrtc::DataChannelInterface
* data_channel
)
295 virtual void OnRenegotiationNeeded() override
{}
297 virtual void OnSignalingChange(
298 webrtc::PeerConnectionInterface::SignalingState new_state
) override
{
301 virtual void OnIceConnectionChange(
302 webrtc::PeerConnectionInterface::IceConnectionState new_state
) override
{
304 new_state
== webrtc::PeerConnectionInterface::kIceConnectionConnected
||
305 new_state
== webrtc::PeerConnectionInterface::kIceConnectionCompleted
;
307 if (connected
!= connected_
) {
308 connected_
= connected
;
309 delegate_
->OnIceConnectionChange(connected_
);
313 virtual void OnIceCandidate(const webrtc::IceCandidateInterface
* candidate
)
316 candidate
->ToString(&sdp
);
318 delegate_
->OnIceCandidate(
319 candidate
->sdp_mid(), candidate
->sdp_mline_index(), sdp
);
323 AbstractPeerConnection::Delegate
* const delegate_
;
328 * Helper object which may outlive PeerConnectionImpl. Provides access
329 * to the connection and the delegate to operaion callback objects
330 * in a safe way. Always accessible on the signaling thread.
332 class PeerConnectionHolder
: public rtc::RefCountInterface
{
334 PeerConnectionHolder(
335 rtc::Thread
* signaling_thread
,
336 webrtc::PeerConnectionInterface
* connection
,
337 AbstractPeerConnection::Delegate
* delegate
)
338 : signaling_thread_(signaling_thread
),
339 connection_(connection
),
344 virtual ~PeerConnectionHolder() {
349 DCHECK(!IsDisposed());
353 webrtc::PeerConnectionInterface
* connection() {
354 DCHECK(!IsDisposed());
358 AbstractPeerConnection::Delegate
* delegate() {
359 DCHECK(!IsDisposed());
364 DCHECK(signaling_thread_
->IsCurrent());
369 rtc::Thread
* const signaling_thread_
;
370 webrtc::PeerConnectionInterface
* const connection_
;
371 AbstractPeerConnection::Delegate
* const delegate_
;
375 class CreateAndSetHandler
376 : public webrtc::CreateSessionDescriptionObserver
,
377 public webrtc::SetSessionDescriptionObserver
{
379 explicit CreateAndSetHandler(
380 rtc::scoped_refptr
<PeerConnectionHolder
> holder
)
384 virtual void OnSuccess(webrtc::SessionDescriptionInterface
* desc
) override
{
385 if (holder_
->IsDisposed()) return;
387 type_
= desc
->type();
388 if (desc
->ToString(&description_
)) {
389 holder_
->connection()->SetLocalDescription(this, desc
);
391 OnFailure("Can't serialize session description");
395 virtual void OnSuccess() override
{
396 if (holder_
->IsDisposed()) return;
398 if (type_
== webrtc::SessionDescriptionInterface::kOffer
) {
399 holder_
->delegate()->OnLocalOfferCreatedAndSetSet(description_
);
401 DCHECK_EQ(webrtc::SessionDescriptionInterface::kAnswer
, type_
);
403 holder_
->delegate()->OnLocalAnswerCreatedAndSetSet(description_
);
407 virtual void OnFailure(const std::string
& error
) override
{
408 if (holder_
->IsDisposed()) return;
410 holder_
->delegate()->OnFailure(error
);
414 const rtc::scoped_refptr
<PeerConnectionHolder
> holder_
;
416 std::string description_
;
419 class SetRemoteDescriptionHandler
420 : public webrtc::SetSessionDescriptionObserver
{
422 SetRemoteDescriptionHandler(
423 rtc::scoped_refptr
<PeerConnectionHolder
> holder
)
427 virtual void OnSuccess() override
{
428 if (holder_
->IsDisposed()) return;
430 holder_
->delegate()->OnRemoteDescriptionSet();
433 virtual void OnFailure(const std::string
& error
) override
{
434 if (holder_
->IsDisposed()) return;
436 holder_
->delegate()->OnFailure(error
);
440 const rtc::scoped_refptr
<PeerConnectionHolder
> holder_
;
443 class PeerConnectionImpl
: public AbstractPeerConnection
{
446 SessionDependencyFactory
* const factory
,
447 rtc::Thread
* signaling_thread
,
448 rtc::scoped_refptr
<webrtc::PeerConnectionInterface
> connection
,
449 scoped_ptr
<PeerConnectionObserverImpl
> observer
,
450 scoped_ptr
<AbstractPeerConnection::Delegate
> delegate
)
452 holder_(new rtc::RefCountedObject
<PeerConnectionHolder
>(
453 signaling_thread
, connection
.get(), delegate
.get())),
454 signaling_thread_(signaling_thread
),
455 connection_(connection
),
456 observer_(observer
.Pass()),
457 delegate_(delegate
.Pass()) {
460 virtual ~PeerConnectionImpl() {
461 signaling_thread_
->Invoke
<void>(rtc::Bind(
462 &PeerConnectionImpl::DisposeOnSignalingThread
, this));
465 virtual void CreateAndSetLocalOffer() override
{
466 connection_
->CreateOffer(MakeCreateAndSetHandler(), NULL
);
469 virtual void CreateAndSetLocalAnswer() override
{
470 connection_
->CreateAnswer(MakeCreateAndSetHandler(), NULL
);
473 virtual void SetRemoteOffer(const std::string
& description
) override
{
474 SetRemoteDescription(
475 webrtc::SessionDescriptionInterface::kOffer
, description
);
478 virtual void SetRemoteAnswer(const std::string
& description
) override
{
479 SetRemoteDescription(
480 webrtc::SessionDescriptionInterface::kAnswer
, description
);
483 void SetRemoteDescription(
484 const std::string
& type
, const std::string
& description
) {
485 webrtc::SdpParseError error
;
486 scoped_ptr
<webrtc::SessionDescriptionInterface
> value(
487 webrtc::CreateSessionDescription(type
, description
, &error
));
492 // Takes ownership on |value|.
493 connection_
->SetRemoteDescription(
494 new rtc::RefCountedObject
<SetRemoteDescriptionHandler
>(holder_
),
498 virtual void AddIceCandidate(
499 const std::string
& sdp_mid
,
501 const std::string
& sdp
) override
{
502 webrtc::SdpParseError error
;
503 auto candidate
= webrtc::CreateIceCandidate(
504 sdp_mid
, sdp_mline_index
, sdp
, &error
);
505 if (candidate
== NULL
) {
509 // Doesn't takes ownership.
510 connection_
->AddIceCandidate(candidate
);
514 virtual scoped_ptr
<AbstractDataChannel
> CreateDataChannel(
515 int channelId
) override
{
516 webrtc::DataChannelInit init
;
518 init
.negotiated
= true;
521 return make_scoped_ptr(new DataChannelImpl(
524 connection_
->CreateDataChannel("", &init
)));
528 webrtc::CreateSessionDescriptionObserver
* MakeCreateAndSetHandler() {
529 return new rtc::RefCountedObject
<CreateAndSetHandler
>(holder_
);
532 void DisposeOnSignalingThread() {
533 DCHECK(signaling_thread_
->IsCurrent());
535 CheckedRelease(&connection_
);
539 void OnParseError(const webrtc::SdpParseError
& error
) {
540 // TODO(serya): Send on signaling thread.
543 SessionDependencyFactory
* const factory_
;
544 const rtc::scoped_refptr
<PeerConnectionHolder
> holder_
;
545 rtc::Thread
* const signaling_thread_
;
546 rtc::scoped_refptr
<webrtc::PeerConnectionInterface
> connection_
;
547 const scoped_ptr
<PeerConnectionObserverImpl
> observer_
;
548 const scoped_ptr
<AbstractPeerConnection::Delegate
> delegate_
;
551 class SessionDependencyFactoryImpl
: public SessionDependencyFactory
{
553 SessionDependencyFactoryImpl(
554 const base::Closure
& cleanup_on_signaling_thread
)
555 : cleanup_on_signaling_thread_(cleanup_on_signaling_thread
) {
556 signaling_thread_
.SetName("signaling_thread", NULL
);
557 signaling_thread_
.Start();
558 worker_thread_
.SetName("worker_thread", NULL
);
559 worker_thread_
.Start();
561 factory_
= webrtc::CreatePeerConnectionFactory(
562 &worker_thread_
, &signaling_thread_
, NULL
, NULL
, NULL
);
565 virtual ~SessionDependencyFactoryImpl() {
566 if (signaling_thread_task_runner_
.get())
567 signaling_thread_task_runner_
->Stop();
569 signaling_thread_
.Invoke
<void>(rtc::Bind(
570 &SessionDependencyFactoryImpl::DisposeOnSignalingThread
, this));
573 virtual scoped_ptr
<AbstractPeerConnection
> CreatePeerConnection(
574 scoped_ptr
<RTCConfiguration
> config
,
575 scoped_ptr
<AbstractPeerConnection::Delegate
> delegate
) override
{
576 auto observer
= make_scoped_ptr(
577 new PeerConnectionObserverImpl(delegate
.get()));
579 MediaConstraints constraints
;
580 constraints
.AddMandatory(
581 MediaConstraints::kEnableDtlsSrtp
, MediaConstraints::kValueTrue
);
583 auto connection
= factory_
->CreatePeerConnection(
584 config
->impl(), &constraints
, NULL
, NULL
, observer
.get());
586 return make_scoped_ptr(new PeerConnectionImpl(
587 this, &signaling_thread_
, connection
, observer
.Pass(),
591 scoped_refptr
<base::TaskRunner
> signaling_thread_task_runner() override
{
592 if (!signaling_thread_task_runner_
.get()) {
593 signaling_thread_task_runner_
=
594 new SignalingThreadTaskRunner(&signaling_thread_
);
596 return signaling_thread_task_runner_
;
599 scoped_refptr
<base::TaskRunner
> io_thread_task_runner() override
{
600 if (!io_thread_
.get()) {
601 io_thread_
.reset(new base::Thread("devtools bridge IO thread"));
602 base::Thread::Options options
;
603 options
.message_loop_type
= base::MessageLoop::TYPE_IO
;
604 CHECK(io_thread_
->StartWithOptions(options
));
606 return io_thread_
->task_runner();
610 void DisposeOnSignalingThread() {
611 DCHECK(signaling_thread_
.IsCurrent());
612 CheckedRelease(&factory_
);
613 if (!cleanup_on_signaling_thread_
.is_null())
614 cleanup_on_signaling_thread_
.Run();
617 scoped_ptr
<base::Thread
> io_thread_
;
618 scoped_refptr
<SignalingThreadTaskRunner
> signaling_thread_task_runner_
;
619 base::Closure cleanup_on_signaling_thread_
;
620 rtc::Thread signaling_thread_
;
621 rtc::Thread worker_thread_
;
622 rtc::scoped_refptr
<webrtc::PeerConnectionFactoryInterface
> factory_
;
630 scoped_ptr
<RTCConfiguration
> RTCConfiguration::CreateInstance() {
631 return make_scoped_ptr(new RTCConfiguration::Impl());
634 // SessionDependencyFactory
637 bool SessionDependencyFactory::InitializeSSL() {
638 return rtc::InitializeSSL();
642 bool SessionDependencyFactory::CleanupSSL() {
643 return rtc::CleanupSSL();
647 scoped_ptr
<SessionDependencyFactory
> SessionDependencyFactory::CreateInstance(
648 const base::Closure
& cleanup_on_signaling_thread
) {
649 return make_scoped_ptr(new SessionDependencyFactoryImpl(
650 cleanup_on_signaling_thread
));
653 } // namespace devtools_bridge