3 * Copyright (c) 2002-2004 Niels Provos <provos@citi.umich.edu>
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions
9 * 1. Redistributions of source code must retain the above copyright
10 * notice, this list of conditions and the following disclaimer.
11 * 2. Redistributions in binary form must reproduce the above copyright
12 * notice, this list of conditions and the following disclaimer in the
13 * documentation and/or other materials provided with the distribution.
14 * 3. The name of the author may not be used to endorse or promote products
15 * derived from this software without specific prior written permission.
17 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
18 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
19 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
20 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
21 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
22 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
23 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
24 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
25 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
26 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29 #include <sys/types.h>
35 #ifdef HAVE_SYS_TIME_H
56 void bufferevent_read_pressure_cb(struct evbuffer
*, size_t, size_t, void *);
59 bufferevent_add(struct event
*ev
, int timeout
)
61 struct timeval tv
, *ptv
= NULL
;
64 evutil_timerclear(&tv
);
69 return (event_add(ev
, ptv
));
73 * This callback is executed when the size of the input buffer changes.
74 * We use it to apply back pressure on the reading side.
78 bufferevent_read_pressure_cb(struct evbuffer
*buf
, size_t old
, size_t now
,
80 struct bufferevent
*bufev
= arg
;
82 * If we are below the watermark then reschedule reading if it's
85 if (bufev
->wm_read
.high
== 0 || now
< bufev
->wm_read
.high
) {
86 evbuffer_setcb(buf
, NULL
, NULL
);
88 if (bufev
->enabled
& EV_READ
)
89 bufferevent_add(&bufev
->ev_read
, bufev
->timeout_read
);
94 bufferevent_readcb(int fd
, short event
, void *arg
)
96 struct bufferevent
*bufev
= arg
;
98 short what
= EVBUFFER_READ
;
102 if (event
== EV_TIMEOUT
) {
103 what
|= EVBUFFER_TIMEOUT
;
108 * If we have a high watermark configured then we don't want to
109 * read more data than would make us reach the watermark.
111 if (bufev
->wm_read
.high
!= 0) {
112 howmuch
= bufev
->wm_read
.high
- EVBUFFER_LENGTH(bufev
->input
);
113 /* we might have lowered the watermark, stop reading */
115 struct evbuffer
*buf
= bufev
->input
;
116 event_del(&bufev
->ev_read
);
118 bufferevent_read_pressure_cb
, bufev
);
123 res
= evbuffer_read(bufev
->input
, fd
, howmuch
);
125 if (errno
== EAGAIN
|| errno
== EINTR
)
128 what
|= EVBUFFER_ERROR
;
129 } else if (res
== 0) {
131 what
|= EVBUFFER_EOF
;
137 bufferevent_add(&bufev
->ev_read
, bufev
->timeout_read
);
139 /* See if this callbacks meets the water marks */
140 len
= EVBUFFER_LENGTH(bufev
->input
);
141 if (bufev
->wm_read
.low
!= 0 && len
< bufev
->wm_read
.low
)
143 if (bufev
->wm_read
.high
!= 0 && len
>= bufev
->wm_read
.high
) {
144 struct evbuffer
*buf
= bufev
->input
;
145 event_del(&bufev
->ev_read
);
147 /* Now schedule a callback for us when the buffer changes */
148 evbuffer_setcb(buf
, bufferevent_read_pressure_cb
, bufev
);
151 /* Invoke the user callback - must always be called last */
152 if (bufev
->readcb
!= NULL
)
153 (*bufev
->readcb
)(bufev
, bufev
->cbarg
);
157 bufferevent_add(&bufev
->ev_read
, bufev
->timeout_read
);
161 (*bufev
->errorcb
)(bufev
, what
, bufev
->cbarg
);
165 bufferevent_writecb(int fd
, short event
, void *arg
)
167 struct bufferevent
*bufev
= arg
;
169 short what
= EVBUFFER_WRITE
;
171 if (event
== EV_TIMEOUT
) {
172 what
|= EVBUFFER_TIMEOUT
;
176 if (EVBUFFER_LENGTH(bufev
->output
)) {
177 res
= evbuffer_write(bufev
->output
, fd
);
180 /*todo. evbuffer uses WriteFile when WIN32 is set. WIN32 system calls do not
181 *set errno. thus this error checking is not portable*/
182 if (errno
== EAGAIN
||
184 errno
== EINPROGRESS
)
187 what
|= EVBUFFER_ERROR
;
193 } else if (res
== 0) {
195 what
|= EVBUFFER_EOF
;
201 if (EVBUFFER_LENGTH(bufev
->output
) != 0)
202 bufferevent_add(&bufev
->ev_write
, bufev
->timeout_write
);
205 * Invoke the user callback if our buffer is drained or below the
208 if (bufev
->writecb
!= NULL
&&
209 EVBUFFER_LENGTH(bufev
->output
) <= bufev
->wm_write
.low
)
210 (*bufev
->writecb
)(bufev
, bufev
->cbarg
);
215 if (EVBUFFER_LENGTH(bufev
->output
) != 0)
216 bufferevent_add(&bufev
->ev_write
, bufev
->timeout_write
);
220 (*bufev
->errorcb
)(bufev
, what
, bufev
->cbarg
);
224 * Create a new buffered event object.
226 * The read callback is invoked whenever we read new data.
227 * The write callback is invoked whenever the output buffer is drained.
228 * The error callback is invoked on a write/read error or on EOF.
230 * Both read and write callbacks maybe NULL. The error callback is not
231 * allowed to be NULL and have to be provided always.
235 bufferevent_new(int fd
, evbuffercb readcb
, evbuffercb writecb
,
236 everrorcb errorcb
, void *cbarg
)
238 struct bufferevent
*bufev
;
240 if ((bufev
= calloc(1, sizeof(struct bufferevent
))) == NULL
)
243 if ((bufev
->input
= evbuffer_new()) == NULL
) {
248 if ((bufev
->output
= evbuffer_new()) == NULL
) {
249 evbuffer_free(bufev
->input
);
254 event_set(&bufev
->ev_read
, fd
, EV_READ
, bufferevent_readcb
, bufev
);
255 event_set(&bufev
->ev_write
, fd
, EV_WRITE
, bufferevent_writecb
, bufev
);
257 bufferevent_setcb(bufev
, readcb
, writecb
, errorcb
, cbarg
);
260 * Set to EV_WRITE so that using bufferevent_write is going to
261 * trigger a callback. Reading needs to be explicitly enabled
262 * because otherwise no data will be available.
264 bufev
->enabled
= EV_WRITE
;
270 bufferevent_setcb(struct bufferevent
*bufev
,
271 evbuffercb readcb
, evbuffercb writecb
, everrorcb errorcb
, void *cbarg
)
273 bufev
->readcb
= readcb
;
274 bufev
->writecb
= writecb
;
275 bufev
->errorcb
= errorcb
;
277 bufev
->cbarg
= cbarg
;
281 bufferevent_setfd(struct bufferevent
*bufev
, int fd
)
283 event_del(&bufev
->ev_read
);
284 event_del(&bufev
->ev_write
);
286 event_set(&bufev
->ev_read
, fd
, EV_READ
, bufferevent_readcb
, bufev
);
287 event_set(&bufev
->ev_write
, fd
, EV_WRITE
, bufferevent_writecb
, bufev
);
288 if (bufev
->ev_base
!= NULL
) {
289 event_base_set(bufev
->ev_base
, &bufev
->ev_read
);
290 event_base_set(bufev
->ev_base
, &bufev
->ev_write
);
293 /* might have to manually trigger event registration */
297 bufferevent_priority_set(struct bufferevent
*bufev
, int priority
)
299 if (event_priority_set(&bufev
->ev_read
, priority
) == -1)
301 if (event_priority_set(&bufev
->ev_write
, priority
) == -1)
307 /* Closing the file descriptor is the responsibility of the caller */
310 bufferevent_free(struct bufferevent
*bufev
)
312 event_del(&bufev
->ev_read
);
313 event_del(&bufev
->ev_write
);
315 evbuffer_free(bufev
->input
);
316 evbuffer_free(bufev
->output
);
322 * Returns 0 on success;
327 bufferevent_write(struct bufferevent
*bufev
, const void *data
, size_t size
)
331 res
= evbuffer_add(bufev
->output
, data
, size
);
336 /* If everything is okay, we need to schedule a write */
337 if (size
> 0 && (bufev
->enabled
& EV_WRITE
))
338 bufferevent_add(&bufev
->ev_write
, bufev
->timeout_write
);
344 bufferevent_write_buffer(struct bufferevent
*bufev
, struct evbuffer
*buf
)
348 res
= bufferevent_write(bufev
, buf
->buffer
, buf
->off
);
350 evbuffer_drain(buf
, buf
->off
);
356 bufferevent_read(struct bufferevent
*bufev
, void *data
, size_t size
)
358 struct evbuffer
*buf
= bufev
->input
;
363 /* Copy the available data to the user buffer */
364 memcpy(data
, buf
->buffer
, size
);
367 evbuffer_drain(buf
, size
);
373 bufferevent_enable(struct bufferevent
*bufev
, short event
)
375 if (event
& EV_READ
) {
376 if (bufferevent_add(&bufev
->ev_read
, bufev
->timeout_read
) == -1)
379 if (event
& EV_WRITE
) {
380 if (bufferevent_add(&bufev
->ev_write
, bufev
->timeout_write
) == -1)
384 bufev
->enabled
|= event
;
389 bufferevent_disable(struct bufferevent
*bufev
, short event
)
391 if (event
& EV_READ
) {
392 if (event_del(&bufev
->ev_read
) == -1)
395 if (event
& EV_WRITE
) {
396 if (event_del(&bufev
->ev_write
) == -1)
400 bufev
->enabled
&= ~event
;
405 * Sets the read and write timeout for a buffered event.
409 bufferevent_settimeout(struct bufferevent
*bufev
,
410 int timeout_read
, int timeout_write
) {
411 bufev
->timeout_read
= timeout_read
;
412 bufev
->timeout_write
= timeout_write
;
414 if (event_pending(&bufev
->ev_read
, EV_READ
, NULL
))
415 bufferevent_add(&bufev
->ev_read
, timeout_read
);
416 if (event_pending(&bufev
->ev_write
, EV_WRITE
, NULL
))
417 bufferevent_add(&bufev
->ev_write
, timeout_write
);
421 * Sets the water marks
425 bufferevent_setwatermark(struct bufferevent
*bufev
, short events
,
426 size_t lowmark
, size_t highmark
)
428 if (events
& EV_READ
) {
429 bufev
->wm_read
.low
= lowmark
;
430 bufev
->wm_read
.high
= highmark
;
433 if (events
& EV_WRITE
) {
434 bufev
->wm_write
.low
= lowmark
;
435 bufev
->wm_write
.high
= highmark
;
438 /* If the watermarks changed then see if we should call read again */
439 bufferevent_read_pressure_cb(bufev
->input
,
440 0, EVBUFFER_LENGTH(bufev
->input
), bufev
);
444 bufferevent_base_set(struct event_base
*base
, struct bufferevent
*bufev
)
448 bufev
->ev_base
= base
;
450 res
= event_base_set(base
, &bufev
->ev_read
);
454 res
= event_base_set(base
, &bufev
->ev_write
);