buffering bugfixes
[cor_2_6_31.git] / net / cor / forward.c
blob9e71fee115a19d15f2fbdda086f8873b9acea4cb
1 /*
2 * Connection oriented routing
3 * Copyright (C) 2007-2010 Michael Blizek
5 * This program is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU General Public License
7 * as published by the Free Software Foundation; either version 2
8 * of the License, or (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
18 * 02110-1301, USA.
21 #include <linux/mutex.h>
23 #include "cor.h"
25 struct kmem_cache *data_buf_item_slab;
27 #define PAGESIZE (1 << PAGE_SHIFT)
29 void databuf_init(struct data_buf *data)
31 memset(data, 0, sizeof(struct data_buf));
32 INIT_LIST_HEAD(&(data->items));
35 static void databuf_item_free(struct data_buf_item *item)
37 if (item->type == TYPE_BUF) {
38 kfree(item->data.buf.buf);
39 } else if (item->type == TYPE_SKB) {
40 kfree_skb(item->data.skb);
41 } else {
42 BUG();
45 list_del(&(item->buf_list));
47 kmem_cache_free(data_buf_item_slab, item);
50 void reset_seqno(struct data_buf *buf)
52 buf->first_offset = 0 - buf->last_read_offset;
55 void databuf_free(struct data_buf *data)
57 while (!list_empty(&(data->items))) {
58 struct data_buf_item *item = container_of(data->items.next,
59 struct data_buf_item, buf_list);
60 databuf_item_free(item);
64 static void databuf_nextreadchunk(struct data_buf *data)
66 if (data->lastread == 0) {
67 BUG_ON(data->last_read_offset != 0);
68 BUG_ON(list_empty(&(data->items)));
69 data->lastread = container_of(data->items.next,
70 struct data_buf_item, buf_list);
71 } else if (&(data->lastread->buf_list) != data->items.prev) {
72 data->lastread = container_of(data->lastread->buf_list.next,
73 struct data_buf_item, buf_list);
75 data->last_read_offset = 0;
79 static int _databuf_pull(struct data_buf *data, char *dst, int len, int userbuf)
81 int totalcpy = 0;
83 BUG_ON(data->read_remaining < len);
85 if (data->lastread == 0)
86 databuf_nextreadchunk(data);
88 while(len > 0) {
89 int rc = 0;
90 int cpy = len;
92 char *srcbuf = 0;
93 int srcbuflen = 0;
95 char *srcbufcpystart = 0;
96 int srcbufcpylen = 0;
98 BUG_ON(data->lastread == 0);
100 if (data->lastread->type == TYPE_BUF) {
101 srcbuf = data->lastread->data.buf.buf;
102 srcbuflen = data->lastread->data.buf.datalen;
103 } else if (data->lastread->type == TYPE_SKB) {
104 srcbuf = data->lastread->data.skb->data;
105 srcbuflen = data->lastread->data.skb->len;
106 } else {
107 BUG();
110 srcbufcpystart = srcbuf + data->last_read_offset;
111 srcbufcpylen = srcbuflen - data->last_read_offset;
113 if (cpy > srcbufcpylen)
114 cpy = srcbufcpylen;
116 if (userbuf) {
117 int notcopied = copy_to_user(dst, srcbufcpystart, cpy);
118 cpy -= notcopied;
119 if (unlikely(notcopied > 0))
120 rc = -EFAULT;
121 } else {
122 memcpy(dst, srcbufcpystart, cpy);
125 dst += cpy;
126 len -= cpy;
127 totalcpy += cpy;
129 data->read_remaining -= cpy;
130 data->last_read_offset += cpy;
132 if (cpy == srcbufcpylen)
133 databuf_nextreadchunk(data);
135 if (unlikely(rc < 0)) {
136 if (totalcpy == 0)
137 totalcpy = rc;
138 break;
142 return totalcpy;
145 void databuf_pull(struct data_buf *data, char *dst, int len)
147 _databuf_pull(data, dst, len, 0);
150 size_t databuf_pulluser(struct conn *sconn, struct msghdr *msg)
152 size_t copied = 0;
153 int iovidx = 0;
154 int iovread = 0;
156 while (iovidx < msg->msg_iovlen) {
157 int rc;
159 struct iovec *iov = msg->msg_iov + iovidx;
160 __user char *msg = iov->iov_base + iovread;
161 unsigned int len = iov->iov_len - iovread;
163 if (len == 0) {
164 iovidx++;
165 iovread = 0;
166 continue;
169 if (sconn->buf.read_remaining == 0) {
170 if (sconn->sourcetype == SOURCE_NONE)
171 rc = -EPIPE;
172 else
173 rc = -EAGAIN;
174 } else {
175 if (len > sconn->buf.read_remaining)
176 len = sconn->buf.read_remaining;
178 rc = _databuf_pull(&(sconn->buf), msg, len, 1);
181 BUG_ON(rc == 0);
183 if (rc < 0) {
184 if (copied == 0)
185 copied = rc;
186 break;
189 copied += rc;
190 iovread += rc;
193 return copied;
196 void databuf_unpull(struct data_buf *data, __u32 bytes)
198 data->read_remaining += bytes;
200 BUG_ON(data->lastread == 0);
202 while (bytes > data->last_read_offset) {
203 bytes -= data->last_read_offset;
204 data->lastread = container_of(data->lastread->buf_list.prev,
205 struct data_buf_item, buf_list);
206 BUG_ON(&(data->lastread->buf_list) == &(data->items));
209 data->last_read_offset -= bytes;
212 void databuf_pullold(struct data_buf *data, __u32 startpos, char *dst, int len)
214 __u32 pos = data->first_offset;
215 struct data_buf_item *dbi = container_of(data->items.next,
216 struct data_buf_item, buf_list);
218 while(1) {
219 int srcbuflen;
221 BUG_ON(&(dbi->buf_list) == &(data->items));
223 if (data->lastread->type == TYPE_BUF) {
224 srcbuflen = dbi->data.buf.datalen;
225 } else if (data->lastread->type == TYPE_SKB) {
226 srcbuflen = dbi->data.skb->len;
227 } else {
228 BUG();
231 if (((__s32) (pos + srcbuflen - startpos)) > 0)
232 break;
234 pos += srcbuflen;
235 dbi = container_of(dbi->buf_list.next, struct data_buf_item,
236 buf_list);
239 while (len > 0) {
240 int cpy = len;
242 char *srcbuf = 0;
243 int srcbuflen = 0;
245 char *srcbufcpystart = 0;
246 int srcbufcpylen = 0;
248 BUG_ON(&(dbi->buf_list) == &(data->items));
250 if (data->lastread->type == TYPE_BUF) {
251 srcbuf = data->lastread->data.buf.buf;
252 srcbuflen = data->lastread->data.buf.datalen;
253 } else if (data->lastread->type == TYPE_SKB) {
254 srcbuf = data->lastread->data.skb->data;
255 srcbuflen = data->lastread->data.skb->len;
256 } else {
257 BUG();
260 BUG_ON(((__s32) (pos - startpos)) > 0);
262 srcbufcpystart = srcbuf + ((__s32) (startpos - pos));
263 srcbufcpylen = srcbuflen - ((__s32) (startpos - pos));
265 if (cpy > srcbufcpylen)
266 cpy = srcbufcpylen;
268 memcpy(dst, srcbufcpystart, cpy);
270 dst += cpy;
271 len -= cpy;
272 startpos += cpy;
274 pos += srcbuflen;
275 dbi = container_of(dbi->buf_list.next, struct data_buf_item,
276 buf_list);
280 /* ack up to *not* including pos */
281 void databuf_ack(struct conn *rconn, __u32 pos)
283 __u32 acked = 0;
284 struct data_buf *buf = &(rconn->buf);
286 while (!list_empty(&(buf->items))) {
287 struct data_buf_item *firstitem = container_of(buf->items.next,
288 struct data_buf_item, buf_list);
289 int firstlen = 0;
291 if (firstitem == buf->lastread)
292 break;
294 if (firstitem->type == TYPE_BUF) {
295 firstlen = firstitem->data.buf.datalen;
296 } else if (firstitem->type == TYPE_SKB) {
297 firstlen = firstitem->data.skb->len;
298 } else {
299 BUG();
302 if (((__s32)(buf->first_offset + firstlen - pos)) > 0)
303 break;
305 buf->first_offset += firstlen;
306 acked += firstlen;
308 databuf_item_free(firstitem);
311 if (rconn-> sourcetype == SOURCE_IN)
312 refresh_speedstat(rconn, acked);
315 void databuf_ackread(struct conn *rconn)
317 __u32 acked = 0;
318 struct data_buf *buf = &(rconn->buf);
320 while (!list_empty(&(buf->items)) && buf->lastread != 0) {
321 struct data_buf_item *firstitem = container_of(buf->items.next,
322 struct data_buf_item, buf_list);
324 if (firstitem == buf->lastread)
325 break;
327 if (firstitem->type == TYPE_BUF) {
328 buf->first_offset += firstitem->data.buf.datalen;
329 acked += firstitem->data.buf.datalen;
330 } else if (firstitem->type == TYPE_SKB) {
331 buf->first_offset += firstitem->data.skb->len;
332 acked += firstitem->data.skb->len;
333 } else {
334 BUG();
337 databuf_item_free(firstitem);
340 if (rconn-> sourcetype == SOURCE_IN)
341 refresh_speedstat(rconn, acked);
344 int databuf_maypush(struct data_buf *buf)
346 return 16384 - buf->read_remaining;
349 void flush_buf(struct conn *rconn)
351 switch (rconn->targettype) {
352 case TARGET_UNCONNECTED:
353 parse(rconn);
354 if (rconn->targettype != TARGET_UNCONNECTED)
355 flush_buf(rconn);
356 break;
357 case TARGET_SOCK:
358 wake_up_interruptible(&(rconn->target.sock.wait));
359 break;
360 case TARGET_OUT:
361 flush_out(rconn);
362 break;
363 default:
364 BUG();
368 static int _receive_buf(struct conn *rconn, char *buf, int len, int userbuf)
370 struct data_buf_item *item = 0;
372 int totalcpy = 0;
374 BUG_ON(databuf_maypush(&(rconn->buf)) < len);
376 if (list_empty(&(rconn->buf.items)) == 0) {
377 struct list_head *last = rconn->buf.items.prev;
378 item = container_of(last, struct data_buf_item, buf_list);
380 if (item->type != TYPE_BUF || rconn->buf.last_buflen <=
381 item->data.buf.datalen)
382 item = 0;
385 while (len > 0) {
386 int rc = 0;
387 int cpy = len;
388 if (item == 0) {
389 __u32 buflen = PAGESIZE;
390 if (buflen > 32768)
391 buflen = 32768;
392 item = kmem_cache_alloc(data_buf_item_slab, GFP_KERNEL);
393 if (unlikely(item == 0)) {
394 rc = -ENOMEM;
395 goto error;
397 memset(item, 0, sizeof(item));
398 item->type = TYPE_BUF;
399 item->data.buf.buf = kmalloc(buflen, GFP_KERNEL);
402 if (unlikely(item->data.buf.buf == 0)) {
403 kmem_cache_free(data_buf_item_slab, item);
404 rc = -ENOMEM;
405 goto error;
407 item->data.buf.datalen = 0;
408 list_add_tail(&(item->buf_list), &(rconn->buf.items));
409 rconn->buf.last_buflen = buflen;
412 BUG_ON(item->type != TYPE_BUF);
413 BUG_ON(rconn->buf.last_buflen <= item->data.buf.datalen);
415 if (rconn->buf.last_buflen - item->data.buf.datalen < cpy)
416 cpy = (rconn->buf.last_buflen - item->data.buf.datalen);
418 if (userbuf) {
419 int notcopied = copy_from_user(item->data.buf.buf +
420 item->data.buf.datalen, buf, cpy);
421 cpy -= notcopied;
422 if (unlikely(notcopied > 0))
423 rc = -EFAULT;
424 } else {
425 memcpy(item->data.buf.buf + item->data.buf.datalen,
426 buf, cpy);
429 buf += cpy;
430 len -= cpy;
431 totalcpy += cpy;
433 item->data.buf.datalen += cpy;
435 error:
436 if (unlikely(rc < 0)) {
437 if (totalcpy == 0)
438 return rc;
439 break;
443 rconn->buf.read_remaining += totalcpy;
445 return totalcpy;
448 int receive_userbuf(struct conn *rconn, struct msghdr *msg)
450 int copied = 0;
451 int iovidx = 0;
452 int iovread = 0;
454 if (databuf_maypush(&(rconn->buf)) <= 0)
455 return -EAGAIN;
457 while (iovidx < msg->msg_iovlen) {
458 struct iovec *iov = msg->msg_iov + iovidx;
459 __user char *userbuf = iov->iov_base + iovread;
460 int len = iov->iov_len - iovread;
461 int rc;
462 int pushlimit;
464 if (len == 0) {
465 iovidx++;
466 iovread = 0;
467 continue;
470 pushlimit = databuf_maypush(&(rconn->buf));
472 if (pushlimit <= 0) {
473 if (rconn->targettype == TARGET_UNCONNECTED)
474 rc = -EPIPE;
475 else
476 rc = -EAGAIN;
477 } else {
478 if (pushlimit < len)
479 len = pushlimit;
481 rc = _receive_buf(rconn, userbuf, len, 1);
484 if (rc < 0) {
485 if (copied == 0)
486 copied = rc;
487 break;
490 copied += rc;
491 iovread += rc;
494 if (copied > 0)
495 flush_buf(rconn);;
497 return copied;
500 void receive_buf(struct conn *rconn, char *buf, int len)
502 BUG_ON(databuf_maypush(&(rconn->buf)) < len);
503 _receive_buf(rconn, buf, len, 0);
504 flush_buf(rconn);
507 int receive_skb(struct conn *rconn, struct sk_buff *skb)
509 struct data_buf_item *item;
511 if (databuf_maypush(&(rconn->buf)) < skb->len)
512 return 1;
514 item = kmem_cache_alloc(data_buf_item_slab, GFP_KERNEL);
516 if (unlikely(item == 0))
517 return 1;
519 item->data.skb = skb;
520 item->type = TYPE_SKB;
521 list_add_tail(&(item->buf_list), &(rconn->buf.items));
522 rconn->buf.read_remaining += skb->len;
523 rconn->buf.last_buflen = 0;
525 flush_buf(rconn);
527 return 0;
530 void wake_sender(struct conn *rconn)
532 switch (rconn->sourcetype) {
533 case SOURCE_NONE:
534 /* nothing */
535 break;
536 case SOURCE_SOCK:
537 wake_up_interruptible(&(rconn->source.sock.wait));
538 break;
539 case SOURCE_IN:
540 drain_ooo_queue(rconn);
541 break;
542 default:
543 BUG();
547 void forward_init(void)
549 data_buf_item_slab = kmem_cache_create("cor_data_buf_item",
550 sizeof(struct data_buf_item), 8, 0, 0);