qos_queue locking+content free, conn_ack_rcvd reset check, neighbor_operations lock...
[cor_2_6_31.git] / net / cor / forward.c
blobbbee4b4fcf6a9094a19f4c136f2998ec7f9df0cc
1 /**
2 * Connection oriented routing
3 * Copyright (C) 2007-2011 Michael Blizek
5 * This program is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU General Public License
7 * as published by the Free Software Foundation; either version 2
8 * of the License, or (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
18 * 02110-1301, USA.
21 #include <linux/mutex.h>
23 #include "cor.h"
25 struct kmem_cache *data_buf_item_slab;
27 #define PAGESIZE (1 << PAGE_SHIFT)
29 void databuf_init(struct conn *cn_init)
31 memset(&(cn_init->data_buf), 0, sizeof(cn_init->data_buf));
32 INIT_LIST_HEAD(&(cn_init->data_buf.items));
35 static void databuf_item_free(struct conn *cn_l, struct data_buf_item *item)
37 list_del(&(item->buf_list));
38 if (item->type == TYPE_BUF) {
39 cn_l->data_buf.overhead -= sizeof(struct data_buf_item) +
40 item->buflen - item->datalen;
41 kfree(item->buf);
42 kmem_cache_free(data_buf_item_slab, item);
43 } else if (item->type == TYPE_SKB) {
44 struct sk_buff *skb = skb_from_pstate(container_of(item,
45 struct skb_procstate, funcstate.rcv2.dbi));
46 cn_l->data_buf.overhead -= sizeof(struct sk_buff);
47 kfree_skb(skb);
48 } else {
49 BUG();
53 void databuf_ackdiscard(struct conn *cn_l)
55 __u32 freed = 0;
56 while (!list_empty(&(cn_l->data_buf.items))) {
57 struct data_buf_item *item = container_of(
58 cn_l->data_buf.items.next,
59 struct data_buf_item, buf_list);
60 freed += item->datalen;
62 databuf_item_free(cn_l, item);
65 cn_l->data_buf.totalsize -= freed;
66 cn_l->data_buf.first_offset += freed;
68 BUG_ON(cn_l->data_buf.totalsize != 0);
69 BUG_ON(cn_l->data_buf.overhead != 0);
71 if (cn_l->data_buf.cpacket_buffer != 0) {
72 free_cpacket_buffer(cn_l->data_buf.cpacket_buffer);
73 cn_l->data_buf.cpacket_buffer = 0;
76 cn_l->data_buf.read_remaining = 0;
77 cn_l->data_buf.last_read_offset = 0;
78 cn_l->data_buf.lastread = 0;
80 if (cn_l->isreset == 0 && cn_l->sourcetype == SOURCE_IN)
81 refresh_speedstat(cn_l, freed);
84 void reset_seqno(struct conn *cn_l, __u32 initseqno)
86 cn_l->data_buf.first_offset = initseqno -
87 cn_l->data_buf.totalsize +
88 cn_l->data_buf.read_remaining;
91 static void databuf_nextreadchunk(struct conn *cn_l)
93 if (cn_l->data_buf.lastread == 0) {
94 BUG_ON(cn_l->data_buf.last_read_offset != 0);
95 BUG_ON(list_empty(&(cn_l->data_buf.items)));
96 cn_l->data_buf.lastread = container_of(
97 cn_l->data_buf.items.next,
98 struct data_buf_item, buf_list);
99 } else if (&(cn_l->data_buf.lastread->buf_list) !=
100 cn_l->data_buf.items.prev) {
101 cn_l->data_buf.lastread = container_of(
102 cn_l->data_buf.lastread->buf_list.next,
103 struct data_buf_item, buf_list);
105 cn_l->data_buf.last_read_offset = 0;
109 static int _databuf_pull(struct conn *cn_l, char *dst, int len, int userbuf)
111 int totalcpy = 0;
113 BUG_ON(cn_l->data_buf.read_remaining < len);
115 if (cn_l->data_buf.lastread == 0)
116 databuf_nextreadchunk(cn_l);
118 while(len > 0) {
119 int rc = 0;
120 int cpy = len;
122 char *srcbufcpystart = 0;
123 int srcbufcpylen = 0;
125 BUG_ON(cn_l->data_buf.lastread == 0);
127 srcbufcpystart = cn_l->data_buf.lastread->buf +
128 cn_l->data_buf.last_read_offset;
129 srcbufcpylen = cn_l->data_buf.lastread->datalen -
130 cn_l->data_buf.last_read_offset;
132 if (cpy > srcbufcpylen)
133 cpy = srcbufcpylen;
135 if (userbuf) {
136 int notcopied = copy_to_user(dst, srcbufcpystart, cpy);
137 cpy -= notcopied;
138 if (unlikely(notcopied > 0))
139 rc = -EFAULT;
140 } else {
141 memcpy(dst, srcbufcpystart, cpy);
144 dst += cpy;
145 len -= cpy;
146 totalcpy += cpy;
148 cn_l->data_buf.read_remaining -= cpy;
149 cn_l->data_buf.last_read_offset += cpy;
151 if (cpy == srcbufcpylen)
152 databuf_nextreadchunk(cn_l);
154 if (unlikely(rc < 0)) {
155 if (totalcpy == 0)
156 totalcpy = rc;
157 break;
161 return totalcpy;
164 void databuf_pull(struct conn *cn_l, char *dst, int len)
166 _databuf_pull(cn_l, dst, len, 0);
169 size_t databuf_pulluser(struct conn *trgt_sock_l, struct msghdr *msg)
171 size_t copied = 0;
172 int iovidx = 0;
173 int iovread = 0;
175 while (iovidx < msg->msg_iovlen) {
176 int rc;
178 struct iovec *iov = msg->msg_iov + iovidx;
179 __user char *msg = iov->iov_base + iovread;
180 unsigned int len = iov->iov_len - iovread;
182 if (len == 0) {
183 iovidx++;
184 iovread = 0;
185 continue;
188 if (trgt_sock_l->data_buf.read_remaining == 0) {
189 rc = -EAGAIN;
190 } else {
191 if (len > trgt_sock_l->data_buf.read_remaining)
192 len = trgt_sock_l->data_buf.read_remaining;
194 rc = _databuf_pull(trgt_sock_l, msg, len, 1);
197 BUG_ON(rc == 0);
199 if (rc < 0) {
200 if (copied == 0)
201 copied = rc;
202 break;
205 copied += rc;
206 iovread += rc;
209 return copied;
212 void databuf_unpull(struct conn *trgt_out_l, __u32 bytes)
214 trgt_out_l->data_buf.read_remaining += bytes;
216 BUG_ON(trgt_out_l->data_buf.lastread == 0);
218 while (bytes > trgt_out_l->data_buf.last_read_offset) {
219 bytes -= trgt_out_l->data_buf.last_read_offset;
220 trgt_out_l->data_buf.lastread = container_of(
221 trgt_out_l->data_buf.lastread->buf_list.prev,
222 struct data_buf_item, buf_list);
223 BUG_ON(&(trgt_out_l->data_buf.lastread->buf_list) ==
224 &(trgt_out_l->data_buf.items));
227 trgt_out_l->data_buf.last_read_offset -= bytes;
230 void databuf_pullold(struct conn *trgt_out_l, __u32 startpos, char *dst,
231 int len)
233 __u32 pos = trgt_out_l->data_buf.first_offset;
234 struct data_buf_item *dbi = container_of(
235 trgt_out_l->data_buf.items.next,
236 struct data_buf_item, buf_list);
238 while(1) {
239 BUG_ON(&(dbi->buf_list) == &(trgt_out_l->data_buf.items));
241 if (((__s32) (pos + dbi->datalen - startpos)) > 0)
242 break;
244 pos += dbi->datalen;
245 dbi = container_of(dbi->buf_list.next, struct data_buf_item,
246 buf_list);
249 while (len > 0) {
250 int cpy = len;
252 char *srcbufcpystart = 0;
253 int srcbufcpylen = 0;
255 BUG_ON(&(dbi->buf_list) == &(trgt_out_l->data_buf.items));
257 BUG_ON(((__s32) (pos - startpos)) > 0);
259 srcbufcpystart = dbi->buf + ((__s32) (startpos - pos));
260 srcbufcpylen = dbi->datalen - ((__s32) (startpos - pos));
262 if (cpy > srcbufcpylen)
263 cpy = srcbufcpylen;
265 memcpy(dst, srcbufcpystart, cpy);
267 dst += cpy;
268 len -= cpy;
269 startpos += cpy;
271 pos += dbi->datalen;
272 dbi = container_of(dbi->buf_list.next, struct data_buf_item,
273 buf_list);
277 /* ack up to *not* including pos */
278 void databuf_ack(struct conn *trgt_out_l, __u32 pos)
280 __u32 acked = 0;
282 while (!list_empty(&(trgt_out_l->data_buf.items))) {
283 struct data_buf_item *firstitem = container_of(
284 trgt_out_l->data_buf.items.next,
285 struct data_buf_item, buf_list);
287 if (firstitem == trgt_out_l->data_buf.lastread)
288 break;
290 if ( ((__s32) (trgt_out_l->data_buf.first_offset +
291 firstitem->datalen - pos)) > 0)
292 break;
294 trgt_out_l->data_buf.first_offset += firstitem->datalen;
295 acked += firstitem->datalen;
297 databuf_item_free(trgt_out_l, firstitem);
300 trgt_out_l->data_buf.totalsize -= acked;
302 BUG_ON(trgt_out_l->data_buf.totalsize == 0 &&
303 trgt_out_l->data_buf.overhead != 0);
305 if (unlikely(trgt_out_l->data_buf.cpacket_buffer != 0)) {
306 __u32 amount = acked > trgt_out_l->data_buf.cpacket_buffer ?
307 acked : trgt_out_l->data_buf.cpacket_buffer;
308 free_cpacket_buffer(amount);
309 trgt_out_l->data_buf.cpacket_buffer -= amount;
312 if (trgt_out_l->sourcetype == SOURCE_IN)
313 refresh_speedstat(trgt_out_l, acked);
316 void databuf_ackread(struct conn *cn_l)
318 __u32 acked = 0;
320 while (!list_empty(&(cn_l->data_buf.items)) &&
321 cn_l->data_buf.lastread != 0) {
322 struct data_buf_item *firstitem = container_of(
323 cn_l->data_buf.items.next,
324 struct data_buf_item, buf_list);
326 if (firstitem == cn_l->data_buf.lastread)
327 break;
329 acked += firstitem->datalen;
331 databuf_item_free(cn_l, firstitem);
334 cn_l->data_buf.first_offset += acked;
335 cn_l->data_buf.totalsize -= acked;
337 BUG_ON(cn_l->data_buf.totalsize == 0 && cn_l->data_buf.overhead != 0);
339 if (unlikely(cn_l->data_buf.cpacket_buffer != 0)) {
340 __u32 amount = acked > cn_l->data_buf.cpacket_buffer ?
341 acked : cn_l->data_buf.cpacket_buffer;
342 free_cpacket_buffer(amount);
343 cn_l->data_buf.cpacket_buffer -= amount;
346 if (cn_l->sourcetype == SOURCE_IN)
347 refresh_speedstat(cn_l, acked);
350 static __s64 _receive_buf(struct conn *cn_l, char *buf, __u32 len, int userbuf,
351 __u32 maxcpy, __u32 maxusage)
353 struct data_buf_item *item = 0;
355 __s64 totalcpy = 0;
357 if (list_empty(&(cn_l->data_buf.items)) == 0) {
358 struct list_head *last = cn_l->data_buf.items.prev;
359 item = container_of(last, struct data_buf_item, buf_list);
362 while (len > 0) {
363 int rc = 0;
364 int cpy = len;
366 if (item == 0 || item->buflen <= item->datalen) {
367 __u32 buflen = len;
369 if (maxusage != 0) {
370 if (cn_l->data_buf.totalsize +
371 cn_l->data_buf.overhead >
372 maxusage) {
373 rc = -EAGAIN;
374 goto error;
377 buflen = maxusage - cn_l->data_buf.totalsize -
378 cn_l->data_buf.overhead -
379 sizeof(struct data_buf_item);
380 } else {
381 if (totalcpy + 64 > maxcpy &&
382 totalcpy + len > maxcpy) {
383 rc = -EAGAIN;
384 goto error;
387 if (totalcpy + buflen < maxcpy)
388 buflen = maxcpy - totalcpy;
391 if (buflen < 64)
392 buflen = 64;
393 if (buflen > PAGESIZE)
394 buflen = PAGESIZE;
395 if (buflen > 32768)
396 buflen = 32768;
398 if (unlikely(unlikely(cn_l->data_buf.totalsize +
399 buflen > (1 << 30)) || unlikely(
400 cn_l->data_buf.overhead > (1 << 30)))) {
401 rc = -EAGAIN;
402 goto error;
405 item = kmem_cache_alloc(data_buf_item_slab, GFP_KERNEL);
406 if (unlikely(item == 0)) {
407 rc = -ENOMEM;
408 goto error;
410 memset(item, 0, sizeof(item));
411 item->type = TYPE_BUF;
412 item->buf = kmalloc(buflen, GFP_KERNEL);
414 if (unlikely(item->buf == 0)) {
415 kmem_cache_free(data_buf_item_slab, item);
416 rc = -ENOMEM;
417 goto error;
419 item->datalen = 0;
421 list_add_tail(&(item->buf_list),
422 &(cn_l->data_buf.items));
423 item->buflen = buflen;
424 cn_l->data_buf.overhead += buflen +
425 sizeof(struct data_buf_item);
428 BUG_ON(item->type != TYPE_BUF);
429 BUG_ON(item->buflen <= item->datalen);
431 if (item->buflen - item->datalen < cpy)
432 cpy = (item->buflen - item->datalen);
434 if (userbuf) {
435 int notcopied = copy_from_user(item->buf +
436 item->datalen, buf, cpy);
437 cpy -= notcopied;
438 if (unlikely(notcopied > 0))
439 rc = -EFAULT;
440 } else {
441 memcpy(item->buf + item->datalen, buf, cpy);
444 buf += cpy;
445 len -= cpy;
446 cn_l->data_buf.read_remaining += cpy;
447 cn_l->data_buf.totalsize += cpy;
448 cn_l->data_buf.overhead -= cpy;
449 BUG_ON(cn_l->data_buf.totalsize == 0 &&
450 cn_l->data_buf.overhead != 0);
451 totalcpy += cpy;
453 item->datalen += cpy;
455 error:
456 if (unlikely(rc < 0)) {
457 if (totalcpy == 0)
458 return rc;
459 break;
463 return totalcpy;
466 __s64 receive_userbuf(struct conn *src_sock_l, struct msghdr *msg, __u32 maxcpy,
467 __u32 maxusage)
469 __s64 copied = 0;
470 int iovidx = 0;
471 int iovread = 0;
473 while (iovidx < msg->msg_iovlen) {
474 struct iovec *iov = msg->msg_iov + iovidx;
475 __user char *userbuf = iov->iov_base + iovread;
476 __u32 len = iov->iov_len - iovread;
477 __s64 rc;
479 if (len == 0) {
480 iovidx++;
481 iovread = 0;
482 continue;
485 BUG_ON(copied < 0);
486 BUG_ON(copied > maxcpy);
487 rc = _receive_buf(src_sock_l, userbuf, len, 1, maxcpy - copied,
488 maxusage);
490 if (rc < 0) {
491 if (copied == 0)
492 copied = rc;
493 break;
496 copied += rc;
497 iovread += rc;
499 if (rc < len)
500 break;
503 return copied;
506 void receive_cpacketresp(struct conn *trtg_unconn_l, char *buf, int len)
508 __s64 rc;
509 BUG_ON(trtg_unconn_l->data_buf.cpacket_buffer <
510 trtg_unconn_l->data_buf.totalsize + len);
511 rc = _receive_buf(trtg_unconn_l, buf, len, 0, len, 0);
512 BUG_ON(rc < 0);
513 BUG_ON(rc < len);
516 int receive_skb(struct conn *src_in_l, struct sk_buff *skb)
518 struct skb_procstate *ps = skb_pstate(skb);
519 struct data_buf_item *item = &(ps->funcstate.rcv2.dbi);
521 if (unlikely(unlikely(src_in_l->data_buf.totalsize + skb->len >
522 (1 << 30)) || unlikely(src_in_l->data_buf.overhead >
523 (1 << 30))))
524 return 1;
526 item->type = TYPE_SKB;
527 item->buf = skb->data;
528 item->datalen = skb->len;
529 item->buflen = item->datalen;
530 list_add_tail(&(item->buf_list), &(src_in_l->data_buf.items));
532 src_in_l->data_buf.read_remaining += item->datalen;
533 src_in_l->data_buf.totalsize += item->datalen;
534 src_in_l->data_buf.overhead += sizeof(struct sk_buff);
536 return 0;
539 static void _wake_sender_in(struct conn *src_in_l)
541 drain_ooo_queue(src_in_l);
542 mutex_unlock(&(src_in_l->rcv_lock));
543 get_window(src_in_l, 0, 0, 0);
546 void wake_sender(struct conn *cn)
548 unreserve_sock_buffer(cn);
550 mutex_lock(&(cn->rcv_lock));
551 switch (cn->sourcetype) {
552 case SOURCE_NONE:
553 mutex_unlock(&(cn->rcv_lock));
554 parse(cn->reversedir, 0);
555 break;
556 case SOURCE_SOCK:
557 wake_up_interruptible(&(cn->source.sock.wait));
558 mutex_unlock(&(cn->rcv_lock));
559 break;
560 case SOURCE_IN:
561 _wake_sender_in(cn); /* mutex_unlock inside */
562 break;
563 default:
564 BUG();
568 void flush_buf(struct conn *cn)
570 int rc;
571 int sent = 0;
572 mutex_lock(&(cn->rcv_lock));
574 switch (cn->targettype) {
575 case TARGET_UNCONNECTED:
576 mutex_unlock(&(cn->rcv_lock));
577 parse(cn, 0);
578 break;
579 case TARGET_SOCK:
580 if (cn->sourcetype != SOURCE_SOCK ||
581 cn->source.sock.delay_flush == 0 ||
582 cn->data_buf.totalsize +
583 cn->data_buf.overhead -
584 cn->data_buf.cpacket_buffer >=
585 BUFFERLIMIT_SOCK_SOCK/2)
586 wake_up_interruptible(&(cn->target.sock.wait));
587 mutex_unlock(&(cn->rcv_lock));
588 break;
589 case TARGET_OUT:
590 rc = flush_out(cn, 0, 0);
591 mutex_unlock(&(cn->rcv_lock));
592 sent = (rc == RC_FLUSH_CONN_OUT_OK_SENT);
593 break;
594 case TARGET_DISCARD:
595 databuf_ackdiscard(cn);
596 mutex_unlock(&(cn->rcv_lock));
597 sent = 1;
598 break;
599 default:
600 BUG();
603 refresh_conn_credits(cn, 0, 0);
605 if (sent) {
606 wake_sender(cn);
610 void __init forward_init(void)
612 data_buf_item_slab = kmem_cache_create("cor_data_buf_item",
613 sizeof(struct data_buf_item), 8, 0, 0);
616 MODULE_LICENSE("GPL");