PageLanguageDetectionTest has the failure rate of 5 - 6% on XP/Vista. Mark it
[chromium-blink-merge.git] / base / message_pump_libevent.cc
blobc2390b46c8b7cbed638a561136ba37032bf813e0
1 // Copyright (c) 2009 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "base/message_pump_libevent.h"
7 #include <errno.h>
8 #include <fcntl.h>
10 #include "base/auto_reset.h"
11 #include "base/eintr_wrapper.h"
12 #include "base/logging.h"
13 #include "base/observer_list.h"
14 #include "base/scoped_nsautorelease_pool.h"
15 #include "base/scoped_ptr.h"
16 #include "base/time.h"
17 #if defined(USE_SYSTEM_LIBEVENT)
18 #include <event.h>
19 #else
20 #include "third_party/libevent/event.h"
21 #endif
23 // Lifecycle of struct event
24 // Libevent uses two main data structures:
25 // struct event_base (of which there is one per message pump), and
26 // struct event (of which there is roughly one per socket).
27 // The socket's struct event is created in
28 // MessagePumpLibevent::WatchFileDescriptor(),
29 // is owned by the FileDescriptorWatcher, and is destroyed in
30 // StopWatchingFileDescriptor().
31 // It is moved into and out of lists in struct event_base by
32 // the libevent functions event_add() and event_del().
34 // TODO(dkegel):
35 // At the moment bad things happen if a FileDescriptorWatcher
36 // is active after its MessagePumpLibevent has been destroyed.
37 // See MessageLoopTest.FileDescriptorWatcherOutlivesMessageLoop
38 // Not clear yet whether that situation occurs in practice,
39 // but if it does, we need to fix it.
41 namespace base {
43 // Return 0 on success
44 // Too small a function to bother putting in a library?
45 static int SetNonBlocking(int fd) {
46 int flags = fcntl(fd, F_GETFL, 0);
47 if (flags == -1)
48 flags = 0;
49 return fcntl(fd, F_SETFL, flags | O_NONBLOCK);
52 MessagePumpLibevent::FileDescriptorWatcher::FileDescriptorWatcher()
53 : is_persistent_(false),
54 event_(NULL),
55 pump_(NULL),
56 watcher_(NULL) {
59 MessagePumpLibevent::FileDescriptorWatcher::~FileDescriptorWatcher() {
60 if (event_) {
61 StopWatchingFileDescriptor();
65 void MessagePumpLibevent::FileDescriptorWatcher::Init(event *e,
66 bool is_persistent) {
67 DCHECK(e);
68 DCHECK(event_ == NULL);
70 is_persistent_ = is_persistent;
71 event_ = e;
74 event *MessagePumpLibevent::FileDescriptorWatcher::ReleaseEvent() {
75 struct event *e = event_;
76 event_ = NULL;
77 return e;
80 bool MessagePumpLibevent::FileDescriptorWatcher::StopWatchingFileDescriptor() {
81 event* e = ReleaseEvent();
82 if (e == NULL)
83 return true;
85 // event_del() is a no-op if the event isn't active.
86 int rv = event_del(e);
87 delete e;
88 pump_ = NULL;
89 watcher_ = NULL;
90 return (rv == 0);
93 void MessagePumpLibevent::FileDescriptorWatcher::OnFileCanReadWithoutBlocking(
94 int fd, MessagePumpLibevent* pump) {
95 pump->WillProcessIOEvent();
96 watcher_->OnFileCanReadWithoutBlocking(fd);
97 pump->DidProcessIOEvent();
100 void MessagePumpLibevent::FileDescriptorWatcher::OnFileCanWriteWithoutBlocking(
101 int fd, MessagePumpLibevent* pump) {
102 pump->WillProcessIOEvent();
103 watcher_->OnFileCanWriteWithoutBlocking(fd);
104 pump->DidProcessIOEvent();
107 // Called if a byte is received on the wakeup pipe.
108 void MessagePumpLibevent::OnWakeup(int socket, short flags, void* context) {
109 base::MessagePumpLibevent* that =
110 static_cast<base::MessagePumpLibevent*>(context);
111 DCHECK(that->wakeup_pipe_out_ == socket);
113 // Remove and discard the wakeup byte.
114 char buf;
115 int nread = HANDLE_EINTR(read(socket, &buf, 1));
116 DCHECK_EQ(nread, 1);
117 // Tell libevent to break out of inner loop.
118 event_base_loopbreak(that->event_base_);
121 MessagePumpLibevent::MessagePumpLibevent()
122 : keep_running_(true),
123 in_run_(false),
124 event_base_(event_base_new()),
125 wakeup_pipe_in_(-1),
126 wakeup_pipe_out_(-1) {
127 if (!Init())
128 NOTREACHED();
131 bool MessagePumpLibevent::Init() {
132 int fds[2];
133 if (pipe(fds)) {
134 DLOG(ERROR) << "pipe() failed, errno: " << errno;
135 return false;
137 if (SetNonBlocking(fds[0])) {
138 DLOG(ERROR) << "SetNonBlocking for pipe fd[0] failed, errno: " << errno;
139 return false;
141 if (SetNonBlocking(fds[1])) {
142 DLOG(ERROR) << "SetNonBlocking for pipe fd[1] failed, errno: " << errno;
143 return false;
145 wakeup_pipe_out_ = fds[0];
146 wakeup_pipe_in_ = fds[1];
148 wakeup_event_ = new event;
149 event_set(wakeup_event_, wakeup_pipe_out_, EV_READ | EV_PERSIST,
150 OnWakeup, this);
151 event_base_set(event_base_, wakeup_event_);
153 if (event_add(wakeup_event_, 0))
154 return false;
155 return true;
158 MessagePumpLibevent::~MessagePumpLibevent() {
159 DCHECK(wakeup_event_);
160 DCHECK(event_base_);
161 event_del(wakeup_event_);
162 delete wakeup_event_;
163 if (wakeup_pipe_in_ >= 0)
164 HANDLE_EINTR(close(wakeup_pipe_in_));
165 if (wakeup_pipe_out_ >= 0)
166 HANDLE_EINTR(close(wakeup_pipe_out_));
167 event_base_free(event_base_);
170 bool MessagePumpLibevent::WatchFileDescriptor(int fd,
171 bool persistent,
172 Mode mode,
173 FileDescriptorWatcher *controller,
174 Watcher *delegate) {
175 DCHECK_GE(fd, 0);
176 DCHECK(controller);
177 DCHECK(delegate);
178 DCHECK(mode == WATCH_READ || mode == WATCH_WRITE || mode == WATCH_READ_WRITE);
180 int event_mask = persistent ? EV_PERSIST : 0;
181 if ((mode & WATCH_READ) != 0) {
182 event_mask |= EV_READ;
184 if ((mode & WATCH_WRITE) != 0) {
185 event_mask |= EV_WRITE;
188 scoped_ptr<event> evt(controller->ReleaseEvent());
189 if (evt.get() == NULL) {
190 // Ownership is transferred to the controller.
191 evt.reset(new event);
192 } else {
193 // Make sure we don't pick up any funky internal libevent masks.
194 int old_interest_mask = evt.get()->ev_events &
195 (EV_READ | EV_WRITE | EV_PERSIST);
197 // Combine old/new event masks.
198 event_mask |= old_interest_mask;
200 // Must disarm the event before we can reuse it.
201 event_del(evt.get());
203 // It's illegal to use this function to listen on 2 separate fds with the
204 // same |controller|.
205 if (EVENT_FD(evt.get()) != fd) {
206 NOTREACHED() << "FDs don't match" << EVENT_FD(evt.get()) << "!=" << fd;
207 return false;
211 // Set current interest mask and message pump for this event.
212 event_set(evt.get(), fd, event_mask, OnLibeventNotification, controller);
214 // Tell libevent which message pump this socket will belong to when we add it.
215 if (event_base_set(event_base_, evt.get()) != 0) {
216 return false;
219 // Add this socket to the list of monitored sockets.
220 if (event_add(evt.get(), NULL) != 0) {
221 return false;
224 // Transfer ownership of evt to controller.
225 controller->Init(evt.release(), persistent);
227 controller->set_watcher(delegate);
228 controller->set_pump(this);
230 return true;
233 void MessagePumpLibevent::OnLibeventNotification(int fd, short flags,
234 void* context) {
235 FileDescriptorWatcher* controller =
236 static_cast<FileDescriptorWatcher*>(context);
238 MessagePumpLibevent* pump = controller->pump();
240 if (flags & EV_WRITE) {
241 controller->OnFileCanWriteWithoutBlocking(fd, pump);
243 if (flags & EV_READ) {
244 controller->OnFileCanReadWithoutBlocking(fd, pump);
248 // Tell libevent to break out of inner loop.
249 static void timer_callback(int fd, short events, void *context)
251 event_base_loopbreak((struct event_base *)context);
254 // Reentrant!
255 void MessagePumpLibevent::Run(Delegate* delegate) {
256 DCHECK(keep_running_) << "Quit must have been called outside of Run!";
257 AutoReset<bool> auto_reset_in_run(&in_run_, true);
259 // event_base_loopexit() + EVLOOP_ONCE is leaky, see http://crbug.com/25641.
260 // Instead, make our own timer and reuse it on each call to event_base_loop().
261 scoped_ptr<event> timer_event(new event);
263 for (;;) {
264 ScopedNSAutoreleasePool autorelease_pool;
266 bool did_work = delegate->DoWork();
267 if (!keep_running_)
268 break;
270 did_work |= delegate->DoDelayedWork(&delayed_work_time_);
271 if (!keep_running_)
272 break;
274 if (did_work)
275 continue;
277 did_work = delegate->DoIdleWork();
278 if (!keep_running_)
279 break;
281 if (did_work)
282 continue;
284 // EVLOOP_ONCE tells libevent to only block once,
285 // but to service all pending events when it wakes up.
286 if (delayed_work_time_.is_null()) {
287 event_base_loop(event_base_, EVLOOP_ONCE);
288 } else {
289 TimeDelta delay = delayed_work_time_ - Time::Now();
290 if (delay > TimeDelta()) {
291 struct timeval poll_tv;
292 poll_tv.tv_sec = delay.InSeconds();
293 poll_tv.tv_usec = delay.InMicroseconds() % Time::kMicrosecondsPerSecond;
294 event_set(timer_event.get(), -1, 0, timer_callback, event_base_);
295 event_base_set(event_base_, timer_event.get());
296 event_add(timer_event.get(), &poll_tv);
297 event_base_loop(event_base_, EVLOOP_ONCE);
298 event_del(timer_event.get());
299 } else {
300 // It looks like delayed_work_time_ indicates a time in the past, so we
301 // need to call DoDelayedWork now.
302 delayed_work_time_ = Time();
307 keep_running_ = true;
310 void MessagePumpLibevent::Quit() {
311 DCHECK(in_run_);
312 // Tell both libevent and Run that they should break out of their loops.
313 keep_running_ = false;
314 ScheduleWork();
317 void MessagePumpLibevent::ScheduleWork() {
318 // Tell libevent (in a threadsafe way) that it should break out of its loop.
319 char buf = 0;
320 int nwrite = HANDLE_EINTR(write(wakeup_pipe_in_, &buf, 1));
321 DCHECK(nwrite == 1 || errno == EAGAIN)
322 << "[nwrite:" << nwrite << "] [errno:" << errno << "]";
325 void MessagePumpLibevent::ScheduleDelayedWork(const Time& delayed_work_time) {
326 // We know that we can't be blocked on Wait right now since this method can
327 // only be called on the same thread as Run, so we only need to update our
328 // record of how long to sleep when we do sleep.
329 delayed_work_time_ = delayed_work_time;
332 void MessagePumpLibevent::AddIOObserver(IOObserver *obs) {
333 io_observers_.AddObserver(obs);
336 void MessagePumpLibevent::RemoveIOObserver(IOObserver *obs) {
337 io_observers_.RemoveObserver(obs);
340 void MessagePumpLibevent::WillProcessIOEvent() {
341 FOR_EACH_OBSERVER(IOObserver, io_observers_, WillProcessIOEvent());
344 void MessagePumpLibevent::DidProcessIOEvent() {
345 FOR_EACH_OBSERVER(IOObserver, io_observers_, DidProcessIOEvent());
348 } // namespace base