1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "jingle/glue/thread_wrapper.h"
8 #include "base/bind_helpers.h"
9 #include "base/lazy_instance.h"
10 #include "base/threading/thread_local.h"
11 #include "third_party/webrtc/base/nullsocketserver.h"
13 namespace jingle_glue
{
15 struct JingleThreadWrapper::PendingSend
{
16 PendingSend(const rtc::Message
& message_value
)
17 : sending_thread(JingleThreadWrapper::current()),
18 message(message_value
),
19 done_event(true, false) {
20 DCHECK(sending_thread
);
23 JingleThreadWrapper
* sending_thread
;
25 base::WaitableEvent done_event
;
28 base::LazyInstance
<base::ThreadLocalPointer
<JingleThreadWrapper
> >
29 g_jingle_thread_wrapper
= LAZY_INSTANCE_INITIALIZER
;
32 void JingleThreadWrapper::EnsureForCurrentMessageLoop() {
33 if (JingleThreadWrapper::current() == NULL
) {
34 base::MessageLoop
* message_loop
= base::MessageLoop::current();
35 g_jingle_thread_wrapper
.Get()
36 .Set(new JingleThreadWrapper(message_loop
->message_loop_proxy()));
37 message_loop
->AddDestructionObserver(current());
40 DCHECK_EQ(rtc::Thread::Current(), current());
44 JingleThreadWrapper
* JingleThreadWrapper::current() {
45 return g_jingle_thread_wrapper
.Get().Get();
48 JingleThreadWrapper::JingleThreadWrapper(
49 scoped_refptr
<base::SingleThreadTaskRunner
> task_runner
)
50 : rtc::Thread(new rtc::NullSocketServer()),
51 task_runner_(task_runner
),
54 pending_send_event_(true, false),
55 weak_ptr_factory_(this) {
56 DCHECK(task_runner
->BelongsToCurrentThread());
57 DCHECK(!rtc::Thread::Current());
58 weak_ptr_
= weak_ptr_factory_
.GetWeakPtr();
59 rtc::MessageQueueManager::Add(this);
63 JingleThreadWrapper::~JingleThreadWrapper() {
64 Clear(NULL
, rtc::MQID_ANY
, NULL
);
67 void JingleThreadWrapper::WillDestroyCurrentMessageLoop() {
68 DCHECK_EQ(rtc::Thread::Current(), current());
70 g_jingle_thread_wrapper
.Get().Set(NULL
);
71 rtc::ThreadManager::Instance()->SetCurrentThread(NULL
);
72 rtc::MessageQueueManager::Remove(this);
73 rtc::SocketServer
* ss
= socketserver();
78 void JingleThreadWrapper::Post(
79 rtc::MessageHandler
* handler
, uint32 message_id
,
80 rtc::MessageData
* data
, bool time_sensitive
) {
81 PostTaskInternal(0, handler
, message_id
, data
);
84 void JingleThreadWrapper::PostDelayed(
85 int delay_ms
, rtc::MessageHandler
* handler
,
86 uint32 message_id
, rtc::MessageData
* data
) {
87 PostTaskInternal(delay_ms
, handler
, message_id
, data
);
90 void JingleThreadWrapper::Clear(rtc::MessageHandler
* handler
, uint32 id
,
91 rtc::MessageList
* removed
) {
92 base::AutoLock
auto_lock(lock_
);
94 for (MessagesQueue::iterator it
= messages_
.begin();
95 it
!= messages_
.end();) {
96 MessagesQueue::iterator next
= it
;
99 if (it
->second
.Match(handler
, id
)) {
101 removed
->push_back(it
->second
);
103 delete it
->second
.pdata
;
111 for (std::list
<PendingSend
*>::iterator it
= pending_send_messages_
.begin();
112 it
!= pending_send_messages_
.end();) {
113 std::list
<PendingSend
*>::iterator next
= it
;
116 if ((*it
)->message
.Match(handler
, id
)) {
118 removed
->push_back((*it
)->message
);
120 delete (*it
)->message
.pdata
;
122 (*it
)->done_event
.Signal();
123 pending_send_messages_
.erase(it
);
130 void JingleThreadWrapper::Send(rtc::MessageHandler
*handler
, uint32 id
,
131 rtc::MessageData
*data
) {
135 JingleThreadWrapper
* current_thread
= JingleThreadWrapper::current();
136 DCHECK(current_thread
!= NULL
) << "Send() can be called only from a "
137 "thread that has JingleThreadWrapper.";
139 rtc::Message message
;
140 message
.phandler
= handler
;
141 message
.message_id
= id
;
142 message
.pdata
= data
;
144 if (current_thread
== this) {
145 handler
->OnMessage(&message
);
149 // Send message from a thread different than |this|.
151 // Allow inter-thread send only from threads that have
152 // |send_allowed_| flag set.
153 DCHECK(current_thread
->send_allowed_
) << "Send()'ing synchronous "
154 "messages is not allowed from the current thread.";
156 PendingSend
pending_send(message
);
158 base::AutoLock
auto_lock(lock_
);
159 pending_send_messages_
.push_back(&pending_send
);
162 // Need to signal |pending_send_event_| here in case the thread is
163 // sending message to another thread.
164 pending_send_event_
.Signal();
165 task_runner_
->PostTask(FROM_HERE
,
166 base::Bind(&JingleThreadWrapper::ProcessPendingSends
,
170 while (!pending_send
.done_event
.IsSignaled()) {
171 base::WaitableEvent
* events
[] = {&pending_send
.done_event
,
172 ¤t_thread
->pending_send_event_
};
173 size_t event
= base::WaitableEvent::WaitMany(events
, arraysize(events
));
174 DCHECK(event
== 0 || event
== 1);
177 current_thread
->ProcessPendingSends();
181 void JingleThreadWrapper::ProcessPendingSends() {
183 PendingSend
* pending_send
= NULL
;
185 base::AutoLock
auto_lock(lock_
);
186 if (!pending_send_messages_
.empty()) {
187 pending_send
= pending_send_messages_
.front();
188 pending_send_messages_
.pop_front();
190 // Reset the event while |lock_| is still locked.
191 pending_send_event_
.Reset();
196 pending_send
->message
.phandler
->OnMessage(&pending_send
->message
);
197 pending_send
->done_event
.Signal();
202 void JingleThreadWrapper::PostTaskInternal(
203 int delay_ms
, rtc::MessageHandler
* handler
,
204 uint32 message_id
, rtc::MessageData
* data
) {
206 rtc::Message message
;
207 message
.phandler
= handler
;
208 message
.message_id
= message_id
;
209 message
.pdata
= data
;
211 base::AutoLock
auto_lock(lock_
);
212 task_id
= ++last_task_id_
;
213 messages_
.insert(std::pair
<int, rtc::Message
>(task_id
, message
));
217 task_runner_
->PostTask(FROM_HERE
,
218 base::Bind(&JingleThreadWrapper::RunTask
,
219 weak_ptr_
, task_id
));
221 task_runner_
->PostDelayedTask(FROM_HERE
,
222 base::Bind(&JingleThreadWrapper::RunTask
,
224 base::TimeDelta::FromMilliseconds(delay_ms
));
228 void JingleThreadWrapper::RunTask(int task_id
) {
229 bool have_message
= false;
230 rtc::Message message
;
232 base::AutoLock
auto_lock(lock_
);
233 MessagesQueue::iterator it
= messages_
.find(task_id
);
234 if (it
!= messages_
.end()) {
236 message
= it
->second
;
242 if (message
.message_id
== rtc::MQID_DISPOSE
) {
243 DCHECK(message
.phandler
== NULL
);
244 delete message
.pdata
;
246 message
.phandler
->OnMessage(&message
);
251 // All methods below are marked as not reached. See comments in the
252 // header for more details.
253 void JingleThreadWrapper::Quit() {
257 bool JingleThreadWrapper::IsQuitting() {
262 void JingleThreadWrapper::Restart() {
266 bool JingleThreadWrapper::Get(rtc::Message
*, int, bool) {
271 bool JingleThreadWrapper::Peek(rtc::Message
*, int) {
276 void JingleThreadWrapper::PostAt(uint32
, rtc::MessageHandler
*,
277 uint32
, rtc::MessageData
*) {
281 void JingleThreadWrapper::Dispatch(rtc::Message
* message
) {
285 void JingleThreadWrapper::ReceiveSends() {
289 int JingleThreadWrapper::GetDelay() {
294 void JingleThreadWrapper::Stop() {
298 void JingleThreadWrapper::Run() {
302 } // namespace jingle_glue