2 * Connection oriented routing
3 * Copyright (C) 2007-2011 Michael Blizek
5 * This program is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU General Public License
7 * as published by the Free Software Foundation; either version 2
8 * of the License, or (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21 #include <linux/mutex.h>
25 struct kmem_cache
*data_buf_item_slab
;
27 #define PAGESIZE (1 << PAGE_SHIFT)
29 void databuf_init(struct conn
*cn_init
)
31 memset(&(cn_init
->data_buf
), 0, sizeof(cn_init
->data_buf
));
32 INIT_LIST_HEAD(&(cn_init
->data_buf
.items
));
35 static void databuf_item_free(struct conn
*cn_l
, struct data_buf_item
*item
)
37 list_del(&(item
->buf_list
));
38 if (item
->type
== TYPE_BUF
) {
39 cn_l
->data_buf
.overhead
-= sizeof(struct data_buf_item
) +
40 item
->buflen
- item
->datalen
;
42 kmem_cache_free(data_buf_item_slab
, item
);
43 } else if (item
->type
== TYPE_SKB
) {
44 struct sk_buff
*skb
= skb_from_pstate(container_of(item
,
45 struct skb_procstate
, funcstate
.rcv2
.dbi
));
46 cn_l
->data_buf
.overhead
-= sizeof(struct sk_buff
);
53 void databuf_ackdiscard(struct conn
*cn_l
)
56 while (!list_empty(&(cn_l
->data_buf
.items
))) {
57 struct data_buf_item
*item
= container_of(
58 cn_l
->data_buf
.items
.next
,
59 struct data_buf_item
, buf_list
);
60 freed
+= item
->datalen
;
62 databuf_item_free(cn_l
, item
);
65 cn_l
->data_buf
.totalsize
-= freed
;
66 cn_l
->data_buf
.first_offset
+= freed
;
68 BUG_ON(cn_l
->data_buf
.totalsize
!= 0);
69 BUG_ON(cn_l
->data_buf
.overhead
!= 0);
71 if (cn_l
->data_buf
.cpacket_buffer
!= 0) {
72 free_cpacket_buffer(cn_l
->data_buf
.cpacket_buffer
);
73 cn_l
->data_buf
.cpacket_buffer
= 0;
76 cn_l
->data_buf
.read_remaining
= 0;
77 cn_l
->data_buf
.last_read_offset
= 0;
78 cn_l
->data_buf
.lastread
= 0;
80 if (cn_l
->isreset
== 0 && cn_l
->sourcetype
== SOURCE_IN
)
81 refresh_speedstat(cn_l
, freed
);
84 void reset_seqno(struct conn
*cn_l
, __u32 initseqno
)
86 cn_l
->data_buf
.first_offset
= initseqno
-
87 cn_l
->data_buf
.totalsize
+
88 cn_l
->data_buf
.read_remaining
;
91 static void databuf_nextreadchunk(struct conn
*cn_l
)
93 if (cn_l
->data_buf
.lastread
== 0) {
94 BUG_ON(cn_l
->data_buf
.last_read_offset
!= 0);
95 BUG_ON(list_empty(&(cn_l
->data_buf
.items
)));
96 cn_l
->data_buf
.lastread
= container_of(
97 cn_l
->data_buf
.items
.next
,
98 struct data_buf_item
, buf_list
);
99 } else if (&(cn_l
->data_buf
.lastread
->buf_list
) !=
100 cn_l
->data_buf
.items
.prev
) {
101 cn_l
->data_buf
.lastread
= container_of(
102 cn_l
->data_buf
.lastread
->buf_list
.next
,
103 struct data_buf_item
, buf_list
);
105 cn_l
->data_buf
.last_read_offset
= 0;
109 static int _databuf_pull(struct conn
*cn_l
, char *dst
, int len
, int userbuf
)
113 BUG_ON(cn_l
->data_buf
.read_remaining
< len
);
115 if (cn_l
->data_buf
.lastread
== 0)
116 databuf_nextreadchunk(cn_l
);
122 char *srcbufcpystart
= 0;
123 int srcbufcpylen
= 0;
125 BUG_ON(cn_l
->data_buf
.lastread
== 0);
127 srcbufcpystart
= cn_l
->data_buf
.lastread
->buf
+
128 cn_l
->data_buf
.last_read_offset
;
129 srcbufcpylen
= cn_l
->data_buf
.lastread
->datalen
-
130 cn_l
->data_buf
.last_read_offset
;
132 if (cpy
> srcbufcpylen
)
136 int notcopied
= copy_to_user(dst
, srcbufcpystart
, cpy
);
138 if (unlikely(notcopied
> 0))
141 memcpy(dst
, srcbufcpystart
, cpy
);
148 cn_l
->data_buf
.read_remaining
-= cpy
;
149 cn_l
->data_buf
.last_read_offset
+= cpy
;
151 if (cpy
== srcbufcpylen
)
152 databuf_nextreadchunk(cn_l
);
154 if (unlikely(rc
< 0)) {
164 void databuf_pull(struct conn
*cn_l
, char *dst
, int len
)
166 _databuf_pull(cn_l
, dst
, len
, 0);
169 size_t databuf_pulluser(struct conn
*trgt_sock_l
, struct msghdr
*msg
)
175 while (iovidx
< msg
->msg_iovlen
) {
178 struct iovec
*iov
= msg
->msg_iov
+ iovidx
;
179 __user
char *msg
= iov
->iov_base
+ iovread
;
180 unsigned int len
= iov
->iov_len
- iovread
;
188 if (trgt_sock_l
->data_buf
.read_remaining
== 0) {
191 if (len
> trgt_sock_l
->data_buf
.read_remaining
)
192 len
= trgt_sock_l
->data_buf
.read_remaining
;
194 rc
= _databuf_pull(trgt_sock_l
, msg
, len
, 1);
212 void databuf_unpull(struct conn
*trgt_out_l
, __u32 bytes
)
214 trgt_out_l
->data_buf
.read_remaining
+= bytes
;
216 BUG_ON(trgt_out_l
->data_buf
.lastread
== 0);
218 while (bytes
> trgt_out_l
->data_buf
.last_read_offset
) {
219 bytes
-= trgt_out_l
->data_buf
.last_read_offset
;
220 trgt_out_l
->data_buf
.lastread
= container_of(
221 trgt_out_l
->data_buf
.lastread
->buf_list
.prev
,
222 struct data_buf_item
, buf_list
);
223 BUG_ON(&(trgt_out_l
->data_buf
.lastread
->buf_list
) ==
224 &(trgt_out_l
->data_buf
.items
));
227 trgt_out_l
->data_buf
.last_read_offset
-= bytes
;
230 void databuf_pullold(struct conn
*trgt_out_l
, __u32 startpos
, char *dst
,
233 __u32 pos
= trgt_out_l
->data_buf
.first_offset
;
234 struct data_buf_item
*dbi
= container_of(
235 trgt_out_l
->data_buf
.items
.next
,
236 struct data_buf_item
, buf_list
);
239 BUG_ON(&(dbi
->buf_list
) == &(trgt_out_l
->data_buf
.items
));
241 if (((__s32
) (pos
+ dbi
->datalen
- startpos
)) > 0)
245 dbi
= container_of(dbi
->buf_list
.next
, struct data_buf_item
,
252 char *srcbufcpystart
= 0;
253 int srcbufcpylen
= 0;
255 BUG_ON(&(dbi
->buf_list
) == &(trgt_out_l
->data_buf
.items
));
257 BUG_ON(((__s32
) (pos
- startpos
)) > 0);
259 srcbufcpystart
= dbi
->buf
+ ((__s32
) (startpos
- pos
));
260 srcbufcpylen
= dbi
->datalen
- ((__s32
) (startpos
- pos
));
262 if (cpy
> srcbufcpylen
)
265 memcpy(dst
, srcbufcpystart
, cpy
);
272 dbi
= container_of(dbi
->buf_list
.next
, struct data_buf_item
,
277 /* ack up to *not* including pos */
278 void databuf_ack(struct conn
*trgt_out_l
, __u32 pos
)
282 while (!list_empty(&(trgt_out_l
->data_buf
.items
))) {
283 struct data_buf_item
*firstitem
= container_of(
284 trgt_out_l
->data_buf
.items
.next
,
285 struct data_buf_item
, buf_list
);
287 if (firstitem
== trgt_out_l
->data_buf
.lastread
)
290 if ( ((__s32
) (trgt_out_l
->data_buf
.first_offset
+
291 firstitem
->datalen
- pos
)) > 0)
294 trgt_out_l
->data_buf
.first_offset
+= firstitem
->datalen
;
295 acked
+= firstitem
->datalen
;
297 databuf_item_free(trgt_out_l
, firstitem
);
300 trgt_out_l
->data_buf
.totalsize
-= acked
;
302 BUG_ON(trgt_out_l
->data_buf
.totalsize
== 0 &&
303 trgt_out_l
->data_buf
.overhead
!= 0);
305 if (unlikely(trgt_out_l
->data_buf
.cpacket_buffer
!= 0)) {
306 __u32 amount
= acked
> trgt_out_l
->data_buf
.cpacket_buffer
?
307 acked
: trgt_out_l
->data_buf
.cpacket_buffer
;
308 free_cpacket_buffer(amount
);
309 trgt_out_l
->data_buf
.cpacket_buffer
-= amount
;
312 if (trgt_out_l
->sourcetype
== SOURCE_IN
)
313 refresh_speedstat(trgt_out_l
, acked
);
316 void databuf_ackread(struct conn
*cn_l
)
320 while (!list_empty(&(cn_l
->data_buf
.items
)) &&
321 cn_l
->data_buf
.lastread
!= 0) {
322 struct data_buf_item
*firstitem
= container_of(
323 cn_l
->data_buf
.items
.next
,
324 struct data_buf_item
, buf_list
);
326 if (firstitem
== cn_l
->data_buf
.lastread
)
329 acked
+= firstitem
->datalen
;
331 databuf_item_free(cn_l
, firstitem
);
334 cn_l
->data_buf
.first_offset
+= acked
;
335 cn_l
->data_buf
.totalsize
-= acked
;
337 BUG_ON(cn_l
->data_buf
.totalsize
== 0 && cn_l
->data_buf
.overhead
!= 0);
339 if (unlikely(cn_l
->data_buf
.cpacket_buffer
!= 0)) {
340 __u32 amount
= acked
> cn_l
->data_buf
.cpacket_buffer
?
341 acked
: cn_l
->data_buf
.cpacket_buffer
;
342 free_cpacket_buffer(amount
);
343 cn_l
->data_buf
.cpacket_buffer
-= amount
;
346 if (cn_l
->sourcetype
== SOURCE_IN
)
347 refresh_speedstat(cn_l
, acked
);
350 static __s64
_receive_buf(struct conn
*cn_l
, char *buf
, __u32 len
, int userbuf
,
351 __u32 maxcpy
, __u32 maxusage
)
353 struct data_buf_item
*item
= 0;
357 if (list_empty(&(cn_l
->data_buf
.items
)) == 0) {
358 struct list_head
*last
= cn_l
->data_buf
.items
.prev
;
359 item
= container_of(last
, struct data_buf_item
, buf_list
);
366 if (item
== 0 || item
->buflen
<= item
->datalen
) {
370 if (cn_l
->data_buf
.totalsize
+
371 cn_l
->data_buf
.overhead
>
377 buflen
= maxusage
- cn_l
->data_buf
.totalsize
-
378 cn_l
->data_buf
.overhead
-
379 sizeof(struct data_buf_item
);
381 if (totalcpy
+ 64 > maxcpy
&&
382 totalcpy
+ len
> maxcpy
) {
387 if (totalcpy
+ buflen
< maxcpy
)
388 buflen
= maxcpy
- totalcpy
;
393 if (buflen
> PAGESIZE
)
398 if (unlikely(unlikely(cn_l
->data_buf
.totalsize
+
399 buflen
> (1 << 30)) || unlikely(
400 cn_l
->data_buf
.overhead
> (1 << 30)))) {
405 item
= kmem_cache_alloc(data_buf_item_slab
, GFP_KERNEL
);
406 if (unlikely(item
== 0)) {
410 memset(item
, 0, sizeof(item
));
411 item
->type
= TYPE_BUF
;
412 item
->buf
= kmalloc(buflen
, GFP_KERNEL
);
414 if (unlikely(item
->buf
== 0)) {
415 kmem_cache_free(data_buf_item_slab
, item
);
421 list_add_tail(&(item
->buf_list
),
422 &(cn_l
->data_buf
.items
));
423 item
->buflen
= buflen
;
424 cn_l
->data_buf
.overhead
+= buflen
+
425 sizeof(struct data_buf_item
);
428 BUG_ON(item
->type
!= TYPE_BUF
);
429 BUG_ON(item
->buflen
<= item
->datalen
);
431 if (item
->buflen
- item
->datalen
< cpy
)
432 cpy
= (item
->buflen
- item
->datalen
);
435 int notcopied
= copy_from_user(item
->buf
+
436 item
->datalen
, buf
, cpy
);
438 if (unlikely(notcopied
> 0))
441 memcpy(item
->buf
+ item
->datalen
, buf
, cpy
);
446 cn_l
->data_buf
.read_remaining
+= cpy
;
447 cn_l
->data_buf
.totalsize
+= cpy
;
448 cn_l
->data_buf
.overhead
-= cpy
;
449 BUG_ON(cn_l
->data_buf
.totalsize
== 0 &&
450 cn_l
->data_buf
.overhead
!= 0);
453 item
->datalen
+= cpy
;
456 if (unlikely(rc
< 0)) {
466 __s64
receive_userbuf(struct conn
*src_sock_l
, struct msghdr
*msg
, __u32 maxcpy
,
473 while (iovidx
< msg
->msg_iovlen
) {
474 struct iovec
*iov
= msg
->msg_iov
+ iovidx
;
475 __user
char *userbuf
= iov
->iov_base
+ iovread
;
476 __u32 len
= iov
->iov_len
- iovread
;
486 BUG_ON(copied
> maxcpy
);
487 rc
= _receive_buf(src_sock_l
, userbuf
, len
, 1, maxcpy
- copied
,
506 void receive_cpacketresp(struct conn
*trtg_unconn_l
, char *buf
, int len
)
509 BUG_ON(trtg_unconn_l
->data_buf
.cpacket_buffer
<
510 trtg_unconn_l
->data_buf
.totalsize
+ len
);
511 rc
= _receive_buf(trtg_unconn_l
, buf
, len
, 0, len
, 0);
516 int receive_skb(struct conn
*src_in_l
, struct sk_buff
*skb
)
518 struct skb_procstate
*ps
= skb_pstate(skb
);
519 struct data_buf_item
*item
= &(ps
->funcstate
.rcv2
.dbi
);
521 if (unlikely(unlikely(src_in_l
->data_buf
.totalsize
+ skb
->len
>
522 (1 << 30)) || unlikely(src_in_l
->data_buf
.overhead
>
526 item
->type
= TYPE_SKB
;
527 item
->buf
= skb
->data
;
528 item
->datalen
= skb
->len
;
529 item
->buflen
= item
->datalen
;
530 list_add_tail(&(item
->buf_list
), &(src_in_l
->data_buf
.items
));
532 src_in_l
->data_buf
.read_remaining
+= item
->datalen
;
533 src_in_l
->data_buf
.totalsize
+= item
->datalen
;
534 src_in_l
->data_buf
.overhead
+= sizeof(struct sk_buff
);
539 static void _wake_sender_in(struct conn
*src_in_l
)
541 drain_ooo_queue(src_in_l
);
542 mutex_unlock(&(src_in_l
->rcv_lock
));
543 get_window(src_in_l
, 0, 0, 0);
546 void wake_sender(struct conn
*cn
)
548 unreserve_sock_buffer(cn
);
550 mutex_lock(&(cn
->rcv_lock
));
551 switch (cn
->sourcetype
) {
553 mutex_unlock(&(cn
->rcv_lock
));
554 parse(cn
->reversedir
, 0);
557 wake_up_interruptible(&(cn
->source
.sock
.wait
));
558 mutex_unlock(&(cn
->rcv_lock
));
561 _wake_sender_in(cn
); /* mutex_unlock inside */
568 void flush_buf(struct conn
*cn
)
572 mutex_lock(&(cn
->rcv_lock
));
574 switch (cn
->targettype
) {
575 case TARGET_UNCONNECTED
:
576 mutex_unlock(&(cn
->rcv_lock
));
580 if (cn
->sourcetype
!= SOURCE_SOCK
||
581 cn
->source
.sock
.delay_flush
== 0 ||
582 cn
->data_buf
.totalsize
+
583 cn
->data_buf
.overhead
-
584 cn
->data_buf
.cpacket_buffer
>=
585 BUFFERLIMIT_SOCK_SOCK
/2)
586 wake_up_interruptible(&(cn
->target
.sock
.wait
));
587 mutex_unlock(&(cn
->rcv_lock
));
590 rc
= flush_out(cn
, 0, 0);
591 mutex_unlock(&(cn
->rcv_lock
));
592 sent
= (rc
== RC_FLUSH_CONN_OUT_OK_SENT
);
595 databuf_ackdiscard(cn
);
596 mutex_unlock(&(cn
->rcv_lock
));
603 refresh_conn_credits(cn
, 0, 0);
610 void __init
forward_init(void)
612 data_buf_item_slab
= kmem_cache_create("cor_data_buf_item",
613 sizeof(struct data_buf_item
), 8, 0, 0);
616 MODULE_LICENSE("GPL");