2 * Connection oriented routing
3 * Copyright (C) 2007-2011 Michael Blizek
5 * This program is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU General Public License
7 * as published by the Free Software Foundation; either version 2
8 * of the License, or (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 #include <linux/net.h>
23 #include <asm/uaccess.h>
27 struct kmem_cache
*sock_slab
;
30 * sock_bt_wait_list and waiting_conns are ordered by min amount first, the
31 * order in which resuming will happen
33 DEFINE_SPINLOCK(sock_bufferlimits_lock
);
34 LIST_HEAD(sock_bt_list
);
35 LIST_HEAD(sock_bt_wait_list
);
36 static __u64 sock_bufferusage
;
38 static struct work_struct outofsockbufferspace_work
;
39 static int outofsockbufferspace_scheduled
;
41 static void free_sbt(struct kref
*ref
)
43 struct sock_buffertracker
*sbt
= container_of(ref
,
44 struct sock_buffertracker
, ref
);
46 BUG_ON(sbt
->usage
!= 0);
47 BUG_ON(list_empty(&(sbt
->waiting_conns
)) == 0);
53 static struct sock_buffertracker
*get_sock_buffertracker(uid_t uid
)
55 struct sock_buffertracker
*sbt
;
56 struct list_head
*curr
;
58 curr
= sock_bt_list
.next
;
59 while (curr
!= &sock_bt_list
) {
60 sbt
= container_of(curr
, struct sock_buffertracker
, lh
);
61 BUG_ON(list_empty(&(sbt
->waiting_conns
)) == 0);
67 curr
= sock_bt_wait_list
.next
;
68 while (curr
!= &sock_bt_wait_list
) {
69 sbt
= container_of(curr
, struct sock_buffertracker
, lh
);
70 BUG_ON(list_empty(&(sbt
->waiting_conns
)));
76 sbt
= kmalloc(sizeof(struct sock_buffertracker
), GFP_ATOMIC
);
78 memset(sbt
, 0, sizeof(struct sock_buffertracker
));
80 list_add_tail(&(sbt
->lh
), &sock_bt_list
);
81 INIT_LIST_HEAD(&(sbt
->delflush_conns
));
82 INIT_LIST_HEAD(&(sbt
->waiting_conns
));
83 kref_init(&(sbt
->ref
));
88 kref_get(&(sbt
->ref
));
93 static void _reserve_sock_buffer_reord_bt(struct sock_buffertracker
*sbt
,
94 int waitingconnremoved
)
96 if (waitingconnremoved
&& list_empty(&(sbt
->waiting_conns
))) {
98 list_add_tail(&(sbt
->lh
), &sock_bt_list
);
102 if (list_empty(&(sbt
->waiting_conns
)))
105 while(sbt
->lh
.next
!= &sock_bt_wait_list
) {
106 struct sock_buffertracker
*next
= container_of(sbt
->lh
.next
,
107 struct sock_buffertracker
, lh
);
109 BUG_ON(sbt
->lh
.next
== &sock_bt_list
);
111 if (sbt
->usage
<= next
->usage
)
114 list_del(&(sbt
->lh
));
115 list_add(&(sbt
->lh
), &(next
->lh
));
119 static int oosbs_resumesbt(struct sock_buffertracker
*sbt
)
122 struct list_head
*curr
= sbt
->delflush_conns
.next
;
124 while (curr
!= &(sbt
->delflush_conns
)) {
125 struct conn
*src_in_o
= container_of(curr
, struct conn
,
126 source
.sock
.delflush_list
);
129 spin_lock_bh(&(src_in_o
->rcv_lock
));
131 BUG_ON(src_in_o
->sourcetype
!= SOURCE_SOCK
);
133 BUG_ON(src_in_o
->source
.sock
.delay_flush
== 0);
135 if (src_in_o
->data_buf
.read_remaining
!= 0) {
136 src_in_o
->source
.sock
.delay_flush
= 0;
137 list_del(&(src_in_o
->source
.sock
.delflush_list
));
141 spin_unlock_bh(&(src_in_o
->rcv_lock
));
146 kref_get(&(sbt
->ref
));
147 spin_unlock_bh(&sock_bufferlimits_lock
);
156 kref_put(&(sbt
->ref
), free_sbt
);
161 static void oosbs_global(void)
163 struct list_head
*curr
;
167 spin_lock_bh(&sock_bufferlimits_lock
);
170 curr
= sock_bt_list
.prev
;
171 while (curr
!= &sock_bt_list
) {
172 struct sock_buffertracker
*sbt
= container_of(curr
,
173 struct sock_buffertracker
, lh
);
174 BUG_ON(list_empty(&(sbt
->waiting_conns
)) == 0);
175 if (oosbs_resumesbt(sbt
))
180 curr
= sock_bt_wait_list
.prev
;
181 while (curr
!= &sock_bt_wait_list
) {
182 struct sock_buffertracker
*sbt
= container_of(curr
,
183 struct sock_buffertracker
, lh
);
184 BUG_ON(list_empty(&(sbt
->waiting_conns
)));
185 if (oosbs_resumesbt(sbt
))
191 static void oosbs_user(void)
193 struct list_head
*curr
;
197 spin_lock_bh(&sock_bufferlimits_lock
);
200 curr
= sock_bt_wait_list
.prev
;
201 while (curr
!= &sock_bt_wait_list
) {
202 struct sock_buffertracker
*sbt
= container_of(curr
,
203 struct sock_buffertracker
, lh
);
204 BUG_ON(list_empty(&(sbt
->waiting_conns
)));
206 if (sbt
->usage
< (BUFFERLIMIT_SOCK_USER
* 3 / 4))
209 if (oosbs_resumesbt(sbt
))
215 static void outofsockbufferspace(struct work_struct
*work
)
217 spin_lock_bh(&sock_bufferlimits_lock
);
218 if (sock_bufferusage
< (BUFFERLIMIT_SOCK_GLOBAL
* 3 / 4)) {
220 if (sock_bufferusage
>= (BUFFERLIMIT_SOCK_GLOBAL
* 3 / 4))
226 outofsockbufferspace_scheduled
= 0;
227 spin_unlock_bh(&sock_bufferlimits_lock
);
230 static void _reserve_sock_buffer_inswl(struct conn
*src_in_l
)
232 struct sock_buffertracker
*sbt
= src_in_l
->source
.sock
.sbt
;
233 struct list_head
*curr
;
237 if (list_empty(&(sbt
->waiting_conns
)) == 0)
240 list_del(&(sbt
->lh
));
242 curr
= sock_bt_wait_list
.next
;
243 while (curr
!= &sock_bt_wait_list
) {
244 struct sock_buffertracker
*currsbt
= container_of(curr
,
245 struct sock_buffertracker
, lh
);
246 BUG_ON(list_empty(&(currsbt
->waiting_conns
)));
247 if (sbt
->usage
< currsbt
->usage
) {
248 list_add(&(sbt
->lh
), curr
);
254 list_add_tail(&(sbt
->lh
), &sock_bt_wait_list
);
257 curr
= sbt
->waiting_conns
.next
;
258 while (curr
!= &(sbt
->waiting_conns
)) {
259 struct conn
*currrconn
= container_of(curr
, struct conn
,
260 source
.sock
.alwait_list
);
261 BUG_ON(currrconn
->sourcetype
!= SOURCE_SOCK
);
262 if (src_in_l
->source
.sock
.alloclimit
<
263 currrconn
->source
.sock
.alloclimit
) {
264 list_add(&(src_in_l
->source
.sock
.alwait_list
), curr
);
270 list_add_tail(&(src_in_l
->source
.sock
.alwait_list
),
271 &(sbt
->waiting_conns
));
274 src_in_l
->source
.sock
.in_alwait_list
= 1;
276 if (outofsockbufferspace_scheduled
== 0) {
277 schedule_work(&outofsockbufferspace_work
);
278 outofsockbufferspace_scheduled
= 1;
282 static void reserve_sock_buffer(struct conn
*src_in_l
, __u64 amount
)
284 struct sock_buffertracker
*sbt
= src_in_l
->source
.sock
.sbt
;
285 struct sock_buffertracker
*first_wait_sbt
= list_empty(
286 &sock_bt_wait_list
) ? 0 : container_of(
287 sock_bt_wait_list
.next
, struct sock_buffertracker
, lh
);
289 __u32 max
= (1 << 30) - 1;
293 if (unlikely(amount
> max
))
296 amount
+= src_in_l
->data_buf
.totalsize
+ src_in_l
->data_buf
.overhead
-
297 src_in_l
->data_buf
.cpacket_buffer
;
299 if (unlikely(amount
> max
))
302 if (amount
> BUFFERLIMIT_SOCK_SOCK
)
303 amount
= BUFFERLIMIT_SOCK_SOCK
;
305 if (amount
<= src_in_l
->source
.sock
.alloclimit
)
308 if ((list_empty(&sock_bt_wait_list
) == 0 && first_wait_sbt
!= 0 &&
309 first_wait_sbt
!= sbt
&&
310 first_wait_sbt
->usage
<= sbt
->usage
) ||
311 amount
- src_in_l
->source
.sock
.alloclimit
>
312 BUFFERLIMIT_SOCK_USER
- sbt
->usage
||
313 amount
- src_in_l
->source
.sock
.alloclimit
>
314 BUFFERLIMIT_SOCK_GLOBAL
- sock_bufferusage
) {
315 _reserve_sock_buffer_inswl(src_in_l
);
317 int waitingconnremoved
= 0;
318 sbt
->usage
+= amount
- src_in_l
->source
.sock
.alloclimit
;
319 sock_bufferusage
+= amount
- src_in_l
->source
.sock
.alloclimit
;
320 src_in_l
->source
.sock
.alloclimit
= amount
;
322 if (src_in_l
->source
.sock
.in_alwait_list
){
323 list_del(&(src_in_l
->source
.sock
.alwait_list
));
324 src_in_l
->source
.sock
.in_alwait_list
= 0;
325 waitingconnremoved
= 1;
327 _reserve_sock_buffer_reord_bt(sbt
, waitingconnremoved
);
331 static int _resume_bufferwaiting_socks(struct sock_buffertracker
*sbt
)
335 while (list_empty(&(sbt
->waiting_conns
)) && failed
== 0) {
336 struct conn
*src_in_o
= container_of(sbt
->waiting_conns
.next
,
337 struct conn
, source
.sock
.alwait_list
);
338 spin_lock_bh(&(src_in_o
->rcv_lock
));
340 BUG_ON(src_in_o
->sourcetype
== SOURCE_SOCK
);
341 BUG_ON(src_in_o
->source
.sock
.in_alwait_list
== 0);
342 BUG_ON(src_in_o
->source
.sock
.wait_len
== 0);
344 reserve_sock_buffer(src_in_o
, src_in_o
->source
.sock
.wait_len
);
346 if (src_in_o
->source
.sock
.alloclimit
+
347 src_in_o
->data_buf
.cpacket_buffer
<=
348 src_in_o
->data_buf
.totalsize
+
349 src_in_o
->data_buf
.overhead
) {
354 wake_up_interruptible(&(src_in_o
->source
.sock
.wait
));
357 spin_unlock_bh(&(src_in_o
->rcv_lock
));
363 static void resume_bufferwaiting_socks(void)
365 struct list_head
*curr
= sock_bt_wait_list
.next
;
367 while (curr
!= &sock_bt_wait_list
) {
368 struct sock_buffertracker
*currsbt
= container_of(curr
,
369 struct sock_buffertracker
, lh
);
370 BUG_ON(list_empty(&(currsbt
->waiting_conns
)));
373 if (_resume_bufferwaiting_socks(currsbt
))
378 static void reorder_sock_bt_wait_list(struct sock_buffertracker
*sbt
)
380 if (list_empty(&(sbt
->waiting_conns
)))
383 while (sbt
->lh
.prev
!= &sock_bt_wait_list
) {
384 struct sock_buffertracker
*prevsbt
= container_of(sbt
->lh
.prev
,
385 struct sock_buffertracker
, lh
);
387 BUG_ON(sbt
->lh
.next
== &sock_bt_list
);
389 if (prevsbt
->usage
<= sbt
->usage
)
392 list_del(&(sbt
->lh
));
393 list_add_tail(&(sbt
->lh
), &(prevsbt
->lh
));
397 void connreset_sbt(struct conn
*cn
)
399 struct sock_buffertracker
*sbt
;
401 spin_lock_bh(&sock_bufferlimits_lock
);
402 spin_lock_bh(&(cn
->rcv_lock
));
404 if (cn
->sourcetype
!= SOURCE_SOCK
)
407 sbt
= cn
->source
.sock
.sbt
;
410 if (cn
->source
.sock
.in_alwait_list
) {
411 list_del(&(cn
->source
.sock
.alwait_list
));
412 cn
->source
.sock
.in_alwait_list
= 0;
414 if (list_empty(&(sbt
->waiting_conns
))) {
415 list_del(&(sbt
->lh
));
416 list_add_tail(&(sbt
->lh
), &sock_bt_list
);
419 reorder_sock_bt_wait_list(sbt
);
422 sbt
->usage
-= cn
->source
.sock
.alloclimit
;
423 if (cn
->source
.sock
.delay_flush
) {
424 cn
->source
.sock
.delay_flush
= 0;
425 list_del(&(cn
->source
.sock
.delflush_list
));
427 kref_put(&(sbt
->ref
), free_sbt
);
428 cn
->source
.sock
.sbt
= 0;
431 spin_unlock_bh(&(cn
->rcv_lock
));
432 spin_unlock_bh(&sock_bufferlimits_lock
);
435 void unreserve_sock_buffer(struct conn
*cn
)
438 struct sock_buffertracker
*sbt
;
440 spin_lock_bh(&sock_bufferlimits_lock
);
441 spin_lock_bh(&(cn
->rcv_lock
));
443 if (cn
->sourcetype
!= SOURCE_SOCK
)
446 if (unlikely(cn
->isreset
!= 0))
449 sbt
= cn
->source
.sock
.sbt
;
452 if (cn
->data_buf
.totalsize
+ cn
->data_buf
.overhead
<=
453 cn
->source
.sock
.alloclimit
+
454 cn
->data_buf
.cpacket_buffer
)
459 BUG_ON(cn
->source
.sock
.alloclimit
> sbt
->usage
);
460 BUG_ON(cn
->source
.sock
.alloclimit
> sock_bufferusage
);
461 BUG_ON(cn
->data_buf
.cpacket_buffer
> cn
->data_buf
.totalsize
+
462 cn
->data_buf
.overhead
);
464 sbt
->usage
-= cn
->source
.sock
.alloclimit
;
465 sbt
->usage
+= cn
->data_buf
.totalsize
;
466 sbt
->usage
+= cn
->data_buf
.overhead
;
467 sbt
->usage
-= cn
->data_buf
.cpacket_buffer
;
469 sock_bufferusage
-= cn
->source
.sock
.alloclimit
;
470 sock_bufferusage
+= cn
->data_buf
.totalsize
;
471 sock_bufferusage
+= cn
->data_buf
.overhead
;
472 sock_bufferusage
-= cn
->data_buf
.cpacket_buffer
;
474 cn
->source
.sock
.alloclimit
= cn
->data_buf
.totalsize
+
475 cn
->data_buf
.overhead
- cn
->data_buf
.cpacket_buffer
;
477 if (cn
->source
.sock
.alloclimit
== 0 &&
478 cn
->source
.sock
.in_alwait_list
) {
479 list_del(&(cn
->source
.sock
.alwait_list
));
480 cn
->source
.sock
.in_alwait_list
= 0;
482 if (list_empty(&(sbt
->waiting_conns
))) {
483 list_del(&(sbt
->lh
));
484 list_add_tail(&(sbt
->lh
), &sock_bt_list
);
488 reorder_sock_bt_wait_list(sbt
);
491 spin_unlock_bh(&(cn
->rcv_lock
));
494 resume_bufferwaiting_socks();
496 spin_unlock_bh(&sock_bufferlimits_lock
);
500 int cor_socket_release(struct socket
*sock
)
502 struct cor_sock
*cs
= (struct cor_sock
*) sock
->sk
;
504 if (cs
->type
== SOCKTYPE_UNCONNECTED
) {
505 } else if (cs
->type
== SOCKTYPE_LISTENER
) {
507 } else if (cs
->type
== SOCKTYPE_CONN
) {
508 reset_conn(cs
->data
.conn
.src_sock
);
509 kref_put(&(cs
->data
.conn
.src_sock
->ref
), free_conn
);
510 kref_put(&(cs
->data
.conn
.trgt_sock
->ref
), free_conn
);
512 if (cs
->data
.conn
.rcvitem
!= 0) {
513 databuf_item_free(cs
->data
.conn
.rcvitem
);
514 cs
->data
.conn
.rcvitem
= 0;
520 kmem_cache_free(sock_slab
, cs
);
525 int cor_socket_bind(struct socket
*sock
, struct sockaddr
*myaddr
,
529 struct cor_sock
*cs
= (struct cor_sock
*) sock
->sk
;
530 struct cor_sockaddr
*addr
= (struct cor_sockaddr
*) myaddr
;
532 if (unlikely(sockaddr_len
< sizeof(struct cor_sockaddr
)))
535 if (unlikely(addr
->type
!= SOCKADDRTYPE_PORT
))
538 spin_lock_bh(&(cs
->lock
));
540 if (unlikely(cs
->type
!= SOCKTYPE_UNCONNECTED
)) {
545 rc
= open_port(cs
, addr
->addr
.port
);
548 spin_unlock_bh(&(cs
->lock
));
553 int cor_socket_connect(struct socket
*sock
, struct sockaddr
*vaddr
,
554 int sockaddr_len
, int flags
)
556 struct sock_buffertracker
*sbt
;
558 struct conn
*src_sock
;
560 struct cor_sock
*cs
= (struct cor_sock
*) sock
->sk
;
562 src_sock
= alloc_conn(GFP_KERNEL
);
564 if (unlikely(src_sock
== 0))
567 src_sock
->is_client
= 1;
569 spin_lock_bh(&sock_bufferlimits_lock
);
570 sbt
= get_sock_buffertracker(current_uid());
571 spin_unlock_bh(&sock_bufferlimits_lock
);
573 if (unlikely(sbt
== 0)) {
574 reset_conn(src_sock
);
578 spin_lock_bh(&(src_sock
->rcv_lock
));
579 spin_lock_bh(&(src_sock
->reversedir
->rcv_lock
));
580 spin_lock_bh(&(cs
->lock
));
581 if (cs
->type
!= SOCKTYPE_UNCONNECTED
) {
582 spin_unlock_bh(&(cs
->lock
));
583 spin_unlock_bh(&(src_sock
->reversedir
->rcv_lock
));
584 spin_unlock_bh(&(src_sock
->rcv_lock
));
585 reset_conn(src_sock
);
586 kref_put(&(sbt
->ref
), free_sbt
);
590 conn_init_sock_source(src_sock
);
591 src_sock
->source
.sock
.sbt
= sbt
;
592 conn_init_sock_target(src_sock
->reversedir
);
594 memset(&(cs
->data
), 0, sizeof(cs
->data
));
595 cs
->type
= SOCKTYPE_CONN
;
596 cs
->data
.conn
.src_sock
= src_sock
;
597 cs
->data
.conn
.trgt_sock
= src_sock
->reversedir
;
598 mutex_init(&(cs
->data
.conn
.rcvbuf_lock
));
599 kref_get(&(src_sock
->ref
));
600 kref_get(&(src_sock
->reversedir
->ref
));
602 spin_unlock_bh(&(cs
->lock
));
603 spin_unlock_bh(&(src_sock
->reversedir
->rcv_lock
));
604 spin_unlock_bh(&(src_sock
->rcv_lock
));
606 sock
->state
= SS_CONNECTED
;
611 static int cor_rdytoaccept(struct cor_sock
*cs
)
614 spin_lock_bh(&(cs
->lock
));
615 BUG_ON(cs
->type
!= SOCKTYPE_LISTENER
);
616 rc
= (list_empty(&(cs
->data
.listener
.conn_queue
)) == 0);
617 spin_unlock_bh(&(cs
->lock
));
621 const struct proto_ops cor_proto_ops
;
623 int cor_socket_accept(struct socket
*sock
, struct socket
*newsock
, int flags
)
625 struct sock_buffertracker
*sbt
;
627 struct cor_sock
*cs
= (struct cor_sock
*) sock
->sk
;
629 struct conn
*src_sock_o
;
631 struct cor_sock
*newcs
;
634 newcs
= kmem_cache_alloc(sock_slab
, GFP_KERNEL
);
635 if (unlikely(newcs
== 0))
637 memset(newcs
, 0, sizeof(struct cor_sock
));
638 newcs
->type
= SOCKTYPE_CONN
;
639 spin_lock_init(&(newcs
->lock
));
641 spin_lock_bh(&sock_bufferlimits_lock
);
642 sbt
= get_sock_buffertracker(current_uid());
643 spin_unlock_bh(&sock_bufferlimits_lock
);
645 if (unlikely(sbt
== 0)) {
646 kmem_cache_free(sock_slab
, newcs
);
651 spin_lock_bh(&(cs
->lock
));
653 BUG_ON(cs
->type
!= SOCKTYPE_UNCONNECTED
&&
654 cs
->type
!= SOCKTYPE_LISTENER
&&
655 cs
->type
!= SOCKTYPE_CONN
);
657 if (unlikely(cs
->type
!= SOCKTYPE_LISTENER
)) {
658 spin_unlock_bh(&(cs
->lock
));
659 kref_put(&(sbt
->ref
), free_sbt
);
660 kmem_cache_free(sock_slab
, newcs
);
664 if (unlikely(cs
->data
.listener
.queue_maxlen
<= 0)) {
665 spin_unlock_bh(&(cs
->lock
));
666 kref_put(&(sbt
->ref
), free_sbt
);
667 kmem_cache_free(sock_slab
, newcs
);
671 while (list_empty(&(cs
->data
.listener
.conn_queue
))) {
672 spin_unlock_bh(&(cs
->lock
));
673 if (wait_event_interruptible(cs
->data
.listener
.wait
,
674 cor_rdytoaccept(cs
))) {
675 kref_put(&(sbt
->ref
), free_sbt
);
676 kmem_cache_free(sock_slab
, newcs
);
679 spin_lock_bh(&(cs
->lock
));
682 src_sock_o
= container_of(cs
->data
.listener
.conn_queue
.next
,
683 struct conn
, source
.sock
.cl_list
);
685 BUG_ON(src_sock_o
->sourcetype
!= SOURCE_SOCK
);
687 list_del(cs
->data
.listener
.conn_queue
.next
);
689 cs
->data
.listener
.queue_len
--;
691 spin_unlock_bh(&(cs
->lock
));
693 spin_lock_bh(&(src_sock_o
->rcv_lock
));
694 if (unlikely(src_sock_o
->isreset
!= 0)) {
695 spin_unlock_bh(&(src_sock_o
->rcv_lock
));
696 kref_put(&(src_sock_o
->ref
), free_conn
);
699 src_sock_o
->source
.sock
.sbt
= sbt
;
700 spin_unlock_bh(&(src_sock_o
->rcv_lock
));
702 newcs
->data
.conn
.src_sock
= src_sock_o
;
703 newcs
->data
.conn
.trgt_sock
= src_sock_o
->reversedir
;
704 kref_get(&(src_sock_o
->reversedir
->ref
));
706 newsock
->ops
= &cor_proto_ops
;
707 newsock
->sk
= (struct sock
*) newcs
;
708 newsock
->state
= SS_CONNECTED
;
713 int cor_socket_listen(struct socket
*sock
, int len
)
715 struct cor_sock
*cs
= (struct cor_sock
*) sock
->sk
;
717 spin_lock_bh(&(cs
->lock
));
719 BUG_ON(cs
->type
!= SOCKTYPE_UNCONNECTED
&&
720 cs
->type
!= SOCKTYPE_LISTENER
&&
721 cs
->type
!= SOCKTYPE_CONN
);
723 if (unlikely(cs
->type
!= SOCKTYPE_LISTENER
)) {
724 spin_unlock_bh(&(cs
->lock
));
728 cs
->data
.listener
.queue_maxlen
= len
;
730 spin_unlock_bh(&(cs
->lock
));
735 int cor_socket_shutdown(struct socket
*sock
, int flags
)
740 int cor_ioctl(struct socket
*sock
, unsigned int cmd
, unsigned long arg
)
745 static int sendmsg_maypush(struct cor_sock
*cs
, struct conn
*src_sock
)
749 spin_lock_bh(&sock_bufferlimits_lock
);
750 spin_lock_bh(&(src_sock
->rcv_lock
));
751 spin_lock_bh(&(cs
->lock
));
753 if (unlikely(unlikely(src_sock
!= cs
->data
.conn
.src_sock
))) {
755 } else if (unlikely(src_sock
->isreset
!= 0)) {
757 } else if (src_sock
->source
.sock
.wait_len
== 0) {
759 } else if (src_sock
->source
.sock
.alloclimit
+
760 src_sock
->data_buf
.cpacket_buffer
>
761 src_sock
->data_buf
.totalsize
+
762 src_sock
->data_buf
.overhead
) {
765 reserve_sock_buffer(src_sock
,
766 src_sock
->source
.sock
.wait_len
);
767 if (src_sock
->source
.sock
.alloclimit
+
768 src_sock
->data_buf
.cpacket_buffer
>
769 src_sock
->data_buf
.totalsize
+
770 src_sock
->data_buf
.overhead
)
774 spin_unlock_bh(&(cs
->lock
));
775 spin_unlock_bh(&(src_sock
->rcv_lock
));
776 spin_unlock_bh(&sock_bufferlimits_lock
);
781 __s32
___cor_sendmsg(char *buf
, __u32 bufread
, __u32 buflen
,
782 __u32 totalremaining
, struct cor_sock
*cs
,
783 struct conn
*src_sock
, struct conn
*trgt_sock
)
788 spin_lock_bh(&sock_bufferlimits_lock
);
789 spin_lock_bh(&(src_sock
->rcv_lock
));
790 spin_lock_bh(&(cs
->lock
));
792 if (unlikely(unlikely(src_sock
!= cs
->data
.conn
.src_sock
) ||
793 unlikely(trgt_sock
!= cs
->data
.conn
.trgt_sock
) ||
794 unlikely(src_sock
->isreset
!= 0))) {
795 spin_unlock_bh(&(cs
->lock
));
796 spin_unlock_bh(&(src_sock
->rcv_lock
));
797 spin_unlock_bh(&sock_bufferlimits_lock
);
801 spin_unlock_bh(&(cs
->lock
));
803 reserve_sock_buffer(src_sock
, (__u32
) (buflen
+
804 sizeof(struct data_buf_item
)));
806 bufferfree
= (__s64
) src_sock
->source
.sock
.alloclimit
+
807 (__s64
) src_sock
->data_buf
.cpacket_buffer
-
808 (__s64
) src_sock
->data_buf
.totalsize
-
809 (__s64
) src_sock
->data_buf
.overhead
;
811 spin_unlock_bh(&sock_bufferlimits_lock
);
813 if (bufferfree
< (buflen
+ sizeof(struct data_buf_item
))) {
816 printk(KERN_ERR
"2");
820 rc
= receive_buf(src_sock
, buf
, bufread
, buflen
, 0);
825 src_sock
->source
.sock
.wait_len
= totalremaining
+
826 sizeof(struct data_buf_item
);
828 src_sock
->source
.sock
.wait_len
= 0;
830 spin_unlock_bh(&(src_sock
->rcv_lock
));
835 __s32
__cor_sendmsg(struct msghdr
*msg
, __u32 totallen
, int *iovidx
,
836 int *iovread
, struct cor_sock
*cs
, struct conn
*src_sock
,
837 struct conn
*trgt_sock
)
841 __u32 buflen
= buf_optlen(totallen
);
843 buf
= kmalloc(buflen
, GFP_KERNEL
);
844 if (unlikely(buf
== 0))
847 memset(buf
, 0, buflen
);
849 while (bufread
< buflen
&& bufread
< totallen
) {
850 struct iovec
*iov
= msg
->msg_iov
+ *iovidx
;
851 __user
char *userbuf
= iov
->iov_base
+ *iovread
;
852 __u32 len
= iov
->iov_len
- *iovread
;
858 BUG_ON(*iovidx
>= msg
->msg_iovlen
);
862 if (len
> (buflen
- bufread
))
863 len
= buflen
- bufread
;
864 if (len
> (totallen
- bufread
))
865 len
= totallen
- bufread
;
867 notcopied
= copy_from_user(buf
+ bufread
, userbuf
, len
);
869 bufread
+= len
- notcopied
;
870 (*iovread
) += len
- notcopied
;
872 if (unlikely(notcopied
== buflen
) && bufread
== 0) {
877 if (unlikely(notcopied
> 0))
881 return ___cor_sendmsg(buf
, bufread
, buflen
, totallen
, cs
, src_sock
,
885 __s32
_cor_sendmsg(struct msghdr
*msg
, size_t total_len
, struct cor_sock
*cs
,
886 struct conn
*src_sock
, struct conn
*trgt_sock
)
888 int flush
= (msg
->msg_flags
& MSG_MORE
) == 0;
892 __u64 max
= (1LL << 31) - 1;
893 __u32 totallen
= (total_len
> max
? max
: total_len
);
898 while (rc
>= 0 && copied
< totallen
) {
899 rc
= __cor_sendmsg(msg
, totallen
- copied
, &iovidx
, &iovread
,
900 cs
, src_sock
, trgt_sock
);
902 if (rc
> 0 || copied
== 0)
906 unreserve_sock_buffer(src_sock
);
908 spin_lock_bh(&sock_bufferlimits_lock
);
909 spin_lock_bh(&(src_sock
->rcv_lock
));
911 if (unlikely(src_sock
->isreset
!= 0)) {
912 spin_unlock_bh(&(src_sock
->rcv_lock
));
913 spin_unlock_bh(&sock_bufferlimits_lock
);
917 if (flush
== 0 && copied
> 0 && copied
== total_len
&&
918 src_sock
->data_buf
.totalsize
+
919 src_sock
->data_buf
.overhead
-
920 src_sock
->data_buf
.cpacket_buffer
<
921 (BUFFERLIMIT_SOCK_SOCK
*3)/4) {
922 if (src_sock
->source
.sock
.delay_flush
== 0) {
923 struct sock_buffertracker
*sbt
=
924 src_sock
->source
.sock
.sbt
;
926 list_add_tail(&(src_sock
->source
.sock
.delflush_list
),
927 &(sbt
->delflush_conns
));
929 src_sock
->source
.sock
.delay_flush
= 1;
931 if (src_sock
->source
.sock
.delay_flush
) {
932 list_del(&(src_sock
->source
.sock
.delflush_list
));
934 src_sock
->source
.sock
.delay_flush
= 0;
937 spin_unlock_bh(&(src_sock
->rcv_lock
));
938 spin_unlock_bh(&sock_bufferlimits_lock
);
943 int cor_sendmsg(struct kiocb
*iocb
, struct socket
*sock
, struct msghdr
*msg
,
948 int blocking
= (msg
->msg_flags
& MSG_DONTWAIT
) == 0;
950 struct cor_sock
*cs
= (struct cor_sock
*) sock
->sk
;
951 struct conn
*src_sock
;
952 struct conn
*trgt_sock
;
954 spin_lock_bh(&(cs
->lock
));
956 BUG_ON(cs
->type
!= SOCKTYPE_UNCONNECTED
&&
957 cs
->type
!= SOCKTYPE_LISTENER
&&
958 cs
->type
!= SOCKTYPE_CONN
);
960 if (unlikely(cs
->type
!= SOCKTYPE_CONN
)) {
961 spin_unlock_bh(&(cs
->lock
));
965 src_sock
= cs
->data
.conn
.src_sock
;
966 trgt_sock
= cs
->data
.conn
.trgt_sock
;
968 kref_get(&(src_sock
->ref
));
969 kref_get(&(trgt_sock
->ref
));
971 spin_unlock_bh(&(cs
->lock
));
973 BUG_ON(src_sock
== 0);
974 BUG_ON(trgt_sock
== 0);
977 rc
= _cor_sendmsg(msg
, total_len
, cs
, src_sock
, trgt_sock
);
979 if (likely(rc
> 0 || rc
== -EAGAIN
))
982 if (rc
== -EAGAIN
&& blocking
) {
983 #warning todo move waitqueue to cor_sock
984 if (wait_event_interruptible(src_sock
->source
.sock
.wait
,
985 sendmsg_maypush(cs
, src_sock
)) == 0)
990 BUG_ON(rc
> total_len
);
994 static int cor_readytoread(struct conn
*trgt_sock_o
)
997 spin_lock_bh(&(trgt_sock_o
->rcv_lock
));
998 rc
= (trgt_sock_o
->data_buf
.read_remaining
!= 0) ||
999 unlikely(trgt_sock_o
->isreset
!= 0);
1000 spin_unlock_bh(&(trgt_sock_o
->rcv_lock
));
1004 static int __cor_recvmsg(struct msghdr
*msg
, __u32 totallen
,
1005 int *iovidx
, int *iovwritten
,
1006 struct cor_sock
*cs
, struct conn
*trgt_sock
)
1008 struct data_buf_item
*dbi
= cs
->data
.conn
.rcvitem
;
1011 while (written
< totallen
&& dbi
!= 0) {
1012 struct iovec
*iov
= msg
->msg_iov
+ *iovidx
;
1013 __user
char *userbuf
= iov
->iov_base
+ *iovwritten
;
1014 __u32 len
= iov
->iov_len
- *iovwritten
;
1020 BUG_ON(*iovidx
>= msg
->msg_iovlen
);
1024 if (dbi
->datalen
== cs
->data
.conn
.rcvoffset
) {
1025 databuf_item_free(cs
->data
.conn
.rcvitem
);
1026 cs
->data
.conn
.rcvitem
= 0;
1027 cs
->data
.conn
.rcvoffset
= 0;
1031 if (len
> (dbi
->datalen
- cs
->data
.conn
.rcvoffset
))
1032 len
= dbi
->datalen
- cs
->data
.conn
.rcvoffset
;
1033 if (len
> (totallen
- written
))
1034 len
= totallen
- written
;
1036 notcopied
= copy_to_user(userbuf
, dbi
->buf
+
1037 cs
->data
.conn
.rcvoffset
, len
);
1039 written
+= (len
- notcopied
);
1040 (*iovwritten
) += (len
- notcopied
);
1041 cs
->data
.conn
.rcvoffset
+= (len
- notcopied
);
1043 if (unlikely(notcopied
== len
) && written
== 0)
1055 static int _cor_recvmsg(struct msghdr
*msg
, size_t total_len
,
1056 struct cor_sock
*cs
, struct conn
*trgt_sock
)
1061 __u64 max
= (1LL << 31) - 1;
1062 __u32 totallen
= (total_len
> max
? max
: total_len
);
1067 mutex_lock(&(cs
->data
.conn
.rcvbuf_lock
));
1069 while (rc
>= 0 && copied
< totallen
) {
1070 spin_lock_bh(&(trgt_sock
->rcv_lock
));
1071 spin_lock_bh(&(cs
->lock
));
1072 if (unlikely(unlikely(trgt_sock
!= cs
->data
.conn
.trgt_sock
)||
1073 unlikely(trgt_sock
->isreset
!= 0))) {
1074 spin_unlock_bh(&(cs
->lock
));
1075 spin_unlock_bh(&(trgt_sock
->rcv_lock
));
1076 mutex_unlock(&(cs
->data
.conn
.rcvbuf_lock
));
1080 spin_unlock_bh(&(cs
->lock
));
1082 if (cs
->data
.conn
.rcvitem
== 0)
1083 databuf_pull_dbi(cs
, trgt_sock
);
1085 spin_unlock_bh(&(trgt_sock
->rcv_lock
));
1087 rc
= __cor_recvmsg(msg
, totallen
- copied
, &iovidx
, &iovwritten
,
1090 if (rc
> 0 || copied
== 0)
1094 mutex_unlock(&(cs
->data
.conn
.rcvbuf_lock
));
1099 int cor_recvmsg(struct kiocb
*iocb
, struct socket
*sock
, struct msghdr
*msg
,
1100 size_t total_len
, int flags
)
1103 int blocking
= (flags
& MSG_DONTWAIT
) == 0;
1105 struct cor_sock
*cs
= (struct cor_sock
*) sock
->sk
;
1106 struct conn
*src_sock
;
1107 struct conn
*trgt_sock
;
1109 spin_lock_bh(&(cs
->lock
));
1111 BUG_ON(cs
->type
!= SOCKTYPE_UNCONNECTED
&&
1112 cs
->type
!= SOCKTYPE_LISTENER
&&
1113 cs
->type
!= SOCKTYPE_CONN
);
1115 if (unlikely(cs
->type
!= SOCKTYPE_CONN
)) {
1116 spin_unlock_bh(&(cs
->lock
));
1120 src_sock
= cs
->data
.conn
.src_sock
;
1121 trgt_sock
= cs
->data
.conn
.trgt_sock
;
1123 BUG_ON(src_sock
== 0);
1124 BUG_ON(trgt_sock
== 0);
1126 kref_get(&(src_sock
->ref
));
1127 kref_get(&(trgt_sock
->ref
));
1129 spin_unlock_bh(&(cs
->lock
));
1132 copied
= _cor_recvmsg(msg
, total_len
, cs
, trgt_sock
);
1134 if (likely(copied
> 0)) {
1135 refresh_conn_credits(trgt_sock
, 0, 0);
1136 wake_sender(trgt_sock
);
1139 if (copied
== -EAGAIN
&& blocking
) {
1140 if (wait_event_interruptible(trgt_sock
->target
.sock
.wait
,
1141 cor_readytoread(trgt_sock
)) == 0)
1143 copied
= -ERESTARTSYS
;
1146 kref_put(&(src_sock
->ref
), free_conn
);
1147 kref_put(&(trgt_sock
->ref
), free_conn
);
1152 const struct proto_ops cor_proto_ops
= {
1154 .owner
= THIS_MODULE
,
1155 .release
= cor_socket_release
,
1156 .bind
= cor_socket_bind
,
1157 .connect
= cor_socket_connect
,
1158 .accept
= cor_socket_accept
,
1159 .listen
= cor_socket_listen
,
1160 .shutdown
= cor_socket_shutdown
,
1162 .sendmsg
= cor_sendmsg
,
1163 .recvmsg
= cor_recvmsg
1178 int cor_createsock(struct net
*net
, struct socket
*sock
, int protocol
)
1180 struct cor_sock
*cs
;
1182 if (unlikely(protocol
!= 0))
1183 return -EPROTONOSUPPORT
;
1185 cs
= kmem_cache_alloc(sock_slab
, GFP_KERNEL
);
1186 if (unlikely(cs
== 0))
1188 memset(cs
, 0, sizeof(struct cor_sock
));
1190 cs
->type
= SOCKTYPE_UNCONNECTED
;
1191 spin_lock_init(&(cs
->lock
));
1193 sock
->state
= SS_UNCONNECTED
;
1194 sock
->ops
= &cor_proto_ops
;
1195 sock
->sk
= (struct sock
*) cs
;
1200 static struct net_proto_family cor_net_proto_family
= {
1202 .create
= cor_createsock
,
1203 .owner
= THIS_MODULE
1206 static int __init
cor_sock_init(void)
1208 sock_slab
= kmem_cache_create("cor_sock",
1209 sizeof(struct cor_sock
), 8, 0, 0);
1211 INIT_WORK(&outofsockbufferspace_work
, outofsockbufferspace
);
1212 outofsockbufferspace_scheduled
= 0;
1214 sock_register(&cor_net_proto_family
);
1215 sock_bufferusage
= 0;
1219 module_init(cor_sock_init
);
1221 MODULE_LICENSE("GPL");