credits, sock/cpacket buffer accounting+resume, type of service, MSG_MORE delayed...
[cor_2_6_31.git] / net / cor / sock.c
blob11c42b1349bbe71d16f2eec3334c3339c173abcf
1 /*
2 * Connection oriented routing
3 * Copyright (C) 2007-2010 Michael Blizek
5 * This program is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU General Public License
7 * as published by the Free Software Foundation; either version 2
8 * of the License, or (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
18 * 02110-1301, USA.
21 #include <net/sock.h>
22 #include <linux/net.h>
23 #include <asm/uaccess.h>
25 #include "cor.h"
27 /**
28 * sock_bt_wait_list and waiting_conns are ordered by min amount first, the
29 * order in which resuming will happen
32 DEFINE_MUTEX(sock_bufferlimits_lock);
33 LIST_HEAD(sock_bt_list);
34 LIST_HEAD(sock_bt_wait_list);
35 static __u64 sock_bufferusage;
37 static struct work_struct outofsockbufferspace_work;
38 static int outofsockbufferspace_scheduled;
40 void free_sbt(struct kref *ref)
42 struct sock_buffertracker *sbt = container_of(ref,
43 struct sock_buffertracker, ref);
45 BUG_ON(sbt->usage != 0);
46 BUG_ON(list_empty(&(sbt->waiting_conns)) == 0);
48 list_del(&(sbt->lh));
49 kfree(sbt);
52 static struct sock_buffertracker *get_sock_buffertracker(uid_t uid)
54 struct sock_buffertracker *sbt;
55 struct list_head *curr;
57 curr = sock_bt_list.next;
58 while (curr != &sock_bt_list) {
59 sbt = container_of(curr, struct sock_buffertracker, lh);
60 BUG_ON(list_empty(&(sbt->waiting_conns)) != 0);
61 if (sbt->uid == uid)
62 goto found;
63 curr = curr->next;
66 curr = sock_bt_wait_list.next;
67 while (curr != &sock_bt_wait_list) {
68 sbt = container_of(curr, struct sock_buffertracker, lh);
69 BUG_ON(list_empty(&(sbt->waiting_conns)));
70 if (sbt->uid == uid)
71 goto found;
72 curr = curr->next;
75 sbt = kmalloc(sizeof(struct sock_buffertracker), GFP_KERNEL);
76 if (sbt != 0)
77 memset(sbt, 0, sizeof(struct sock_buffertracker));
78 sbt->uid = uid;
79 list_add_tail(&(sbt->lh), &sock_bt_list);
80 INIT_LIST_HEAD(&(sbt->delflush_conns));
81 INIT_LIST_HEAD(&(sbt->waiting_conns));
82 kref_init(&(sbt->ref));
84 if (0) {
85 found:
86 kref_get(&(sbt->ref));
88 return sbt;
91 static void _reserve_sock_buffer_reord_bt(struct sock_buffertracker *sbt,
92 int waitingconnremoved)
94 if (waitingconnremoved && list_empty(&(sbt->waiting_conns))) {
95 list_del(&(sbt->lh));
96 list_add_tail(&(sbt->lh), &sock_bt_list);
97 return;
100 if (list_empty(&(sbt->waiting_conns)))
101 return;
103 while(sbt->lh.next != &sock_bt_wait_list) {
104 struct sock_buffertracker *next = container_of(sbt->lh.next,
105 struct sock_buffertracker, lh);
107 BUG_ON(sbt->lh.next == &sock_bt_list);
109 if (sbt->usage <= next->usage)
110 break;
112 list_del(&(sbt->lh));
113 list_add(&(sbt->lh), &(next->lh));
117 static int oosbs_resumesbt(struct sock_buffertracker *sbt)
119 int restart = 0;
120 struct list_head *curr = sbt->delflush_conns.next;
122 while (curr != &(sbt->delflush_conns)) {
123 struct conn *rconn = container_of(curr, struct conn,
124 source.sock.delflush_list);
125 int flush = 0;
127 mutex_lock(&(rconn->rcv_lock));
129 BUG_ON(rconn->sourcetype != SOURCE_SOCK);
131 BUG_ON(rconn->source.sock.delay_flush == 0);
133 if (rconn->data_buf.read_remaining != 0) {
134 rconn->source.sock.delay_flush = 0;
135 list_del(&(rconn->source.sock.delflush_list));
136 flush = 1;
139 mutex_unlock(&(rconn->rcv_lock));
141 if (flush) {
142 if (restart == 0) {
143 restart = 1;
144 kref_get(&(sbt->ref));
145 mutex_unlock(&sock_bufferlimits_lock);
147 flush_buf(rconn);
150 curr = curr->next;
153 if (restart)
154 kref_put(&(sbt->ref), free_sbt);
156 return restart;
159 static void oosbs_global(void)
161 struct list_head *curr;
163 if (0) {
164 restart:
165 mutex_lock(&sock_bufferlimits_lock);
168 curr = sock_bt_list.prev;
169 while (curr != &sock_bt_list) {
170 struct sock_buffertracker *sbt = container_of(curr,
171 struct sock_buffertracker, lh);
172 BUG_ON(list_empty(&(sbt->waiting_conns)) == 0);
173 if (oosbs_resumesbt(sbt))
174 goto restart;
175 curr = curr->prev;
178 curr = sock_bt_wait_list.prev;
179 while (curr != &sock_bt_wait_list) {
180 struct sock_buffertracker *sbt = container_of(curr,
181 struct sock_buffertracker, lh);
182 BUG_ON(list_empty(&(sbt->waiting_conns)));
183 if (oosbs_resumesbt(sbt))
184 goto restart;
185 curr = curr->prev;
189 static void oosbs_user(void)
191 struct list_head *curr;
193 if (0) {
194 restart:
195 mutex_lock(&sock_bufferlimits_lock);
198 curr = sock_bt_wait_list.prev;
199 while (curr != &sock_bt_wait_list) {
200 struct sock_buffertracker *sbt = container_of(curr,
201 struct sock_buffertracker, lh);
202 BUG_ON(list_empty(&(sbt->waiting_conns)));
204 if (sbt->usage < (BUFFERLIMIT_SOCK_USER * 3 / 4))
205 break;
207 if (oosbs_resumesbt(sbt))
208 goto restart;
209 curr = curr->prev;
213 static void outofsockbufferspace(struct work_struct *work)
215 mutex_lock(&sock_bufferlimits_lock);
216 if (sock_bufferusage < (BUFFERLIMIT_SOCK_GLOBAL * 3 / 4)) {
217 oosbs_user();
218 if (sock_bufferusage >= (BUFFERLIMIT_SOCK_GLOBAL * 3 / 4))
219 goto global;
220 } else {
221 global:
222 oosbs_global();
224 outofsockbufferspace_scheduled = 0;
225 mutex_unlock(&sock_bufferlimits_lock);
228 static void _reserve_sock_buffer_inswl(struct conn *rconn)
230 struct sock_buffertracker *sbt = rconn->source.sock.sbt;
231 struct list_head *curr;
233 BUG_ON(sbt == 0);
235 if (list_empty(&(sbt->waiting_conns)) == 0)
236 goto wlinserted;
238 list_del(&(sbt->lh));
240 curr = sock_bt_wait_list.next;
241 while (curr != &sock_bt_wait_list) {
242 struct sock_buffertracker *currsbt = container_of(curr,
243 struct sock_buffertracker, lh);
244 BUG_ON(list_empty(&(currsbt->waiting_conns)));
245 if (sbt->usage < currsbt->usage) {
246 list_add(&(sbt->lh), curr);
247 goto wlinserted;
249 curr = curr->next;
252 list_add_tail(&(sbt->lh), &sock_bt_wait_list);
254 wlinserted:
255 curr = sbt->waiting_conns.next;
256 while (curr != &(sbt->waiting_conns)) {
257 struct conn *currrconn = container_of(curr, struct conn,
258 source.sock.alwait_list);
259 BUG_ON(currrconn->sourcetype != SOURCE_SOCK);
260 if (rconn->source.sock.alloclimit <
261 currrconn->source.sock.alloclimit) {
262 list_add(&(rconn->source.sock.alwait_list), curr);
263 goto wcinserted;
265 curr = curr->next;
268 list_add_tail(&(rconn->source.sock.alwait_list), &(sbt->waiting_conns));
270 wcinserted:
271 rconn->source.sock.in_alwait_list = 1;
273 if (outofsockbufferspace_scheduled == 0) {
274 schedule_work(&outofsockbufferspace_work);
275 outofsockbufferspace_scheduled = 1;
279 static void reserve_sock_buffer(struct conn *rconn, __u64 amount)
281 struct sock_buffertracker *sbt = rconn->source.sock.sbt;
282 struct sock_buffertracker *first_wait_sbt = list_empty(
283 &sock_bt_wait_list) ? 0 : container_of(
284 sock_bt_wait_list.next, struct sock_buffertracker, lh);
286 __u32 max = (1 << 30) - 1;
288 BUG_ON(sbt == 0);
290 if (unlikely(amount > max))
291 amount = max;
293 amount += rconn->data_buf.totalsize + rconn->data_buf.overhead -
294 rconn->data_buf.cpacket_buffer;
296 if (unlikely(amount > max))
297 amount = max;
299 if (amount > BUFFERLIMIT_SOCK_SOCK)
300 amount = BUFFERLIMIT_SOCK_SOCK;
302 if (amount <= rconn->source.sock.alloclimit)
303 return;
305 if ((list_empty(&sock_bt_wait_list) == 0 && first_wait_sbt != 0 &&
306 first_wait_sbt != sbt &&
307 first_wait_sbt->usage <= sbt->usage) ||
308 amount - rconn->source.sock.alloclimit >
309 BUFFERLIMIT_SOCK_USER - sbt->usage ||
310 amount - rconn->source.sock.alloclimit >
311 BUFFERLIMIT_SOCK_GLOBAL - sock_bufferusage) {
312 _reserve_sock_buffer_inswl(rconn);
313 } else {
314 int waitingconnremoved = 0;
315 sbt->usage += amount - rconn->source.sock.alloclimit;
316 sock_bufferusage += amount - rconn->source.sock.alloclimit;
317 rconn->source.sock.alloclimit = amount;
319 if (rconn->source.sock.in_alwait_list){
320 list_del(&(rconn->source.sock.alwait_list));
321 rconn->source.sock.in_alwait_list = 0;
322 waitingconnremoved = 1;
324 _reserve_sock_buffer_reord_bt(sbt, waitingconnremoved);
328 static int _resume_bufferwaiting_socks(struct sock_buffertracker *sbt)
330 int failed = 0;
332 while (list_empty(&(sbt->waiting_conns)) && failed == 0) {
333 struct conn *rconn = container_of(sbt->waiting_conns.next,
334 struct conn, source.sock.alwait_list);
335 mutex_lock(&(rconn->rcv_lock));
337 BUG_ON(rconn->sourcetype == SOURCE_SOCK);
338 BUG_ON(rconn->source.sock.in_alwait_list == 0);
339 BUG_ON(rconn->source.sock.wait_len == 0);
341 reserve_sock_buffer(rconn, rconn->source.sock.wait_len);
343 if (rconn->source.sock.alloclimit +
344 rconn->data_buf.cpacket_buffer <=
345 rconn->data_buf.totalsize +
346 rconn->data_buf.overhead) {
347 failed = 1;
348 goto out;
351 wake_up_interruptible(&(rconn->source.sock.wait));
353 out:
354 mutex_unlock(&(rconn->rcv_lock));
357 return failed;
360 static void resume_bufferwaiting_socks(void)
362 struct list_head *curr = sock_bt_wait_list.next;
364 while (curr != &sock_bt_wait_list) {
365 struct sock_buffertracker *currsbt = container_of(curr,
366 struct sock_buffertracker, lh);
367 BUG_ON(list_empty(&(currsbt->waiting_conns)));
368 curr = curr->next;
370 if (_resume_bufferwaiting_socks(currsbt))
371 return;
375 void unreserve_sock_buffer(struct conn *conn)
377 int freed = 0;
378 struct sock_buffertracker *sbt;
380 mutex_lock(&sock_bufferlimits_lock);
381 mutex_lock(&(conn->rcv_lock));
383 if (conn->sourcetype != SOURCE_IN)
384 goto out;
386 sbt = conn->source.sock.sbt;
387 BUG_ON(sbt == 0);
389 if (conn->data_buf.totalsize + conn->data_buf.overhead <=
390 conn->source.sock.alloclimit +
391 conn->data_buf.cpacket_buffer)
392 goto out;
394 freed = 1;
396 BUG_ON(conn->source.sock.alloclimit > sbt->usage);
397 BUG_ON(conn->source.sock.alloclimit > sock_bufferusage);
399 sbt->usage -= conn->source.sock.alloclimit +
400 conn->data_buf.cpacket_buffer-
401 conn->data_buf.totalsize -
402 conn->data_buf.overhead;
404 sock_bufferusage -= conn->source.sock.alloclimit +
405 conn->data_buf.cpacket_buffer -
406 conn->data_buf.totalsize - conn->data_buf.overhead;
408 conn->source.sock.alloclimit = conn->data_buf.totalsize +
409 conn->data_buf.overhead - conn->data_buf.cpacket_buffer;
411 if (conn->source.sock.alloclimit == 0 &&
412 conn->source.sock.in_alwait_list) {
413 list_del(&(conn->source.sock.alwait_list));
414 conn->source.sock.in_alwait_list = 0;
416 if (list_empty(&(sbt->waiting_conns))) {
417 list_del(&(sbt->lh));
418 list_add_tail(&(sbt->lh), &sock_bt_list);
422 if (list_empty(&(sbt->waiting_conns)))
423 goto out;
425 while (sbt->lh.prev != &sock_bt_wait_list) {
426 struct sock_buffertracker *prevsbt = container_of(sbt->lh.prev,
427 struct sock_buffertracker, lh);
429 BUG_ON(sbt->lh.next == &sock_bt_list);
431 if (prevsbt->usage <= sbt->usage)
432 break;
434 list_del(&(sbt->lh));
435 list_add_tail(&(sbt->lh), &(prevsbt->lh));
438 out:
439 mutex_unlock(&(conn->rcv_lock));
441 if (freed)
442 resume_bufferwaiting_socks();
444 mutex_unlock(&sock_bufferlimits_lock);
448 static int check_connlistener_state(struct connlistener *cl)
450 if (likely(cl != 0 && cl->sockstate == SOCKSTATE_LISTENER))
451 return 0;
453 return 1;
456 static int check_conn_state(struct conn *conn)
458 if (likely(conn != 0 && conn->sockstate == SOCKSTATE_CONN))
459 return 0;
461 return 1;
464 int cor_socket_release(struct socket *sock)
466 struct connlistener *cl = (struct connlistener *) sock->sk;
467 struct conn *rconn = (struct conn *) sock->sk;
469 if (sock->sk == 0)
470 return 0;
472 if (cl->sockstate == SOCKSTATE_LISTENER) {
473 close_port(cl);
474 } else if (rconn->sockstate == SOCKSTATE_CONN) {
475 reset_conn(rconn);
476 BUG_ON(rconn->sourcetype != SOURCE_SOCK);
477 kref_put(&(rconn->ref), free_conn);
478 } else {
479 BUG();
482 return 0;
485 int cor_socket_bind(struct socket *sock, struct sockaddr *myaddr,
486 int sockaddr_len)
488 struct connlistener *listener;
489 struct cor_sockaddr *addr = (struct cor_sockaddr *) myaddr;
491 if (unlikely(sock->sk != 0))
492 return -EINVAL;
494 if (sockaddr_len < sizeof(struct cor_sockaddr))
495 return -EINVAL;
497 if (addr->type != SOCKADDRTYPE_PORT)
498 return -EINVAL;
500 listener = open_port(addr->addr.port);
502 if (listener == 0)
503 return -EADDRINUSE;
505 sock->sk = (struct sock *) listener;
507 return 0;
510 int cor_socket_connect(struct socket *sock, struct sockaddr *vaddr,
511 int sockaddr_len, int flags)
513 struct sock_buffertracker *sbt;
515 struct conn *rconn;
517 if (unlikely(sock->sk != 0))
518 return -EISCONN;
520 rconn = alloc_conn(GFP_KERNEL);
522 if (unlikely(rconn == 0))
523 return -ENOMEM;
525 mutex_lock(&sock_bufferlimits_lock);
526 sbt = get_sock_buffertracker(current_uid());
527 mutex_unlock(&sock_bufferlimits_lock);
529 if (unlikely(sbt == 0)) {
531 reset_conn(rconn);
532 return -ENOMEM;
535 kref_get(&(rconn->ref));
537 mutex_lock(&(rconn->rcv_lock));
538 mutex_lock(&(rconn->reversedir->rcv_lock));
539 conn_init_sock_source(rconn);
540 rconn->source.sock.sbt = sbt;
541 conn_init_sock_target(rconn->reversedir);
542 mutex_unlock(&(rconn->reversedir->rcv_lock));
543 mutex_unlock(&(rconn->rcv_lock));
545 sock->sk = (struct sock *) rconn;
546 sock->state = SS_CONNECTED;
548 return 0;
551 static int cor_rdytoaccept(struct connlistener *cl)
553 int rc;
554 mutex_lock(&(cl->lock));
555 rc = (list_empty(&(cl->conn_queue)) == 0);
556 mutex_unlock(&(cl->lock));
557 return rc;
560 const struct proto_ops cor_proto_ops;
562 int cor_socket_accept(struct socket *sock, struct socket *newsock, int flags)
564 struct sock_buffertracker *sbt;
566 struct connlistener *cl = (struct connlistener *) sock->sk;
568 int rc = check_connlistener_state(cl);
570 struct conn *newconn;
572 if (unlikely(rc))
573 return -EINVAL;
575 mutex_lock(&sock_bufferlimits_lock);
576 sbt = get_sock_buffertracker(current_uid());
577 mutex_unlock(&sock_bufferlimits_lock);
579 if (unlikely(sbt == 0))
580 return -ENOMEM;
582 mutex_lock(&(cl->lock));
584 if (unlikely(cl->queue_maxlen <= 0)) {
585 mutex_unlock(&(cl->lock));
586 return -EINVAL;
589 while (list_empty(&(cl->conn_queue))) {
590 mutex_unlock(&(cl->lock));
591 if (wait_event_interruptible(cl->wait, cor_rdytoaccept(cl))) {
592 kref_put(&(sbt->ref), free_sbt);
593 return -ERESTARTSYS;
595 mutex_lock(&(cl->lock));
598 newconn = container_of(cl->conn_queue.next, struct conn,
599 source.sock.cl_list);
601 BUG_ON(newconn->sourcetype != SOURCE_SOCK);
603 list_del(cl->conn_queue.next);
605 cl->queue_len--;
607 mutex_unlock(&(cl->lock));
609 mutex_lock(&(newconn->rcv_lock));
610 newconn->source.sock.sbt = sbt;
611 mutex_unlock(&(newconn->rcv_lock));
613 newsock->ops = &cor_proto_ops;
614 newsock->sk = (struct sock *) newconn;
615 newsock->state = SS_CONNECTED;
617 return 0;
620 int cor_socket_listen(struct socket *sock, int len)
622 struct connlistener *cl = (struct connlistener *) sock->sk;
624 int rc = check_connlistener_state(cl);
626 if (unlikely(rc))
627 return -EOPNOTSUPP;
629 mutex_lock(&(cl->lock));
630 cl->queue_maxlen = len;
631 mutex_unlock(&(cl->lock));
633 return 0;
636 int cor_socket_shutdown(struct socket *sock, int flags)
638 return -ENOTSUPP;
641 int cor_ioctl(struct socket *sock, unsigned int cmd, unsigned long arg)
643 return -ENOIOCTLCMD;
646 static int sendmsg_maypush(struct conn *rconn)
648 int ret = 0;
649 mutex_lock(&sock_bufferlimits_lock);
650 mutex_lock(&(rconn->rcv_lock));
651 if (unlikely(atomic_read(&(rconn->isreset)) != 0)) {
652 ret = 1;
653 } else if (rconn->source.sock.wait_len == 0) {
654 ret = 1;
655 } else if (rconn->source.sock.alloclimit +
656 rconn->data_buf.cpacket_buffer >
657 rconn->data_buf.totalsize +
658 rconn->data_buf.overhead) {
659 ret = 1;
660 } else {
661 reserve_sock_buffer(rconn, rconn->source.sock.wait_len);
662 if (rconn->source.sock.alloclimit +
663 rconn->data_buf.cpacket_buffer >
664 rconn->data_buf.totalsize +
665 rconn->data_buf.overhead)
666 ret = 1;
668 mutex_unlock(&(rconn->rcv_lock));
669 mutex_unlock(&sock_bufferlimits_lock);
670 return ret;
673 int cor_sendmsg(struct kiocb *iocb, struct socket *sock, struct msghdr *msg,
674 size_t total_len)
676 __s64 copied = 0;
678 struct conn *rconn = (struct conn *) sock->sk;
680 int rc = check_conn_state(rconn);
682 int flush = (msg->msg_flags & MSG_MORE) == 0;
683 int blocking = (msg->msg_flags & MSG_DONTWAIT) == 0;
685 __s32 bufferfree;
686 __u64 max = (((__u64) 1) << 32) - 1;
687 __u32 totallen = (total_len > max ? max : total_len);
689 if (unlikely(rc))
690 return -EBADF;
692 recv:
693 mutex_lock(&sock_bufferlimits_lock);
694 mutex_lock(&(rconn->rcv_lock));
696 if (unlikely(atomic_read(&(rconn->isreset)) != 0)) {
697 mutex_unlock(&sock_bufferlimits_lock);
698 copied = -EPIPE;
699 goto out;
702 reserve_sock_buffer(rconn, totallen);
704 mutex_unlock(&sock_bufferlimits_lock);
706 bufferfree = (__s64) rconn->source.sock.alloclimit +
707 (__s64) rconn->data_buf.cpacket_buffer -
708 (__s64) rconn->data_buf.totalsize -
709 (__s64) rconn->data_buf.overhead;
711 if (bufferfree <= 0) {
712 if (copied == 0)
713 copied = -EAGAIN;
714 goto out;
717 copied = receive_userbuf(rconn, msg, bufferfree, bufferfree >=
718 totallen ? 0 : (rconn->source.sock.alloclimit +
719 rconn->data_buf.cpacket_buffer));
722 if (0) {
723 out:
724 bufferfree = (__s64) rconn->source.sock.alloclimit +
725 (__s64) rconn->data_buf.cpacket_buffer -
726 (__s64) rconn->data_buf.totalsize -
727 (__s64) rconn->data_buf.overhead;
730 if (copied == -EAGAIN)
731 rconn->source.sock.wait_len = totallen;
732 else
733 rconn->source.sock.wait_len = 0;
735 mutex_unlock(&(rconn->rcv_lock));
737 unreserve_sock_buffer(rconn);
739 mutex_lock(&sock_bufferlimits_lock);
740 mutex_lock(&(rconn->rcv_lock));
742 if (flush == 0 && rconn->data_buf.totalsize + rconn->data_buf.overhead -
743 rconn->data_buf.cpacket_buffer <
744 (BUFFERLIMIT_SOCK_SOCK*3)/4) {
745 if (rconn->source.sock.delay_flush == 0) {
746 struct sock_buffertracker *sbt = rconn->source.sock.sbt;
747 list_add_tail(&(rconn->source.sock.delflush_list),
748 &(sbt->delflush_conns));
750 rconn->source.sock.delay_flush = 1;
751 } else {
752 if (rconn->source.sock.delay_flush) {
753 list_del(&(rconn->source.sock.delflush_list));
755 rconn->source.sock.delay_flush = 0;
758 mutex_unlock(&(rconn->rcv_lock));
759 mutex_unlock(&sock_bufferlimits_lock);
761 if (likely(copied > 0 || bufferfree <= 0))
762 flush_buf(rconn);
764 if (copied == -EAGAIN && blocking) {
765 if (wait_event_interruptible(rconn->source.sock.wait,
766 sendmsg_maypush(rconn)) == 0)
767 goto recv;
768 copied = -ERESTARTSYS;
771 return copied;
774 static int cor_readytoread(struct conn *sconn)
776 int rc = 0;
777 mutex_lock(&(sconn->rcv_lock));
778 rc = (sconn->data_buf.read_remaining != 0) ||
779 unlikely(atomic_read(&(sconn->isreset)) != 0);
780 mutex_unlock(&(sconn->rcv_lock));
781 return rc;
784 int cor_recvmsg(struct kiocb *iocb, struct socket *sock, struct msghdr *msg,
785 size_t total_len, int flags)
787 struct conn *rconn = (struct conn *) sock->sk;
788 struct conn *sconn = rconn->reversedir;
789 size_t copied = 0;
791 int rc = check_conn_state(rconn);
793 int blocking = (flags & MSG_DONTWAIT) == 0;
795 if (unlikely(rc))
796 return -EBADF;
798 BUG_ON(sconn == 0);
800 recv:
801 mutex_lock(&(sconn->rcv_lock));
803 if (unlikely(atomic_read(&(rconn->isreset)) != 0)) {
804 copied = -EPIPE;
805 goto out;
808 copied = databuf_pulluser(sconn, msg);
809 databuf_ackread(sconn);
811 out:
812 mutex_unlock(&(sconn->rcv_lock));
814 if (likely(copied > 0)) {
815 refresh_conn_credits(sconn);
816 unreserve_sock_buffer(sconn);
817 wake_sender(sconn);
821 if (copied == -EAGAIN && blocking) {
822 if (wait_event_interruptible(sconn->target.sock.wait,
823 cor_readytoread(sconn)) == 0)
824 goto recv;
825 copied = -ERESTARTSYS;
828 return copied;
831 const struct proto_ops cor_proto_ops = {
832 .family = PF_COR,
833 .owner = THIS_MODULE,
834 .release = cor_socket_release,
835 .bind = cor_socket_bind,
836 .connect = cor_socket_connect,
837 .accept = cor_socket_accept,
838 .listen = cor_socket_listen,
839 .shutdown = cor_socket_shutdown,
840 .ioctl = cor_ioctl,
841 .sendmsg = cor_sendmsg,
842 .recvmsg = cor_recvmsg
844 /*socketpair
845 getname
846 poll
847 compat_ioctl
848 setsockopt
849 getsockopt
850 compat_setsockopt
851 compat_getsockopt
852 mmap
853 sendpage
854 splice_read*/
857 int cor_createsock(struct net *net, struct socket *sock, int protocol)
859 if (unlikely(protocol != 0))
860 return -EPROTONOSUPPORT;
862 sock->state = SS_UNCONNECTED;
863 sock->ops = &cor_proto_ops;
865 return 0;
868 static struct net_proto_family cor_net_proto_family = {
869 .family = PF_COR,
870 .create = cor_createsock,
871 .owner = THIS_MODULE
874 static int __init cor_sock_init(void)
876 INIT_WORK(&outofsockbufferspace_work, outofsockbufferspace);
877 outofsockbufferspace_scheduled = 0;
879 sock_register(&cor_net_proto_family);
880 sock_bufferusage = 0;
881 return 0;
884 module_init(cor_sock_init);
886 MODULE_LICENSE("GPL");