Enable large file support
[chromium-blink-merge.git] / jingle / glue / thread_wrapper.cc
blobf74fbb378f4a7baa68349a180556d6501e0a67f9
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "jingle/glue/thread_wrapper.h"
7 #include "base/bind.h"
8 #include "base/bind_helpers.h"
9 #include "base/lazy_instance.h"
10 #include "base/threading/thread_local.h"
11 #include "third_party/webrtc/base/nullsocketserver.h"
13 namespace jingle_glue {
15 struct JingleThreadWrapper::PendingSend {
16 PendingSend(const rtc::Message& message_value)
17 : sending_thread(JingleThreadWrapper::current()),
18 message(message_value),
19 done_event(true, false) {
20 DCHECK(sending_thread);
23 JingleThreadWrapper* sending_thread;
24 rtc::Message message;
25 base::WaitableEvent done_event;
28 base::LazyInstance<base::ThreadLocalPointer<JingleThreadWrapper> >
29 g_jingle_thread_wrapper = LAZY_INSTANCE_INITIALIZER;
31 // static
32 void JingleThreadWrapper::EnsureForCurrentMessageLoop() {
33 if (JingleThreadWrapper::current() == nullptr) {
34 base::MessageLoop* message_loop = base::MessageLoop::current();
35 scoped_ptr<JingleThreadWrapper> wrapper =
36 JingleThreadWrapper::WrapTaskRunner(message_loop->task_runner());
37 message_loop->AddDestructionObserver(wrapper.release());
40 DCHECK_EQ(rtc::Thread::Current(), current());
43 scoped_ptr<JingleThreadWrapper> JingleThreadWrapper::WrapTaskRunner(
44 scoped_refptr<base::SingleThreadTaskRunner> task_runner) {
45 DCHECK(!JingleThreadWrapper::current());
46 DCHECK(task_runner->BelongsToCurrentThread());
48 scoped_ptr<JingleThreadWrapper> result(new JingleThreadWrapper(task_runner));
49 g_jingle_thread_wrapper.Get().Set(result.get());
50 return result.Pass();
53 // static
54 JingleThreadWrapper* JingleThreadWrapper::current() {
55 return g_jingle_thread_wrapper.Get().Get();
58 JingleThreadWrapper::JingleThreadWrapper(
59 scoped_refptr<base::SingleThreadTaskRunner> task_runner)
60 : task_runner_(task_runner),
61 send_allowed_(false),
62 last_task_id_(0),
63 pending_send_event_(true, false),
64 weak_ptr_factory_(this) {
65 DCHECK(task_runner->BelongsToCurrentThread());
66 DCHECK(!rtc::Thread::Current());
67 weak_ptr_ = weak_ptr_factory_.GetWeakPtr();
68 rtc::MessageQueueManager::Add(this);
69 SafeWrapCurrent();
72 JingleThreadWrapper::~JingleThreadWrapper() {
73 DCHECK_EQ(this, JingleThreadWrapper::current());
74 DCHECK_EQ(this, rtc::Thread::Current());
76 UnwrapCurrent();
77 rtc::ThreadManager::Instance()->SetCurrentThread(nullptr);
78 rtc::MessageQueueManager::Remove(this);
79 g_jingle_thread_wrapper.Get().Set(nullptr);
81 Clear(nullptr, rtc::MQID_ANY, nullptr);
84 void JingleThreadWrapper::WillDestroyCurrentMessageLoop() {
85 delete this;
88 void JingleThreadWrapper::Post(
89 rtc::MessageHandler* handler, uint32 message_id,
90 rtc::MessageData* data, bool time_sensitive) {
91 PostTaskInternal(0, handler, message_id, data);
94 void JingleThreadWrapper::PostDelayed(
95 int delay_ms, rtc::MessageHandler* handler,
96 uint32 message_id, rtc::MessageData* data) {
97 PostTaskInternal(delay_ms, handler, message_id, data);
100 void JingleThreadWrapper::Clear(rtc::MessageHandler* handler, uint32 id,
101 rtc::MessageList* removed) {
102 base::AutoLock auto_lock(lock_);
104 for (MessagesQueue::iterator it = messages_.begin();
105 it != messages_.end();) {
106 MessagesQueue::iterator next = it;
107 ++next;
109 if (it->second.Match(handler, id)) {
110 if (removed) {
111 removed->push_back(it->second);
112 } else {
113 delete it->second.pdata;
115 messages_.erase(it);
118 it = next;
121 for (std::list<PendingSend*>::iterator it = pending_send_messages_.begin();
122 it != pending_send_messages_.end();) {
123 std::list<PendingSend*>::iterator next = it;
124 ++next;
126 if ((*it)->message.Match(handler, id)) {
127 if (removed) {
128 removed ->push_back((*it)->message);
129 } else {
130 delete (*it)->message.pdata;
132 (*it)->done_event.Signal();
133 pending_send_messages_.erase(it);
136 it = next;
140 void JingleThreadWrapper::Send(rtc::MessageHandler *handler, uint32 id,
141 rtc::MessageData *data) {
142 if (fStop_)
143 return;
145 JingleThreadWrapper* current_thread = JingleThreadWrapper::current();
146 DCHECK(current_thread != nullptr) << "Send() can be called only from a "
147 "thread that has JingleThreadWrapper.";
149 rtc::Message message;
150 message.phandler = handler;
151 message.message_id = id;
152 message.pdata = data;
154 if (current_thread == this) {
155 handler->OnMessage(&message);
156 return;
159 // Send message from a thread different than |this|.
161 // Allow inter-thread send only from threads that have
162 // |send_allowed_| flag set.
163 DCHECK(current_thread->send_allowed_) << "Send()'ing synchronous "
164 "messages is not allowed from the current thread.";
166 PendingSend pending_send(message);
168 base::AutoLock auto_lock(lock_);
169 pending_send_messages_.push_back(&pending_send);
172 // Need to signal |pending_send_event_| here in case the thread is
173 // sending message to another thread.
174 pending_send_event_.Signal();
175 task_runner_->PostTask(FROM_HERE,
176 base::Bind(&JingleThreadWrapper::ProcessPendingSends,
177 weak_ptr_));
180 while (!pending_send.done_event.IsSignaled()) {
181 base::WaitableEvent* events[] = {&pending_send.done_event,
182 &current_thread->pending_send_event_};
183 size_t event = base::WaitableEvent::WaitMany(events, arraysize(events));
184 DCHECK(event == 0 || event == 1);
186 if (event == 1)
187 current_thread->ProcessPendingSends();
191 void JingleThreadWrapper::ProcessPendingSends() {
192 while (true) {
193 PendingSend* pending_send = nullptr;
195 base::AutoLock auto_lock(lock_);
196 if (!pending_send_messages_.empty()) {
197 pending_send = pending_send_messages_.front();
198 pending_send_messages_.pop_front();
199 } else {
200 // Reset the event while |lock_| is still locked.
201 pending_send_event_.Reset();
202 break;
205 if (pending_send) {
206 pending_send->message.phandler->OnMessage(&pending_send->message);
207 pending_send->done_event.Signal();
212 void JingleThreadWrapper::PostTaskInternal(
213 int delay_ms, rtc::MessageHandler* handler,
214 uint32 message_id, rtc::MessageData* data) {
215 int task_id;
216 rtc::Message message;
217 message.phandler = handler;
218 message.message_id = message_id;
219 message.pdata = data;
221 base::AutoLock auto_lock(lock_);
222 task_id = ++last_task_id_;
223 messages_.insert(std::pair<int, rtc::Message>(task_id, message));
226 if (delay_ms <= 0) {
227 task_runner_->PostTask(FROM_HERE,
228 base::Bind(&JingleThreadWrapper::RunTask,
229 weak_ptr_, task_id));
230 } else {
231 task_runner_->PostDelayedTask(FROM_HERE,
232 base::Bind(&JingleThreadWrapper::RunTask,
233 weak_ptr_, task_id),
234 base::TimeDelta::FromMilliseconds(delay_ms));
238 void JingleThreadWrapper::RunTask(int task_id) {
239 bool have_message = false;
240 rtc::Message message;
242 base::AutoLock auto_lock(lock_);
243 MessagesQueue::iterator it = messages_.find(task_id);
244 if (it != messages_.end()) {
245 have_message = true;
246 message = it->second;
247 messages_.erase(it);
251 if (have_message) {
252 if (message.message_id == rtc::MQID_DISPOSE) {
253 DCHECK(message.phandler == nullptr);
254 delete message.pdata;
255 } else {
256 message.phandler->OnMessage(&message);
261 // All methods below are marked as not reached. See comments in the
262 // header for more details.
263 void JingleThreadWrapper::Quit() {
264 NOTREACHED();
267 bool JingleThreadWrapper::IsQuitting() {
268 NOTREACHED();
269 return false;
272 void JingleThreadWrapper::Restart() {
273 NOTREACHED();
276 bool JingleThreadWrapper::Get(rtc::Message*, int, bool) {
277 NOTREACHED();
278 return false;
281 bool JingleThreadWrapper::Peek(rtc::Message*, int) {
282 NOTREACHED();
283 return false;
286 void JingleThreadWrapper::PostAt(uint32, rtc::MessageHandler*,
287 uint32, rtc::MessageData*) {
288 NOTREACHED();
291 void JingleThreadWrapper::Dispatch(rtc::Message* message) {
292 NOTREACHED();
295 void JingleThreadWrapper::ReceiveSends() {
296 NOTREACHED();
299 int JingleThreadWrapper::GetDelay() {
300 NOTREACHED();
301 return 0;
304 void JingleThreadWrapper::Stop() {
305 NOTREACHED();
308 void JingleThreadWrapper::Run() {
309 NOTREACHED();
312 } // namespace jingle_glue