credit system
[cor_2_6_31.git] / net / cor / forward.c
blob41e0f047351d75ce9f8779a7d8c0ef0829dbb677
1 /*
2 * Connection oriented routing
3 * Copyright (C) 2007-2010 Michael Blizek
5 * This program is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU General Public License
7 * as published by the Free Software Foundation; either version 2
8 * of the License, or (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
18 * 02110-1301, USA.
21 #include <linux/mutex.h>
23 #include "cor.h"
25 struct kmem_cache *data_buf_item_slab;
27 #define PAGESIZE (1 << PAGE_SHIFT)
29 void databuf_init(struct conn *conn)
31 memset(&(conn->data_buf), 0, sizeof(conn->data_buf));
32 INIT_LIST_HEAD(&(conn->data_buf.items));
35 static void databuf_item_free(struct conn *conn, struct data_buf_item *item)
37 if (item->type == TYPE_BUF) {
38 kfree(item->data.buf.buf);
39 conn->data_buf.overhead -= item->data.buf.buflen -
40 item->data.buf.datalen;
41 } else if (item->type == TYPE_SKB) {
42 kfree_skb(item->data.skb);
43 conn->data_buf.overhead -= sizeof(struct sk_buff);
44 } else {
45 BUG();
48 conn->data_buf.overhead -= sizeof(struct data_buf_item);
50 list_del(&(item->buf_list));
52 kmem_cache_free(data_buf_item_slab, item);
55 void reset_seqno(struct conn *conn, __u32 initseqno)
57 conn->data_buf.first_offset = initseqno -
58 conn->data_buf.last_read_offset;
61 void databuf_free(struct conn *conn)
63 while (!list_empty(&(conn->data_buf.items))) {
64 struct data_buf_item *item = container_of(
65 conn->data_buf.items.next,
66 struct data_buf_item, buf_list);
67 if (item->type == TYPE_BUF) {
68 conn->data_buf.totalsize -= item->data.buf.datalen;
69 } else if (item->type == TYPE_SKB) {
70 conn->data_buf.totalsize -= item->data.skb->len;
71 } else {
72 BUG();
75 databuf_item_free(conn, item);
78 BUG_ON(conn->data_buf.totalsize != 0);
79 BUG_ON(conn->data_buf.overhead != 0);
81 if (conn->data_buf.cpacket_buffer != 0) {
82 free_cpacket_buffer(conn->data_buf.cpacket_buffer);
83 conn->data_buf.cpacket_buffer = 0;
87 static void databuf_nextreadchunk(struct conn *conn)
89 if (conn->data_buf.lastread == 0) {
90 BUG_ON(conn->data_buf.last_read_offset != 0);
91 BUG_ON(list_empty(&(conn->data_buf.items)));
92 conn->data_buf.lastread = container_of(conn->data_buf.items.next,
93 struct data_buf_item, buf_list);
94 } else if (&(conn->data_buf.lastread->buf_list) !=
95 conn->data_buf.items.prev) {
96 conn->data_buf.lastread = container_of(
97 conn->data_buf.lastread->buf_list.next,
98 struct data_buf_item, buf_list);
100 conn->data_buf.last_read_offset = 0;
104 static int _databuf_pull(struct conn *conn, char *dst, int len, int userbuf)
106 int totalcpy = 0;
108 BUG_ON(conn->data_buf.read_remaining < len);
110 if (conn->data_buf.lastread == 0)
111 databuf_nextreadchunk(conn);
113 while(len > 0) {
114 int rc = 0;
115 int cpy = len;
117 char *srcbuf = 0;
118 int srcbuflen = 0;
120 char *srcbufcpystart = 0;
121 int srcbufcpylen = 0;
123 BUG_ON(conn->data_buf.lastread == 0);
125 if (conn->data_buf.lastread->type == TYPE_BUF) {
126 srcbuf = conn->data_buf.lastread->data.buf.buf;
127 srcbuflen = conn->data_buf.lastread->data.buf.datalen;
128 } else if (conn->data_buf.lastread->type == TYPE_SKB) {
129 srcbuf = conn->data_buf.lastread->data.skb->data;
130 srcbuflen = conn->data_buf.lastread->data.skb->len;
131 } else {
132 BUG();
135 srcbufcpystart = srcbuf + conn->data_buf.last_read_offset;
136 srcbufcpylen = srcbuflen - conn->data_buf.last_read_offset;
138 if (cpy > srcbufcpylen)
139 cpy = srcbufcpylen;
141 if (userbuf) {
142 int notcopied = copy_to_user(dst, srcbufcpystart, cpy);
143 cpy -= notcopied;
144 if (unlikely(notcopied > 0))
145 rc = -EFAULT;
146 } else {
147 memcpy(dst, srcbufcpystart, cpy);
150 dst += cpy;
151 len -= cpy;
152 totalcpy += cpy;
154 conn->data_buf.read_remaining -= cpy;
155 conn->data_buf.last_read_offset += cpy;
157 if (cpy == srcbufcpylen)
158 databuf_nextreadchunk(conn);
160 if (unlikely(rc < 0)) {
161 if (totalcpy == 0)
162 totalcpy = rc;
163 break;
167 return totalcpy;
170 void databuf_pull(struct conn *conn, char *dst, int len)
172 _databuf_pull(conn, dst, len, 0);
175 size_t databuf_pulluser(struct conn *sconn, struct msghdr *msg)
177 size_t copied = 0;
178 int iovidx = 0;
179 int iovread = 0;
181 while (iovidx < msg->msg_iovlen) {
182 int rc;
184 struct iovec *iov = msg->msg_iov + iovidx;
185 __user char *msg = iov->iov_base + iovread;
186 unsigned int len = iov->iov_len - iovread;
188 if (len == 0) {
189 iovidx++;
190 iovread = 0;
191 continue;
194 if (sconn->data_buf.read_remaining == 0) {
195 if (sconn->sourcetype == SOURCE_NONE)
196 rc = -EPIPE;
197 else
198 rc = -EAGAIN;
199 } else {
200 if (len > sconn->data_buf.read_remaining)
201 len = sconn->data_buf.read_remaining;
203 rc = _databuf_pull(sconn, msg, len, 1);
206 BUG_ON(rc == 0);
208 if (rc < 0) {
209 if (copied == 0)
210 copied = rc;
211 break;
214 copied += rc;
215 iovread += rc;
218 return copied;
221 void databuf_unpull(struct conn *conn, __u32 bytes)
223 conn->data_buf.read_remaining += bytes;
225 BUG_ON(conn->data_buf.lastread == 0);
227 while (bytes > conn->data_buf.last_read_offset) {
228 bytes -= conn->data_buf.last_read_offset;
229 conn->data_buf.lastread = container_of(
230 conn->data_buf.lastread->buf_list.prev,
231 struct data_buf_item, buf_list);
232 BUG_ON(&(conn->data_buf.lastread->buf_list) ==
233 &(conn->data_buf.items));
236 conn->data_buf.last_read_offset -= bytes;
239 void databuf_pullold(struct conn *conn, __u32 startpos, char *dst, int len)
241 __u32 pos = conn->data_buf.first_offset;
242 struct data_buf_item *dbi = container_of(conn->data_buf.items.next,
243 struct data_buf_item, buf_list);
245 while(1) {
246 int srcbuflen;
248 BUG_ON(&(dbi->buf_list) == &(conn->data_buf.items));
250 if (conn->data_buf.lastread->type == TYPE_BUF) {
251 srcbuflen = dbi->data.buf.datalen;
252 } else if (conn->data_buf.lastread->type == TYPE_SKB) {
253 srcbuflen = dbi->data.skb->len;
254 } else {
255 BUG();
258 if (((__s32) (pos + srcbuflen - startpos)) > 0)
259 break;
261 pos += srcbuflen;
262 dbi = container_of(dbi->buf_list.next, struct data_buf_item,
263 buf_list);
266 while (len > 0) {
267 int cpy = len;
269 char *srcbuf = 0;
270 int srcbuflen = 0;
272 char *srcbufcpystart = 0;
273 int srcbufcpylen = 0;
275 BUG_ON(&(dbi->buf_list) == &(conn->data_buf.items));
277 if (conn->data_buf.lastread->type == TYPE_BUF) {
278 srcbuf = conn->data_buf.lastread->data.buf.buf;
279 srcbuflen = conn->data_buf.lastread->data.buf.datalen;
280 } else if (conn->data_buf.lastread->type == TYPE_SKB) {
281 srcbuf = conn->data_buf.lastread->data.skb->data;
282 srcbuflen = conn->data_buf.lastread->data.skb->len;
283 } else {
284 BUG();
287 BUG_ON(((__s32) (pos - startpos)) > 0);
289 srcbufcpystart = srcbuf + ((__s32) (startpos - pos));
290 srcbufcpylen = srcbuflen - ((__s32) (startpos - pos));
292 if (cpy > srcbufcpylen)
293 cpy = srcbufcpylen;
295 memcpy(dst, srcbufcpystart, cpy);
297 dst += cpy;
298 len -= cpy;
299 startpos += cpy;
301 pos += srcbuflen;
302 dbi = container_of(dbi->buf_list.next, struct data_buf_item,
303 buf_list);
307 /* ack up to *not* including pos */
308 void databuf_ack(struct conn *rconn, __u32 pos)
310 __u32 acked = 0;
312 while (!list_empty(&(rconn->data_buf.items))) {
313 struct data_buf_item *firstitem = container_of(
314 rconn->data_buf.items.next, struct data_buf_item,
315 buf_list);
316 int firstlen = 0;
318 if (firstitem == rconn->data_buf.lastread)
319 break;
321 if (firstitem->type == TYPE_BUF) {
322 firstlen = firstitem->data.buf.datalen;
323 } else if (firstitem->type == TYPE_SKB) {
324 firstlen = firstitem->data.skb->len;
325 } else {
326 BUG();
329 if (((__s32)(rconn->data_buf.first_offset + firstlen - pos)) >0)
330 break;
332 rconn->data_buf.first_offset += firstlen;
333 acked += firstlen;
335 databuf_item_free(rconn, firstitem);
338 rconn->data_buf.totalsize -= acked;
340 BUG_ON(rconn->data_buf.totalsize == 0 && rconn->data_buf.overhead != 0);
342 if (unlikely(rconn->data_buf.cpacket_buffer != 0)) {
343 __u32 amount = acked > rconn->data_buf.cpacket_buffer ?
344 acked : rconn->data_buf.cpacket_buffer;
345 free_cpacket_buffer(amount);
346 rconn->data_buf.cpacket_buffer -= amount;
349 if (rconn-> sourcetype == SOURCE_IN)
350 refresh_speedstat(rconn, acked);
353 void databuf_ackread(struct conn *rconn)
355 __u32 acked = 0;
357 while (!list_empty(&(rconn->data_buf.items)) &&
358 rconn->data_buf.lastread != 0) {
359 struct data_buf_item *firstitem = container_of(
360 rconn->data_buf.items.next,
361 struct data_buf_item, buf_list);
363 if (firstitem == rconn->data_buf.lastread)
364 break;
366 if (firstitem->type == TYPE_BUF) {
367 acked += firstitem->data.buf.datalen;
368 } else if (firstitem->type == TYPE_SKB) {
369 acked += firstitem->data.skb->len;
370 } else {
371 BUG();
374 databuf_item_free(rconn, firstitem);
377 rconn->data_buf.first_offset += acked;
378 rconn->data_buf.totalsize -= acked;
380 BUG_ON(rconn->data_buf.totalsize == 0 && rconn->data_buf.overhead != 0);
382 if (unlikely(rconn->data_buf.cpacket_buffer != 0)) {
383 __u32 amount = acked > rconn->data_buf.cpacket_buffer ?
384 acked : rconn->data_buf.cpacket_buffer;
385 free_cpacket_buffer(amount);
386 rconn->data_buf.cpacket_buffer -= amount;
389 if (rconn-> sourcetype == SOURCE_IN)
390 refresh_speedstat(rconn, acked);
393 void flush_buf(struct conn *rconn)
395 int rc = RC_FLUSH_CONN_OUT_OK;
396 mutex_lock(&(rconn->rcv_lock));
398 switch (rconn->targettype) {
399 case TARGET_UNCONNECTED:
400 mutex_unlock(&(rconn->rcv_lock));
401 parse(rconn, 0);
402 break;
403 case TARGET_SOCK:
404 if (rconn->sourcetype != SOURCE_SOCK ||
405 rconn->source.sock.delay_flush == 0 ||
406 rconn->data_buf.totalsize +
407 rconn->data_buf.overhead -
408 rconn->data_buf.cpacket_buffer >=
409 BUFFERLIMIT_SOCK_SOCK/2)
410 wake_up_interruptible(&(rconn->target.sock.wait));
411 mutex_unlock(&(rconn->rcv_lock));
412 break;
413 case TARGET_OUT:
414 rc = flush_out(rconn, 0, 0);
415 mutex_unlock(&(rconn->rcv_lock));
416 break;
417 default:
418 BUG();
421 refresh_conn_credits(rconn, 0, 0);
422 unreserve_sock_buffer(rconn);
424 if (rc == RC_FLUSH_CONN_OUT_CONG) {
425 qos_enqueue(rconn->target.out.nb->dev, &(rconn->target.out.rb),
426 QOS_CALLER_CONN);
427 } else if (rc == RC_FLUSH_CONN_OUT_OOM) {
428 printk(KERN_DEBUG "oom");
429 qos_enqueue(rconn->target.out.nb->dev, &(rconn->target.out.rb),
430 QOS_CALLER_CONN);
431 } else if (rc == RC_FLUSH_CONN_OUT_OK_SENT) {
432 wake_sender(rconn);
436 static __s64 _receive_buf(struct conn *rconn, char *buf, __u32 len, int userbuf,
437 __u32 maxcpy, __u32 maxusage)
439 struct data_buf_item *item = 0;
441 __s64 totalcpy = 0;
443 if (list_empty(&(rconn->data_buf.items)) == 0) {
444 struct list_head *last = rconn->data_buf.items.prev;
445 item = container_of(last, struct data_buf_item, buf_list);
448 while (len > 0) {
449 int rc = 0;
450 int cpy = len;
452 if (item == 0 || item->type != TYPE_BUF ||
453 item->data.buf.buflen <=
454 item->data.buf.datalen) {
455 __u32 buflen = len;
457 if (maxusage != 0) {
458 if (rconn->data_buf.totalsize +
459 rconn->data_buf.overhead >
460 maxusage) {
461 rc = -EAGAIN;
462 goto error;
465 buflen = maxusage - rconn->data_buf.totalsize -
466 rconn->data_buf.overhead -
467 sizeof(struct data_buf_item);
468 } else {
469 if (totalcpy + 64 > maxcpy &&
470 totalcpy + len > maxcpy) {
471 rc = -EAGAIN;
472 goto error;
475 if (totalcpy + buflen < maxcpy)
476 buflen = maxcpy - totalcpy;
479 if (buflen < 64)
480 buflen = 64;
481 if (buflen > PAGESIZE)
482 buflen = PAGESIZE;
483 if (buflen > 32768)
484 buflen = 32768;
486 item = kmem_cache_alloc(data_buf_item_slab, GFP_KERNEL);
487 if (unlikely(item == 0)) {
488 rc = -ENOMEM;
489 goto error;
491 memset(item, 0, sizeof(item));
492 item->type = TYPE_BUF;
493 item->data.buf.buf = kmalloc(buflen, GFP_KERNEL);
495 if (unlikely(item->data.buf.buf == 0)) {
496 kmem_cache_free(data_buf_item_slab, item);
497 rc = -ENOMEM;
498 goto error;
500 item->data.buf.datalen = 0;
502 list_add_tail(&(item->buf_list),
503 &(rconn->data_buf.items));
504 item->data.buf.buflen = buflen;
505 rconn->data_buf.overhead += buflen +
506 sizeof(struct data_buf_item);
509 BUG_ON(item->type != TYPE_BUF);
510 BUG_ON(item->data.buf.buflen <= item->data.buf.datalen);
512 if (item->data.buf.buflen - item->data.buf.datalen < cpy)
513 cpy = (item->data.buf.buflen - item->data.buf.datalen);
515 if (userbuf) {
516 int notcopied = copy_from_user(item->data.buf.buf +
517 item->data.buf.datalen, buf, cpy);
518 cpy -= notcopied;
519 if (unlikely(notcopied > 0))
520 rc = -EFAULT;
521 } else {
522 memcpy(item->data.buf.buf + item->data.buf.datalen,
523 buf, cpy);
526 buf += cpy;
527 len -= cpy;
528 rconn->data_buf.read_remaining += cpy;
529 rconn->data_buf.totalsize += cpy;
530 rconn->data_buf.overhead -= cpy;
531 BUG_ON(rconn->data_buf.totalsize == 0 &&
532 rconn->data_buf.overhead != 0);
533 totalcpy += cpy;
535 item->data.buf.datalen += cpy;
537 error:
538 if (unlikely(rc < 0)) {
539 if (totalcpy == 0)
540 return rc;
541 break;
545 return totalcpy;
548 __s64 receive_userbuf(struct conn *rconn, struct msghdr *msg, __u32 maxcpy,
549 __u32 maxusage)
551 __s64 copied = 0;
552 int iovidx = 0;
553 int iovread = 0;
555 while (iovidx < msg->msg_iovlen) {
556 struct iovec *iov = msg->msg_iov + iovidx;
557 __user char *userbuf = iov->iov_base + iovread;
558 __u32 len = iov->iov_len - iovread;
559 __s64 rc;
561 if (len == 0) {
562 iovidx++;
563 iovread = 0;
564 continue;
567 BUG_ON(copied < 0);
568 BUG_ON(copied > maxcpy);
569 rc = _receive_buf(rconn, userbuf, len, 1, maxcpy - copied,
570 maxusage);
572 if (rc < 0) {
573 if (copied == 0)
574 copied = rc;
575 break;
578 copied += rc;
579 iovread += rc;
581 if (rc < len)
582 break;
585 return copied;
588 void receive_cpacketresp(struct conn *rconn, char *buf, int len)
590 __s64 rc;
591 BUG_ON(rconn->data_buf.cpacket_buffer < rconn->data_buf.totalsize +len);
592 rc = _receive_buf(rconn, buf, len, 0, len, 0);
593 BUG_ON(rc < 0);
594 BUG_ON(rc < len);
597 int receive_skb(struct conn *rconn, struct sk_buff *skb)
599 struct data_buf_item *item;
601 item = kmem_cache_alloc(data_buf_item_slab, GFP_KERNEL);
603 if (unlikely(item == 0))
604 return 1;
606 item->data.skb = skb;
607 item->type = TYPE_SKB;
608 list_add_tail(&(item->buf_list), &(rconn->data_buf.items));
609 rconn->data_buf.read_remaining += skb->len;
610 rconn->data_buf.totalsize += skb->len;
611 rconn->data_buf.overhead += sizeof(struct data_buf_item) +
612 sizeof(struct sk_buff);
614 BUG_ON(rconn->data_buf.totalsize == 0 && rconn->data_buf.overhead != 0);
616 return 0;
619 static void _wake_sender_in(struct conn *rconn)
621 int windowlimitreached = (rconn->source.in.next_seqno ==
622 rconn->source.in.window_seqnolimit_last);
623 struct neighbor *nb = rconn->source.in.nb;
624 __u32 conn_id = rconn->reversedir->target.out.conn_id;
625 __u32 next_seqno = rconn->source.in.next_seqno;
627 __u8 window;
628 struct control_msg_out *cm;
630 drain_ooo_queue(rconn);
631 mutex_unlock(&(rconn->rcv_lock));
633 if (windowlimitreached == 0)
634 return;
636 window = get_window(rconn);
637 if (window == 0)
638 return;
640 cm = alloc_control_msg(nb, ACM_PRIORITY_HIGH);
641 if (unlikely(cm == 0))
642 send_ping_all_conns(nb);
643 else
644 send_ack_conn(cm, rconn, conn_id, next_seqno);
647 void wake_sender(struct conn *rconn)
649 mutex_lock(&(rconn->rcv_lock));
650 switch (rconn->sourcetype) {
651 case SOURCE_NONE:
652 mutex_unlock(&(rconn->rcv_lock));
653 parse(rconn->reversedir, 0);
654 break;
655 case SOURCE_SOCK:
656 wake_up_interruptible(&(rconn->source.sock.wait));
657 mutex_unlock(&(rconn->rcv_lock));
658 break;
659 case SOURCE_IN:
660 _wake_sender_in(rconn); /* mutex_unlock inside */
661 break;
662 default:
663 BUG();
667 void forward_init(void)
669 data_buf_item_slab = kmem_cache_create("cor_data_buf_item",
670 sizeof(struct data_buf_item), 8, 0, 0);
673 MODULE_LICENSE("GPL");