2 * Connection oriented routing
3 * Copyright (C) 2007-2010 Michael Blizek
5 * This program is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU General Public License
7 * as published by the Free Software Foundation; either version 2
8 * of the License, or (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21 #include <linux/mutex.h>
25 struct kmem_cache
*data_buf_item_slab
;
27 #define PAGESIZE (1 << PAGE_SHIFT)
29 void databuf_init(struct data_buf
*data
)
31 memset(data
, 0, sizeof(struct data_buf
));
32 INIT_LIST_HEAD(&(data
->items
));
35 static void databuf_item_free(struct data_buf_item
*item
)
37 if (item
->type
== TYPE_BUF
) {
38 kfree(item
->data
.buf
.buf
);
39 } else if (item
->type
== TYPE_SKB
) {
40 kfree_skb(item
->data
.skb
);
45 list_del(&(item
->buf_list
));
47 kmem_cache_free(data_buf_item_slab
, item
);
50 void reset_seqno(struct data_buf
*buf
)
52 buf
->first_offset
= 0 - buf
->last_read_offset
;
55 void databuf_free(struct data_buf
*data
)
57 while (!list_empty(&(data
->items
))) {
58 struct data_buf_item
*item
= container_of(data
->items
.next
,
59 struct data_buf_item
, buf_list
);
60 databuf_item_free(item
);
64 static void databuf_nextreadchunk(struct data_buf
*data
)
66 if (data
->lastread
== 0) {
67 BUG_ON(data
->last_read_offset
!= 0);
68 BUG_ON(list_empty(&(data
->items
)));
69 data
->lastread
= container_of(data
->items
.next
,
70 struct data_buf_item
, buf_list
);
71 } else if (&(data
->lastread
->buf_list
) != data
->items
.prev
) {
72 data
->lastread
= container_of(data
->lastread
->buf_list
.next
,
73 struct data_buf_item
, buf_list
);
75 data
->last_read_offset
= 0;
79 static int _databuf_pull(struct data_buf
*data
, char *dst
, int len
, int userbuf
)
83 BUG_ON(data
->read_remaining
< len
);
85 if (data
->lastread
== 0)
86 databuf_nextreadchunk(data
);
95 char *srcbufcpystart
= 0;
98 BUG_ON(data
->lastread
== 0);
100 if (data
->lastread
->type
== TYPE_BUF
) {
101 srcbuf
= data
->lastread
->data
.buf
.buf
;
102 srcbuflen
= data
->lastread
->data
.buf
.datalen
;
103 } else if (data
->lastread
->type
== TYPE_SKB
) {
104 srcbuf
= data
->lastread
->data
.skb
->data
;
105 srcbuflen
= data
->lastread
->data
.skb
->len
;
110 srcbufcpystart
= srcbuf
+ data
->last_read_offset
;
111 srcbufcpylen
= srcbuflen
- data
->last_read_offset
;
113 if (cpy
> srcbufcpylen
)
117 int notcopied
= copy_to_user(dst
, srcbufcpystart
, cpy
);
119 if (unlikely(notcopied
> 0))
122 memcpy(dst
, srcbufcpystart
, cpy
);
129 data
->read_remaining
-= cpy
;
130 data
->last_read_offset
+= cpy
;
132 if (cpy
== srcbufcpylen
)
133 databuf_nextreadchunk(data
);
135 if (unlikely(rc
< 0)) {
145 void databuf_pull(struct data_buf
*data
, char *dst
, int len
)
147 _databuf_pull(data
, dst
, len
, 0);
150 size_t databuf_pulluser(struct conn
*sconn
, struct msghdr
*msg
)
156 while (iovidx
< msg
->msg_iovlen
) {
159 struct iovec
*iov
= msg
->msg_iov
+ iovidx
;
160 __user
char *msg
= iov
->iov_base
+ iovread
;
161 unsigned int len
= iov
->iov_len
- iovread
;
169 if (sconn
->buf
.read_remaining
== 0) {
170 if (sconn
->sourcetype
== SOURCE_NONE
)
175 if (len
> sconn
->buf
.read_remaining
)
176 len
= sconn
->buf
.read_remaining
;
178 rc
= _databuf_pull(&(sconn
->buf
), msg
, len
, 1);
196 void databuf_unpull(struct data_buf
*data
, __u32 bytes
)
198 data
->read_remaining
+= bytes
;
200 BUG_ON(data
->lastread
== 0);
202 while (bytes
> data
->last_read_offset
) {
203 bytes
-= data
->last_read_offset
;
204 data
->lastread
= container_of(data
->lastread
->buf_list
.prev
,
205 struct data_buf_item
, buf_list
);
206 BUG_ON(&(data
->lastread
->buf_list
) == &(data
->items
));
209 data
->last_read_offset
-= bytes
;
212 void databuf_pullold(struct data_buf
*data
, __u32 startpos
, char *dst
, int len
)
214 __u32 pos
= data
->first_offset
;
215 struct data_buf_item
*dbi
= container_of(data
->items
.next
,
216 struct data_buf_item
, buf_list
);
221 BUG_ON(&(dbi
->buf_list
) == &(data
->items
));
223 if (data
->lastread
->type
== TYPE_BUF
) {
224 srcbuflen
= dbi
->data
.buf
.datalen
;
225 } else if (data
->lastread
->type
== TYPE_SKB
) {
226 srcbuflen
= dbi
->data
.skb
->len
;
231 if (((__s32
) (pos
+ srcbuflen
- startpos
)) > 0)
235 dbi
= container_of(dbi
->buf_list
.next
, struct data_buf_item
,
245 char *srcbufcpystart
= 0;
246 int srcbufcpylen
= 0;
248 BUG_ON(&(dbi
->buf_list
) == &(data
->items
));
250 if (data
->lastread
->type
== TYPE_BUF
) {
251 srcbuf
= data
->lastread
->data
.buf
.buf
;
252 srcbuflen
= data
->lastread
->data
.buf
.datalen
;
253 } else if (data
->lastread
->type
== TYPE_SKB
) {
254 srcbuf
= data
->lastread
->data
.skb
->data
;
255 srcbuflen
= data
->lastread
->data
.skb
->len
;
260 BUG_ON(((__s32
) (pos
- startpos
)) > 0);
262 srcbufcpystart
= srcbuf
+ ((__s32
) (startpos
- pos
));
263 srcbufcpylen
= srcbuflen
- ((__s32
) (startpos
- pos
));
265 if (cpy
> srcbufcpylen
)
268 memcpy(dst
, srcbufcpystart
, cpy
);
275 dbi
= container_of(dbi
->buf_list
.next
, struct data_buf_item
,
280 /* ack up to *not* including pos */
281 void databuf_ack(struct conn
*rconn
, __u32 pos
)
284 struct data_buf
*buf
= &(rconn
->buf
);
286 while (!list_empty(&(buf
->items
))) {
287 struct data_buf_item
*firstitem
= container_of(buf
->items
.next
,
288 struct data_buf_item
, buf_list
);
291 if (firstitem
== buf
->lastread
)
294 if (firstitem
->type
== TYPE_BUF
) {
295 firstlen
= firstitem
->data
.buf
.datalen
;
296 } else if (firstitem
->type
== TYPE_SKB
) {
297 firstlen
= firstitem
->data
.skb
->len
;
302 if (((__s32
)(buf
->first_offset
+ firstlen
- pos
)) > 0)
305 buf
->first_offset
+= firstlen
;
308 databuf_item_free(firstitem
);
311 if (rconn
-> sourcetype
== SOURCE_IN
)
312 refresh_speedstat(rconn
, acked
);
315 void databuf_ackread(struct conn
*rconn
)
318 struct data_buf
*buf
= &(rconn
->buf
);
320 while (!list_empty(&(buf
->items
)) && buf
->lastread
!= 0) {
321 struct data_buf_item
*firstitem
= container_of(buf
->items
.next
,
322 struct data_buf_item
, buf_list
);
324 if (firstitem
== buf
->lastread
)
327 if (firstitem
->type
== TYPE_BUF
) {
328 buf
->first_offset
+= firstitem
->data
.buf
.datalen
;
329 acked
+= firstitem
->data
.buf
.datalen
;
330 } else if (firstitem
->type
== TYPE_SKB
) {
331 buf
->first_offset
+= firstitem
->data
.skb
->len
;
332 acked
+= firstitem
->data
.skb
->len
;
337 databuf_item_free(firstitem
);
340 if (rconn
-> sourcetype
== SOURCE_IN
)
341 refresh_speedstat(rconn
, acked
);
344 int databuf_maypush(struct data_buf
*buf
)
346 return 16384 - buf
->read_remaining
;
349 void flush_buf(struct conn
*rconn
)
351 switch (rconn
->targettype
) {
352 case TARGET_UNCONNECTED
:
354 if (rconn
->targettype
!= TARGET_UNCONNECTED
)
358 wake_up_interruptible(&(rconn
->target
.sock
.wait
));
368 static int _receive_buf(struct conn
*rconn
, char *buf
, int len
, int userbuf
)
370 struct data_buf_item
*item
= 0;
374 BUG_ON(databuf_maypush(&(rconn
->buf
)) < len
);
376 if (list_empty(&(rconn
->buf
.items
)) == 0) {
377 struct list_head
*last
= rconn
->buf
.items
.prev
;
378 item
= container_of(last
, struct data_buf_item
, buf_list
);
380 if (item
->type
!= TYPE_BUF
|| rconn
->buf
.last_buflen
<=
381 item
->data
.buf
.datalen
)
389 __u32 buflen
= PAGESIZE
;
392 item
= kmem_cache_alloc(data_buf_item_slab
, GFP_KERNEL
);
393 if (unlikely(item
== 0)) {
397 memset(item
, 0, sizeof(item
));
398 item
->type
= TYPE_BUF
;
399 item
->data
.buf
.buf
= kmalloc(buflen
, GFP_KERNEL
);
402 if (unlikely(item
->data
.buf
.buf
== 0)) {
403 kmem_cache_free(data_buf_item_slab
, item
);
407 item
->data
.buf
.datalen
= 0;
408 list_add_tail(&(item
->buf_list
), &(rconn
->buf
.items
));
409 rconn
->buf
.last_buflen
= buflen
;
412 BUG_ON(item
->type
!= TYPE_BUF
);
413 BUG_ON(rconn
->buf
.last_buflen
<= item
->data
.buf
.datalen
);
415 if (rconn
->buf
.last_buflen
- item
->data
.buf
.datalen
< cpy
)
416 cpy
= (rconn
->buf
.last_buflen
- item
->data
.buf
.datalen
);
419 int notcopied
= copy_from_user(item
->data
.buf
.buf
+
420 item
->data
.buf
.datalen
, buf
, cpy
);
422 if (unlikely(notcopied
> 0))
425 memcpy(item
->data
.buf
.buf
+ item
->data
.buf
.datalen
,
433 item
->data
.buf
.datalen
+= cpy
;
436 if (unlikely(rc
< 0)) {
443 rconn
->buf
.read_remaining
+= totalcpy
;
448 int receive_userbuf(struct conn
*rconn
, struct msghdr
*msg
)
454 if (databuf_maypush(&(rconn
->buf
)) <= 0)
457 while (iovidx
< msg
->msg_iovlen
) {
458 struct iovec
*iov
= msg
->msg_iov
+ iovidx
;
459 __user
char *userbuf
= iov
->iov_base
+ iovread
;
460 int len
= iov
->iov_len
- iovread
;
470 pushlimit
= databuf_maypush(&(rconn
->buf
));
472 if (pushlimit
<= 0) {
473 if (rconn
->targettype
== TARGET_UNCONNECTED
)
481 rc
= _receive_buf(rconn
, userbuf
, len
, 1);
500 void receive_buf(struct conn
*rconn
, char *buf
, int len
)
502 BUG_ON(databuf_maypush(&(rconn
->buf
)) < len
);
503 _receive_buf(rconn
, buf
, len
, 0);
507 int receive_skb(struct conn
*rconn
, struct sk_buff
*skb
)
509 struct data_buf_item
*item
;
511 if (databuf_maypush(&(rconn
->buf
)) < skb
->len
)
514 item
= kmem_cache_alloc(data_buf_item_slab
, GFP_KERNEL
);
516 if (unlikely(item
== 0))
519 item
->data
.skb
= skb
;
520 item
->type
= TYPE_SKB
;
521 list_add_tail(&(item
->buf_list
), &(rconn
->buf
.items
));
522 rconn
->buf
.read_remaining
+= skb
->len
;
523 rconn
->buf
.last_buflen
= 0;
530 void wake_sender(struct conn
*rconn
)
532 switch (rconn
->sourcetype
) {
537 wake_up_interruptible(&(rconn
->source
.sock
.wait
));
540 drain_ooo_queue(rconn
);
547 void forward_init(void)
549 data_buf_item_slab
= kmem_cache_create("cor_data_buf_item",
550 sizeof(struct data_buf_item
), 8, 0, 0);