conn rcv_lock converted to spinlock, struct cor_sock created, kernel_packet skb_clone...
[cor_2_6_31.git] / net / cor / sock.c
blob12d1526f379edfac41707ceb2826f123065240e4
1 /**
2 * Connection oriented routing
3 * Copyright (C) 2007-2011 Michael Blizek
5 * This program is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU General Public License
7 * as published by the Free Software Foundation; either version 2
8 * of the License, or (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
18 * 02110-1301, USA.
21 #include <net/sock.h>
22 #include <linux/net.h>
23 #include <asm/uaccess.h>
25 #include "cor.h"
27 struct kmem_cache *sock_slab;
29 /**
30 * sock_bt_wait_list and waiting_conns are ordered by min amount first, the
31 * order in which resuming will happen
33 DEFINE_SPINLOCK(sock_bufferlimits_lock);
34 LIST_HEAD(sock_bt_list);
35 LIST_HEAD(sock_bt_wait_list);
36 static __u64 sock_bufferusage;
38 static struct work_struct outofsockbufferspace_work;
39 static int outofsockbufferspace_scheduled;
41 static void free_sbt(struct kref *ref)
43 struct sock_buffertracker *sbt = container_of(ref,
44 struct sock_buffertracker, ref);
46 BUG_ON(sbt->usage != 0);
47 BUG_ON(list_empty(&(sbt->waiting_conns)) == 0);
49 list_del(&(sbt->lh));
50 kfree(sbt);
53 static struct sock_buffertracker *get_sock_buffertracker(uid_t uid)
55 struct sock_buffertracker *sbt;
56 struct list_head *curr;
58 curr = sock_bt_list.next;
59 while (curr != &sock_bt_list) {
60 sbt = container_of(curr, struct sock_buffertracker, lh);
61 BUG_ON(list_empty(&(sbt->waiting_conns)) == 0);
62 if (sbt->uid == uid)
63 goto found;
64 curr = curr->next;
67 curr = sock_bt_wait_list.next;
68 while (curr != &sock_bt_wait_list) {
69 sbt = container_of(curr, struct sock_buffertracker, lh);
70 BUG_ON(list_empty(&(sbt->waiting_conns)));
71 if (sbt->uid == uid)
72 goto found;
73 curr = curr->next;
76 sbt = kmalloc(sizeof(struct sock_buffertracker), GFP_ATOMIC);
77 if (sbt != 0) {
78 memset(sbt, 0, sizeof(struct sock_buffertracker));
79 sbt->uid = uid;
80 list_add_tail(&(sbt->lh), &sock_bt_list);
81 INIT_LIST_HEAD(&(sbt->delflush_conns));
82 INIT_LIST_HEAD(&(sbt->waiting_conns));
83 kref_init(&(sbt->ref));
86 if (0) {
87 found:
88 kref_get(&(sbt->ref));
90 return sbt;
93 static void _reserve_sock_buffer_reord_bt(struct sock_buffertracker *sbt,
94 int waitingconnremoved)
96 if (waitingconnremoved && list_empty(&(sbt->waiting_conns))) {
97 list_del(&(sbt->lh));
98 list_add_tail(&(sbt->lh), &sock_bt_list);
99 return;
102 if (list_empty(&(sbt->waiting_conns)))
103 return;
105 while(sbt->lh.next != &sock_bt_wait_list) {
106 struct sock_buffertracker *next = container_of(sbt->lh.next,
107 struct sock_buffertracker, lh);
109 BUG_ON(sbt->lh.next == &sock_bt_list);
111 if (sbt->usage <= next->usage)
112 break;
114 list_del(&(sbt->lh));
115 list_add(&(sbt->lh), &(next->lh));
119 static int oosbs_resumesbt(struct sock_buffertracker *sbt)
121 int restart = 0;
122 struct list_head *curr = sbt->delflush_conns.next;
124 while (curr != &(sbt->delflush_conns)) {
125 struct conn *src_in_o = container_of(curr, struct conn,
126 source.sock.delflush_list);
127 int flush = 0;
129 spin_lock_bh(&(src_in_o->rcv_lock));
131 BUG_ON(src_in_o->sourcetype != SOURCE_SOCK);
133 BUG_ON(src_in_o->source.sock.delay_flush == 0);
135 if (src_in_o->data_buf.read_remaining != 0) {
136 src_in_o->source.sock.delay_flush = 0;
137 list_del(&(src_in_o->source.sock.delflush_list));
138 flush = 1;
141 spin_unlock_bh(&(src_in_o->rcv_lock));
143 if (flush) {
144 if (restart == 0) {
145 restart = 1;
146 kref_get(&(sbt->ref));
147 spin_unlock_bh(&sock_bufferlimits_lock);
149 flush_buf(src_in_o);
152 curr = curr->next;
155 if (restart)
156 kref_put(&(sbt->ref), free_sbt);
158 return restart;
161 static void oosbs_global(void)
163 struct list_head *curr;
165 if (0) {
166 restart:
167 spin_lock_bh(&sock_bufferlimits_lock);
170 curr = sock_bt_list.prev;
171 while (curr != &sock_bt_list) {
172 struct sock_buffertracker *sbt = container_of(curr,
173 struct sock_buffertracker, lh);
174 BUG_ON(list_empty(&(sbt->waiting_conns)) == 0);
175 if (oosbs_resumesbt(sbt))
176 goto restart;
177 curr = curr->prev;
180 curr = sock_bt_wait_list.prev;
181 while (curr != &sock_bt_wait_list) {
182 struct sock_buffertracker *sbt = container_of(curr,
183 struct sock_buffertracker, lh);
184 BUG_ON(list_empty(&(sbt->waiting_conns)));
185 if (oosbs_resumesbt(sbt))
186 goto restart;
187 curr = curr->prev;
191 static void oosbs_user(void)
193 struct list_head *curr;
195 if (0) {
196 restart:
197 spin_lock_bh(&sock_bufferlimits_lock);
200 curr = sock_bt_wait_list.prev;
201 while (curr != &sock_bt_wait_list) {
202 struct sock_buffertracker *sbt = container_of(curr,
203 struct sock_buffertracker, lh);
204 BUG_ON(list_empty(&(sbt->waiting_conns)));
206 if (sbt->usage < (BUFFERLIMIT_SOCK_USER * 3 / 4))
207 break;
209 if (oosbs_resumesbt(sbt))
210 goto restart;
211 curr = curr->prev;
215 static void outofsockbufferspace(struct work_struct *work)
217 spin_lock_bh(&sock_bufferlimits_lock);
218 if (sock_bufferusage < (BUFFERLIMIT_SOCK_GLOBAL * 3 / 4)) {
219 oosbs_user();
220 if (sock_bufferusage >= (BUFFERLIMIT_SOCK_GLOBAL * 3 / 4))
221 goto global;
222 } else {
223 global:
224 oosbs_global();
226 outofsockbufferspace_scheduled = 0;
227 spin_unlock_bh(&sock_bufferlimits_lock);
230 static void _reserve_sock_buffer_inswl(struct conn *src_in_l)
232 struct sock_buffertracker *sbt = src_in_l->source.sock.sbt;
233 struct list_head *curr;
235 BUG_ON(sbt == 0);
237 if (list_empty(&(sbt->waiting_conns)) == 0)
238 goto wlinserted;
240 list_del(&(sbt->lh));
242 curr = sock_bt_wait_list.next;
243 while (curr != &sock_bt_wait_list) {
244 struct sock_buffertracker *currsbt = container_of(curr,
245 struct sock_buffertracker, lh);
246 BUG_ON(list_empty(&(currsbt->waiting_conns)));
247 if (sbt->usage < currsbt->usage) {
248 list_add(&(sbt->lh), curr);
249 goto wlinserted;
251 curr = curr->next;
254 list_add_tail(&(sbt->lh), &sock_bt_wait_list);
256 wlinserted:
257 curr = sbt->waiting_conns.next;
258 while (curr != &(sbt->waiting_conns)) {
259 struct conn *currrconn = container_of(curr, struct conn,
260 source.sock.alwait_list);
261 BUG_ON(currrconn->sourcetype != SOURCE_SOCK);
262 if (src_in_l->source.sock.alloclimit <
263 currrconn->source.sock.alloclimit) {
264 list_add(&(src_in_l->source.sock.alwait_list), curr);
265 goto wcinserted;
267 curr = curr->next;
270 list_add_tail(&(src_in_l->source.sock.alwait_list),
271 &(sbt->waiting_conns));
273 wcinserted:
274 src_in_l->source.sock.in_alwait_list = 1;
276 if (outofsockbufferspace_scheduled == 0) {
277 schedule_work(&outofsockbufferspace_work);
278 outofsockbufferspace_scheduled = 1;
282 static void reserve_sock_buffer(struct conn *src_in_l, __u64 amount)
284 struct sock_buffertracker *sbt = src_in_l->source.sock.sbt;
285 struct sock_buffertracker *first_wait_sbt = list_empty(
286 &sock_bt_wait_list) ? 0 : container_of(
287 sock_bt_wait_list.next, struct sock_buffertracker, lh);
289 __u32 max = (1 << 30) - 1;
291 BUG_ON(sbt == 0);
293 if (unlikely(amount > max))
294 amount = max;
296 amount += src_in_l->data_buf.totalsize + src_in_l->data_buf.overhead -
297 src_in_l->data_buf.cpacket_buffer;
299 if (unlikely(amount > max))
300 amount = max;
302 if (amount > BUFFERLIMIT_SOCK_SOCK)
303 amount = BUFFERLIMIT_SOCK_SOCK;
305 if (amount <= src_in_l->source.sock.alloclimit)
306 return;
308 if ((list_empty(&sock_bt_wait_list) == 0 && first_wait_sbt != 0 &&
309 first_wait_sbt != sbt &&
310 first_wait_sbt->usage <= sbt->usage) ||
311 amount - src_in_l->source.sock.alloclimit >
312 BUFFERLIMIT_SOCK_USER - sbt->usage ||
313 amount - src_in_l->source.sock.alloclimit >
314 BUFFERLIMIT_SOCK_GLOBAL - sock_bufferusage) {
315 _reserve_sock_buffer_inswl(src_in_l);
316 } else {
317 int waitingconnremoved = 0;
318 sbt->usage += amount - src_in_l->source.sock.alloclimit;
319 sock_bufferusage += amount - src_in_l->source.sock.alloclimit;
320 src_in_l->source.sock.alloclimit = amount;
322 if (src_in_l->source.sock.in_alwait_list){
323 list_del(&(src_in_l->source.sock.alwait_list));
324 src_in_l->source.sock.in_alwait_list = 0;
325 waitingconnremoved = 1;
327 _reserve_sock_buffer_reord_bt(sbt, waitingconnremoved);
331 static int _resume_bufferwaiting_socks(struct sock_buffertracker *sbt)
333 int failed = 0;
335 while (list_empty(&(sbt->waiting_conns)) && failed == 0) {
336 struct conn *src_in_o = container_of(sbt->waiting_conns.next,
337 struct conn, source.sock.alwait_list);
338 spin_lock_bh(&(src_in_o->rcv_lock));
340 BUG_ON(src_in_o->sourcetype == SOURCE_SOCK);
341 BUG_ON(src_in_o->source.sock.in_alwait_list == 0);
342 BUG_ON(src_in_o->source.sock.wait_len == 0);
344 reserve_sock_buffer(src_in_o, src_in_o->source.sock.wait_len);
346 if (src_in_o->source.sock.alloclimit +
347 src_in_o->data_buf.cpacket_buffer <=
348 src_in_o->data_buf.totalsize +
349 src_in_o->data_buf.overhead) {
350 failed = 1;
351 goto out;
354 wake_up_interruptible(&(src_in_o->source.sock.wait));
356 out:
357 spin_unlock_bh(&(src_in_o->rcv_lock));
360 return failed;
363 static void resume_bufferwaiting_socks(void)
365 struct list_head *curr = sock_bt_wait_list.next;
367 while (curr != &sock_bt_wait_list) {
368 struct sock_buffertracker *currsbt = container_of(curr,
369 struct sock_buffertracker, lh);
370 BUG_ON(list_empty(&(currsbt->waiting_conns)));
371 curr = curr->next;
373 if (_resume_bufferwaiting_socks(currsbt))
374 return;
378 static void reorder_sock_bt_wait_list(struct sock_buffertracker *sbt)
380 if (list_empty(&(sbt->waiting_conns)))
381 return;
383 while (sbt->lh.prev != &sock_bt_wait_list) {
384 struct sock_buffertracker *prevsbt = container_of(sbt->lh.prev,
385 struct sock_buffertracker, lh);
387 BUG_ON(sbt->lh.next == &sock_bt_list);
389 if (prevsbt->usage <= sbt->usage)
390 break;
392 list_del(&(sbt->lh));
393 list_add_tail(&(sbt->lh), &(prevsbt->lh));
397 void connreset_sbt(struct conn *cn)
399 struct sock_buffertracker *sbt;
401 spin_lock_bh(&sock_bufferlimits_lock);
402 spin_lock_bh(&(cn->rcv_lock));
404 if (cn->sourcetype != SOURCE_SOCK)
405 goto out;
407 sbt = cn->source.sock.sbt;
408 BUG_ON(sbt == 0);
410 if (cn->source.sock.in_alwait_list) {
411 list_del(&(cn->source.sock.alwait_list));
412 cn->source.sock.in_alwait_list = 0;
414 if (list_empty(&(sbt->waiting_conns))) {
415 list_del(&(sbt->lh));
416 list_add_tail(&(sbt->lh), &sock_bt_list);
419 reorder_sock_bt_wait_list(sbt);
422 sbt->usage -= cn->source.sock.alloclimit;
423 if (cn->source.sock.delay_flush) {
424 cn->source.sock.delay_flush = 0;
425 list_del(&(cn->source.sock.delflush_list));
427 kref_put(&(sbt->ref), free_sbt);
428 cn->source.sock.sbt = 0;
430 out:
431 spin_unlock_bh(&(cn->rcv_lock));
432 spin_unlock_bh(&sock_bufferlimits_lock);
435 void unreserve_sock_buffer(struct conn *cn)
437 int freed = 0;
438 struct sock_buffertracker *sbt;
440 spin_lock_bh(&sock_bufferlimits_lock);
441 spin_lock_bh(&(cn->rcv_lock));
443 if (cn->sourcetype != SOURCE_SOCK)
444 goto out;
446 if (unlikely(cn->isreset != 0))
447 goto out;
449 sbt = cn->source.sock.sbt;
450 BUG_ON(sbt == 0);
452 if (cn->data_buf.totalsize + cn->data_buf.overhead <=
453 cn->source.sock.alloclimit +
454 cn->data_buf.cpacket_buffer)
455 goto out;
457 freed = 1;
459 BUG_ON(cn->source.sock.alloclimit > sbt->usage);
460 BUG_ON(cn->source.sock.alloclimit > sock_bufferusage);
461 BUG_ON(cn->data_buf.cpacket_buffer > cn->data_buf.totalsize +
462 cn->data_buf.overhead);
464 sbt->usage -= cn->source.sock.alloclimit;
465 sbt->usage += cn->data_buf.totalsize;
466 sbt->usage += cn->data_buf.overhead;
467 sbt->usage -= cn->data_buf.cpacket_buffer;
469 sock_bufferusage -= cn->source.sock.alloclimit;
470 sock_bufferusage += cn->data_buf.totalsize;
471 sock_bufferusage += cn->data_buf.overhead;
472 sock_bufferusage -= cn->data_buf.cpacket_buffer;
474 cn->source.sock.alloclimit = cn->data_buf.totalsize +
475 cn->data_buf.overhead - cn->data_buf.cpacket_buffer;
477 if (cn->source.sock.alloclimit == 0 &&
478 cn->source.sock.in_alwait_list) {
479 list_del(&(cn->source.sock.alwait_list));
480 cn->source.sock.in_alwait_list = 0;
482 if (list_empty(&(sbt->waiting_conns))) {
483 list_del(&(sbt->lh));
484 list_add_tail(&(sbt->lh), &sock_bt_list);
488 reorder_sock_bt_wait_list(sbt);
490 out:
491 spin_unlock_bh(&(cn->rcv_lock));
493 if (freed)
494 resume_bufferwaiting_socks();
496 spin_unlock_bh(&sock_bufferlimits_lock);
500 int cor_socket_release(struct socket *sock)
502 struct cor_sock *cs = (struct cor_sock *) sock->sk;
504 if (cs->type == SOCKTYPE_UNCONNECTED) {
505 } else if (cs->type == SOCKTYPE_LISTENER) {
506 close_port(cs);
507 } else if (cs->type == SOCKTYPE_CONN) {
508 reset_conn(cs->data.conn.src_sock);
509 kref_put(&(cs->data.conn.src_sock->ref), free_conn);
510 kref_put(&(cs->data.conn.trgt_sock->ref), free_conn);
512 if (cs->data.conn.rcvitem != 0) {
513 databuf_item_free(cs->data.conn.rcvitem);
514 cs->data.conn.rcvitem = 0;
516 } else {
517 BUG();
520 kmem_cache_free(sock_slab, cs);
522 return 0;
525 int cor_socket_bind(struct socket *sock, struct sockaddr *myaddr,
526 int sockaddr_len)
528 int rc = 0;
529 struct cor_sock *cs = (struct cor_sock *) sock->sk;
530 struct cor_sockaddr *addr = (struct cor_sockaddr *) myaddr;
532 if (unlikely(sockaddr_len < sizeof(struct cor_sockaddr)))
533 return -EINVAL;
535 if (unlikely(addr->type != SOCKADDRTYPE_PORT))
536 return -EINVAL;
538 spin_lock_bh(&(cs->lock));
540 if (unlikely(cs->type != SOCKTYPE_UNCONNECTED)) {
541 rc = -EINVAL;
542 goto out;
545 rc = open_port(cs, addr->addr.port);
547 out:
548 spin_unlock_bh(&(cs->lock));
550 return rc;
553 int cor_socket_connect(struct socket *sock, struct sockaddr *vaddr,
554 int sockaddr_len, int flags)
556 struct sock_buffertracker *sbt;
558 struct conn *src_sock;
560 struct cor_sock *cs = (struct cor_sock *) sock->sk;
562 src_sock = alloc_conn(GFP_KERNEL);
564 if (unlikely(src_sock == 0))
565 return -ENOMEM;
567 src_sock->is_client = 1;
569 spin_lock_bh(&sock_bufferlimits_lock);
570 sbt = get_sock_buffertracker(current_uid());
571 spin_unlock_bh(&sock_bufferlimits_lock);
573 if (unlikely(sbt == 0)) {
574 reset_conn(src_sock);
575 return -ENOMEM;
578 spin_lock_bh(&(src_sock->rcv_lock));
579 spin_lock_bh(&(src_sock->reversedir->rcv_lock));
580 spin_lock_bh(&(cs->lock));
581 if (cs->type != SOCKTYPE_UNCONNECTED) {
582 spin_unlock_bh(&(cs->lock));
583 spin_unlock_bh(&(src_sock->reversedir->rcv_lock));
584 spin_unlock_bh(&(src_sock->rcv_lock));
585 reset_conn(src_sock);
586 kref_put(&(sbt->ref), free_sbt);
587 return -EISCONN;
590 conn_init_sock_source(src_sock);
591 src_sock->source.sock.sbt = sbt;
592 conn_init_sock_target(src_sock->reversedir);
594 memset(&(cs->data), 0, sizeof(cs->data));
595 cs->type = SOCKTYPE_CONN;
596 cs->data.conn.src_sock = src_sock;
597 cs->data.conn.trgt_sock = src_sock->reversedir;
598 mutex_init(&(cs->data.conn.rcvbuf_lock));
599 kref_get(&(src_sock->ref));
600 kref_get(&(src_sock->reversedir->ref));
602 spin_unlock_bh(&(cs->lock));
603 spin_unlock_bh(&(src_sock->reversedir->rcv_lock));
604 spin_unlock_bh(&(src_sock->rcv_lock));
606 sock->state = SS_CONNECTED;
608 return 0;
611 static int cor_rdytoaccept(struct cor_sock *cs)
613 int rc;
614 spin_lock_bh(&(cs->lock));
615 BUG_ON(cs->type != SOCKTYPE_LISTENER);
616 rc = (list_empty(&(cs->data.listener.conn_queue)) == 0);
617 spin_unlock_bh(&(cs->lock));
618 return rc;
621 const struct proto_ops cor_proto_ops;
623 int cor_socket_accept(struct socket *sock, struct socket *newsock, int flags)
625 struct sock_buffertracker *sbt;
627 struct cor_sock *cs = (struct cor_sock *) sock->sk;
629 struct conn *src_sock_o;
631 struct cor_sock *newcs;
634 newcs = kmem_cache_alloc(sock_slab, GFP_KERNEL);
635 if (unlikely(newcs == 0))
636 return -ENOMEM;
637 memset(newcs, 0, sizeof(struct cor_sock));
638 newcs->type = SOCKTYPE_CONN;
639 spin_lock_init(&(newcs->lock));
641 spin_lock_bh(&sock_bufferlimits_lock);
642 sbt = get_sock_buffertracker(current_uid());
643 spin_unlock_bh(&sock_bufferlimits_lock);
645 if (unlikely(sbt == 0)) {
646 kmem_cache_free(sock_slab, newcs);
647 return -ENOMEM;
650 restart:
651 spin_lock_bh(&(cs->lock));
653 BUG_ON(cs->type != SOCKTYPE_UNCONNECTED &&
654 cs->type != SOCKTYPE_LISTENER &&
655 cs->type != SOCKTYPE_CONN);
657 if (unlikely(cs->type != SOCKTYPE_LISTENER)) {
658 spin_unlock_bh(&(cs->lock));
659 kref_put(&(sbt->ref), free_sbt);
660 kmem_cache_free(sock_slab, newcs);
661 return -EINVAL;
664 if (unlikely(cs->data.listener.queue_maxlen <= 0)) {
665 spin_unlock_bh(&(cs->lock));
666 kref_put(&(sbt->ref), free_sbt);
667 kmem_cache_free(sock_slab, newcs);
668 return -EINVAL;
671 while (list_empty(&(cs->data.listener.conn_queue))) {
672 spin_unlock_bh(&(cs->lock));
673 if (wait_event_interruptible(cs->data.listener.wait,
674 cor_rdytoaccept(cs))) {
675 kref_put(&(sbt->ref), free_sbt);
676 kmem_cache_free(sock_slab, newcs);
677 return -ERESTARTSYS;
679 spin_lock_bh(&(cs->lock));
682 src_sock_o = container_of(cs->data.listener.conn_queue.next,
683 struct conn, source.sock.cl_list);
685 BUG_ON(src_sock_o->sourcetype != SOURCE_SOCK);
687 list_del(cs->data.listener.conn_queue.next);
689 cs->data.listener.queue_len--;
691 spin_unlock_bh(&(cs->lock));
693 spin_lock_bh(&(src_sock_o->rcv_lock));
694 if (unlikely(src_sock_o->isreset != 0)) {
695 spin_unlock_bh(&(src_sock_o->rcv_lock));
696 kref_put(&(src_sock_o->ref), free_conn);
697 goto restart;
699 src_sock_o->source.sock.sbt = sbt;
700 spin_unlock_bh(&(src_sock_o->rcv_lock));
702 newcs->data.conn.src_sock = src_sock_o;
703 newcs->data.conn.trgt_sock = src_sock_o->reversedir;
704 kref_get(&(src_sock_o->reversedir->ref));
706 newsock->ops = &cor_proto_ops;
707 newsock->sk = (struct sock *) newcs;
708 newsock->state = SS_CONNECTED;
710 return 0;
713 int cor_socket_listen(struct socket *sock, int len)
715 struct cor_sock *cs = (struct cor_sock *) sock->sk;
717 spin_lock_bh(&(cs->lock));
719 BUG_ON(cs->type != SOCKTYPE_UNCONNECTED &&
720 cs->type != SOCKTYPE_LISTENER &&
721 cs->type != SOCKTYPE_CONN);
723 if (unlikely(cs->type != SOCKTYPE_LISTENER)) {
724 spin_unlock_bh(&(cs->lock));
725 return -EOPNOTSUPP;
728 cs->data.listener.queue_maxlen = len;
730 spin_unlock_bh(&(cs->lock));
732 return 0;
735 int cor_socket_shutdown(struct socket *sock, int flags)
737 return -ENOTSUPP;
740 int cor_ioctl(struct socket *sock, unsigned int cmd, unsigned long arg)
742 return -ENOIOCTLCMD;
745 static int sendmsg_maypush(struct cor_sock *cs, struct conn *src_sock)
747 int ret = 0;
749 spin_lock_bh(&sock_bufferlimits_lock);
750 spin_lock_bh(&(src_sock->rcv_lock));
751 spin_lock_bh(&(cs->lock));
753 if (unlikely(unlikely(src_sock != cs->data.conn.src_sock))) {
754 ret = 1;
755 } else if (unlikely(src_sock->isreset != 0)) {
756 ret = 1;
757 } else if (src_sock->source.sock.wait_len == 0) {
758 ret = 1;
759 } else if (src_sock->source.sock.alloclimit +
760 src_sock->data_buf.cpacket_buffer >
761 src_sock->data_buf.totalsize +
762 src_sock->data_buf.overhead) {
763 ret = 1;
764 } else {
765 reserve_sock_buffer(src_sock,
766 src_sock->source.sock.wait_len);
767 if (src_sock->source.sock.alloclimit +
768 src_sock->data_buf.cpacket_buffer >
769 src_sock->data_buf.totalsize +
770 src_sock->data_buf.overhead)
771 ret = 1;
774 spin_unlock_bh(&(cs->lock));
775 spin_unlock_bh(&(src_sock->rcv_lock));
776 spin_unlock_bh(&sock_bufferlimits_lock);
778 return ret;
781 __s32 ___cor_sendmsg(char *buf, __u32 bufread, __u32 buflen,
782 __u32 totalremaining, struct cor_sock *cs,
783 struct conn *src_sock, struct conn *trgt_sock)
785 __s32 rc = 0;
786 __s64 bufferfree;
788 spin_lock_bh(&sock_bufferlimits_lock);
789 spin_lock_bh(&(src_sock->rcv_lock));
790 spin_lock_bh(&(cs->lock));
792 if (unlikely(unlikely(src_sock != cs->data.conn.src_sock) ||
793 unlikely(trgt_sock != cs->data.conn.trgt_sock) ||
794 unlikely(src_sock->isreset != 0))) {
795 spin_unlock_bh(&(cs->lock));
796 spin_unlock_bh(&(src_sock->rcv_lock));
797 spin_unlock_bh(&sock_bufferlimits_lock);
798 return -EPIPE;
801 spin_unlock_bh(&(cs->lock));
803 reserve_sock_buffer(src_sock, (__u32) (buflen +
804 sizeof(struct data_buf_item)));
806 bufferfree = (__s64) src_sock->source.sock.alloclimit +
807 (__s64) src_sock->data_buf.cpacket_buffer -
808 (__s64) src_sock->data_buf.totalsize -
809 (__s64) src_sock->data_buf.overhead;
811 spin_unlock_bh(&sock_bufferlimits_lock);
813 if (bufferfree < (buflen + sizeof(struct data_buf_item))) {
814 kfree(buf);
815 rc = -EAGAIN;
816 printk(KERN_ERR "2");
817 goto out;
820 rc = receive_buf(src_sock, buf, bufread, buflen, 0);
822 out:
824 if (rc == -EAGAIN)
825 src_sock->source.sock.wait_len = totalremaining +
826 sizeof(struct data_buf_item);
827 else
828 src_sock->source.sock.wait_len = 0;
830 spin_unlock_bh(&(src_sock->rcv_lock));
832 return rc;
835 __s32 __cor_sendmsg(struct msghdr *msg, __u32 totallen, int *iovidx,
836 int *iovread, struct cor_sock *cs, struct conn *src_sock,
837 struct conn *trgt_sock)
839 char *buf = 0;
840 __u32 bufread = 0;
841 __u32 buflen = buf_optlen(totallen);
843 buf = kmalloc(buflen, GFP_KERNEL);
844 if (unlikely(buf == 0))
845 return -ENOMEM;
847 memset(buf, 0, buflen);
849 while (bufread < buflen && bufread < totallen) {
850 struct iovec *iov = msg->msg_iov + *iovidx;
851 __user char *userbuf = iov->iov_base + *iovread;
852 __u32 len = iov->iov_len - *iovread;
853 int notcopied;
855 if (len == 0) {
856 (*iovidx)++;
857 (*iovread) = 0;
858 BUG_ON(*iovidx >= msg->msg_iovlen);
859 continue;
862 if (len > (buflen - bufread))
863 len = buflen - bufread;
864 if (len > (totallen - bufread))
865 len = totallen - bufread;
867 notcopied = copy_from_user(buf + bufread, userbuf, len);
869 bufread += len - notcopied;
870 (*iovread) += len - notcopied;
872 if (unlikely(notcopied == buflen) && bufread == 0) {
873 kfree(buf);
874 return -EFAULT;
877 if (unlikely(notcopied > 0))
878 break;
881 return ___cor_sendmsg(buf, bufread, buflen, totallen, cs, src_sock,
882 trgt_sock);
885 __s32 _cor_sendmsg(struct msghdr *msg, size_t total_len, struct cor_sock *cs,
886 struct conn *src_sock, struct conn *trgt_sock)
888 int flush = (msg->msg_flags & MSG_MORE) == 0;
890 __s32 copied = 0;
891 __s32 rc = 0;
892 __u64 max = (1LL << 31) - 1;
893 __u32 totallen = (total_len > max ? max : total_len);
895 int iovidx = 0;
896 int iovread = 0;
898 while (rc >= 0 && copied < totallen) {
899 rc = __cor_sendmsg(msg, totallen - copied, &iovidx, &iovread,
900 cs, src_sock, trgt_sock);
902 if (rc > 0 || copied == 0)
903 copied += rc;
906 unreserve_sock_buffer(src_sock);
908 spin_lock_bh(&sock_bufferlimits_lock);
909 spin_lock_bh(&(src_sock->rcv_lock));
911 if (unlikely(src_sock->isreset != 0)) {
912 spin_unlock_bh(&(src_sock->rcv_lock));
913 spin_unlock_bh(&sock_bufferlimits_lock);
914 return -EPIPE;
917 if (flush == 0 && copied > 0 && copied == total_len &&
918 src_sock->data_buf.totalsize +
919 src_sock->data_buf.overhead -
920 src_sock->data_buf.cpacket_buffer <
921 (BUFFERLIMIT_SOCK_SOCK*3)/4) {
922 if (src_sock->source.sock.delay_flush == 0) {
923 struct sock_buffertracker *sbt =
924 src_sock->source.sock.sbt;
925 BUG_ON(sbt == 0);
926 list_add_tail(&(src_sock->source.sock.delflush_list),
927 &(sbt->delflush_conns));
929 src_sock->source.sock.delay_flush = 1;
930 } else {
931 if (src_sock->source.sock.delay_flush) {
932 list_del(&(src_sock->source.sock.delflush_list));
934 src_sock->source.sock.delay_flush = 0;
937 spin_unlock_bh(&(src_sock->rcv_lock));
938 spin_unlock_bh(&sock_bufferlimits_lock);
940 return copied;
943 int cor_sendmsg(struct kiocb *iocb, struct socket *sock, struct msghdr *msg,
944 size_t total_len)
946 __s32 rc = 0;
948 int blocking = (msg->msg_flags & MSG_DONTWAIT) == 0;
950 struct cor_sock *cs = (struct cor_sock *) sock->sk;
951 struct conn *src_sock;
952 struct conn *trgt_sock;
954 spin_lock_bh(&(cs->lock));
956 BUG_ON(cs->type != SOCKTYPE_UNCONNECTED &&
957 cs->type != SOCKTYPE_LISTENER &&
958 cs->type != SOCKTYPE_CONN);
960 if (unlikely(cs->type != SOCKTYPE_CONN)) {
961 spin_unlock_bh(&(cs->lock));
962 return -EBADF;
965 src_sock = cs->data.conn.src_sock;
966 trgt_sock = cs->data.conn.trgt_sock;
968 kref_get(&(src_sock->ref));
969 kref_get(&(trgt_sock->ref));
971 spin_unlock_bh(&(cs->lock));
973 BUG_ON(src_sock == 0);
974 BUG_ON(trgt_sock == 0);
976 send:
977 rc = _cor_sendmsg(msg, total_len, cs, src_sock, trgt_sock);
979 if (likely(rc > 0 || rc == -EAGAIN))
980 flush_buf(src_sock);
982 if (rc == -EAGAIN && blocking) {
983 #warning todo move waitqueue to cor_sock
984 if (wait_event_interruptible(src_sock->source.sock.wait,
985 sendmsg_maypush(cs, src_sock)) == 0)
986 goto send;
987 rc = -ERESTARTSYS;
990 BUG_ON(rc > total_len);
991 return rc;
994 static int cor_readytoread(struct conn *trgt_sock_o)
996 int rc = 0;
997 spin_lock_bh(&(trgt_sock_o->rcv_lock));
998 rc = (trgt_sock_o->data_buf.read_remaining != 0) ||
999 unlikely(trgt_sock_o->isreset != 0);
1000 spin_unlock_bh(&(trgt_sock_o->rcv_lock));
1001 return rc;
1004 static int __cor_recvmsg(struct msghdr *msg, __u32 totallen,
1005 int *iovidx, int *iovwritten,
1006 struct cor_sock *cs, struct conn *trgt_sock)
1008 struct data_buf_item *dbi = cs->data.conn.rcvitem;
1009 int written = 0;
1011 while (written < totallen && dbi != 0) {
1012 struct iovec *iov = msg->msg_iov + *iovidx;
1013 __user char *userbuf = iov->iov_base + *iovwritten;
1014 __u32 len = iov->iov_len - *iovwritten;
1015 int notcopied;
1017 if (len == 0) {
1018 (*iovidx)++;
1019 (*iovwritten) = 0;
1020 BUG_ON(*iovidx >= msg->msg_iovlen);
1021 continue;
1024 if (dbi->datalen == cs->data.conn.rcvoffset) {
1025 databuf_item_free(cs->data.conn.rcvitem);
1026 cs->data.conn.rcvitem = 0;
1027 cs->data.conn.rcvoffset = 0;
1028 break;
1031 if (len > (dbi->datalen - cs->data.conn.rcvoffset))
1032 len = dbi->datalen - cs->data.conn.rcvoffset;
1033 if (len > (totallen - written))
1034 len = totallen - written;
1036 notcopied = copy_to_user(userbuf, dbi->buf +
1037 cs->data.conn.rcvoffset, len);
1039 written += (len - notcopied);
1040 (*iovwritten) += (len - notcopied);
1041 cs->data.conn.rcvoffset += (len - notcopied);
1043 if (unlikely(notcopied == len) && written == 0)
1044 return -EFAULT;
1046 if (notcopied > 0)
1047 break;
1050 if (written == 0)
1051 return -EAGAIN;
1052 return written;
1055 static int _cor_recvmsg(struct msghdr *msg, size_t total_len,
1056 struct cor_sock *cs, struct conn *trgt_sock)
1058 int copied = 0;
1059 int rc = 0;
1061 __u64 max = (1LL << 31) - 1;
1062 __u32 totallen = (total_len > max ? max : total_len);
1064 int iovidx = 0;
1065 int iovwritten = 0;
1067 mutex_lock(&(cs->data.conn.rcvbuf_lock));
1069 while (rc >= 0 && copied < totallen) {
1070 spin_lock_bh(&(trgt_sock->rcv_lock));
1071 spin_lock_bh(&(cs->lock));
1072 if (unlikely(unlikely(trgt_sock != cs->data.conn.trgt_sock)||
1073 unlikely(trgt_sock->isreset != 0))) {
1074 spin_unlock_bh(&(cs->lock));
1075 spin_unlock_bh(&(trgt_sock->rcv_lock));
1076 mutex_unlock(&(cs->data.conn.rcvbuf_lock));
1077 return -EPIPE;
1080 spin_unlock_bh(&(cs->lock));
1082 if (cs->data.conn.rcvitem == 0)
1083 databuf_pull_dbi(cs, trgt_sock);
1085 spin_unlock_bh(&(trgt_sock->rcv_lock));
1087 rc = __cor_recvmsg(msg, totallen - copied, &iovidx, &iovwritten,
1088 cs, trgt_sock);
1090 if (rc > 0 || copied == 0)
1091 copied += rc;
1094 mutex_unlock(&(cs->data.conn.rcvbuf_lock));
1096 return copied;
1099 int cor_recvmsg(struct kiocb *iocb, struct socket *sock, struct msghdr *msg,
1100 size_t total_len, int flags)
1102 size_t copied = 0;
1103 int blocking = (flags & MSG_DONTWAIT) == 0;
1105 struct cor_sock *cs = (struct cor_sock *) sock->sk;
1106 struct conn *src_sock;
1107 struct conn *trgt_sock;
1109 spin_lock_bh(&(cs->lock));
1111 BUG_ON(cs->type != SOCKTYPE_UNCONNECTED &&
1112 cs->type != SOCKTYPE_LISTENER &&
1113 cs->type != SOCKTYPE_CONN);
1115 if (unlikely(cs->type != SOCKTYPE_CONN)) {
1116 spin_unlock_bh(&(cs->lock));
1117 return -EBADF;
1120 src_sock = cs->data.conn.src_sock;
1121 trgt_sock = cs->data.conn.trgt_sock;
1123 BUG_ON(src_sock == 0);
1124 BUG_ON(trgt_sock == 0);
1126 kref_get(&(src_sock->ref));
1127 kref_get(&(trgt_sock->ref));
1129 spin_unlock_bh(&(cs->lock));
1131 recv:
1132 copied = _cor_recvmsg(msg, total_len, cs, trgt_sock);
1134 if (likely(copied > 0)) {
1135 refresh_conn_credits(trgt_sock, 0, 0);
1136 wake_sender(trgt_sock);
1139 if (copied == -EAGAIN && blocking) {
1140 if (wait_event_interruptible(trgt_sock->target.sock.wait,
1141 cor_readytoread(trgt_sock)) == 0)
1142 goto recv;
1143 copied = -ERESTARTSYS;
1146 kref_put(&(src_sock->ref), free_conn);
1147 kref_put(&(trgt_sock->ref), free_conn);
1149 return copied;
1152 const struct proto_ops cor_proto_ops = {
1153 .family = PF_COR,
1154 .owner = THIS_MODULE,
1155 .release = cor_socket_release,
1156 .bind = cor_socket_bind,
1157 .connect = cor_socket_connect,
1158 .accept = cor_socket_accept,
1159 .listen = cor_socket_listen,
1160 .shutdown = cor_socket_shutdown,
1161 .ioctl = cor_ioctl,
1162 .sendmsg = cor_sendmsg,
1163 .recvmsg = cor_recvmsg
1165 /*socketpair
1166 getname
1167 poll
1168 compat_ioctl
1169 setsockopt
1170 getsockopt
1171 compat_setsockopt
1172 compat_getsockopt
1173 mmap
1174 sendpage
1175 splice_read*/
1178 int cor_createsock(struct net *net, struct socket *sock, int protocol)
1180 struct cor_sock *cs;
1182 if (unlikely(protocol != 0))
1183 return -EPROTONOSUPPORT;
1185 cs = kmem_cache_alloc(sock_slab, GFP_KERNEL);
1186 if (unlikely(cs == 0))
1187 return -ENOMEM;
1188 memset(cs, 0, sizeof(struct cor_sock));
1190 cs->type = SOCKTYPE_UNCONNECTED;
1191 spin_lock_init(&(cs->lock));
1193 sock->state = SS_UNCONNECTED;
1194 sock->ops = &cor_proto_ops;
1195 sock->sk = (struct sock *) cs;
1197 return 0;
1200 static struct net_proto_family cor_net_proto_family = {
1201 .family = PF_COR,
1202 .create = cor_createsock,
1203 .owner = THIS_MODULE
1206 static int __init cor_sock_init(void)
1208 sock_slab = kmem_cache_create("cor_sock",
1209 sizeof(struct cor_sock), 8, 0, 0);
1211 INIT_WORK(&outofsockbufferspace_work, outofsockbufferspace);
1212 outofsockbufferspace_scheduled = 0;
1214 sock_register(&cor_net_proto_family);
1215 sock_bufferusage = 0;
1216 return 0;
1219 module_init(cor_sock_init);
1221 MODULE_LICENSE("GPL");