socket sleep bugfix, socket wakeup bugfix, drain_ooo_queue locking bigfix
[cor_2_6_31.git] / net / cor / forward.c
bloba7fb134cf5fce1a17e0a11d12dd5a68011c494dd
1 /*
2 * Connection oriented routing
3 * Copyright (C) 2007-2008 Michael Blizek
5 * This program is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU General Public License
7 * as published by the Free Software Foundation; either version 2
8 * of the License, or (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
18 * 02110-1301, USA.
21 #include <linux/mutex.h>
23 #include "cor.h"
25 struct kmem_cache *data_buf_item_slab;
27 #define PAGESIZE (1 << PAGE_SHIFT)
29 void databuf_init(struct data_buf *data)
31 memset(data, 0, sizeof(struct data_buf));
32 INIT_LIST_HEAD(&(data->items));
35 static void databuf_item_free(struct data_buf_item *item)
37 if (item->type == TYPE_BUF) {
38 kfree(item->data.buf.buf);
39 } else if (item->type == TYPE_SKB) {
40 kfree_skb(item->data.skb);
41 } else {
42 BUG();
45 list_del(&(item->buf_list));
47 kmem_cache_free(data_buf_item_slab, item);
50 void databuf_free(struct data_buf *data)
52 while (!list_empty(&(data->items))) {
53 struct data_buf_item *item = container_of(data->items.next,
54 struct data_buf_item, buf_list);
55 databuf_item_free(item);
59 static void databuf_nextreadchunk(struct data_buf *data)
61 if (data->lastread == 0) {
62 BUG_ON(data->last_read_offset != 0);
63 BUG_ON(list_empty(&(data->items)));
64 data->lastread = container_of(data->items.next,
65 struct data_buf_item, buf_list);
66 } else if (&(data->lastread->buf_list) != data->items.prev) {
67 data->lastread = container_of(data->lastread->buf_list.next,
68 struct data_buf_item, buf_list);
70 data->last_read_offset = 0;
74 int _databuf_pull(struct data_buf *data, char *dst, int len, int userbuf)
76 int totalcpy = 0;
78 BUG_ON(data->read_remaining < len);
80 if (data->lastread == 0)
81 databuf_nextreadchunk(data);
83 while(len > 0) {
84 int rc = 0;
85 int cpy = len;
87 char *srcbuf = 0;
88 int srcbuflen = 0;
90 char *srcbufcpystart = 0;
91 int srcbufcpylen = 0;
93 BUG_ON(data->lastread == 0);
95 if (data->lastread->type == TYPE_BUF) {
96 srcbuf = data->lastread->data.buf.buf;
97 srcbuflen = data->lastread->data.buf.datalen;
98 } else if (data->lastread->type == TYPE_SKB) {
99 srcbuf = data->lastread->data.skb->data;
100 srcbuflen = data->lastread->data.skb->len;
101 } else {
102 BUG();
105 srcbufcpystart = srcbuf + data->last_read_offset;
106 srcbufcpylen = srcbuflen - data->last_read_offset;
108 if (cpy > srcbufcpylen)
109 cpy = srcbufcpylen;
111 if (userbuf) {
112 int notcopied = copy_to_user(dst, srcbufcpystart, cpy);
113 cpy -= notcopied;
114 if (notcopied > 0)
115 rc = -EFAULT;
116 } else {
117 memcpy(dst, srcbufcpystart, cpy);
120 dst += cpy;
121 len -= cpy;
122 totalcpy += cpy;
124 data->read_remaining -= cpy;
125 data->last_read_offset += cpy;
127 if (cpy == srcbufcpylen)
128 databuf_nextreadchunk(data);
130 if (rc < 0) {
131 if (totalcpy == 0)
132 totalcpy = rc;
133 break;
137 return totalcpy;
140 void databuf_pull(struct data_buf *data, char *dst, int len)
142 _databuf_pull(data, dst, len, 0);
145 size_t databuf_pulluser(struct conn *sconn, struct msghdr *msg)
147 size_t copied = 0;
148 int iovidx = 0;
149 int iovread = 0;
151 while (iovidx < msg->msg_iovlen) {
152 int rc;
154 struct iovec *iov = msg->msg_iov + iovidx;
155 __user char *msg = iov->iov_base + iovread;
156 unsigned int len = iov->iov_len - iovread;
158 if (len == 0) {
159 iovidx++;
160 iovread = 0;
161 continue;
164 if (sconn->buf.read_remaining == 0) {
165 if (sconn->sourcetype == SOURCE_NONE)
166 rc = -EPIPE;
167 else
168 rc = -EAGAIN;
169 } else {
170 if (len > sconn->buf.read_remaining)
171 len = sconn->buf.read_remaining;
173 rc = _databuf_pull(&(sconn->buf), msg, len, 1);
176 BUG_ON(rc == 0);
178 if (rc < 0) {
179 if (copied == 0)
180 copied = rc;
181 break;
184 copied += rc;
185 iovread += rc;
188 return copied;
191 /* ack up to *not* including pos (e.g. pos=0 ack nothing) */
192 void databuf_ack(struct data_buf *buf, __u64 pos)
194 if (pos <= buf->first_offset)
195 return;
196 BUG_ON(pos > buf->read_offset);
198 while (1) {
199 struct list_head *first = buf->items.next;
200 struct data_buf_item *firstitem = container_of(first,
201 struct data_buf_item, buf_list);
202 int firstlen = 0;
204 BUG_ON(list_empty(&(buf->items)));
206 if (firstitem->type == TYPE_BUF) {
207 firstlen = firstitem->data.buf.datalen;
208 } else if (firstitem->type == TYPE_SKB) {
209 firstlen = firstitem->data.skb->len;
210 } else {
211 BUG();
214 if (buf->first_offset + firstlen > pos ||
215 firstitem == buf->lastread)
216 break;
218 buf->first_offset += firstlen;
220 databuf_item_free(firstitem);
224 void databuf_ackread(struct data_buf *buf)
226 databuf_ack(buf, buf->read_offset);
229 int databuf_maypush(struct data_buf *buf)
231 return 16384 - buf->read_remaining;
234 static void local_delivery(struct conn *rconn)
236 wake_up_interruptible(&(rconn->target.sock.wait));
239 void flush_buf(struct conn *rconn)
241 switch (rconn->targettype) {
242 case TARGET_UNCONNECTED:
243 parse(rconn);
244 if (rconn->targettype != TARGET_UNCONNECTED)
245 flush_buf(rconn);
246 break;
247 case TARGET_SOCK:
248 local_delivery(rconn);
249 break;
250 case TARGET_OUT:
251 flush_out(rconn);
252 break;
253 default:
254 BUG();
258 static int _receive_buf(struct conn *rconn, char *buf, int len, int userbuf)
260 struct data_buf_item *item = 0;
262 int totalcpy = 0;
264 BUG_ON(databuf_maypush(&(rconn->buf)) < len);
266 if (list_empty(&(rconn->buf.items)) == 0) {
267 struct list_head *last = rconn->buf.items.prev;
268 item = container_of(last, struct data_buf_item, buf_list);
270 if (item->type != TYPE_BUF || rconn->buf.last_buflen <=
271 item->data.buf.datalen)
272 item = 0;
275 while (len > 0) {
276 int rc = 0;
277 int cpy = len;
278 if (item == 0) {
279 item = kmem_cache_alloc(data_buf_item_slab, GFP_KERNEL);
280 if (item == 0) {
281 rc = -ENOMEM;
282 goto error;
284 memset(item, 0, sizeof(item));
285 item->type = TYPE_BUF;
286 item->data.buf.buf = kmalloc(PAGESIZE, GFP_KERNEL);
287 if (item->data.buf.buf == 0) {
288 kmem_cache_free(data_buf_item_slab, item);
289 rc = -ENOMEM;
290 goto error;
292 item->data.buf.datalen = 0;
293 list_add_tail(&(item->buf_list), &(rconn->buf.items));
294 rconn->buf.last_buflen = PAGESIZE;
297 BUG_ON(item->type != TYPE_BUF);
298 BUG_ON(rconn->buf.last_buflen <= item->data.buf.datalen);
300 if (rconn->buf.last_buflen - item->data.buf.datalen < cpy)
301 cpy = (rconn->buf.last_buflen - item->data.buf.datalen);
303 if (userbuf) {
304 int notcopied = copy_from_user(item->data.buf.buf +
305 item->data.buf.datalen, buf, cpy);
306 cpy -= notcopied;
307 if (notcopied > 0)
308 rc = -EFAULT;
309 } else {
310 memcpy(item->data.buf.buf + item->data.buf.datalen,
311 buf, cpy);
314 buf += cpy;
315 len -= cpy;
316 totalcpy += cpy;
318 item->data.buf.datalen += cpy;
320 error:
321 if (rc < 0) {
322 if (totalcpy == 0)
323 return rc;
324 break;
328 rconn->buf.read_remaining += totalcpy;
330 return totalcpy;
333 int receive_userbuf(struct conn *rconn, struct msghdr *msg)
335 int copied = 0;
336 int iovidx = 0;
337 int iovread = 0;
339 if (databuf_maypush(&(rconn->buf)) <= 0)
340 return -EAGAIN;
342 while (iovidx < msg->msg_iovlen) {
343 struct iovec *iov = msg->msg_iov + iovidx;
344 __user char *userbuf = iov->iov_base + iovread;
345 int len = iov->iov_len - iovread;
346 int rc;
347 int pushlimit;
349 if (len == 0) {
350 iovidx++;
351 iovread = 0;
352 continue;
355 pushlimit = databuf_maypush(&(rconn->buf));
357 if (pushlimit <= 0) {
358 if (rconn->targettype == TARGET_UNCONNECTED)
359 rc = -EPIPE;
360 else
361 rc = -EAGAIN;
362 } else {
363 if (pushlimit < len)
364 len = pushlimit;
366 rc = _receive_buf(rconn, userbuf, len, 1);
369 if (rc < 0) {
370 if (copied == 0)
371 copied = rc;
372 break;
375 copied += rc;
376 iovread += rc;
379 if (copied > 0)
380 flush_buf(rconn);;
382 return copied;
385 void receive_buf(struct conn *rconn, char *buf, int len)
387 BUG_ON(databuf_maypush(&(rconn->buf)) < len);
388 _receive_buf(rconn, buf, len, 0);
389 flush_buf(rconn);
392 int receive_skb(struct conn *rconn, struct sk_buff *skb)
394 struct data_buf_item *item;
396 if (databuf_maypush(&(rconn->buf)) < skb->len)
397 return 1;
399 item = kmem_cache_alloc(data_buf_item_slab, GFP_KERNEL);
401 if (item == 0)
402 return 1;
404 item->data.skb = skb;
405 item->type = TYPE_SKB;
406 list_add_tail(&(item->buf_list), &(rconn->buf.items));
407 rconn->buf.read_remaining += skb->len;
408 rconn->buf.last_buflen = 0;
410 flush_buf(rconn);
412 return 0;
415 void wake_sender(struct conn *rconn)
417 switch (rconn->sourcetype) {
418 case SOURCE_NONE:
419 /* nothing */
420 break;
421 case SOURCE_SOCK:
422 wake_up_interruptible(&(rconn->source.sock.wait));
423 break;
424 case SOURCE_IN:
425 drain_ooo_queue(rconn);
426 break;
427 default:
428 BUG();
432 void forward_init(void)
434 data_buf_item_slab = kmem_cache_create("cor_data_buf_item",
435 sizeof(struct data_buf_item), 8, 0, 0);