Fix some interleaving issues in EmbeddedWorkerInstance.
[chromium-blink-merge.git] / third_party / libevent / evbuffer.c
blobf2179a5044f4c2c0d49047704c14c916e7e62d14
1 /*
2 * Copyright (c) 2002-2004 Niels Provos <provos@citi.umich.edu>
3 * All rights reserved.
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
7 * are met:
8 * 1. Redistributions of source code must retain the above copyright
9 * notice, this list of conditions and the following disclaimer.
10 * 2. Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * 3. The name of the author may not be used to endorse or promote products
14 * derived from this software without specific prior written permission.
16 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
17 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
18 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
19 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
20 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
21 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
22 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
23 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
24 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
25 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
28 #include <sys/types.h>
30 #ifdef HAVE_CONFIG_H
31 #include "config.h"
32 #endif
34 #ifdef HAVE_SYS_TIME_H
35 #include <sys/time.h>
36 #endif
38 #include <errno.h>
39 #include <stdio.h>
40 #include <stdlib.h>
41 #include <string.h>
42 #ifdef HAVE_STDARG_H
43 #include <stdarg.h>
44 #endif
46 #ifdef WIN32
47 #include <winsock2.h>
48 #endif
50 #include "evutil.h"
51 #include "event.h"
53 /* prototypes */
55 void bufferevent_read_pressure_cb(struct evbuffer *, size_t, size_t, void *);
57 static int
58 bufferevent_add(struct event *ev, int timeout)
60 struct timeval tv, *ptv = NULL;
62 if (timeout) {
63 evutil_timerclear(&tv);
64 tv.tv_sec = timeout;
65 ptv = &tv;
68 return (event_add(ev, ptv));
71 /*
72 * This callback is executed when the size of the input buffer changes.
73 * We use it to apply back pressure on the reading side.
76 void
77 bufferevent_read_pressure_cb(struct evbuffer *buf, size_t old, size_t now,
78 void *arg) {
79 struct bufferevent *bufev = arg;
80 /*
81 * If we are below the watermark then reschedule reading if it's
82 * still enabled.
84 if (bufev->wm_read.high == 0 || now < bufev->wm_read.high) {
85 evbuffer_setcb(buf, NULL, NULL);
87 if (bufev->enabled & EV_READ)
88 bufferevent_add(&bufev->ev_read, bufev->timeout_read);
92 static void
93 bufferevent_readcb(int fd, short event, void *arg)
95 struct bufferevent *bufev = arg;
96 int res = 0;
97 short what = EVBUFFER_READ;
98 size_t len;
99 int howmuch = -1;
101 if (event == EV_TIMEOUT) {
102 what |= EVBUFFER_TIMEOUT;
103 goto error;
107 * If we have a high watermark configured then we don't want to
108 * read more data than would make us reach the watermark.
110 if (bufev->wm_read.high != 0) {
111 howmuch = bufev->wm_read.high - EVBUFFER_LENGTH(bufev->input);
112 /* we might have lowered the watermark, stop reading */
113 if (howmuch <= 0) {
114 struct evbuffer *buf = bufev->input;
115 event_del(&bufev->ev_read);
116 evbuffer_setcb(buf,
117 bufferevent_read_pressure_cb, bufev);
118 return;
122 res = evbuffer_read(bufev->input, fd, howmuch);
123 if (res == -1) {
124 if (errno == EAGAIN || errno == EINTR)
125 goto reschedule;
126 /* error case */
127 what |= EVBUFFER_ERROR;
128 } else if (res == 0) {
129 /* eof case */
130 what |= EVBUFFER_EOF;
133 if (res <= 0)
134 goto error;
136 bufferevent_add(&bufev->ev_read, bufev->timeout_read);
138 /* See if this callbacks meets the water marks */
139 len = EVBUFFER_LENGTH(bufev->input);
140 if (bufev->wm_read.low != 0 && len < bufev->wm_read.low)
141 return;
142 if (bufev->wm_read.high != 0 && len >= bufev->wm_read.high) {
143 struct evbuffer *buf = bufev->input;
144 event_del(&bufev->ev_read);
146 /* Now schedule a callback for us when the buffer changes */
147 evbuffer_setcb(buf, bufferevent_read_pressure_cb, bufev);
150 /* Invoke the user callback - must always be called last */
151 if (bufev->readcb != NULL)
152 (*bufev->readcb)(bufev, bufev->cbarg);
153 return;
155 reschedule:
156 bufferevent_add(&bufev->ev_read, bufev->timeout_read);
157 return;
159 error:
160 (*bufev->errorcb)(bufev, what, bufev->cbarg);
163 static void
164 bufferevent_writecb(int fd, short event, void *arg)
166 struct bufferevent *bufev = arg;
167 int res = 0;
168 short what = EVBUFFER_WRITE;
170 if (event == EV_TIMEOUT) {
171 what |= EVBUFFER_TIMEOUT;
172 goto error;
175 if (EVBUFFER_LENGTH(bufev->output)) {
176 res = evbuffer_write(bufev->output, fd);
177 if (res == -1) {
178 #ifndef WIN32
179 /*todo. evbuffer uses WriteFile when WIN32 is set. WIN32 system calls do not
180 *set errno. thus this error checking is not portable*/
181 if (errno == EAGAIN ||
182 errno == EINTR ||
183 errno == EINPROGRESS)
184 goto reschedule;
185 /* error case */
186 what |= EVBUFFER_ERROR;
188 #else
189 goto reschedule;
190 #endif
192 } else if (res == 0) {
193 /* eof case */
194 what |= EVBUFFER_EOF;
196 if (res <= 0)
197 goto error;
200 if (EVBUFFER_LENGTH(bufev->output) != 0)
201 bufferevent_add(&bufev->ev_write, bufev->timeout_write);
204 * Invoke the user callback if our buffer is drained or below the
205 * low watermark.
207 if (bufev->writecb != NULL &&
208 EVBUFFER_LENGTH(bufev->output) <= bufev->wm_write.low)
209 (*bufev->writecb)(bufev, bufev->cbarg);
211 return;
213 reschedule:
214 if (EVBUFFER_LENGTH(bufev->output) != 0)
215 bufferevent_add(&bufev->ev_write, bufev->timeout_write);
216 return;
218 error:
219 (*bufev->errorcb)(bufev, what, bufev->cbarg);
223 * Create a new buffered event object.
225 * The read callback is invoked whenever we read new data.
226 * The write callback is invoked whenever the output buffer is drained.
227 * The error callback is invoked on a write/read error or on EOF.
229 * Both read and write callbacks maybe NULL. The error callback is not
230 * allowed to be NULL and have to be provided always.
233 struct bufferevent *
234 bufferevent_new(int fd, evbuffercb readcb, evbuffercb writecb,
235 everrorcb errorcb, void *cbarg)
237 struct bufferevent *bufev;
239 if ((bufev = calloc(1, sizeof(struct bufferevent))) == NULL)
240 return (NULL);
242 if ((bufev->input = evbuffer_new()) == NULL) {
243 free(bufev);
244 return (NULL);
247 if ((bufev->output = evbuffer_new()) == NULL) {
248 evbuffer_free(bufev->input);
249 free(bufev);
250 return (NULL);
253 event_set(&bufev->ev_read, fd, EV_READ, bufferevent_readcb, bufev);
254 event_set(&bufev->ev_write, fd, EV_WRITE, bufferevent_writecb, bufev);
256 bufferevent_setcb(bufev, readcb, writecb, errorcb, cbarg);
259 * Set to EV_WRITE so that using bufferevent_write is going to
260 * trigger a callback. Reading needs to be explicitly enabled
261 * because otherwise no data will be available.
263 bufev->enabled = EV_WRITE;
265 return (bufev);
268 void
269 bufferevent_setcb(struct bufferevent *bufev,
270 evbuffercb readcb, evbuffercb writecb, everrorcb errorcb, void *cbarg)
272 bufev->readcb = readcb;
273 bufev->writecb = writecb;
274 bufev->errorcb = errorcb;
276 bufev->cbarg = cbarg;
279 void
280 bufferevent_setfd(struct bufferevent *bufev, int fd)
282 event_del(&bufev->ev_read);
283 event_del(&bufev->ev_write);
285 event_set(&bufev->ev_read, fd, EV_READ, bufferevent_readcb, bufev);
286 event_set(&bufev->ev_write, fd, EV_WRITE, bufferevent_writecb, bufev);
287 if (bufev->ev_base != NULL) {
288 event_base_set(bufev->ev_base, &bufev->ev_read);
289 event_base_set(bufev->ev_base, &bufev->ev_write);
292 /* might have to manually trigger event registration */
296 bufferevent_priority_set(struct bufferevent *bufev, int priority)
298 if (event_priority_set(&bufev->ev_read, priority) == -1)
299 return (-1);
300 if (event_priority_set(&bufev->ev_write, priority) == -1)
301 return (-1);
303 return (0);
306 /* Closing the file descriptor is the responsibility of the caller */
308 void
309 bufferevent_free(struct bufferevent *bufev)
311 event_del(&bufev->ev_read);
312 event_del(&bufev->ev_write);
314 evbuffer_free(bufev->input);
315 evbuffer_free(bufev->output);
317 free(bufev);
321 * Returns 0 on success;
322 * -1 on failure.
326 bufferevent_write(struct bufferevent *bufev, const void *data, size_t size)
328 int res;
330 res = evbuffer_add(bufev->output, data, size);
332 if (res == -1)
333 return (res);
335 /* If everything is okay, we need to schedule a write */
336 if (size > 0 && (bufev->enabled & EV_WRITE))
337 bufferevent_add(&bufev->ev_write, bufev->timeout_write);
339 return (res);
343 bufferevent_write_buffer(struct bufferevent *bufev, struct evbuffer *buf)
345 int res;
347 res = bufferevent_write(bufev, buf->buffer, buf->off);
348 if (res != -1)
349 evbuffer_drain(buf, buf->off);
351 return (res);
354 size_t
355 bufferevent_read(struct bufferevent *bufev, void *data, size_t size)
357 struct evbuffer *buf = bufev->input;
359 if (buf->off < size)
360 size = buf->off;
362 /* Copy the available data to the user buffer */
363 memcpy(data, buf->buffer, size);
365 if (size)
366 evbuffer_drain(buf, size);
368 return (size);
372 bufferevent_enable(struct bufferevent *bufev, short event)
374 if (event & EV_READ) {
375 if (bufferevent_add(&bufev->ev_read, bufev->timeout_read) == -1)
376 return (-1);
378 if (event & EV_WRITE) {
379 if (bufferevent_add(&bufev->ev_write, bufev->timeout_write) == -1)
380 return (-1);
383 bufev->enabled |= event;
384 return (0);
388 bufferevent_disable(struct bufferevent *bufev, short event)
390 if (event & EV_READ) {
391 if (event_del(&bufev->ev_read) == -1)
392 return (-1);
394 if (event & EV_WRITE) {
395 if (event_del(&bufev->ev_write) == -1)
396 return (-1);
399 bufev->enabled &= ~event;
400 return (0);
404 * Sets the read and write timeout for a buffered event.
407 void
408 bufferevent_settimeout(struct bufferevent *bufev,
409 int timeout_read, int timeout_write) {
410 bufev->timeout_read = timeout_read;
411 bufev->timeout_write = timeout_write;
413 if (event_pending(&bufev->ev_read, EV_READ, NULL))
414 bufferevent_add(&bufev->ev_read, timeout_read);
415 if (event_pending(&bufev->ev_write, EV_WRITE, NULL))
416 bufferevent_add(&bufev->ev_write, timeout_write);
420 * Sets the water marks
423 void
424 bufferevent_setwatermark(struct bufferevent *bufev, short events,
425 size_t lowmark, size_t highmark)
427 if (events & EV_READ) {
428 bufev->wm_read.low = lowmark;
429 bufev->wm_read.high = highmark;
432 if (events & EV_WRITE) {
433 bufev->wm_write.low = lowmark;
434 bufev->wm_write.high = highmark;
437 /* If the watermarks changed then see if we should call read again */
438 bufferevent_read_pressure_cb(bufev->input,
439 0, EVBUFFER_LENGTH(bufev->input), bufev);
443 bufferevent_base_set(struct event_base *base, struct bufferevent *bufev)
445 int res;
447 bufev->ev_base = base;
449 res = event_base_set(base, &bufev->ev_read);
450 if (res == -1)
451 return (res);
453 res = event_base_set(base, &bufev->ev_write);
454 return (res);