2 * Connection oriented routing
3 * Copyright (C) 2007-2008 Michael Blizek
5 * This program is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU General Public License
7 * as published by the Free Software Foundation; either version 2
8 * of the License, or (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21 #include <linux/mutex.h>
25 struct kmem_cache
*data_buf_item_slab
;
27 #define PAGESIZE (1 << PAGE_SHIFT)
29 void databuf_init(struct data_buf
*data
)
31 memset(data
, 0, sizeof(struct data_buf
));
32 INIT_LIST_HEAD(&(data
->items
));
35 static void databuf_item_free(struct data_buf_item
*item
)
37 if (item
->type
== TYPE_BUF
) {
38 kfree(item
->data
.buf
.buf
);
39 } else if (item
->type
== TYPE_SKB
) {
40 kfree_skb(item
->data
.skb
);
45 list_del(&(item
->buf_list
));
47 kmem_cache_free(data_buf_item_slab
, item
);
50 void databuf_free(struct data_buf
*data
)
52 while (!list_empty(&(data
->items
))) {
53 struct data_buf_item
*item
= container_of(data
->items
.next
,
54 struct data_buf_item
, buf_list
);
55 databuf_item_free(item
);
59 static void databuf_nextreadchunk(struct data_buf
*data
)
61 if (data
->lastread
== 0) {
62 BUG_ON(data
->last_read_offset
!= 0);
63 BUG_ON(list_empty(&(data
->items
)));
64 data
->lastread
= container_of(data
->items
.next
,
65 struct data_buf_item
, buf_list
);
66 } else if (&(data
->lastread
->buf_list
) != data
->items
.prev
) {
67 data
->lastread
= container_of(data
->lastread
->buf_list
.next
,
68 struct data_buf_item
, buf_list
);
70 data
->last_read_offset
= 0;
74 int _databuf_pull(struct data_buf
*data
, char *dst
, int len
, int userbuf
)
78 BUG_ON(data
->read_remaining
< len
);
80 if (data
->lastread
== 0)
81 databuf_nextreadchunk(data
);
90 char *srcbufcpystart
= 0;
93 BUG_ON(data
->lastread
== 0);
95 if (data
->lastread
->type
== TYPE_BUF
) {
96 srcbuf
= data
->lastread
->data
.buf
.buf
;
97 srcbuflen
= data
->lastread
->data
.buf
.datalen
;
98 } else if (data
->lastread
->type
== TYPE_SKB
) {
99 srcbuf
= data
->lastread
->data
.skb
->data
;
100 srcbuflen
= data
->lastread
->data
.skb
->len
;
105 srcbufcpystart
= srcbuf
+ data
->last_read_offset
;
106 srcbufcpylen
= srcbuflen
- data
->last_read_offset
;
108 if (cpy
> srcbufcpylen
)
112 int notcopied
= copy_to_user(dst
, srcbufcpystart
, cpy
);
117 memcpy(dst
, srcbufcpystart
, cpy
);
124 data
->read_remaining
-= cpy
;
125 data
->last_read_offset
+= cpy
;
127 if (cpy
== srcbufcpylen
)
128 databuf_nextreadchunk(data
);
140 void databuf_pull(struct data_buf
*data
, char *dst
, int len
)
142 _databuf_pull(data
, dst
, len
, 0);
145 size_t databuf_pulluser(struct conn
*sconn
, struct msghdr
*msg
)
151 while (iovidx
< msg
->msg_iovlen
) {
154 struct iovec
*iov
= msg
->msg_iov
+ iovidx
;
155 __user
char *msg
= iov
->iov_base
+ iovread
;
156 unsigned int len
= iov
->iov_len
- iovread
;
164 if (sconn
->buf
.read_remaining
== 0) {
165 if (sconn
->sourcetype
== SOURCE_NONE
)
170 if (len
> sconn
->buf
.read_remaining
)
171 len
= sconn
->buf
.read_remaining
;
173 rc
= _databuf_pull(&(sconn
->buf
), msg
, len
, 1);
191 /* ack up to *not* including pos (e.g. pos=0 ack nothing) */
192 void databuf_ack(struct data_buf
*buf
, __u64 pos
)
194 if (pos
<= buf
->first_offset
)
196 BUG_ON(pos
> buf
->read_offset
);
199 struct list_head
*first
= buf
->items
.next
;
200 struct data_buf_item
*firstitem
= container_of(first
,
201 struct data_buf_item
, buf_list
);
204 BUG_ON(list_empty(&(buf
->items
)));
206 if (firstitem
->type
== TYPE_BUF
) {
207 firstlen
= firstitem
->data
.buf
.datalen
;
208 } else if (firstitem
->type
== TYPE_SKB
) {
209 firstlen
= firstitem
->data
.skb
->len
;
214 if (buf
->first_offset
+ firstlen
> pos
||
215 firstitem
== buf
->lastread
)
218 buf
->first_offset
+= firstlen
;
220 databuf_item_free(firstitem
);
224 void databuf_ackread(struct data_buf
*buf
)
226 databuf_ack(buf
, buf
->read_offset
);
229 int databuf_maypush(struct data_buf
*buf
)
231 return 16384 - buf
->read_remaining
;
234 static void local_delivery(struct conn
*rconn
)
236 wake_up_interruptible(&(rconn
->target
.sock
.wait
));
239 void flush_buf(struct conn
*rconn
)
241 switch (rconn
->targettype
) {
242 case TARGET_UNCONNECTED
:
244 if (rconn
->targettype
!= TARGET_UNCONNECTED
)
248 local_delivery(rconn
);
258 static int _receive_buf(struct conn
*rconn
, char *buf
, int len
, int userbuf
)
260 struct data_buf_item
*item
= 0;
264 BUG_ON(databuf_maypush(&(rconn
->buf
)) < len
);
266 if (list_empty(&(rconn
->buf
.items
)) == 0) {
267 struct list_head
*last
= rconn
->buf
.items
.prev
;
268 item
= container_of(last
, struct data_buf_item
, buf_list
);
270 if (item
->type
!= TYPE_BUF
|| rconn
->buf
.last_buflen
<=
271 item
->data
.buf
.datalen
)
279 __u32 buflen
= PAGESIZE
;
282 item
= kmem_cache_alloc(data_buf_item_slab
, GFP_KERNEL
);
287 memset(item
, 0, sizeof(item
));
288 item
->type
= TYPE_BUF
;
289 item
->data
.buf
.buf
= kmalloc(buflen
, GFP_KERNEL
);
292 if (item
->data
.buf
.buf
== 0) {
293 kmem_cache_free(data_buf_item_slab
, item
);
297 item
->data
.buf
.datalen
= 0;
298 list_add_tail(&(item
->buf_list
), &(rconn
->buf
.items
));
299 rconn
->buf
.last_buflen
= buflen
;
302 BUG_ON(item
->type
!= TYPE_BUF
);
303 BUG_ON(rconn
->buf
.last_buflen
<= item
->data
.buf
.datalen
);
305 if (rconn
->buf
.last_buflen
- item
->data
.buf
.datalen
< cpy
)
306 cpy
= (rconn
->buf
.last_buflen
- item
->data
.buf
.datalen
);
309 int notcopied
= copy_from_user(item
->data
.buf
.buf
+
310 item
->data
.buf
.datalen
, buf
, cpy
);
315 memcpy(item
->data
.buf
.buf
+ item
->data
.buf
.datalen
,
323 item
->data
.buf
.datalen
+= cpy
;
333 rconn
->buf
.read_remaining
+= totalcpy
;
338 int receive_userbuf(struct conn
*rconn
, struct msghdr
*msg
)
344 if (databuf_maypush(&(rconn
->buf
)) <= 0)
347 while (iovidx
< msg
->msg_iovlen
) {
348 struct iovec
*iov
= msg
->msg_iov
+ iovidx
;
349 __user
char *userbuf
= iov
->iov_base
+ iovread
;
350 int len
= iov
->iov_len
- iovread
;
360 pushlimit
= databuf_maypush(&(rconn
->buf
));
362 if (pushlimit
<= 0) {
363 if (rconn
->targettype
== TARGET_UNCONNECTED
)
371 rc
= _receive_buf(rconn
, userbuf
, len
, 1);
390 void receive_buf(struct conn
*rconn
, char *buf
, int len
)
392 BUG_ON(databuf_maypush(&(rconn
->buf
)) < len
);
393 _receive_buf(rconn
, buf
, len
, 0);
397 int receive_skb(struct conn
*rconn
, struct sk_buff
*skb
)
399 struct data_buf_item
*item
;
401 if (databuf_maypush(&(rconn
->buf
)) < skb
->len
)
404 item
= kmem_cache_alloc(data_buf_item_slab
, GFP_KERNEL
);
409 item
->data
.skb
= skb
;
410 item
->type
= TYPE_SKB
;
411 list_add_tail(&(item
->buf_list
), &(rconn
->buf
.items
));
412 rconn
->buf
.read_remaining
+= skb
->len
;
413 rconn
->buf
.last_buflen
= 0;
420 void wake_sender(struct conn
*rconn
)
422 switch (rconn
->sourcetype
) {
427 wake_up_interruptible(&(rconn
->source
.sock
.wait
));
430 drain_ooo_queue(rconn
);
437 void forward_init(void)
439 data_buf_item_slab
= kmem_cache_create("cor_data_buf_item",
440 sizeof(struct data_buf_item
), 8, 0, 0);