Roll leveldb 3f7758:803d69 (v1.17 -> v1.18)
[chromium-blink-merge.git] / jingle / glue / thread_wrapper.cc
blobd07b7468c798b6fc96a3ac386044d0c8802c3c37
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "jingle/glue/thread_wrapper.h"
7 #include "base/bind.h"
8 #include "base/bind_helpers.h"
9 #include "base/lazy_instance.h"
10 #include "base/threading/thread_local.h"
11 #include "third_party/webrtc/base/nullsocketserver.h"
13 namespace jingle_glue {
15 struct JingleThreadWrapper::PendingSend {
16 PendingSend(const rtc::Message& message_value)
17 : sending_thread(JingleThreadWrapper::current()),
18 message(message_value),
19 done_event(true, false) {
20 DCHECK(sending_thread);
23 JingleThreadWrapper* sending_thread;
24 rtc::Message message;
25 base::WaitableEvent done_event;
28 base::LazyInstance<base::ThreadLocalPointer<JingleThreadWrapper> >
29 g_jingle_thread_wrapper = LAZY_INSTANCE_INITIALIZER;
31 // static
32 void JingleThreadWrapper::EnsureForCurrentMessageLoop() {
33 if (JingleThreadWrapper::current() == NULL) {
34 base::MessageLoop* message_loop = base::MessageLoop::current();
35 g_jingle_thread_wrapper.Get()
36 .Set(new JingleThreadWrapper(message_loop->message_loop_proxy()));
37 message_loop->AddDestructionObserver(current());
40 DCHECK_EQ(rtc::Thread::Current(), current());
43 // static
44 JingleThreadWrapper* JingleThreadWrapper::current() {
45 return g_jingle_thread_wrapper.Get().Get();
48 JingleThreadWrapper::JingleThreadWrapper(
49 scoped_refptr<base::SingleThreadTaskRunner> task_runner)
50 : rtc::Thread(new rtc::NullSocketServer()),
51 task_runner_(task_runner),
52 send_allowed_(false),
53 last_task_id_(0),
54 pending_send_event_(true, false),
55 weak_ptr_factory_(this) {
56 DCHECK(task_runner->BelongsToCurrentThread());
57 DCHECK(!rtc::Thread::Current());
58 weak_ptr_ = weak_ptr_factory_.GetWeakPtr();
59 rtc::MessageQueueManager::Add(this);
60 SafeWrapCurrent();
63 JingleThreadWrapper::~JingleThreadWrapper() {
64 Clear(NULL, rtc::MQID_ANY, NULL);
67 void JingleThreadWrapper::WillDestroyCurrentMessageLoop() {
68 DCHECK_EQ(rtc::Thread::Current(), current());
69 UnwrapCurrent();
70 g_jingle_thread_wrapper.Get().Set(NULL);
71 rtc::ThreadManager::Instance()->SetCurrentThread(NULL);
72 rtc::MessageQueueManager::Remove(this);
73 rtc::SocketServer* ss = socketserver();
74 delete this;
75 delete ss;
78 void JingleThreadWrapper::Post(
79 rtc::MessageHandler* handler, uint32 message_id,
80 rtc::MessageData* data, bool time_sensitive) {
81 PostTaskInternal(0, handler, message_id, data);
84 void JingleThreadWrapper::PostDelayed(
85 int delay_ms, rtc::MessageHandler* handler,
86 uint32 message_id, rtc::MessageData* data) {
87 PostTaskInternal(delay_ms, handler, message_id, data);
90 void JingleThreadWrapper::Clear(rtc::MessageHandler* handler, uint32 id,
91 rtc::MessageList* removed) {
92 base::AutoLock auto_lock(lock_);
94 for (MessagesQueue::iterator it = messages_.begin();
95 it != messages_.end();) {
96 MessagesQueue::iterator next = it;
97 ++next;
99 if (it->second.Match(handler, id)) {
100 if (removed) {
101 removed->push_back(it->second);
102 } else {
103 delete it->second.pdata;
105 messages_.erase(it);
108 it = next;
111 for (std::list<PendingSend*>::iterator it = pending_send_messages_.begin();
112 it != pending_send_messages_.end();) {
113 std::list<PendingSend*>::iterator next = it;
114 ++next;
116 if ((*it)->message.Match(handler, id)) {
117 if (removed) {
118 removed ->push_back((*it)->message);
119 } else {
120 delete (*it)->message.pdata;
122 (*it)->done_event.Signal();
123 pending_send_messages_.erase(it);
126 it = next;
130 void JingleThreadWrapper::Send(rtc::MessageHandler *handler, uint32 id,
131 rtc::MessageData *data) {
132 if (fStop_)
133 return;
135 JingleThreadWrapper* current_thread = JingleThreadWrapper::current();
136 DCHECK(current_thread != NULL) << "Send() can be called only from a "
137 "thread that has JingleThreadWrapper.";
139 rtc::Message message;
140 message.phandler = handler;
141 message.message_id = id;
142 message.pdata = data;
144 if (current_thread == this) {
145 handler->OnMessage(&message);
146 return;
149 // Send message from a thread different than |this|.
151 // Allow inter-thread send only from threads that have
152 // |send_allowed_| flag set.
153 DCHECK(current_thread->send_allowed_) << "Send()'ing synchronous "
154 "messages is not allowed from the current thread.";
156 PendingSend pending_send(message);
158 base::AutoLock auto_lock(lock_);
159 pending_send_messages_.push_back(&pending_send);
162 // Need to signal |pending_send_event_| here in case the thread is
163 // sending message to another thread.
164 pending_send_event_.Signal();
165 task_runner_->PostTask(FROM_HERE,
166 base::Bind(&JingleThreadWrapper::ProcessPendingSends,
167 weak_ptr_));
170 while (!pending_send.done_event.IsSignaled()) {
171 base::WaitableEvent* events[] = {&pending_send.done_event,
172 &current_thread->pending_send_event_};
173 size_t event = base::WaitableEvent::WaitMany(events, arraysize(events));
174 DCHECK(event == 0 || event == 1);
176 if (event == 1)
177 current_thread->ProcessPendingSends();
181 void JingleThreadWrapper::ProcessPendingSends() {
182 while (true) {
183 PendingSend* pending_send = NULL;
185 base::AutoLock auto_lock(lock_);
186 if (!pending_send_messages_.empty()) {
187 pending_send = pending_send_messages_.front();
188 pending_send_messages_.pop_front();
189 } else {
190 // Reset the event while |lock_| is still locked.
191 pending_send_event_.Reset();
192 break;
195 if (pending_send) {
196 pending_send->message.phandler->OnMessage(&pending_send->message);
197 pending_send->done_event.Signal();
202 void JingleThreadWrapper::PostTaskInternal(
203 int delay_ms, rtc::MessageHandler* handler,
204 uint32 message_id, rtc::MessageData* data) {
205 int task_id;
206 rtc::Message message;
207 message.phandler = handler;
208 message.message_id = message_id;
209 message.pdata = data;
211 base::AutoLock auto_lock(lock_);
212 task_id = ++last_task_id_;
213 messages_.insert(std::pair<int, rtc::Message>(task_id, message));
216 if (delay_ms <= 0) {
217 task_runner_->PostTask(FROM_HERE,
218 base::Bind(&JingleThreadWrapper::RunTask,
219 weak_ptr_, task_id));
220 } else {
221 task_runner_->PostDelayedTask(FROM_HERE,
222 base::Bind(&JingleThreadWrapper::RunTask,
223 weak_ptr_, task_id),
224 base::TimeDelta::FromMilliseconds(delay_ms));
228 void JingleThreadWrapper::RunTask(int task_id) {
229 bool have_message = false;
230 rtc::Message message;
232 base::AutoLock auto_lock(lock_);
233 MessagesQueue::iterator it = messages_.find(task_id);
234 if (it != messages_.end()) {
235 have_message = true;
236 message = it->second;
237 messages_.erase(it);
241 if (have_message) {
242 if (message.message_id == rtc::MQID_DISPOSE) {
243 DCHECK(message.phandler == NULL);
244 delete message.pdata;
245 } else {
246 message.phandler->OnMessage(&message);
251 // All methods below are marked as not reached. See comments in the
252 // header for more details.
253 void JingleThreadWrapper::Quit() {
254 NOTREACHED();
257 bool JingleThreadWrapper::IsQuitting() {
258 NOTREACHED();
259 return false;
262 void JingleThreadWrapper::Restart() {
263 NOTREACHED();
266 bool JingleThreadWrapper::Get(rtc::Message*, int, bool) {
267 NOTREACHED();
268 return false;
271 bool JingleThreadWrapper::Peek(rtc::Message*, int) {
272 NOTREACHED();
273 return false;
276 void JingleThreadWrapper::PostAt(uint32, rtc::MessageHandler*,
277 uint32, rtc::MessageData*) {
278 NOTREACHED();
281 void JingleThreadWrapper::Dispatch(rtc::Message* message) {
282 NOTREACHED();
285 void JingleThreadWrapper::ReceiveSends() {
286 NOTREACHED();
289 int JingleThreadWrapper::GetDelay() {
290 NOTREACHED();
291 return 0;
294 void JingleThreadWrapper::Stop() {
295 NOTREACHED();
298 void JingleThreadWrapper::Run() {
299 NOTREACHED();
302 } // namespace jingle_glue