2 * Connection oriented routing
3 * Copyright (C) 2007-2008 Michael Blizek
5 * This program is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU General Public License
7 * as published by the Free Software Foundation; either version 2
8 * of the License, or (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21 #include <linux/mutex.h>
25 struct kmem_cache
*data_buf_item_slab
;
27 #define PAGESIZE (1 << PAGE_SHIFT)
29 void databuf_init(struct data_buf
*data
)
31 memset(data
, 0, sizeof(struct data_buf
));
32 INIT_LIST_HEAD(&(data
->items
));
35 static void databuf_item_free(struct data_buf_item
*item
)
37 if (item
->type
== TYPE_BUF
) {
38 kfree(item
->data
.buf
.buf
);
39 } else if (item
->type
== TYPE_SKB
) {
40 kfree_skb(item
->data
.skb
);
45 list_del(&(item
->buf_list
));
47 kmem_cache_free(data_buf_item_slab
, item
);
50 void databuf_free(struct data_buf
*data
)
52 while (!list_empty(&(data
->items
))) {
53 struct data_buf_item
*item
= container_of(data
->items
.next
,
54 struct data_buf_item
, buf_list
);
55 databuf_item_free(item
);
59 static void databuf_nextreadchunk(struct data_buf
*data
)
61 if (list_empty(&(data
->items
)))
64 data
->lastread
= container_of(data
->items
.next
,
65 struct data_buf_item
, buf_list
);
67 data
->last_read_offset
= 0;
70 int _databuf_pull(struct data_buf
*data
, char *dst
, int len
, int userbuf
)
74 BUG_ON(data
->read_remaining
< len
);
76 if (data
->lastread
== 0)
77 databuf_nextreadchunk(data
);
86 char *srcbufcpystart
= 0;
89 BUG_ON(data
->lastread
== 0);
91 if (data
->lastread
->type
== TYPE_BUF
) {
92 srcbuf
= data
->lastread
->data
.buf
.buf
;
93 srcbuflen
= data
->lastread
->data
.buf
.datalen
;
94 } else if (data
->lastread
->type
== TYPE_SKB
) {
95 srcbuf
= data
->lastread
->data
.skb
->data
;
96 srcbuflen
= data
->lastread
->data
.skb
->len
;
101 srcbufcpystart
= srcbuf
+ data
->last_read_offset
;
102 srcbufcpylen
= srcbuflen
- data
->last_read_offset
;
104 if (cpy
> srcbufcpylen
)
108 int notcopied
= copy_to_user(dst
, srcbufcpystart
, cpy
);
113 memcpy(dst
, srcbufcpystart
, cpy
);
120 data
->read_remaining
-= cpy
;
121 data
->last_read_offset
+= cpy
;
123 if (cpy
== srcbufcpylen
)
124 databuf_nextreadchunk(data
);
136 void databuf_pull(struct data_buf
*data
, char *dst
, int len
)
138 _databuf_pull(data
, dst
, len
, 0);
141 size_t databuf_pulluser(struct conn
*sconn
, struct msghdr
*msg
)
147 while (iovidx
< msg
->msg_iovlen
) {
150 struct iovec
*iov
= msg
->msg_iov
+ iovidx
;
151 __user
char *msg
= iov
->iov_base
+ iovread
;
152 unsigned int len
= iov
->iov_len
- iovread
;
160 if (sconn
->buf
.read_remaining
== 0) {
161 if (sconn
->sourcetype
== SOURCE_NONE
)
166 if (len
> sconn
->buf
.read_remaining
)
167 len
= sconn
->buf
.read_remaining
;
169 rc
= _databuf_pull(&(sconn
->buf
), msg
, len
, 1);
187 /* ack up to *not* including pos (e.g. pos=0 ack nothing) */
188 void databuf_ack(struct data_buf
*buf
, __u64 pos
)
190 if (pos
<= buf
->first_offset
)
192 BUG_ON(pos
> buf
->read_offset
);
195 struct list_head
*first
= buf
->items
.next
;
196 struct data_buf_item
*firstitem
= container_of(first
,
197 struct data_buf_item
, buf_list
);
200 BUG_ON(list_empty(&(buf
->items
)));
202 if (firstitem
->type
== TYPE_BUF
) {
203 firstlen
= firstitem
->data
.buf
.datalen
;
204 } else if (firstitem
->type
== TYPE_SKB
) {
205 firstlen
= firstitem
->data
.skb
->len
;
210 if (buf
->first_offset
+ firstlen
> pos
)
213 buf
->first_offset
+= firstlen
;
215 databuf_item_free(firstitem
);
219 void databuf_ackread(struct data_buf
*buf
)
221 databuf_ack(buf
, buf
->read_offset
);
224 int databuf_maypush(struct data_buf
*buf
)
226 return 16384 - buf
->read_remaining
;
229 static void local_delivery(struct conn
*rconn
)
231 wake_up_interruptible(&(rconn
->target
.sock
.wait
));
234 void flush_buf(struct conn
*rconn
)
236 switch (rconn
->targettype
) {
237 case TARGET_UNCONNECTED
:
239 if (rconn
->targettype
!= TARGET_UNCONNECTED
)
243 local_delivery(rconn
);
253 static int _receive_buf(struct conn
*rconn
, char *buf
, int len
, int userbuf
)
255 struct data_buf_item
*item
= 0;
259 BUG_ON(databuf_maypush(&(rconn
->buf
)) < len
);
261 if (list_empty(&(rconn
->buf
.items
)) == 0) {
262 struct list_head
*last
= rconn
->buf
.items
.prev
;
263 item
= container_of(last
, struct data_buf_item
, buf_list
);
265 if (item
->type
!= TYPE_BUF
|| rconn
->buf
.last_buflen
<=
266 item
->data
.buf
.datalen
)
274 item
= kmem_cache_alloc(data_buf_item_slab
, GFP_KERNEL
);
279 memset(item
, 0, sizeof(item
));
280 item
->type
= TYPE_BUF
;
281 item
->data
.buf
.buf
= kmalloc(PAGESIZE
, GFP_KERNEL
);
282 if (item
->data
.buf
.buf
== 0) {
283 kmem_cache_free(data_buf_item_slab
, item
);
287 item
->data
.buf
.datalen
= 0;
288 list_add_tail(&(item
->buf_list
), &(rconn
->buf
.items
));
289 rconn
->buf
.last_buflen
= PAGESIZE
;
292 BUG_ON(item
->type
!= TYPE_BUF
);
293 BUG_ON(rconn
->buf
.last_buflen
<= item
->data
.buf
.datalen
);
295 if (rconn
->buf
.last_buflen
- item
->data
.buf
.datalen
< cpy
)
296 cpy
= (rconn
->buf
.last_buflen
- item
->data
.buf
.datalen
);
299 int notcopied
= copy_from_user(item
->data
.buf
.buf
+
300 item
->data
.buf
.datalen
, buf
, cpy
);
305 memcpy(item
->data
.buf
.buf
+ item
->data
.buf
.datalen
,
313 item
->data
.buf
.datalen
+= cpy
;
323 rconn
->buf
.read_remaining
+= totalcpy
;
328 int receive_userbuf(struct conn
*rconn
, struct msghdr
*msg
)
334 if (databuf_maypush(&(rconn
->buf
)) <= 0)
337 while (iovidx
< msg
->msg_iovlen
) {
338 struct iovec
*iov
= msg
->msg_iov
+ iovidx
;
339 __user
char *userbuf
= iov
->iov_base
+ iovread
;
340 int len
= iov
->iov_len
- iovread
;
350 pushlimit
= databuf_maypush(&(rconn
->buf
));
352 if (pushlimit
<= 0) {
353 if (rconn
->targettype
== TARGET_UNCONNECTED
)
361 rc
= _receive_buf(rconn
, userbuf
, len
, 1);
380 void receive_buf(struct conn
*rconn
, char *buf
, int len
)
382 BUG_ON(databuf_maypush(&(rconn
->buf
)) < len
);
383 _receive_buf(rconn
, buf
, len
, 0);
387 int receive_skb(struct conn
*rconn
, struct sk_buff
*skb
)
389 struct data_buf_item
*item
;
391 if (databuf_maypush(&(rconn
->buf
)) < skb
->len
)
394 item
= kmem_cache_alloc(data_buf_item_slab
, GFP_KERNEL
);
399 item
->data
.skb
= skb
;
400 item
->type
= TYPE_SKB
;
401 list_add_tail(&(item
->buf_list
), &(rconn
->buf
.items
));
402 rconn
->buf
.read_remaining
+= skb
->len
;
403 rconn
->buf
.last_buflen
= 0;
410 void wake_sender(struct conn
*rconn
)
412 switch (rconn
->sourcetype
) {
417 wake_up_interruptible(&(rconn
->target
.sock
.wait
));
420 drain_ooo_queue(rconn
);
427 void forward_init(void)
429 data_buf_item_slab
= kmem_cache_create("cor_data_buf_item",
430 sizeof(struct data_buf_item
), 8, 0, 0);