1 /* $NetBSD: bufferevent_pair.c,v 1.1.1.2 2015/01/29 06:38:09 spz Exp $ */
3 * Copyright (c) 2009-2012 Niels Provos, Nick Mathewson
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
8 * 1. Redistributions of source code must retain the above copyright
9 * notice, this list of conditions and the following disclaimer.
10 * 2. Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * 3. The name of the author may not be used to endorse or promote products
14 * derived from this software without specific prior written permission.
16 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
17 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
18 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
19 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
20 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
21 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
22 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
23 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
24 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
25 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
28 #include <sys/types.h>
34 #include "event2/event-config.h"
35 #include <sys/cdefs.h>
36 __RCSID("$NetBSD: bufferevent_pair.c,v 1.1.1.2 2015/01/29 06:38:09 spz Exp $");
38 #include "event2/util.h"
39 #include "event2/buffer.h"
40 #include "event2/bufferevent.h"
41 #include "event2/bufferevent_struct.h"
42 #include "event2/event.h"
43 #include "defer-internal.h"
44 #include "bufferevent-internal.h"
45 #include "mm-internal.h"
46 #include "util-internal.h"
48 struct bufferevent_pair
{
49 struct bufferevent_private bev
;
50 struct bufferevent_pair
*partner
;
54 /* Given a bufferevent that's really a bev part of a bufferevent_pair,
55 * return that bufferevent_filtered. Returns NULL otherwise.*/
56 static inline struct bufferevent_pair
*
57 upcast(struct bufferevent
*bev
)
59 struct bufferevent_pair
*bev_p
;
60 if (bev
->be_ops
!= &bufferevent_ops_pair
)
62 bev_p
= EVUTIL_UPCAST(bev
, struct bufferevent_pair
, bev
.bev
);
63 EVUTIL_ASSERT(bev_p
->bev
.bev
.be_ops
== &bufferevent_ops_pair
);
67 #define downcast(bev_pair) (&(bev_pair)->bev.bev)
70 incref_and_lock(struct bufferevent
*b
)
72 struct bufferevent_pair
*bevp
;
73 _bufferevent_incref_and_lock(b
);
76 _bufferevent_incref_and_lock(downcast(bevp
->partner
));
80 decref_and_unlock(struct bufferevent
*b
)
82 struct bufferevent_pair
*bevp
= upcast(b
);
84 _bufferevent_decref_and_unlock(downcast(bevp
->partner
));
85 _bufferevent_decref_and_unlock(b
);
88 /* XXX Handle close */
90 static void be_pair_outbuf_cb(struct evbuffer
*,
91 const struct evbuffer_cb_info
*, void *);
93 static struct bufferevent_pair
*
94 bufferevent_pair_elt_new(struct event_base
*base
,
97 struct bufferevent_pair
*bufev
;
98 if (! (bufev
= mm_calloc(1, sizeof(struct bufferevent_pair
))))
100 if (bufferevent_init_common(&bufev
->bev
, base
, &bufferevent_ops_pair
,
105 if (!evbuffer_add_cb(bufev
->bev
.bev
.output
, be_pair_outbuf_cb
, bufev
)) {
106 bufferevent_free(downcast(bufev
));
110 _bufferevent_init_generic_timeout_cbs(&bufev
->bev
.bev
);
116 bufferevent_pair_new(struct event_base
*base
, int options
,
117 struct bufferevent
*pair
[2])
119 struct bufferevent_pair
*bufev1
= NULL
, *bufev2
= NULL
;
122 options
|= BEV_OPT_DEFER_CALLBACKS
;
123 tmp_options
= options
& ~BEV_OPT_THREADSAFE
;
125 bufev1
= bufferevent_pair_elt_new(base
, options
);
128 bufev2
= bufferevent_pair_elt_new(base
, tmp_options
);
130 bufferevent_free(downcast(bufev1
));
134 if (options
& BEV_OPT_THREADSAFE
) {
135 /*XXXX check return */
136 bufferevent_enable_locking(downcast(bufev2
), bufev1
->bev
.lock
);
139 bufev1
->partner
= bufev2
;
140 bufev2
->partner
= bufev1
;
142 evbuffer_freeze(downcast(bufev1
)->input
, 0);
143 evbuffer_freeze(downcast(bufev1
)->output
, 1);
144 evbuffer_freeze(downcast(bufev2
)->input
, 0);
145 evbuffer_freeze(downcast(bufev2
)->output
, 1);
147 pair
[0] = downcast(bufev1
);
148 pair
[1] = downcast(bufev2
);
154 be_pair_transfer(struct bufferevent
*src
, struct bufferevent
*dst
,
157 size_t src_size
, dst_size
;
160 evbuffer_unfreeze(src
->output
, 1);
161 evbuffer_unfreeze(dst
->input
, 0);
163 if (dst
->wm_read
.high
) {
164 dst_size
= evbuffer_get_length(dst
->input
);
165 if (dst_size
< dst
->wm_read
.high
) {
166 n
= dst
->wm_read
.high
- dst_size
;
167 evbuffer_remove_buffer(src
->output
, dst
->input
, n
);
171 n
= evbuffer_get_length(src
->output
);
172 evbuffer_add_buffer(dst
->input
, src
->output
);
175 n
= evbuffer_get_length(src
->output
);
176 evbuffer_add_buffer(dst
->input
, src
->output
);
180 BEV_RESET_GENERIC_READ_TIMEOUT(dst
);
182 if (evbuffer_get_length(dst
->output
))
183 BEV_RESET_GENERIC_WRITE_TIMEOUT(dst
);
185 BEV_DEL_GENERIC_WRITE_TIMEOUT(dst
);
188 src_size
= evbuffer_get_length(src
->output
);
189 dst_size
= evbuffer_get_length(dst
->input
);
191 if (dst_size
>= dst
->wm_read
.low
) {
192 _bufferevent_run_readcb(dst
);
194 if (src_size
<= src
->wm_write
.low
) {
195 _bufferevent_run_writecb(src
);
198 evbuffer_freeze(src
->output
, 1);
199 evbuffer_freeze(dst
->input
, 0);
203 be_pair_wants_to_talk(struct bufferevent_pair
*src
,
204 struct bufferevent_pair
*dst
)
206 return (downcast(src
)->enabled
& EV_WRITE
) &&
207 (downcast(dst
)->enabled
& EV_READ
) &&
208 !dst
->bev
.read_suspended
&&
209 evbuffer_get_length(downcast(src
)->output
);
213 be_pair_outbuf_cb(struct evbuffer
*outbuf
,
214 const struct evbuffer_cb_info
*info
, void *arg
)
216 struct bufferevent_pair
*bev_pair
= arg
;
217 struct bufferevent_pair
*partner
= bev_pair
->partner
;
219 incref_and_lock(downcast(bev_pair
));
221 if (info
->n_added
> info
->n_deleted
&& partner
) {
222 /* We got more data. If the other side's reading, then
224 if (be_pair_wants_to_talk(bev_pair
, partner
)) {
225 be_pair_transfer(downcast(bev_pair
), downcast(partner
), 0);
229 decref_and_unlock(downcast(bev_pair
));
233 be_pair_enable(struct bufferevent
*bufev
, short events
)
235 struct bufferevent_pair
*bev_p
= upcast(bufev
);
236 struct bufferevent_pair
*partner
= bev_p
->partner
;
238 incref_and_lock(bufev
);
240 if (events
& EV_READ
) {
241 BEV_RESET_GENERIC_READ_TIMEOUT(bufev
);
243 if ((events
& EV_WRITE
) && evbuffer_get_length(bufev
->output
))
244 BEV_RESET_GENERIC_WRITE_TIMEOUT(bufev
);
246 /* We're starting to read! Does the other side have anything to write?*/
247 if ((events
& EV_READ
) && partner
&&
248 be_pair_wants_to_talk(partner
, bev_p
)) {
249 be_pair_transfer(downcast(partner
), bufev
, 0);
251 /* We're starting to write! Does the other side want to read? */
252 if ((events
& EV_WRITE
) && partner
&&
253 be_pair_wants_to_talk(bev_p
, partner
)) {
254 be_pair_transfer(bufev
, downcast(partner
), 0);
256 decref_and_unlock(bufev
);
261 be_pair_disable(struct bufferevent
*bev
, short events
)
263 if (events
& EV_READ
) {
264 BEV_DEL_GENERIC_READ_TIMEOUT(bev
);
266 if (events
& EV_WRITE
)
267 BEV_DEL_GENERIC_WRITE_TIMEOUT(bev
);
272 be_pair_destruct(struct bufferevent
*bev
)
274 struct bufferevent_pair
*bev_p
= upcast(bev
);
276 if (bev_p
->partner
) {
277 bev_p
->partner
->partner
= NULL
;
278 bev_p
->partner
= NULL
;
281 _bufferevent_del_generic_timeout_cbs(bev
);
285 be_pair_flush(struct bufferevent
*bev
, short iotype
,
286 enum bufferevent_flush_mode mode
)
288 struct bufferevent_pair
*bev_p
= upcast(bev
);
289 struct bufferevent
*partner
;
290 incref_and_lock(bev
);
294 partner
= downcast(bev_p
->partner
);
296 if (mode
== BEV_NORMAL
)
299 if ((iotype
& EV_READ
) != 0)
300 be_pair_transfer(partner
, bev
, 1);
302 if ((iotype
& EV_WRITE
) != 0)
303 be_pair_transfer(bev
, partner
, 1);
305 if (mode
== BEV_FINISHED
) {
306 _bufferevent_run_eventcb(partner
, iotype
|BEV_EVENT_EOF
);
308 decref_and_unlock(bev
);
313 bufferevent_pair_get_partner(struct bufferevent
*bev
)
315 struct bufferevent_pair
*bev_p
;
316 struct bufferevent
*partner
= NULL
;
321 incref_and_lock(bev
);
323 partner
= downcast(bev_p
->partner
);
324 decref_and_unlock(bev
);
328 const struct bufferevent_ops bufferevent_ops_pair
= {
330 evutil_offsetof(struct bufferevent_pair
, bev
.bev
),
334 _bufferevent_generic_adj_timeouts
,