qos queue
[cor_2_6_31.git] / net / cor / forward.c
blob1a96ad18b621af8ce38b91d94a3a2545a0fa810e
1 /*
2 * Connection oriented routing
3 * Copyright (C) 2007-2008 Michael Blizek
5 * This program is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU General Public License
7 * as published by the Free Software Foundation; either version 2
8 * of the License, or (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
18 * 02110-1301, USA.
21 #include <linux/mutex.h>
23 #include "cor.h"
25 struct kmem_cache *data_buf_item_slab;
27 #define PAGESIZE (1 << PAGE_SHIFT)
29 void databuf_init(struct data_buf *data)
31 memset(data, 0, sizeof(struct data_buf));
32 INIT_LIST_HEAD(&(data->items));
35 static void databuf_item_free(struct data_buf_item *item)
37 if (item->type == TYPE_BUF) {
38 kfree(item->data.buf.buf);
39 } else if (item->type == TYPE_SKB) {
40 kfree_skb(item->data.skb);
41 } else {
42 BUG();
45 list_del(&(item->buf_list));
47 kmem_cache_free(data_buf_item_slab, item);
50 void reset_seqno(struct data_buf *buf)
52 buf->first_offset = 0 - buf->last_read_offset;
55 void databuf_free(struct data_buf *data)
57 while (!list_empty(&(data->items))) {
58 struct data_buf_item *item = container_of(data->items.next,
59 struct data_buf_item, buf_list);
60 databuf_item_free(item);
64 static void databuf_nextreadchunk(struct data_buf *data)
66 if (data->lastread == 0) {
67 BUG_ON(data->last_read_offset != 0);
68 BUG_ON(list_empty(&(data->items)));
69 data->lastread = container_of(data->items.next,
70 struct data_buf_item, buf_list);
71 } else if (&(data->lastread->buf_list) != data->items.prev) {
72 data->lastread = container_of(data->lastread->buf_list.next,
73 struct data_buf_item, buf_list);
75 data->last_read_offset = 0;
79 int _databuf_pull(struct data_buf *data, char *dst, int len, int userbuf)
81 int totalcpy = 0;
83 BUG_ON(data->read_remaining < len);
85 if (data->lastread == 0)
86 databuf_nextreadchunk(data);
88 while(len > 0) {
89 int rc = 0;
90 int cpy = len;
92 char *srcbuf = 0;
93 int srcbuflen = 0;
95 char *srcbufcpystart = 0;
96 int srcbufcpylen = 0;
98 BUG_ON(data->lastread == 0);
100 if (data->lastread->type == TYPE_BUF) {
101 srcbuf = data->lastread->data.buf.buf;
102 srcbuflen = data->lastread->data.buf.datalen;
103 } else if (data->lastread->type == TYPE_SKB) {
104 srcbuf = data->lastread->data.skb->data;
105 srcbuflen = data->lastread->data.skb->len;
106 } else {
107 BUG();
110 srcbufcpystart = srcbuf + data->last_read_offset;
111 srcbufcpylen = srcbuflen - data->last_read_offset;
113 if (cpy > srcbufcpylen)
114 cpy = srcbufcpylen;
116 if (userbuf) {
117 int notcopied = copy_to_user(dst, srcbufcpystart, cpy);
118 cpy -= notcopied;
119 if (unlikely(notcopied > 0))
120 rc = -EFAULT;
121 } else {
122 memcpy(dst, srcbufcpystart, cpy);
125 dst += cpy;
126 len -= cpy;
127 totalcpy += cpy;
129 data->read_remaining -= cpy;
130 data->last_read_offset += cpy;
132 if (cpy == srcbufcpylen)
133 databuf_nextreadchunk(data);
135 if (unlikely(rc < 0)) {
136 if (totalcpy == 0)
137 totalcpy = rc;
138 break;
142 return totalcpy;
145 void databuf_pull(struct data_buf *data, char *dst, int len)
147 _databuf_pull(data, dst, len, 0);
150 size_t databuf_pulluser(struct conn *sconn, struct msghdr *msg)
152 size_t copied = 0;
153 int iovidx = 0;
154 int iovread = 0;
156 while (iovidx < msg->msg_iovlen) {
157 int rc;
159 struct iovec *iov = msg->msg_iov + iovidx;
160 __user char *msg = iov->iov_base + iovread;
161 unsigned int len = iov->iov_len - iovread;
163 if (len == 0) {
164 iovidx++;
165 iovread = 0;
166 continue;
169 if (sconn->buf.read_remaining == 0) {
170 if (sconn->sourcetype == SOURCE_NONE)
171 rc = -EPIPE;
172 else
173 rc = -EAGAIN;
174 } else {
175 if (len > sconn->buf.read_remaining)
176 len = sconn->buf.read_remaining;
178 rc = _databuf_pull(&(sconn->buf), msg, len, 1);
181 BUG_ON(rc == 0);
183 if (rc < 0) {
184 if (copied == 0)
185 copied = rc;
186 break;
189 copied += rc;
190 iovread += rc;
193 return copied;
196 void databuf_unpull(struct data_buf *data, __u32 bytes)
198 data->read_remaining += bytes;
200 BUG_ON(data->lastread == 0);
202 while (bytes > data->last_read_offset) {
203 bytes -= data->last_read_offset;
204 data->lastread = container_of(data->lastread->buf_list.prev,
205 struct data_buf_item, buf_list);
206 BUG_ON(&(data->lastread->buf_list) == &(data->items));
209 data->last_read_offset -= bytes;
212 void databuf_pullold(struct data_buf *data, __u32 startpos, char *dst, int len)
214 __u32 pos = data->first_offset;
215 struct data_buf_item *dbi = container_of(data->items.next,
216 struct data_buf_item, buf_list);
218 while(1) {
219 int srcbuflen;
221 BUG_ON(&(dbi->buf_list) == &(data->items));
223 if (data->lastread->type == TYPE_BUF) {
224 srcbuflen = dbi->data.buf.datalen;
225 } else if (data->lastread->type == TYPE_SKB) {
226 srcbuflen = dbi->data.skb->len;
227 } else {
228 BUG();
231 if (((__s32) (pos + srcbuflen - startpos)) > 0)
232 break;
234 pos += srcbuflen;
235 dbi = container_of(dbi->buf_list.next, struct data_buf_item,
236 buf_list);
239 while (len > 0) {
240 int cpy = len;
242 char *srcbuf = 0;
243 int srcbuflen = 0;
245 char *srcbufcpystart = 0;
246 int srcbufcpylen = 0;
248 BUG_ON(&(dbi->buf_list) == &(data->items));
250 if (data->lastread->type == TYPE_BUF) {
251 srcbuf = data->lastread->data.buf.buf;
252 srcbuflen = data->lastread->data.buf.datalen;
253 } else if (data->lastread->type == TYPE_SKB) {
254 srcbuf = data->lastread->data.skb->data;
255 srcbuflen = data->lastread->data.skb->len;
256 } else {
257 BUG();
260 BUG_ON(((__s32) (pos - startpos)) > 0);
262 srcbufcpystart = srcbuf + ((__s32) (startpos - pos));
263 srcbufcpylen = srcbuflen - ((__s32) (startpos - pos));
265 if (cpy > srcbufcpylen)
266 cpy = srcbufcpylen;
268 memcpy(dst, srcbufcpystart, cpy);
270 dst += cpy;
271 len -= cpy;
272 startpos += cpy;
274 pos += srcbuflen;
275 dbi = container_of(dbi->buf_list.next, struct data_buf_item,
276 buf_list);
280 /* ack up to *not* including pos */
281 void databuf_ack(struct data_buf *buf, __u32 pos)
283 while (!list_empty(&(buf->items))) {
284 struct data_buf_item *firstitem = container_of(buf->items.next,
285 struct data_buf_item, buf_list);
286 int firstlen = 0;
288 if (firstitem == buf->lastread)
289 break;
291 if (firstitem->type == TYPE_BUF) {
292 firstlen = firstitem->data.buf.datalen;
293 } else if (firstitem->type == TYPE_SKB) {
294 firstlen = firstitem->data.skb->len;
295 } else {
296 BUG();
299 if (((__s32)(buf->first_offset + firstlen - pos)) > 0)
300 break;
302 buf->first_offset += firstlen;
304 databuf_item_free(firstitem);
308 void databuf_ackread(struct data_buf *buf)
310 while (!list_empty(&(buf->items)) && buf->lastread != 0) {
311 struct data_buf_item *firstitem = container_of(buf->items.next,
312 struct data_buf_item, buf_list);
314 if (firstitem == buf->lastread)
315 break;
317 if (firstitem->type == TYPE_BUF) {
318 buf->first_offset += firstitem->data.buf.datalen;
319 } else if (firstitem->type == TYPE_SKB) {
320 buf->first_offset += firstitem->data.skb->len;
321 } else {
322 BUG();
325 databuf_item_free(firstitem);
329 int databuf_maypush(struct data_buf *buf)
331 return 16384 - buf->read_remaining;
334 void flush_buf(struct conn *rconn)
336 switch (rconn->targettype) {
337 case TARGET_UNCONNECTED:
338 parse(rconn);
339 if (rconn->targettype != TARGET_UNCONNECTED)
340 flush_buf(rconn);
341 break;
342 case TARGET_SOCK:
343 wake_up_interruptible(&(rconn->target.sock.wait));
344 break;
345 case TARGET_OUT:
346 flush_out(rconn);
347 break;
348 default:
349 BUG();
353 static int _receive_buf(struct conn *rconn, char *buf, int len, int userbuf)
355 struct data_buf_item *item = 0;
357 int totalcpy = 0;
359 BUG_ON(databuf_maypush(&(rconn->buf)) < len);
361 if (list_empty(&(rconn->buf.items)) == 0) {
362 struct list_head *last = rconn->buf.items.prev;
363 item = container_of(last, struct data_buf_item, buf_list);
365 if (item->type != TYPE_BUF || rconn->buf.last_buflen <=
366 item->data.buf.datalen)
367 item = 0;
370 while (len > 0) {
371 int rc = 0;
372 int cpy = len;
373 if (item == 0) {
374 __u32 buflen = PAGESIZE;
375 if (buflen > 32768)
376 buflen = 32768;
377 item = kmem_cache_alloc(data_buf_item_slab, GFP_KERNEL);
378 if (unlikely(item == 0)) {
379 rc = -ENOMEM;
380 goto error;
382 memset(item, 0, sizeof(item));
383 item->type = TYPE_BUF;
384 item->data.buf.buf = kmalloc(buflen, GFP_KERNEL);
387 if (unlikely(item->data.buf.buf == 0)) {
388 kmem_cache_free(data_buf_item_slab, item);
389 rc = -ENOMEM;
390 goto error;
392 item->data.buf.datalen = 0;
393 list_add_tail(&(item->buf_list), &(rconn->buf.items));
394 rconn->buf.last_buflen = buflen;
397 BUG_ON(item->type != TYPE_BUF);
398 BUG_ON(rconn->buf.last_buflen <= item->data.buf.datalen);
400 if (rconn->buf.last_buflen - item->data.buf.datalen < cpy)
401 cpy = (rconn->buf.last_buflen - item->data.buf.datalen);
403 if (userbuf) {
404 int notcopied = copy_from_user(item->data.buf.buf +
405 item->data.buf.datalen, buf, cpy);
406 cpy -= notcopied;
407 if (unlikely(notcopied > 0))
408 rc = -EFAULT;
409 } else {
410 memcpy(item->data.buf.buf + item->data.buf.datalen,
411 buf, cpy);
414 buf += cpy;
415 len -= cpy;
416 totalcpy += cpy;
418 item->data.buf.datalen += cpy;
420 error:
421 if (unlikely(rc < 0)) {
422 if (totalcpy == 0)
423 return rc;
424 break;
428 rconn->buf.read_remaining += totalcpy;
430 return totalcpy;
433 int receive_userbuf(struct conn *rconn, struct msghdr *msg)
435 int copied = 0;
436 int iovidx = 0;
437 int iovread = 0;
439 if (databuf_maypush(&(rconn->buf)) <= 0)
440 return -EAGAIN;
442 while (iovidx < msg->msg_iovlen) {
443 struct iovec *iov = msg->msg_iov + iovidx;
444 __user char *userbuf = iov->iov_base + iovread;
445 int len = iov->iov_len - iovread;
446 int rc;
447 int pushlimit;
449 if (len == 0) {
450 iovidx++;
451 iovread = 0;
452 continue;
455 pushlimit = databuf_maypush(&(rconn->buf));
457 if (pushlimit <= 0) {
458 if (rconn->targettype == TARGET_UNCONNECTED)
459 rc = -EPIPE;
460 else
461 rc = -EAGAIN;
462 } else {
463 if (pushlimit < len)
464 len = pushlimit;
466 rc = _receive_buf(rconn, userbuf, len, 1);
469 if (rc < 0) {
470 if (copied == 0)
471 copied = rc;
472 break;
475 copied += rc;
476 iovread += rc;
479 if (copied > 0)
480 flush_buf(rconn);;
482 return copied;
485 void receive_buf(struct conn *rconn, char *buf, int len)
487 BUG_ON(databuf_maypush(&(rconn->buf)) < len);
488 _receive_buf(rconn, buf, len, 0);
489 flush_buf(rconn);
492 int receive_skb(struct conn *rconn, struct sk_buff *skb)
494 struct data_buf_item *item;
496 if (databuf_maypush(&(rconn->buf)) < skb->len)
497 return 1;
499 item = kmem_cache_alloc(data_buf_item_slab, GFP_KERNEL);
501 if (unlikely(item == 0))
502 return 1;
504 item->data.skb = skb;
505 item->type = TYPE_SKB;
506 list_add_tail(&(item->buf_list), &(rconn->buf.items));
507 rconn->buf.read_remaining += skb->len;
508 rconn->buf.last_buflen = 0;
510 flush_buf(rconn);
512 return 0;
515 void wake_sender(struct conn *rconn)
517 switch (rconn->sourcetype) {
518 case SOURCE_NONE:
519 /* nothing */
520 break;
521 case SOURCE_SOCK:
522 wake_up_interruptible(&(rconn->source.sock.wait));
523 break;
524 case SOURCE_IN:
525 drain_ooo_queue(rconn);
526 break;
527 default:
528 BUG();
532 void forward_init(void)
534 data_buf_item_slab = kmem_cache_create("cor_data_buf_item",
535 sizeof(struct data_buf_item), 8, 0, 0);