2 * Copyright (c) 2009-2012 Niels Provos, Nick Mathewson
4 * Redistribution and use in source and binary forms, with or without
5 * modification, are permitted provided that the following conditions
7 * 1. Redistributions of source code must retain the above copyright
8 * notice, this list of conditions and the following disclaimer.
9 * 2. Redistributions in binary form must reproduce the above copyright
10 * notice, this list of conditions and the following disclaimer in the
11 * documentation and/or other materials provided with the distribution.
12 * 3. The name of the author may not be used to endorse or promote products
13 * derived from this software without specific prior written permission.
15 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
16 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
17 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
18 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
19 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
20 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
21 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
22 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
23 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
24 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
27 #include <sys/types.h>
33 #include "event2/event-config.h"
35 #include "event2/util.h"
36 #include "event2/buffer.h"
37 #include "event2/bufferevent.h"
38 #include "event2/bufferevent_struct.h"
39 #include "event2/event.h"
40 #include "defer-internal.h"
41 #include "bufferevent-internal.h"
42 #include "mm-internal.h"
43 #include "util-internal.h"
45 struct bufferevent_pair
{
46 struct bufferevent_private bev
;
47 struct bufferevent_pair
*partner
;
51 /* Given a bufferevent that's really a bev part of a bufferevent_pair,
52 * return that bufferevent_filtered. Returns NULL otherwise.*/
53 static inline struct bufferevent_pair
*
54 upcast(struct bufferevent
*bev
)
56 struct bufferevent_pair
*bev_p
;
57 if (bev
->be_ops
!= &bufferevent_ops_pair
)
59 bev_p
= EVUTIL_UPCAST(bev
, struct bufferevent_pair
, bev
.bev
);
60 EVUTIL_ASSERT(bev_p
->bev
.bev
.be_ops
== &bufferevent_ops_pair
);
64 #define downcast(bev_pair) (&(bev_pair)->bev.bev)
67 incref_and_lock(struct bufferevent
*b
)
69 struct bufferevent_pair
*bevp
;
70 _bufferevent_incref_and_lock(b
);
73 _bufferevent_incref_and_lock(downcast(bevp
->partner
));
77 decref_and_unlock(struct bufferevent
*b
)
79 struct bufferevent_pair
*bevp
= upcast(b
);
81 _bufferevent_decref_and_unlock(downcast(bevp
->partner
));
82 _bufferevent_decref_and_unlock(b
);
85 /* XXX Handle close */
87 static void be_pair_outbuf_cb(struct evbuffer
*,
88 const struct evbuffer_cb_info
*, void *);
90 static struct bufferevent_pair
*
91 bufferevent_pair_elt_new(struct event_base
*base
,
94 struct bufferevent_pair
*bufev
;
95 if (! (bufev
= mm_calloc(1, sizeof(struct bufferevent_pair
))))
97 if (bufferevent_init_common(&bufev
->bev
, base
, &bufferevent_ops_pair
,
102 if (!evbuffer_add_cb(bufev
->bev
.bev
.output
, be_pair_outbuf_cb
, bufev
)) {
103 bufferevent_free(downcast(bufev
));
107 _bufferevent_init_generic_timeout_cbs(&bufev
->bev
.bev
);
113 bufferevent_pair_new(struct event_base
*base
, int options
,
114 struct bufferevent
*pair
[2])
116 struct bufferevent_pair
*bufev1
= NULL
, *bufev2
= NULL
;
119 options
|= BEV_OPT_DEFER_CALLBACKS
;
120 tmp_options
= options
& ~BEV_OPT_THREADSAFE
;
122 bufev1
= bufferevent_pair_elt_new(base
, options
);
125 bufev2
= bufferevent_pair_elt_new(base
, tmp_options
);
127 bufferevent_free(downcast(bufev1
));
131 if (options
& BEV_OPT_THREADSAFE
) {
132 /*XXXX check return */
133 bufferevent_enable_locking(downcast(bufev2
), bufev1
->bev
.lock
);
136 bufev1
->partner
= bufev2
;
137 bufev2
->partner
= bufev1
;
139 evbuffer_freeze(downcast(bufev1
)->input
, 0);
140 evbuffer_freeze(downcast(bufev1
)->output
, 1);
141 evbuffer_freeze(downcast(bufev2
)->input
, 0);
142 evbuffer_freeze(downcast(bufev2
)->output
, 1);
144 pair
[0] = downcast(bufev1
);
145 pair
[1] = downcast(bufev2
);
151 be_pair_transfer(struct bufferevent
*src
, struct bufferevent
*dst
,
154 size_t src_size
, dst_size
;
157 evbuffer_unfreeze(src
->output
, 1);
158 evbuffer_unfreeze(dst
->input
, 0);
160 if (dst
->wm_read
.high
) {
161 dst_size
= evbuffer_get_length(dst
->input
);
162 if (dst_size
< dst
->wm_read
.high
) {
163 n
= dst
->wm_read
.high
- dst_size
;
164 evbuffer_remove_buffer(src
->output
, dst
->input
, n
);
168 n
= evbuffer_get_length(src
->output
);
169 evbuffer_add_buffer(dst
->input
, src
->output
);
172 n
= evbuffer_get_length(src
->output
);
173 evbuffer_add_buffer(dst
->input
, src
->output
);
177 BEV_RESET_GENERIC_READ_TIMEOUT(dst
);
179 if (evbuffer_get_length(dst
->output
))
180 BEV_RESET_GENERIC_WRITE_TIMEOUT(dst
);
182 BEV_DEL_GENERIC_WRITE_TIMEOUT(dst
);
185 src_size
= evbuffer_get_length(src
->output
);
186 dst_size
= evbuffer_get_length(dst
->input
);
188 if (dst_size
>= dst
->wm_read
.low
) {
189 _bufferevent_run_readcb(dst
);
191 if (src_size
<= src
->wm_write
.low
) {
192 _bufferevent_run_writecb(src
);
195 evbuffer_freeze(src
->output
, 1);
196 evbuffer_freeze(dst
->input
, 0);
200 be_pair_wants_to_talk(struct bufferevent_pair
*src
,
201 struct bufferevent_pair
*dst
)
203 return (downcast(src
)->enabled
& EV_WRITE
) &&
204 (downcast(dst
)->enabled
& EV_READ
) &&
205 !dst
->bev
.read_suspended
&&
206 evbuffer_get_length(downcast(src
)->output
);
210 be_pair_outbuf_cb(struct evbuffer
*outbuf
,
211 const struct evbuffer_cb_info
*info
, void *arg
)
213 struct bufferevent_pair
*bev_pair
= arg
;
214 struct bufferevent_pair
*partner
= bev_pair
->partner
;
216 incref_and_lock(downcast(bev_pair
));
218 if (info
->n_added
> info
->n_deleted
&& partner
) {
219 /* We got more data. If the other side's reading, then
221 if (be_pair_wants_to_talk(bev_pair
, partner
)) {
222 be_pair_transfer(downcast(bev_pair
), downcast(partner
), 0);
226 decref_and_unlock(downcast(bev_pair
));
230 be_pair_enable(struct bufferevent
*bufev
, short events
)
232 struct bufferevent_pair
*bev_p
= upcast(bufev
);
233 struct bufferevent_pair
*partner
= bev_p
->partner
;
235 incref_and_lock(bufev
);
237 if (events
& EV_READ
) {
238 BEV_RESET_GENERIC_READ_TIMEOUT(bufev
);
240 if ((events
& EV_WRITE
) && evbuffer_get_length(bufev
->output
))
241 BEV_RESET_GENERIC_WRITE_TIMEOUT(bufev
);
243 /* We're starting to read! Does the other side have anything to write?*/
244 if ((events
& EV_READ
) && partner
&&
245 be_pair_wants_to_talk(partner
, bev_p
)) {
246 be_pair_transfer(downcast(partner
), bufev
, 0);
248 /* We're starting to write! Does the other side want to read? */
249 if ((events
& EV_WRITE
) && partner
&&
250 be_pair_wants_to_talk(bev_p
, partner
)) {
251 be_pair_transfer(bufev
, downcast(partner
), 0);
253 decref_and_unlock(bufev
);
258 be_pair_disable(struct bufferevent
*bev
, short events
)
260 if (events
& EV_READ
) {
261 BEV_DEL_GENERIC_READ_TIMEOUT(bev
);
263 if (events
& EV_WRITE
)
264 BEV_DEL_GENERIC_WRITE_TIMEOUT(bev
);
269 be_pair_destruct(struct bufferevent
*bev
)
271 struct bufferevent_pair
*bev_p
= upcast(bev
);
273 if (bev_p
->partner
) {
274 bev_p
->partner
->partner
= NULL
;
275 bev_p
->partner
= NULL
;
278 _bufferevent_del_generic_timeout_cbs(bev
);
282 be_pair_flush(struct bufferevent
*bev
, short iotype
,
283 enum bufferevent_flush_mode mode
)
285 struct bufferevent_pair
*bev_p
= upcast(bev
);
286 struct bufferevent
*partner
;
287 incref_and_lock(bev
);
291 partner
= downcast(bev_p
->partner
);
293 if (mode
== BEV_NORMAL
)
296 if ((iotype
& EV_READ
) != 0)
297 be_pair_transfer(partner
, bev
, 1);
299 if ((iotype
& EV_WRITE
) != 0)
300 be_pair_transfer(bev
, partner
, 1);
302 if (mode
== BEV_FINISHED
) {
303 _bufferevent_run_eventcb(partner
, iotype
|BEV_EVENT_EOF
);
305 decref_and_unlock(bev
);
310 bufferevent_pair_get_partner(struct bufferevent
*bev
)
312 struct bufferevent_pair
*bev_p
;
313 struct bufferevent
*partner
;
318 incref_and_lock(bev
);
319 partner
= downcast(bev_p
->partner
);
320 decref_and_unlock(bev
);
324 const struct bufferevent_ops bufferevent_ops_pair
= {
326 evutil_offsetof(struct bufferevent_pair
, bev
.bev
),
330 _bufferevent_generic_adj_timeouts
,