2 * Connection oriented routing
3 * Copyright (C) 2007-2010 Michael Blizek
5 * This program is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU General Public License
7 * as published by the Free Software Foundation; either version 2
8 * of the License, or (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21 #include <linux/mutex.h>
25 struct kmem_cache
*data_buf_item_slab
;
27 #define PAGESIZE (1 << PAGE_SHIFT)
29 void databuf_init(struct conn
*conn
)
31 memset(&(conn
->data_buf
), 0, sizeof(conn
->data_buf
));
32 INIT_LIST_HEAD(&(conn
->data_buf
.items
));
35 static void databuf_item_free(struct conn
*conn
, struct data_buf_item
*item
)
37 if (item
->type
== TYPE_BUF
) {
38 kfree(item
->data
.buf
.buf
);
39 conn
->data_buf
.overhead
-= item
->data
.buf
.buflen
-
40 item
->data
.buf
.datalen
;
41 } else if (item
->type
== TYPE_SKB
) {
42 kfree_skb(item
->data
.skb
);
43 conn
->data_buf
.overhead
-= sizeof(struct sk_buff
);
48 conn
->data_buf
.overhead
-= sizeof(struct data_buf_item
);
50 list_del(&(item
->buf_list
));
52 kmem_cache_free(data_buf_item_slab
, item
);
55 void reset_seqno(struct conn
*conn
, __u32 initseqno
)
57 conn
->data_buf
.first_offset
= initseqno
-
58 conn
->data_buf
.last_read_offset
;
61 void databuf_free(struct conn
*conn
)
63 while (!list_empty(&(conn
->data_buf
.items
))) {
64 struct data_buf_item
*item
= container_of(
65 conn
->data_buf
.items
.next
,
66 struct data_buf_item
, buf_list
);
67 if (item
->type
== TYPE_BUF
) {
68 conn
->data_buf
.totalsize
-= item
->data
.buf
.datalen
;
69 } else if (item
->type
== TYPE_SKB
) {
70 conn
->data_buf
.totalsize
-= item
->data
.skb
->len
;
75 databuf_item_free(conn
, item
);
78 BUG_ON(conn
->data_buf
.totalsize
!= 0);
79 BUG_ON(conn
->data_buf
.overhead
!= 0);
81 if (conn
->data_buf
.cpacket_buffer
!= 0) {
82 free_cpacket_buffer(conn
->data_buf
.cpacket_buffer
);
83 conn
->data_buf
.cpacket_buffer
= 0;
87 static void databuf_nextreadchunk(struct conn
*conn
)
89 if (conn
->data_buf
.lastread
== 0) {
90 BUG_ON(conn
->data_buf
.last_read_offset
!= 0);
91 BUG_ON(list_empty(&(conn
->data_buf
.items
)));
92 conn
->data_buf
.lastread
= container_of(conn
->data_buf
.items
.next
,
93 struct data_buf_item
, buf_list
);
94 } else if (&(conn
->data_buf
.lastread
->buf_list
) !=
95 conn
->data_buf
.items
.prev
) {
96 conn
->data_buf
.lastread
= container_of(
97 conn
->data_buf
.lastread
->buf_list
.next
,
98 struct data_buf_item
, buf_list
);
100 conn
->data_buf
.last_read_offset
= 0;
104 static int _databuf_pull(struct conn
*conn
, char *dst
, int len
, int userbuf
)
108 BUG_ON(conn
->data_buf
.read_remaining
< len
);
110 if (conn
->data_buf
.lastread
== 0)
111 databuf_nextreadchunk(conn
);
120 char *srcbufcpystart
= 0;
121 int srcbufcpylen
= 0;
123 BUG_ON(conn
->data_buf
.lastread
== 0);
125 if (conn
->data_buf
.lastread
->type
== TYPE_BUF
) {
126 srcbuf
= conn
->data_buf
.lastread
->data
.buf
.buf
;
127 srcbuflen
= conn
->data_buf
.lastread
->data
.buf
.datalen
;
128 } else if (conn
->data_buf
.lastread
->type
== TYPE_SKB
) {
129 srcbuf
= conn
->data_buf
.lastread
->data
.skb
->data
;
130 srcbuflen
= conn
->data_buf
.lastread
->data
.skb
->len
;
135 srcbufcpystart
= srcbuf
+ conn
->data_buf
.last_read_offset
;
136 srcbufcpylen
= srcbuflen
- conn
->data_buf
.last_read_offset
;
138 if (cpy
> srcbufcpylen
)
142 int notcopied
= copy_to_user(dst
, srcbufcpystart
, cpy
);
144 if (unlikely(notcopied
> 0))
147 memcpy(dst
, srcbufcpystart
, cpy
);
154 conn
->data_buf
.read_remaining
-= cpy
;
155 conn
->data_buf
.last_read_offset
+= cpy
;
157 if (cpy
== srcbufcpylen
)
158 databuf_nextreadchunk(conn
);
160 if (unlikely(rc
< 0)) {
170 void databuf_pull(struct conn
*conn
, char *dst
, int len
)
172 _databuf_pull(conn
, dst
, len
, 0);
175 size_t databuf_pulluser(struct conn
*sconn
, struct msghdr
*msg
)
181 while (iovidx
< msg
->msg_iovlen
) {
184 struct iovec
*iov
= msg
->msg_iov
+ iovidx
;
185 __user
char *msg
= iov
->iov_base
+ iovread
;
186 unsigned int len
= iov
->iov_len
- iovread
;
194 if (sconn
->data_buf
.read_remaining
== 0) {
195 if (sconn
->sourcetype
== SOURCE_NONE
)
200 if (len
> sconn
->data_buf
.read_remaining
)
201 len
= sconn
->data_buf
.read_remaining
;
203 rc
= _databuf_pull(sconn
, msg
, len
, 1);
221 void databuf_unpull(struct conn
*conn
, __u32 bytes
)
223 conn
->data_buf
.read_remaining
+= bytes
;
225 BUG_ON(conn
->data_buf
.lastread
== 0);
227 while (bytes
> conn
->data_buf
.last_read_offset
) {
228 bytes
-= conn
->data_buf
.last_read_offset
;
229 conn
->data_buf
.lastread
= container_of(
230 conn
->data_buf
.lastread
->buf_list
.prev
,
231 struct data_buf_item
, buf_list
);
232 BUG_ON(&(conn
->data_buf
.lastread
->buf_list
) ==
233 &(conn
->data_buf
.items
));
236 conn
->data_buf
.last_read_offset
-= bytes
;
239 void databuf_pullold(struct conn
*conn
, __u32 startpos
, char *dst
, int len
)
241 __u32 pos
= conn
->data_buf
.first_offset
;
242 struct data_buf_item
*dbi
= container_of(conn
->data_buf
.items
.next
,
243 struct data_buf_item
, buf_list
);
248 BUG_ON(&(dbi
->buf_list
) == &(conn
->data_buf
.items
));
250 if (conn
->data_buf
.lastread
->type
== TYPE_BUF
) {
251 srcbuflen
= dbi
->data
.buf
.datalen
;
252 } else if (conn
->data_buf
.lastread
->type
== TYPE_SKB
) {
253 srcbuflen
= dbi
->data
.skb
->len
;
258 if (((__s32
) (pos
+ srcbuflen
- startpos
)) > 0)
262 dbi
= container_of(dbi
->buf_list
.next
, struct data_buf_item
,
272 char *srcbufcpystart
= 0;
273 int srcbufcpylen
= 0;
275 BUG_ON(&(dbi
->buf_list
) == &(conn
->data_buf
.items
));
277 if (conn
->data_buf
.lastread
->type
== TYPE_BUF
) {
278 srcbuf
= conn
->data_buf
.lastread
->data
.buf
.buf
;
279 srcbuflen
= conn
->data_buf
.lastread
->data
.buf
.datalen
;
280 } else if (conn
->data_buf
.lastread
->type
== TYPE_SKB
) {
281 srcbuf
= conn
->data_buf
.lastread
->data
.skb
->data
;
282 srcbuflen
= conn
->data_buf
.lastread
->data
.skb
->len
;
287 BUG_ON(((__s32
) (pos
- startpos
)) > 0);
289 srcbufcpystart
= srcbuf
+ ((__s32
) (startpos
- pos
));
290 srcbufcpylen
= srcbuflen
- ((__s32
) (startpos
- pos
));
292 if (cpy
> srcbufcpylen
)
295 memcpy(dst
, srcbufcpystart
, cpy
);
302 dbi
= container_of(dbi
->buf_list
.next
, struct data_buf_item
,
307 /* ack up to *not* including pos */
308 void databuf_ack(struct conn
*rconn
, __u32 pos
)
312 while (!list_empty(&(rconn
->data_buf
.items
))) {
313 struct data_buf_item
*firstitem
= container_of(
314 rconn
->data_buf
.items
.next
, struct data_buf_item
,
318 if (firstitem
== rconn
->data_buf
.lastread
)
321 if (firstitem
->type
== TYPE_BUF
) {
322 firstlen
= firstitem
->data
.buf
.datalen
;
323 } else if (firstitem
->type
== TYPE_SKB
) {
324 firstlen
= firstitem
->data
.skb
->len
;
329 if (((__s32
)(rconn
->data_buf
.first_offset
+ firstlen
- pos
)) >0)
332 rconn
->data_buf
.first_offset
+= firstlen
;
335 databuf_item_free(rconn
, firstitem
);
338 rconn
->data_buf
.totalsize
-= acked
;
340 BUG_ON(rconn
->data_buf
.totalsize
== 0 && rconn
->data_buf
.overhead
!= 0);
342 if (unlikely(rconn
->data_buf
.cpacket_buffer
!= 0)) {
343 __u32 amount
= acked
> rconn
->data_buf
.cpacket_buffer
?
344 acked
: rconn
->data_buf
.cpacket_buffer
;
345 free_cpacket_buffer(amount
);
346 rconn
->data_buf
.cpacket_buffer
-= amount
;
349 if (rconn
-> sourcetype
== SOURCE_IN
)
350 refresh_speedstat(rconn
, acked
);
353 void databuf_ackread(struct conn
*rconn
)
357 while (!list_empty(&(rconn
->data_buf
.items
)) &&
358 rconn
->data_buf
.lastread
!= 0) {
359 struct data_buf_item
*firstitem
= container_of(
360 rconn
->data_buf
.items
.next
,
361 struct data_buf_item
, buf_list
);
363 if (firstitem
== rconn
->data_buf
.lastread
)
366 if (firstitem
->type
== TYPE_BUF
) {
367 acked
+= firstitem
->data
.buf
.datalen
;
368 } else if (firstitem
->type
== TYPE_SKB
) {
369 acked
+= firstitem
->data
.skb
->len
;
374 databuf_item_free(rconn
, firstitem
);
377 rconn
->data_buf
.first_offset
+= acked
;
378 rconn
->data_buf
.totalsize
-= acked
;
380 BUG_ON(rconn
->data_buf
.totalsize
== 0 && rconn
->data_buf
.overhead
!= 0);
382 if (unlikely(rconn
->data_buf
.cpacket_buffer
!= 0)) {
383 __u32 amount
= acked
> rconn
->data_buf
.cpacket_buffer
?
384 acked
: rconn
->data_buf
.cpacket_buffer
;
385 free_cpacket_buffer(amount
);
386 rconn
->data_buf
.cpacket_buffer
-= amount
;
389 if (rconn
-> sourcetype
== SOURCE_IN
)
390 refresh_speedstat(rconn
, acked
);
393 void flush_buf(struct conn
*rconn
)
395 int rc
= RC_FLUSH_CONN_OUT_OK
;
396 mutex_lock(&(rconn
->rcv_lock
));
398 switch (rconn
->targettype
) {
399 case TARGET_UNCONNECTED
:
400 mutex_unlock(&(rconn
->rcv_lock
));
404 if (rconn
->sourcetype
!= SOURCE_SOCK
||
405 rconn
->source
.sock
.delay_flush
== 0 ||
406 rconn
->data_buf
.totalsize
+
407 rconn
->data_buf
.overhead
-
408 rconn
->data_buf
.cpacket_buffer
>=
409 BUFFERLIMIT_SOCK_SOCK
/2)
410 wake_up_interruptible(&(rconn
->target
.sock
.wait
));
411 mutex_unlock(&(rconn
->rcv_lock
));
414 rc
= flush_out(rconn
, 0, 0);
415 mutex_unlock(&(rconn
->rcv_lock
));
421 refresh_conn_credits(rconn
, 0, 0);
422 unreserve_sock_buffer(rconn
);
424 if (rc
== RC_FLUSH_CONN_OUT_CONG
) {
425 qos_enqueue(rconn
->target
.out
.nb
->dev
, &(rconn
->target
.out
.rb
),
427 } else if (rc
== RC_FLUSH_CONN_OUT_OOM
) {
428 printk(KERN_DEBUG
"oom");
429 qos_enqueue(rconn
->target
.out
.nb
->dev
, &(rconn
->target
.out
.rb
),
431 } else if (rc
== RC_FLUSH_CONN_OUT_OK_SENT
) {
436 static __s64
_receive_buf(struct conn
*rconn
, char *buf
, __u32 len
, int userbuf
,
437 __u32 maxcpy
, __u32 maxusage
)
439 struct data_buf_item
*item
= 0;
443 if (list_empty(&(rconn
->data_buf
.items
)) == 0) {
444 struct list_head
*last
= rconn
->data_buf
.items
.prev
;
445 item
= container_of(last
, struct data_buf_item
, buf_list
);
452 if (item
== 0 || item
->type
!= TYPE_BUF
||
453 item
->data
.buf
.buflen
<=
454 item
->data
.buf
.datalen
) {
458 if (rconn
->data_buf
.totalsize
+
459 rconn
->data_buf
.overhead
>
465 buflen
= maxusage
- rconn
->data_buf
.totalsize
-
466 rconn
->data_buf
.overhead
-
467 sizeof(struct data_buf_item
);
469 if (totalcpy
+ 64 > maxcpy
&&
470 totalcpy
+ len
> maxcpy
) {
475 if (totalcpy
+ buflen
< maxcpy
)
476 buflen
= maxcpy
- totalcpy
;
481 if (buflen
> PAGESIZE
)
486 item
= kmem_cache_alloc(data_buf_item_slab
, GFP_KERNEL
);
487 if (unlikely(item
== 0)) {
491 memset(item
, 0, sizeof(item
));
492 item
->type
= TYPE_BUF
;
493 item
->data
.buf
.buf
= kmalloc(buflen
, GFP_KERNEL
);
495 if (unlikely(item
->data
.buf
.buf
== 0)) {
496 kmem_cache_free(data_buf_item_slab
, item
);
500 item
->data
.buf
.datalen
= 0;
502 list_add_tail(&(item
->buf_list
),
503 &(rconn
->data_buf
.items
));
504 item
->data
.buf
.buflen
= buflen
;
505 rconn
->data_buf
.overhead
+= buflen
+
506 sizeof(struct data_buf_item
);
509 BUG_ON(item
->type
!= TYPE_BUF
);
510 BUG_ON(item
->data
.buf
.buflen
<= item
->data
.buf
.datalen
);
512 if (item
->data
.buf
.buflen
- item
->data
.buf
.datalen
< cpy
)
513 cpy
= (item
->data
.buf
.buflen
- item
->data
.buf
.datalen
);
516 int notcopied
= copy_from_user(item
->data
.buf
.buf
+
517 item
->data
.buf
.datalen
, buf
, cpy
);
519 if (unlikely(notcopied
> 0))
522 memcpy(item
->data
.buf
.buf
+ item
->data
.buf
.datalen
,
528 rconn
->data_buf
.read_remaining
+= cpy
;
529 rconn
->data_buf
.totalsize
+= cpy
;
530 rconn
->data_buf
.overhead
-= cpy
;
531 BUG_ON(rconn
->data_buf
.totalsize
== 0 &&
532 rconn
->data_buf
.overhead
!= 0);
535 item
->data
.buf
.datalen
+= cpy
;
538 if (unlikely(rc
< 0)) {
548 __s64
receive_userbuf(struct conn
*rconn
, struct msghdr
*msg
, __u32 maxcpy
,
555 while (iovidx
< msg
->msg_iovlen
) {
556 struct iovec
*iov
= msg
->msg_iov
+ iovidx
;
557 __user
char *userbuf
= iov
->iov_base
+ iovread
;
558 __u32 len
= iov
->iov_len
- iovread
;
568 BUG_ON(copied
> maxcpy
);
569 rc
= _receive_buf(rconn
, userbuf
, len
, 1, maxcpy
- copied
,
588 void receive_cpacketresp(struct conn
*rconn
, char *buf
, int len
)
591 BUG_ON(rconn
->data_buf
.cpacket_buffer
< rconn
->data_buf
.totalsize
+len
);
592 rc
= _receive_buf(rconn
, buf
, len
, 0, len
, 0);
597 int receive_skb(struct conn
*rconn
, struct sk_buff
*skb
)
599 struct data_buf_item
*item
;
601 item
= kmem_cache_alloc(data_buf_item_slab
, GFP_KERNEL
);
603 if (unlikely(item
== 0))
606 item
->data
.skb
= skb
;
607 item
->type
= TYPE_SKB
;
608 list_add_tail(&(item
->buf_list
), &(rconn
->data_buf
.items
));
609 rconn
->data_buf
.read_remaining
+= skb
->len
;
610 rconn
->data_buf
.totalsize
+= skb
->len
;
611 rconn
->data_buf
.overhead
+= sizeof(struct data_buf_item
) +
612 sizeof(struct sk_buff
);
614 BUG_ON(rconn
->data_buf
.totalsize
== 0 && rconn
->data_buf
.overhead
!= 0);
619 static void _wake_sender_in(struct conn
*rconn
)
621 int windowlimitreached
= (rconn
->source
.in
.next_seqno
==
622 rconn
->source
.in
.window_seqnolimit_last
);
623 struct neighbor
*nb
= rconn
->source
.in
.nb
;
624 __u32 conn_id
= rconn
->reversedir
->target
.out
.conn_id
;
625 __u32 next_seqno
= rconn
->source
.in
.next_seqno
;
628 struct control_msg_out
*cm
;
630 drain_ooo_queue(rconn
);
631 mutex_unlock(&(rconn
->rcv_lock
));
633 if (windowlimitreached
== 0)
636 window
= get_window(rconn
);
640 cm
= alloc_control_msg(nb
, ACM_PRIORITY_HIGH
);
641 if (unlikely(cm
== 0))
642 send_ping_all_conns(nb
);
644 send_ack_conn(cm
, rconn
, conn_id
, next_seqno
);
647 void wake_sender(struct conn
*rconn
)
649 mutex_lock(&(rconn
->rcv_lock
));
650 switch (rconn
->sourcetype
) {
652 mutex_unlock(&(rconn
->rcv_lock
));
653 parse(rconn
->reversedir
, 0);
656 wake_up_interruptible(&(rconn
->source
.sock
.wait
));
657 mutex_unlock(&(rconn
->rcv_lock
));
660 _wake_sender_in(rconn
); /* mutex_unlock inside */
667 void forward_init(void)
669 data_buf_item_slab
= kmem_cache_create("cor_data_buf_item",
670 sizeof(struct data_buf_item
), 8, 0, 0);
673 MODULE_LICENSE("GPL");