credit system
[cor_2_6_31.git] / net / cor / sock.c
blobe08d3f6ed57b901861593cab15a458c72db68fb1
1 /*
2 * Connection oriented routing
3 * Copyright (C) 2007-2010 Michael Blizek
5 * This program is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU General Public License
7 * as published by the Free Software Foundation; either version 2
8 * of the License, or (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
18 * 02110-1301, USA.
21 #include <net/sock.h>
22 #include <linux/net.h>
23 #include <asm/uaccess.h>
25 #include "cor.h"
27 /**
28 * sock_bt_wait_list and waiting_conns are ordered by min amount first, the
29 * order in which resuming will happen
32 DEFINE_MUTEX(sock_bufferlimits_lock);
33 LIST_HEAD(sock_bt_list);
34 LIST_HEAD(sock_bt_wait_list);
35 static __u64 sock_bufferusage;
37 static struct work_struct outofsockbufferspace_work;
38 static int outofsockbufferspace_scheduled;
40 void free_sbt(struct kref *ref)
42 struct sock_buffertracker *sbt = container_of(ref,
43 struct sock_buffertracker, ref);
45 BUG_ON(sbt->usage != 0);
46 BUG_ON(list_empty(&(sbt->waiting_conns)) == 0);
48 list_del(&(sbt->lh));
49 kfree(sbt);
52 static struct sock_buffertracker *get_sock_buffertracker(uid_t uid)
54 struct sock_buffertracker *sbt;
55 struct list_head *curr;
57 curr = sock_bt_list.next;
58 while (curr != &sock_bt_list) {
59 sbt = container_of(curr, struct sock_buffertracker, lh);
60 BUG_ON(list_empty(&(sbt->waiting_conns)) != 0);
61 if (sbt->uid == uid)
62 goto found;
63 curr = curr->next;
66 curr = sock_bt_wait_list.next;
67 while (curr != &sock_bt_wait_list) {
68 sbt = container_of(curr, struct sock_buffertracker, lh);
69 BUG_ON(list_empty(&(sbt->waiting_conns)));
70 if (sbt->uid == uid)
71 goto found;
72 curr = curr->next;
75 sbt = kmalloc(sizeof(struct sock_buffertracker), GFP_KERNEL);
76 if (sbt != 0)
77 memset(sbt, 0, sizeof(struct sock_buffertracker));
78 sbt->uid = uid;
79 list_add_tail(&(sbt->lh), &sock_bt_list);
80 INIT_LIST_HEAD(&(sbt->delflush_conns));
81 INIT_LIST_HEAD(&(sbt->waiting_conns));
82 kref_init(&(sbt->ref));
84 if (0) {
85 found:
86 kref_get(&(sbt->ref));
88 return sbt;
91 static void _reserve_sock_buffer_reord_bt(struct sock_buffertracker *sbt,
92 int waitingconnremoved)
94 if (waitingconnremoved && list_empty(&(sbt->waiting_conns))) {
95 list_del(&(sbt->lh));
96 list_add_tail(&(sbt->lh), &sock_bt_list);
97 return;
100 if (list_empty(&(sbt->waiting_conns)))
101 return;
103 while(sbt->lh.next != &sock_bt_wait_list) {
104 struct sock_buffertracker *next = container_of(sbt->lh.next,
105 struct sock_buffertracker, lh);
107 BUG_ON(sbt->lh.next == &sock_bt_list);
109 if (sbt->usage <= next->usage)
110 break;
112 list_del(&(sbt->lh));
113 list_add(&(sbt->lh), &(next->lh));
117 static int oosbs_resumesbt(struct sock_buffertracker *sbt)
119 int restart = 0;
120 struct list_head *curr = sbt->delflush_conns.next;
122 while (curr != &(sbt->delflush_conns)) {
123 struct conn *rconn = container_of(curr, struct conn,
124 source.sock.delflush_list);
125 int flush = 0;
127 mutex_lock(&(rconn->rcv_lock));
129 BUG_ON(rconn->sourcetype != SOURCE_SOCK);
131 BUG_ON(rconn->source.sock.delay_flush == 0);
133 if (rconn->data_buf.read_remaining != 0) {
134 rconn->source.sock.delay_flush = 0;
135 list_del(&(rconn->source.sock.delflush_list));
136 flush = 1;
139 mutex_unlock(&(rconn->rcv_lock));
141 if (flush) {
142 if (restart == 0) {
143 restart = 1;
144 kref_get(&(sbt->ref));
145 mutex_unlock(&sock_bufferlimits_lock);
147 flush_buf(rconn);
150 curr = curr->next;
153 if (restart)
154 kref_put(&(sbt->ref), free_sbt);
156 return restart;
159 static void oosbs_global(void)
161 struct list_head *curr;
163 if (0) {
164 restart:
165 mutex_lock(&sock_bufferlimits_lock);
168 curr = sock_bt_list.prev;
169 while (curr != &sock_bt_list) {
170 struct sock_buffertracker *sbt = container_of(curr,
171 struct sock_buffertracker, lh);
172 BUG_ON(list_empty(&(sbt->waiting_conns)) == 0);
173 if (oosbs_resumesbt(sbt))
174 goto restart;
175 curr = curr->prev;
178 curr = sock_bt_wait_list.prev;
179 while (curr != &sock_bt_wait_list) {
180 struct sock_buffertracker *sbt = container_of(curr,
181 struct sock_buffertracker, lh);
182 BUG_ON(list_empty(&(sbt->waiting_conns)));
183 if (oosbs_resumesbt(sbt))
184 goto restart;
185 curr = curr->prev;
189 static void oosbs_user(void)
191 struct list_head *curr;
193 if (0) {
194 restart:
195 mutex_lock(&sock_bufferlimits_lock);
198 curr = sock_bt_wait_list.prev;
199 while (curr != &sock_bt_wait_list) {
200 struct sock_buffertracker *sbt = container_of(curr,
201 struct sock_buffertracker, lh);
202 BUG_ON(list_empty(&(sbt->waiting_conns)));
204 if (sbt->usage < (BUFFERLIMIT_SOCK_USER * 3 / 4))
205 break;
207 if (oosbs_resumesbt(sbt))
208 goto restart;
209 curr = curr->prev;
213 static void outofsockbufferspace(struct work_struct *work)
215 mutex_lock(&sock_bufferlimits_lock);
216 if (sock_bufferusage < (BUFFERLIMIT_SOCK_GLOBAL * 3 / 4)) {
217 oosbs_user();
218 if (sock_bufferusage >= (BUFFERLIMIT_SOCK_GLOBAL * 3 / 4))
219 goto global;
220 } else {
221 global:
222 oosbs_global();
224 outofsockbufferspace_scheduled = 0;
225 mutex_unlock(&sock_bufferlimits_lock);
228 static void _reserve_sock_buffer_inswl(struct conn *rconn)
230 struct sock_buffertracker *sbt = rconn->source.sock.sbt;
231 struct list_head *curr;
233 BUG_ON(sbt == 0);
235 if (list_empty(&(sbt->waiting_conns)) == 0)
236 goto wlinserted;
238 list_del(&(sbt->lh));
240 curr = sock_bt_wait_list.next;
241 while (curr != &sock_bt_wait_list) {
242 struct sock_buffertracker *currsbt = container_of(curr,
243 struct sock_buffertracker, lh);
244 BUG_ON(list_empty(&(currsbt->waiting_conns)));
245 if (sbt->usage < currsbt->usage) {
246 list_add(&(sbt->lh), curr);
247 goto wlinserted;
249 curr = curr->next;
252 list_add_tail(&(sbt->lh), &sock_bt_wait_list);
254 wlinserted:
255 curr = sbt->waiting_conns.next;
256 while (curr != &(sbt->waiting_conns)) {
257 struct conn *currrconn = container_of(curr, struct conn,
258 source.sock.alwait_list);
259 BUG_ON(currrconn->sourcetype != SOURCE_SOCK);
260 if (rconn->source.sock.alloclimit <
261 currrconn->source.sock.alloclimit) {
262 list_add(&(rconn->source.sock.alwait_list), curr);
263 goto wcinserted;
265 curr = curr->next;
268 list_add_tail(&(rconn->source.sock.alwait_list), &(sbt->waiting_conns));
270 wcinserted:
271 rconn->source.sock.in_alwait_list = 1;
273 if (outofsockbufferspace_scheduled == 0) {
274 schedule_work(&outofsockbufferspace_work);
275 outofsockbufferspace_scheduled = 1;
279 static void reserve_sock_buffer(struct conn *rconn, __u64 amount)
281 struct sock_buffertracker *sbt = rconn->source.sock.sbt;
282 struct sock_buffertracker *first_wait_sbt = list_empty(
283 &sock_bt_wait_list) ? 0 : container_of(
284 sock_bt_wait_list.next, struct sock_buffertracker, lh);
286 __u32 max = (1 << 30) - 1;
288 BUG_ON(sbt == 0);
290 if (unlikely(amount > max))
291 amount = max;
293 amount += rconn->data_buf.totalsize + rconn->data_buf.overhead -
294 rconn->data_buf.cpacket_buffer;
296 if (unlikely(amount > max))
297 amount = max;
299 if (amount > BUFFERLIMIT_SOCK_SOCK)
300 amount = BUFFERLIMIT_SOCK_SOCK;
302 if (amount <= rconn->source.sock.alloclimit)
303 return;
305 if ((list_empty(&sock_bt_wait_list) == 0 && first_wait_sbt != 0 &&
306 first_wait_sbt != sbt &&
307 first_wait_sbt->usage <= sbt->usage) ||
308 amount - rconn->source.sock.alloclimit >
309 BUFFERLIMIT_SOCK_USER - sbt->usage ||
310 amount - rconn->source.sock.alloclimit >
311 BUFFERLIMIT_SOCK_GLOBAL - sock_bufferusage) {
312 _reserve_sock_buffer_inswl(rconn);
313 } else {
314 int waitingconnremoved = 0;
315 sbt->usage += amount - rconn->source.sock.alloclimit;
316 sock_bufferusage += amount - rconn->source.sock.alloclimit;
317 rconn->source.sock.alloclimit = amount;
319 if (rconn->source.sock.in_alwait_list){
320 list_del(&(rconn->source.sock.alwait_list));
321 rconn->source.sock.in_alwait_list = 0;
322 waitingconnremoved = 1;
324 _reserve_sock_buffer_reord_bt(sbt, waitingconnremoved);
328 static int _resume_bufferwaiting_socks(struct sock_buffertracker *sbt)
330 int failed = 0;
332 while (list_empty(&(sbt->waiting_conns)) && failed == 0) {
333 struct conn *rconn = container_of(sbt->waiting_conns.next,
334 struct conn, source.sock.alwait_list);
335 mutex_lock(&(rconn->rcv_lock));
337 BUG_ON(rconn->sourcetype == SOURCE_SOCK);
338 BUG_ON(rconn->source.sock.in_alwait_list == 0);
339 BUG_ON(rconn->source.sock.wait_len == 0);
341 reserve_sock_buffer(rconn, rconn->source.sock.wait_len);
343 if (rconn->source.sock.alloclimit +
344 rconn->data_buf.cpacket_buffer <=
345 rconn->data_buf.totalsize +
346 rconn->data_buf.overhead) {
347 failed = 1;
348 goto out;
351 wake_up_interruptible(&(rconn->source.sock.wait));
353 out:
354 mutex_unlock(&(rconn->rcv_lock));
357 return failed;
360 static void resume_bufferwaiting_socks(void)
362 struct list_head *curr = sock_bt_wait_list.next;
364 while (curr != &sock_bt_wait_list) {
365 struct sock_buffertracker *currsbt = container_of(curr,
366 struct sock_buffertracker, lh);
367 BUG_ON(list_empty(&(currsbt->waiting_conns)));
368 curr = curr->next;
370 if (_resume_bufferwaiting_socks(currsbt))
371 return;
375 void unreserve_sock_buffer(struct conn *conn)
377 int freed = 0;
378 struct sock_buffertracker *sbt;
380 mutex_lock(&sock_bufferlimits_lock);
381 mutex_lock(&(conn->rcv_lock));
383 if (conn->sourcetype != SOURCE_IN)
384 goto out;
386 sbt = conn->source.sock.sbt;
387 BUG_ON(sbt == 0);
389 if (conn->data_buf.totalsize + conn->data_buf.overhead <=
390 conn->source.sock.alloclimit +
391 conn->data_buf.cpacket_buffer)
392 goto out;
394 freed = 1;
396 BUG_ON(conn->source.sock.alloclimit > sbt->usage);
397 BUG_ON(conn->source.sock.alloclimit > sock_bufferusage);
399 sbt->usage -= conn->source.sock.alloclimit +
400 conn->data_buf.cpacket_buffer-
401 conn->data_buf.totalsize -
402 conn->data_buf.overhead;
404 sock_bufferusage -= conn->source.sock.alloclimit +
405 conn->data_buf.cpacket_buffer -
406 conn->data_buf.totalsize - conn->data_buf.overhead;
408 conn->source.sock.alloclimit = conn->data_buf.totalsize +
409 conn->data_buf.overhead - conn->data_buf.cpacket_buffer;
411 if (conn->source.sock.alloclimit == 0 &&
412 conn->source.sock.in_alwait_list) {
413 list_del(&(conn->source.sock.alwait_list));
414 conn->source.sock.in_alwait_list = 0;
416 if (list_empty(&(sbt->waiting_conns))) {
417 list_del(&(sbt->lh));
418 list_add_tail(&(sbt->lh), &sock_bt_list);
422 if (list_empty(&(sbt->waiting_conns)))
423 goto out;
425 while (sbt->lh.prev != &sock_bt_wait_list) {
426 struct sock_buffertracker *prevsbt = container_of(sbt->lh.prev,
427 struct sock_buffertracker, lh);
429 BUG_ON(sbt->lh.next == &sock_bt_list);
431 if (prevsbt->usage <= sbt->usage)
432 break;
434 list_del(&(sbt->lh));
435 list_add_tail(&(sbt->lh), &(prevsbt->lh));
438 out:
439 mutex_unlock(&(conn->rcv_lock));
441 if (freed)
442 resume_bufferwaiting_socks();
444 mutex_unlock(&sock_bufferlimits_lock);
448 static int check_connlistener_state(struct connlistener *cl)
450 if (likely(cl != 0 && cl->sockstate == SOCKSTATE_LISTENER))
451 return 0;
453 return 1;
456 static int check_conn_state(struct conn *conn)
458 if (likely(conn != 0 && conn->sockstate == SOCKSTATE_CONN))
459 return 0;
461 return 1;
464 int cor_socket_release(struct socket *sock)
466 struct connlistener *cl = (struct connlistener *) sock->sk;
467 struct conn *rconn = (struct conn *) sock->sk;
469 if (sock->sk == 0)
470 return 0;
472 if (cl->sockstate == SOCKSTATE_LISTENER) {
473 close_port(cl);
474 } else if (rconn->sockstate == SOCKSTATE_CONN) {
475 reset_conn(rconn);
476 BUG_ON(rconn->sourcetype != SOURCE_SOCK);
477 kref_put(&(rconn->ref), free_conn);
478 } else {
479 BUG();
482 return 0;
485 int cor_socket_bind(struct socket *sock, struct sockaddr *myaddr,
486 int sockaddr_len)
488 struct connlistener *listener;
489 struct cor_sockaddr *addr = (struct cor_sockaddr *) myaddr;
491 if (unlikely(sock->sk != 0))
492 return -EINVAL;
494 if (sockaddr_len < sizeof(struct cor_sockaddr))
495 return -EINVAL;
497 if (addr->type != SOCKADDRTYPE_PORT)
498 return -EINVAL;
500 listener = open_port(addr->addr.port);
502 if (listener == 0)
503 return -EADDRINUSE;
505 sock->sk = (struct sock *) listener;
507 return 0;
510 int cor_socket_connect(struct socket *sock, struct sockaddr *vaddr,
511 int sockaddr_len, int flags)
513 struct sock_buffertracker *sbt;
515 struct conn *rconn;
517 if (unlikely(sock->sk != 0))
518 return -EISCONN;
520 rconn = alloc_conn(GFP_KERNEL);
522 if (unlikely(rconn == 0))
523 return -ENOMEM;
525 mutex_lock(&sock_bufferlimits_lock);
526 sbt = get_sock_buffertracker(current_uid());
527 mutex_unlock(&sock_bufferlimits_lock);
529 if (unlikely(sbt == 0)) {
531 reset_conn(rconn);
532 return -ENOMEM;
535 kref_get(&(rconn->ref));
537 mutex_lock(&(rconn->rcv_lock));
538 mutex_lock(&(rconn->reversedir->rcv_lock));
539 conn_init_sock_source(rconn);
540 rconn->source.sock.sbt = sbt;
541 conn_init_sock_target(rconn->reversedir);
542 rconn->source.sock.is_client = 1;
543 mutex_unlock(&(rconn->reversedir->rcv_lock));
544 mutex_unlock(&(rconn->rcv_lock));
546 sock->sk = (struct sock *) rconn;
547 sock->state = SS_CONNECTED;
549 return 0;
552 static int cor_rdytoaccept(struct connlistener *cl)
554 int rc;
555 mutex_lock(&(cl->lock));
556 rc = (list_empty(&(cl->conn_queue)) == 0);
557 mutex_unlock(&(cl->lock));
558 return rc;
561 const struct proto_ops cor_proto_ops;
563 int cor_socket_accept(struct socket *sock, struct socket *newsock, int flags)
565 struct sock_buffertracker *sbt;
567 struct connlistener *cl = (struct connlistener *) sock->sk;
569 int rc = check_connlistener_state(cl);
571 struct conn *newconn;
573 if (unlikely(rc))
574 return -EINVAL;
576 mutex_lock(&sock_bufferlimits_lock);
577 sbt = get_sock_buffertracker(current_uid());
578 mutex_unlock(&sock_bufferlimits_lock);
580 if (unlikely(sbt == 0))
581 return -ENOMEM;
583 mutex_lock(&(cl->lock));
585 if (unlikely(cl->queue_maxlen <= 0)) {
586 mutex_unlock(&(cl->lock));
587 return -EINVAL;
590 while (list_empty(&(cl->conn_queue))) {
591 mutex_unlock(&(cl->lock));
592 if (wait_event_interruptible(cl->wait, cor_rdytoaccept(cl))) {
593 kref_put(&(sbt->ref), free_sbt);
594 return -ERESTARTSYS;
596 mutex_lock(&(cl->lock));
599 newconn = container_of(cl->conn_queue.next, struct conn,
600 source.sock.cl_list);
602 BUG_ON(newconn->sourcetype != SOURCE_SOCK);
604 list_del(cl->conn_queue.next);
606 cl->queue_len--;
608 mutex_unlock(&(cl->lock));
610 mutex_lock(&(newconn->rcv_lock));
611 newconn->source.sock.sbt = sbt;
612 mutex_unlock(&(newconn->rcv_lock));
614 newsock->ops = &cor_proto_ops;
615 newsock->sk = (struct sock *) newconn;
616 newsock->state = SS_CONNECTED;
618 return 0;
621 int cor_socket_listen(struct socket *sock, int len)
623 struct connlistener *cl = (struct connlistener *) sock->sk;
625 int rc = check_connlistener_state(cl);
627 if (unlikely(rc))
628 return -EOPNOTSUPP;
630 mutex_lock(&(cl->lock));
631 cl->queue_maxlen = len;
632 mutex_unlock(&(cl->lock));
634 return 0;
637 int cor_socket_shutdown(struct socket *sock, int flags)
639 return -ENOTSUPP;
642 int cor_ioctl(struct socket *sock, unsigned int cmd, unsigned long arg)
644 return -ENOIOCTLCMD;
647 static int sendmsg_maypush(struct conn *rconn)
649 int ret = 0;
650 mutex_lock(&sock_bufferlimits_lock);
651 mutex_lock(&(rconn->rcv_lock));
652 if (unlikely(atomic_read(&(rconn->isreset)) != 0)) {
653 ret = 1;
654 } else if (rconn->source.sock.wait_len == 0) {
655 ret = 1;
656 } else if (rconn->source.sock.alloclimit +
657 rconn->data_buf.cpacket_buffer >
658 rconn->data_buf.totalsize +
659 rconn->data_buf.overhead) {
660 ret = 1;
661 } else {
662 reserve_sock_buffer(rconn, rconn->source.sock.wait_len);
663 if (rconn->source.sock.alloclimit +
664 rconn->data_buf.cpacket_buffer >
665 rconn->data_buf.totalsize +
666 rconn->data_buf.overhead)
667 ret = 1;
669 mutex_unlock(&(rconn->rcv_lock));
670 mutex_unlock(&sock_bufferlimits_lock);
671 return ret;
674 int cor_sendmsg(struct kiocb *iocb, struct socket *sock, struct msghdr *msg,
675 size_t total_len)
677 __s64 copied = 0;
679 struct conn *rconn = (struct conn *) sock->sk;
681 int rc = check_conn_state(rconn);
683 int flush = (msg->msg_flags & MSG_MORE) == 0;
684 int blocking = (msg->msg_flags & MSG_DONTWAIT) == 0;
686 __s32 bufferfree;
687 __u64 max = (1LL << 32) - 1;
688 __u32 totallen = (total_len > max ? max : total_len);
690 if (unlikely(rc))
691 return -EBADF;
693 recv:
694 mutex_lock(&sock_bufferlimits_lock);
695 mutex_lock(&(rconn->rcv_lock));
697 if (unlikely(atomic_read(&(rconn->isreset)) != 0)) {
698 mutex_unlock(&sock_bufferlimits_lock);
699 copied = -EPIPE;
700 goto out;
703 reserve_sock_buffer(rconn, totallen);
705 mutex_unlock(&sock_bufferlimits_lock);
707 bufferfree = (__s64) rconn->source.sock.alloclimit +
708 (__s64) rconn->data_buf.cpacket_buffer -
709 (__s64) rconn->data_buf.totalsize -
710 (__s64) rconn->data_buf.overhead;
712 if (bufferfree <= 0) {
713 if (copied == 0)
714 copied = -EAGAIN;
715 goto out;
718 copied = receive_userbuf(rconn, msg, bufferfree, bufferfree >=
719 totallen ? 0 : (rconn->source.sock.alloclimit +
720 rconn->data_buf.cpacket_buffer));
723 if (0) {
724 out:
725 bufferfree = (__s64) rconn->source.sock.alloclimit +
726 (__s64) rconn->data_buf.cpacket_buffer -
727 (__s64) rconn->data_buf.totalsize -
728 (__s64) rconn->data_buf.overhead;
731 if (copied == -EAGAIN)
732 rconn->source.sock.wait_len = totallen;
733 else
734 rconn->source.sock.wait_len = 0;
736 mutex_unlock(&(rconn->rcv_lock));
738 unreserve_sock_buffer(rconn);
740 mutex_lock(&sock_bufferlimits_lock);
741 mutex_lock(&(rconn->rcv_lock));
743 if (flush == 0 && rconn->data_buf.totalsize + rconn->data_buf.overhead -
744 rconn->data_buf.cpacket_buffer <
745 (BUFFERLIMIT_SOCK_SOCK*3)/4) {
746 if (rconn->source.sock.delay_flush == 0) {
747 struct sock_buffertracker *sbt = rconn->source.sock.sbt;
748 list_add_tail(&(rconn->source.sock.delflush_list),
749 &(sbt->delflush_conns));
751 rconn->source.sock.delay_flush = 1;
752 } else {
753 if (rconn->source.sock.delay_flush) {
754 list_del(&(rconn->source.sock.delflush_list));
756 rconn->source.sock.delay_flush = 0;
759 mutex_unlock(&(rconn->rcv_lock));
760 mutex_unlock(&sock_bufferlimits_lock);
762 if (likely(copied > 0 || bufferfree <= 0))
763 flush_buf(rconn);
765 if (copied == -EAGAIN && blocking) {
766 if (wait_event_interruptible(rconn->source.sock.wait,
767 sendmsg_maypush(rconn)) == 0)
768 goto recv;
769 copied = -ERESTARTSYS;
772 BUG_ON(copied > total_len);
773 return copied;
776 static int cor_readytoread(struct conn *sconn)
778 int rc = 0;
779 mutex_lock(&(sconn->rcv_lock));
780 rc = (sconn->data_buf.read_remaining != 0) ||
781 unlikely(atomic_read(&(sconn->isreset)) != 0);
782 mutex_unlock(&(sconn->rcv_lock));
783 return rc;
786 int cor_recvmsg(struct kiocb *iocb, struct socket *sock, struct msghdr *msg,
787 size_t total_len, int flags)
789 struct conn *rconn = (struct conn *) sock->sk;
790 struct conn *sconn = rconn->reversedir;
791 size_t copied = 0;
793 int rc = check_conn_state(rconn);
795 int blocking = (flags & MSG_DONTWAIT) == 0;
797 if (unlikely(rc))
798 return -EBADF;
800 BUG_ON(sconn == 0);
802 recv:
803 mutex_lock(&(sconn->rcv_lock));
805 if (unlikely(atomic_read(&(rconn->isreset)) != 0)) {
806 copied = -EPIPE;
807 goto out;
810 copied = databuf_pulluser(sconn, msg);
811 databuf_ackread(sconn);
813 out:
814 mutex_unlock(&(sconn->rcv_lock));
816 if (likely(copied > 0)) {
817 refresh_conn_credits(sconn, 0, 0);
818 unreserve_sock_buffer(sconn);
819 wake_sender(sconn);
823 if (copied == -EAGAIN && blocking) {
824 if (wait_event_interruptible(sconn->target.sock.wait,
825 cor_readytoread(sconn)) == 0)
826 goto recv;
827 copied = -ERESTARTSYS;
830 return copied;
833 const struct proto_ops cor_proto_ops = {
834 .family = PF_COR,
835 .owner = THIS_MODULE,
836 .release = cor_socket_release,
837 .bind = cor_socket_bind,
838 .connect = cor_socket_connect,
839 .accept = cor_socket_accept,
840 .listen = cor_socket_listen,
841 .shutdown = cor_socket_shutdown,
842 .ioctl = cor_ioctl,
843 .sendmsg = cor_sendmsg,
844 .recvmsg = cor_recvmsg
846 /*socketpair
847 getname
848 poll
849 compat_ioctl
850 setsockopt
851 getsockopt
852 compat_setsockopt
853 compat_getsockopt
854 mmap
855 sendpage
856 splice_read*/
859 int cor_createsock(struct net *net, struct socket *sock, int protocol)
861 if (unlikely(protocol != 0))
862 return -EPROTONOSUPPORT;
864 sock->state = SS_UNCONNECTED;
865 sock->ops = &cor_proto_ops;
867 return 0;
870 static struct net_proto_family cor_net_proto_family = {
871 .family = PF_COR,
872 .create = cor_createsock,
873 .owner = THIS_MODULE
876 static int __init cor_sock_init(void)
878 INIT_WORK(&outofsockbufferspace_work, outofsockbufferspace);
879 outofsockbufferspace_scheduled = 0;
881 sock_register(&cor_net_proto_family);
882 sock_bufferusage = 0;
883 return 0;
886 module_init(cor_sock_init);
888 MODULE_LICENSE("GPL");