1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "jingle/glue/thread_wrapper.h"
8 #include "base/bind_helpers.h"
9 #include "base/lazy_instance.h"
10 #include "base/threading/thread_local.h"
11 #include "third_party/webrtc/base/nullsocketserver.h"
13 namespace jingle_glue
{
15 struct JingleThreadWrapper::PendingSend
{
16 PendingSend(const rtc::Message
& message_value
)
17 : sending_thread(JingleThreadWrapper::current()),
18 message(message_value
),
19 done_event(true, false) {
20 DCHECK(sending_thread
);
23 JingleThreadWrapper
* sending_thread
;
25 base::WaitableEvent done_event
;
28 base::LazyInstance
<base::ThreadLocalPointer
<JingleThreadWrapper
> >
29 g_jingle_thread_wrapper
= LAZY_INSTANCE_INITIALIZER
;
32 void JingleThreadWrapper::EnsureForCurrentMessageLoop() {
33 if (JingleThreadWrapper::current() == nullptr) {
34 base::MessageLoop
* message_loop
= base::MessageLoop::current();
35 scoped_ptr
<JingleThreadWrapper
> wrapper
=
36 JingleThreadWrapper::WrapTaskRunner(message_loop
->task_runner());
37 message_loop
->AddDestructionObserver(wrapper
.release());
40 DCHECK_EQ(rtc::Thread::Current(), current());
43 scoped_ptr
<JingleThreadWrapper
> JingleThreadWrapper::WrapTaskRunner(
44 scoped_refptr
<base::SingleThreadTaskRunner
> task_runner
) {
45 DCHECK(!JingleThreadWrapper::current());
46 DCHECK(task_runner
->BelongsToCurrentThread());
48 scoped_ptr
<JingleThreadWrapper
> result(new JingleThreadWrapper(task_runner
));
49 g_jingle_thread_wrapper
.Get().Set(result
.get());
54 JingleThreadWrapper
* JingleThreadWrapper::current() {
55 return g_jingle_thread_wrapper
.Get().Get();
58 JingleThreadWrapper::JingleThreadWrapper(
59 scoped_refptr
<base::SingleThreadTaskRunner
> task_runner
)
60 : task_runner_(task_runner
),
63 pending_send_event_(true, false),
64 weak_ptr_factory_(this) {
65 DCHECK(task_runner
->BelongsToCurrentThread());
66 DCHECK(!rtc::Thread::Current());
67 weak_ptr_
= weak_ptr_factory_
.GetWeakPtr();
68 rtc::MessageQueueManager::Add(this);
72 JingleThreadWrapper::~JingleThreadWrapper() {
73 DCHECK_EQ(this, JingleThreadWrapper::current());
74 DCHECK_EQ(this, rtc::Thread::Current());
77 rtc::ThreadManager::Instance()->SetCurrentThread(nullptr);
78 rtc::MessageQueueManager::Remove(this);
79 g_jingle_thread_wrapper
.Get().Set(nullptr);
81 Clear(nullptr, rtc::MQID_ANY
, nullptr);
84 void JingleThreadWrapper::WillDestroyCurrentMessageLoop() {
88 void JingleThreadWrapper::Post(
89 rtc::MessageHandler
* handler
, uint32 message_id
,
90 rtc::MessageData
* data
, bool time_sensitive
) {
91 PostTaskInternal(0, handler
, message_id
, data
);
94 void JingleThreadWrapper::PostDelayed(
95 int delay_ms
, rtc::MessageHandler
* handler
,
96 uint32 message_id
, rtc::MessageData
* data
) {
97 PostTaskInternal(delay_ms
, handler
, message_id
, data
);
100 void JingleThreadWrapper::Clear(rtc::MessageHandler
* handler
, uint32 id
,
101 rtc::MessageList
* removed
) {
102 base::AutoLock
auto_lock(lock_
);
104 for (MessagesQueue::iterator it
= messages_
.begin();
105 it
!= messages_
.end();) {
106 MessagesQueue::iterator next
= it
;
109 if (it
->second
.Match(handler
, id
)) {
111 removed
->push_back(it
->second
);
113 delete it
->second
.pdata
;
121 for (std::list
<PendingSend
*>::iterator it
= pending_send_messages_
.begin();
122 it
!= pending_send_messages_
.end();) {
123 std::list
<PendingSend
*>::iterator next
= it
;
126 if ((*it
)->message
.Match(handler
, id
)) {
128 removed
->push_back((*it
)->message
);
130 delete (*it
)->message
.pdata
;
132 (*it
)->done_event
.Signal();
133 pending_send_messages_
.erase(it
);
140 void JingleThreadWrapper::Send(rtc::MessageHandler
*handler
, uint32 id
,
141 rtc::MessageData
*data
) {
145 JingleThreadWrapper
* current_thread
= JingleThreadWrapper::current();
146 DCHECK(current_thread
!= nullptr) << "Send() can be called only from a "
147 "thread that has JingleThreadWrapper.";
149 rtc::Message message
;
150 message
.phandler
= handler
;
151 message
.message_id
= id
;
152 message
.pdata
= data
;
154 if (current_thread
== this) {
155 handler
->OnMessage(&message
);
159 // Send message from a thread different than |this|.
161 // Allow inter-thread send only from threads that have
162 // |send_allowed_| flag set.
163 DCHECK(current_thread
->send_allowed_
) << "Send()'ing synchronous "
164 "messages is not allowed from the current thread.";
166 PendingSend
pending_send(message
);
168 base::AutoLock
auto_lock(lock_
);
169 pending_send_messages_
.push_back(&pending_send
);
172 // Need to signal |pending_send_event_| here in case the thread is
173 // sending message to another thread.
174 pending_send_event_
.Signal();
175 task_runner_
->PostTask(FROM_HERE
,
176 base::Bind(&JingleThreadWrapper::ProcessPendingSends
,
180 while (!pending_send
.done_event
.IsSignaled()) {
181 base::WaitableEvent
* events
[] = {&pending_send
.done_event
,
182 ¤t_thread
->pending_send_event_
};
183 size_t event
= base::WaitableEvent::WaitMany(events
, arraysize(events
));
184 DCHECK(event
== 0 || event
== 1);
187 current_thread
->ProcessPendingSends();
191 void JingleThreadWrapper::ProcessPendingSends() {
193 PendingSend
* pending_send
= nullptr;
195 base::AutoLock
auto_lock(lock_
);
196 if (!pending_send_messages_
.empty()) {
197 pending_send
= pending_send_messages_
.front();
198 pending_send_messages_
.pop_front();
200 // Reset the event while |lock_| is still locked.
201 pending_send_event_
.Reset();
206 pending_send
->message
.phandler
->OnMessage(&pending_send
->message
);
207 pending_send
->done_event
.Signal();
212 void JingleThreadWrapper::PostTaskInternal(
213 int delay_ms
, rtc::MessageHandler
* handler
,
214 uint32 message_id
, rtc::MessageData
* data
) {
216 rtc::Message message
;
217 message
.phandler
= handler
;
218 message
.message_id
= message_id
;
219 message
.pdata
= data
;
221 base::AutoLock
auto_lock(lock_
);
222 task_id
= ++last_task_id_
;
223 messages_
.insert(std::pair
<int, rtc::Message
>(task_id
, message
));
227 task_runner_
->PostTask(FROM_HERE
,
228 base::Bind(&JingleThreadWrapper::RunTask
,
229 weak_ptr_
, task_id
));
231 task_runner_
->PostDelayedTask(FROM_HERE
,
232 base::Bind(&JingleThreadWrapper::RunTask
,
234 base::TimeDelta::FromMilliseconds(delay_ms
));
238 void JingleThreadWrapper::RunTask(int task_id
) {
239 bool have_message
= false;
240 rtc::Message message
;
242 base::AutoLock
auto_lock(lock_
);
243 MessagesQueue::iterator it
= messages_
.find(task_id
);
244 if (it
!= messages_
.end()) {
246 message
= it
->second
;
252 if (message
.message_id
== rtc::MQID_DISPOSE
) {
253 DCHECK(message
.phandler
== nullptr);
254 delete message
.pdata
;
256 message
.phandler
->OnMessage(&message
);
261 // All methods below are marked as not reached. See comments in the
262 // header for more details.
263 void JingleThreadWrapper::Quit() {
267 bool JingleThreadWrapper::IsQuitting() {
272 void JingleThreadWrapper::Restart() {
276 bool JingleThreadWrapper::Get(rtc::Message
*, int, bool) {
281 bool JingleThreadWrapper::Peek(rtc::Message
*, int) {
286 void JingleThreadWrapper::PostAt(uint32
, rtc::MessageHandler
*,
287 uint32
, rtc::MessageData
*) {
291 void JingleThreadWrapper::Dispatch(rtc::Message
* message
) {
295 void JingleThreadWrapper::ReceiveSends() {
299 int JingleThreadWrapper::GetDelay() {
304 void JingleThreadWrapper::Stop() {
308 void JingleThreadWrapper::Run() {
312 } // namespace jingle_glue