2 #include <linux/errno.h>
3 #include <linux/errqueue.h>
4 #include <linux/file.h>
6 #include <linux/kernel.h>
7 #include <linux/module.h>
9 #include <linux/netdevice.h>
10 #include <linux/poll.h>
11 #include <linux/rculist.h>
12 #include <linux/skbuff.h>
13 #include <linux/socket.h>
14 #include <linux/uaccess.h>
15 #include <linux/workqueue.h>
17 #include <net/netns/generic.h>
20 #include <uapi/linux/kcm.h>
22 unsigned int kcm_net_id
;
24 static struct kmem_cache
*kcm_psockp __read_mostly
;
25 static struct kmem_cache
*kcm_muxp __read_mostly
;
26 static struct workqueue_struct
*kcm_wq
;
28 static inline struct kcm_sock
*kcm_sk(const struct sock
*sk
)
30 return (struct kcm_sock
*)sk
;
33 static inline struct kcm_tx_msg
*kcm_tx_msg(struct sk_buff
*skb
)
35 return (struct kcm_tx_msg
*)skb
->cb
;
38 static inline struct kcm_rx_msg
*kcm_rx_msg(struct sk_buff
*skb
)
40 return (struct kcm_rx_msg
*)((void *)skb
->cb
+
41 offsetof(struct qdisc_skb_cb
, data
));
44 static void report_csk_error(struct sock
*csk
, int err
)
47 csk
->sk_error_report(csk
);
50 /* Callback lock held */
51 static void kcm_abort_rx_psock(struct kcm_psock
*psock
, int err
,
54 struct sock
*csk
= psock
->sk
;
56 /* Unrecoverable error in receive */
58 del_timer(&psock
->rx_msg_timer
);
60 if (psock
->rx_stopped
)
63 psock
->rx_stopped
= 1;
64 KCM_STATS_INCR(psock
->stats
.rx_aborts
);
66 /* Report an error on the lower socket */
67 report_csk_error(csk
, err
);
70 static void kcm_abort_tx_psock(struct kcm_psock
*psock
, int err
,
73 struct sock
*csk
= psock
->sk
;
74 struct kcm_mux
*mux
= psock
->mux
;
76 /* Unrecoverable error in transmit */
78 spin_lock_bh(&mux
->lock
);
80 if (psock
->tx_stopped
) {
81 spin_unlock_bh(&mux
->lock
);
85 psock
->tx_stopped
= 1;
86 KCM_STATS_INCR(psock
->stats
.tx_aborts
);
89 /* Take off psocks_avail list */
90 list_del(&psock
->psock_avail_list
);
91 } else if (wakeup_kcm
) {
92 /* In this case psock is being aborted while outside of
93 * write_msgs and psock is reserved. Schedule tx_work
94 * to handle the failure there. Need to commit tx_stopped
95 * before queuing work.
99 queue_work(kcm_wq
, &psock
->tx_kcm
->tx_work
);
102 spin_unlock_bh(&mux
->lock
);
104 /* Report error on lower socket */
105 report_csk_error(csk
, err
);
108 /* RX mux lock held. */
109 static void kcm_update_rx_mux_stats(struct kcm_mux
*mux
,
110 struct kcm_psock
*psock
)
112 KCM_STATS_ADD(mux
->stats
.rx_bytes
,
113 psock
->stats
.rx_bytes
- psock
->saved_rx_bytes
);
114 mux
->stats
.rx_msgs
+=
115 psock
->stats
.rx_msgs
- psock
->saved_rx_msgs
;
116 psock
->saved_rx_msgs
= psock
->stats
.rx_msgs
;
117 psock
->saved_rx_bytes
= psock
->stats
.rx_bytes
;
120 static void kcm_update_tx_mux_stats(struct kcm_mux
*mux
,
121 struct kcm_psock
*psock
)
123 KCM_STATS_ADD(mux
->stats
.tx_bytes
,
124 psock
->stats
.tx_bytes
- psock
->saved_tx_bytes
);
125 mux
->stats
.tx_msgs
+=
126 psock
->stats
.tx_msgs
- psock
->saved_tx_msgs
;
127 psock
->saved_tx_msgs
= psock
->stats
.tx_msgs
;
128 psock
->saved_tx_bytes
= psock
->stats
.tx_bytes
;
131 static int kcm_queue_rcv_skb(struct sock
*sk
, struct sk_buff
*skb
);
133 /* KCM is ready to receive messages on its queue-- either the KCM is new or
134 * has become unblocked after being blocked on full socket buffer. Queue any
135 * pending ready messages on a psock. RX mux lock held.
137 static void kcm_rcv_ready(struct kcm_sock
*kcm
)
139 struct kcm_mux
*mux
= kcm
->mux
;
140 struct kcm_psock
*psock
;
143 if (unlikely(kcm
->rx_wait
|| kcm
->rx_psock
|| kcm
->rx_disabled
))
146 while (unlikely((skb
= __skb_dequeue(&mux
->rx_hold_queue
)))) {
147 if (kcm_queue_rcv_skb(&kcm
->sk
, skb
)) {
148 /* Assuming buffer limit has been reached */
149 skb_queue_head(&mux
->rx_hold_queue
, skb
);
150 WARN_ON(!sk_rmem_alloc_get(&kcm
->sk
));
155 while (!list_empty(&mux
->psocks_ready
)) {
156 psock
= list_first_entry(&mux
->psocks_ready
, struct kcm_psock
,
159 if (kcm_queue_rcv_skb(&kcm
->sk
, psock
->ready_rx_msg
)) {
160 /* Assuming buffer limit has been reached */
161 WARN_ON(!sk_rmem_alloc_get(&kcm
->sk
));
165 /* Consumed the ready message on the psock. Schedule rx_work to
168 list_del(&psock
->psock_ready_list
);
169 psock
->ready_rx_msg
= NULL
;
171 /* Commit clearing of ready_rx_msg for queuing work */
174 queue_work(kcm_wq
, &psock
->rx_work
);
177 /* Buffer limit is okay now, add to ready list */
178 list_add_tail(&kcm
->wait_rx_list
,
179 &kcm
->mux
->kcm_rx_waiters
);
183 static void kcm_rfree(struct sk_buff
*skb
)
185 struct sock
*sk
= skb
->sk
;
186 struct kcm_sock
*kcm
= kcm_sk(sk
);
187 struct kcm_mux
*mux
= kcm
->mux
;
188 unsigned int len
= skb
->truesize
;
190 sk_mem_uncharge(sk
, len
);
191 atomic_sub(len
, &sk
->sk_rmem_alloc
);
193 /* For reading rx_wait and rx_psock without holding lock */
194 smp_mb__after_atomic();
196 if (!kcm
->rx_wait
&& !kcm
->rx_psock
&&
197 sk_rmem_alloc_get(sk
) < sk
->sk_rcvlowat
) {
198 spin_lock_bh(&mux
->rx_lock
);
200 spin_unlock_bh(&mux
->rx_lock
);
204 static int kcm_queue_rcv_skb(struct sock
*sk
, struct sk_buff
*skb
)
206 struct sk_buff_head
*list
= &sk
->sk_receive_queue
;
208 if (atomic_read(&sk
->sk_rmem_alloc
) >= sk
->sk_rcvbuf
)
211 if (!sk_rmem_schedule(sk
, skb
, skb
->truesize
))
218 skb
->destructor
= kcm_rfree
;
219 atomic_add(skb
->truesize
, &sk
->sk_rmem_alloc
);
220 sk_mem_charge(sk
, skb
->truesize
);
222 skb_queue_tail(list
, skb
);
224 if (!sock_flag(sk
, SOCK_DEAD
))
225 sk
->sk_data_ready(sk
);
230 /* Requeue received messages for a kcm socket to other kcm sockets. This is
231 * called with a kcm socket is receive disabled.
234 static void requeue_rx_msgs(struct kcm_mux
*mux
, struct sk_buff_head
*head
)
237 struct kcm_sock
*kcm
;
239 while ((skb
= __skb_dequeue(head
))) {
240 /* Reset destructor to avoid calling kcm_rcv_ready */
241 skb
->destructor
= sock_rfree
;
244 if (list_empty(&mux
->kcm_rx_waiters
)) {
245 skb_queue_tail(&mux
->rx_hold_queue
, skb
);
249 kcm
= list_first_entry(&mux
->kcm_rx_waiters
,
250 struct kcm_sock
, wait_rx_list
);
252 if (kcm_queue_rcv_skb(&kcm
->sk
, skb
)) {
253 /* Should mean socket buffer full */
254 list_del(&kcm
->wait_rx_list
);
255 kcm
->rx_wait
= false;
257 /* Commit rx_wait to read in kcm_free */
265 /* Lower sock lock held */
266 static struct kcm_sock
*reserve_rx_kcm(struct kcm_psock
*psock
,
267 struct sk_buff
*head
)
269 struct kcm_mux
*mux
= psock
->mux
;
270 struct kcm_sock
*kcm
;
272 WARN_ON(psock
->ready_rx_msg
);
275 return psock
->rx_kcm
;
277 spin_lock_bh(&mux
->rx_lock
);
280 spin_unlock_bh(&mux
->rx_lock
);
281 return psock
->rx_kcm
;
284 kcm_update_rx_mux_stats(mux
, psock
);
286 if (list_empty(&mux
->kcm_rx_waiters
)) {
287 psock
->ready_rx_msg
= head
;
288 list_add_tail(&psock
->psock_ready_list
,
290 spin_unlock_bh(&mux
->rx_lock
);
294 kcm
= list_first_entry(&mux
->kcm_rx_waiters
,
295 struct kcm_sock
, wait_rx_list
);
296 list_del(&kcm
->wait_rx_list
);
297 kcm
->rx_wait
= false;
300 kcm
->rx_psock
= psock
;
302 spin_unlock_bh(&mux
->rx_lock
);
307 static void kcm_done(struct kcm_sock
*kcm
);
309 static void kcm_done_work(struct work_struct
*w
)
311 kcm_done(container_of(w
, struct kcm_sock
, done_work
));
314 /* Lower sock held */
315 static void unreserve_rx_kcm(struct kcm_psock
*psock
,
318 struct kcm_sock
*kcm
= psock
->rx_kcm
;
319 struct kcm_mux
*mux
= psock
->mux
;
324 spin_lock_bh(&mux
->rx_lock
);
326 psock
->rx_kcm
= NULL
;
327 kcm
->rx_psock
= NULL
;
329 /* Commit kcm->rx_psock before sk_rmem_alloc_get to sync with
334 if (unlikely(kcm
->done
)) {
335 spin_unlock_bh(&mux
->rx_lock
);
337 /* Need to run kcm_done in a task since we need to qcquire
338 * callback locks which may already be held here.
340 INIT_WORK(&kcm
->done_work
, kcm_done_work
);
341 schedule_work(&kcm
->done_work
);
345 if (unlikely(kcm
->rx_disabled
)) {
346 requeue_rx_msgs(mux
, &kcm
->sk
.sk_receive_queue
);
347 } else if (rcv_ready
|| unlikely(!sk_rmem_alloc_get(&kcm
->sk
))) {
348 /* Check for degenerative race with rx_wait that all
349 * data was dequeued (accounted for in kcm_rfree).
353 spin_unlock_bh(&mux
->rx_lock
);
356 static void kcm_start_rx_timer(struct kcm_psock
*psock
)
358 if (psock
->sk
->sk_rcvtimeo
)
359 mod_timer(&psock
->rx_msg_timer
, psock
->sk
->sk_rcvtimeo
);
362 /* Macro to invoke filter function. */
363 #define KCM_RUN_FILTER(prog, ctx) \
364 (*prog->bpf_func)(ctx, prog->insnsi)
366 /* Lower socket lock held */
367 static int kcm_tcp_recv(read_descriptor_t
*desc
, struct sk_buff
*orig_skb
,
368 unsigned int orig_offset
, size_t orig_len
)
370 struct kcm_psock
*psock
= (struct kcm_psock
*)desc
->arg
.data
;
371 struct kcm_rx_msg
*rxm
;
372 struct kcm_sock
*kcm
;
373 struct sk_buff
*head
, *skb
;
374 size_t eaten
= 0, cand_len
;
377 bool cloned_orig
= false;
379 if (psock
->ready_rx_msg
)
382 head
= psock
->rx_skb_head
;
384 /* Message already in progress */
386 rxm
= kcm_rx_msg(head
);
387 if (unlikely(rxm
->early_eaten
)) {
388 /* Already some number of bytes on the receive sock
389 * data saved in rx_skb_head, just indicate they
392 eaten
= orig_len
<= rxm
->early_eaten
?
393 orig_len
: rxm
->early_eaten
;
394 rxm
->early_eaten
-= eaten
;
399 if (unlikely(orig_offset
)) {
400 /* Getting data with a non-zero offset when a message is
401 * in progress is not expected. If it does happen, we
402 * need to clone and pull since we can't deal with
403 * offsets in the skbs for a message expect in the head.
405 orig_skb
= skb_clone(orig_skb
, GFP_ATOMIC
);
407 KCM_STATS_INCR(psock
->stats
.rx_mem_fail
);
408 desc
->error
= -ENOMEM
;
411 if (!pskb_pull(orig_skb
, orig_offset
)) {
412 KCM_STATS_INCR(psock
->stats
.rx_mem_fail
);
414 desc
->error
= -ENOMEM
;
421 if (!psock
->rx_skb_nextp
) {
422 /* We are going to append to the frags_list of head.
423 * Need to unshare the frag_list.
425 err
= skb_unclone(head
, GFP_ATOMIC
);
427 KCM_STATS_INCR(psock
->stats
.rx_mem_fail
);
432 if (unlikely(skb_shinfo(head
)->frag_list
)) {
433 /* We can't append to an sk_buff that already
434 * has a frag_list. We create a new head, point
435 * the frag_list of that to the old head, and
436 * then are able to use the old head->next for
437 * appending to the message.
439 if (WARN_ON(head
->next
)) {
440 desc
->error
= -EINVAL
;
444 skb
= alloc_skb(0, GFP_ATOMIC
);
446 KCM_STATS_INCR(psock
->stats
.rx_mem_fail
);
447 desc
->error
= -ENOMEM
;
450 skb
->len
= head
->len
;
451 skb
->data_len
= head
->len
;
452 skb
->truesize
= head
->truesize
;
453 *kcm_rx_msg(skb
) = *kcm_rx_msg(head
);
454 psock
->rx_skb_nextp
= &head
->next
;
455 skb_shinfo(skb
)->frag_list
= head
;
456 psock
->rx_skb_head
= skb
;
459 psock
->rx_skb_nextp
=
460 &skb_shinfo(head
)->frag_list
;
465 while (eaten
< orig_len
) {
466 /* Always clone since we will consume something */
467 skb
= skb_clone(orig_skb
, GFP_ATOMIC
);
469 KCM_STATS_INCR(psock
->stats
.rx_mem_fail
);
470 desc
->error
= -ENOMEM
;
474 cand_len
= orig_len
- eaten
;
476 head
= psock
->rx_skb_head
;
479 psock
->rx_skb_head
= head
;
480 /* Will set rx_skb_nextp on next packet if needed */
481 psock
->rx_skb_nextp
= NULL
;
482 rxm
= kcm_rx_msg(head
);
483 memset(rxm
, 0, sizeof(*rxm
));
484 rxm
->offset
= orig_offset
+ eaten
;
486 /* Unclone since we may be appending to an skb that we
487 * already share a frag_list with.
489 err
= skb_unclone(skb
, GFP_ATOMIC
);
491 KCM_STATS_INCR(psock
->stats
.rx_mem_fail
);
496 rxm
= kcm_rx_msg(head
);
497 *psock
->rx_skb_nextp
= skb
;
498 psock
->rx_skb_nextp
= &skb
->next
;
499 head
->data_len
+= skb
->len
;
500 head
->len
+= skb
->len
;
501 head
->truesize
+= skb
->truesize
;
504 if (!rxm
->full_len
) {
507 len
= KCM_RUN_FILTER(psock
->bpf_prog
, head
);
510 /* Need more header to determine length */
511 if (!rxm
->accum_len
) {
512 /* Start RX timer for new message */
513 kcm_start_rx_timer(psock
);
515 rxm
->accum_len
+= cand_len
;
517 KCM_STATS_INCR(psock
->stats
.rx_need_more_hdr
);
518 WARN_ON(eaten
!= orig_len
);
520 } else if (len
> psock
->sk
->sk_rcvbuf
) {
521 /* Message length exceeds maximum allowed */
522 KCM_STATS_INCR(psock
->stats
.rx_msg_too_big
);
523 desc
->error
= -EMSGSIZE
;
524 psock
->rx_skb_head
= NULL
;
525 kcm_abort_rx_psock(psock
, EMSGSIZE
, head
);
527 } else if (len
<= (ssize_t
)head
->len
-
528 skb
->len
- rxm
->offset
) {
529 /* Length must be into new skb (and also
532 KCM_STATS_INCR(psock
->stats
.rx_bad_hdr_len
);
533 desc
->error
= -EPROTO
;
534 psock
->rx_skb_head
= NULL
;
535 kcm_abort_rx_psock(psock
, EPROTO
, head
);
542 extra
= (ssize_t
)(rxm
->accum_len
+ cand_len
) - rxm
->full_len
;
545 /* Message not complete yet. */
546 if (rxm
->full_len
- rxm
->accum_len
>
547 tcp_inq(psock
->sk
)) {
548 /* Don't have the whole messages in the socket
549 * buffer. Set psock->rx_need_bytes to wait for
550 * the rest of the message. Also, set "early
551 * eaten" since we've already buffered the skb
552 * but don't consume yet per tcp_read_sock.
555 if (!rxm
->accum_len
) {
556 /* Start RX timer for new message */
557 kcm_start_rx_timer(psock
);
560 psock
->rx_need_bytes
= rxm
->full_len
-
562 rxm
->accum_len
+= cand_len
;
563 rxm
->early_eaten
= cand_len
;
564 KCM_STATS_ADD(psock
->stats
.rx_bytes
, cand_len
);
565 desc
->count
= 0; /* Stop reading socket */
568 rxm
->accum_len
+= cand_len
;
570 WARN_ON(eaten
!= orig_len
);
574 /* Positive extra indicates ore bytes than needed for the
578 WARN_ON(extra
> cand_len
);
580 eaten
+= (cand_len
- extra
);
582 /* Hurray, we have a new message! */
583 del_timer(&psock
->rx_msg_timer
);
584 psock
->rx_skb_head
= NULL
;
585 KCM_STATS_INCR(psock
->stats
.rx_msgs
);
588 kcm
= reserve_rx_kcm(psock
, head
);
590 /* Unable to reserve a KCM, message is held in psock. */
594 if (kcm_queue_rcv_skb(&kcm
->sk
, head
)) {
595 /* Should mean socket buffer full */
596 unreserve_rx_kcm(psock
, false);
604 KCM_STATS_ADD(psock
->stats
.rx_bytes
, eaten
);
609 /* Called with lock held on lower socket */
610 static int psock_tcp_read_sock(struct kcm_psock
*psock
)
612 read_descriptor_t desc
;
614 desc
.arg
.data
= psock
;
616 desc
.count
= 1; /* give more than one skb per call */
618 /* sk should be locked here, so okay to do tcp_read_sock */
619 tcp_read_sock(psock
->sk
, &desc
, kcm_tcp_recv
);
621 unreserve_rx_kcm(psock
, true);
626 /* Lower sock lock held */
627 static void psock_tcp_data_ready(struct sock
*sk
)
629 struct kcm_psock
*psock
;
631 read_lock_bh(&sk
->sk_callback_lock
);
633 psock
= (struct kcm_psock
*)sk
->sk_user_data
;
634 if (unlikely(!psock
|| psock
->rx_stopped
))
637 if (psock
->ready_rx_msg
)
640 if (psock
->rx_need_bytes
) {
641 if (tcp_inq(sk
) >= psock
->rx_need_bytes
)
642 psock
->rx_need_bytes
= 0;
647 if (psock_tcp_read_sock(psock
) == -ENOMEM
)
648 queue_delayed_work(kcm_wq
, &psock
->rx_delayed_work
, 0);
651 read_unlock_bh(&sk
->sk_callback_lock
);
654 static void do_psock_rx_work(struct kcm_psock
*psock
)
656 read_descriptor_t rd_desc
;
657 struct sock
*csk
= psock
->sk
;
659 /* We need the read lock to synchronize with psock_tcp_data_ready. We
660 * need the socket lock for calling tcp_read_sock.
663 read_lock_bh(&csk
->sk_callback_lock
);
665 if (unlikely(csk
->sk_user_data
!= psock
))
668 if (unlikely(psock
->rx_stopped
))
671 if (psock
->ready_rx_msg
)
674 rd_desc
.arg
.data
= psock
;
676 if (psock_tcp_read_sock(psock
) == -ENOMEM
)
677 queue_delayed_work(kcm_wq
, &psock
->rx_delayed_work
, 0);
680 read_unlock_bh(&csk
->sk_callback_lock
);
684 static void psock_rx_work(struct work_struct
*w
)
686 do_psock_rx_work(container_of(w
, struct kcm_psock
, rx_work
));
689 static void psock_rx_delayed_work(struct work_struct
*w
)
691 do_psock_rx_work(container_of(w
, struct kcm_psock
,
692 rx_delayed_work
.work
));
695 static void psock_tcp_state_change(struct sock
*sk
)
697 /* TCP only does a POLLIN for a half close. Do a POLLHUP here
698 * since application will normally not poll with POLLIN
699 * on the TCP sockets.
702 report_csk_error(sk
, EPIPE
);
705 static void psock_tcp_write_space(struct sock
*sk
)
707 struct kcm_psock
*psock
;
709 struct kcm_sock
*kcm
;
711 read_lock_bh(&sk
->sk_callback_lock
);
713 psock
= (struct kcm_psock
*)sk
->sk_user_data
;
714 if (unlikely(!psock
))
719 spin_lock_bh(&mux
->lock
);
721 /* Check if the socket is reserved so someone is waiting for sending. */
724 queue_work(kcm_wq
, &kcm
->tx_work
);
726 spin_unlock_bh(&mux
->lock
);
728 read_unlock_bh(&sk
->sk_callback_lock
);
731 static void unreserve_psock(struct kcm_sock
*kcm
);
733 /* kcm sock is locked. */
734 static struct kcm_psock
*reserve_psock(struct kcm_sock
*kcm
)
736 struct kcm_mux
*mux
= kcm
->mux
;
737 struct kcm_psock
*psock
;
739 psock
= kcm
->tx_psock
;
741 smp_rmb(); /* Must read tx_psock before tx_wait */
744 WARN_ON(kcm
->tx_wait
);
745 if (unlikely(psock
->tx_stopped
))
746 unreserve_psock(kcm
);
748 return kcm
->tx_psock
;
751 spin_lock_bh(&mux
->lock
);
753 /* Check again under lock to see if psock was reserved for this
754 * psock via psock_unreserve.
756 psock
= kcm
->tx_psock
;
757 if (unlikely(psock
)) {
758 WARN_ON(kcm
->tx_wait
);
759 spin_unlock_bh(&mux
->lock
);
760 return kcm
->tx_psock
;
763 if (!list_empty(&mux
->psocks_avail
)) {
764 psock
= list_first_entry(&mux
->psocks_avail
,
767 list_del(&psock
->psock_avail_list
);
769 list_del(&kcm
->wait_psock_list
);
770 kcm
->tx_wait
= false;
772 kcm
->tx_psock
= psock
;
774 KCM_STATS_INCR(psock
->stats
.reserved
);
775 } else if (!kcm
->tx_wait
) {
776 list_add_tail(&kcm
->wait_psock_list
,
777 &mux
->kcm_tx_waiters
);
781 spin_unlock_bh(&mux
->lock
);
787 static void psock_now_avail(struct kcm_psock
*psock
)
789 struct kcm_mux
*mux
= psock
->mux
;
790 struct kcm_sock
*kcm
;
792 if (list_empty(&mux
->kcm_tx_waiters
)) {
793 list_add_tail(&psock
->psock_avail_list
,
796 kcm
= list_first_entry(&mux
->kcm_tx_waiters
,
799 list_del(&kcm
->wait_psock_list
);
800 kcm
->tx_wait
= false;
803 /* Commit before changing tx_psock since that is read in
804 * reserve_psock before queuing work.
808 kcm
->tx_psock
= psock
;
809 KCM_STATS_INCR(psock
->stats
.reserved
);
810 queue_work(kcm_wq
, &kcm
->tx_work
);
814 /* kcm sock is locked. */
815 static void unreserve_psock(struct kcm_sock
*kcm
)
817 struct kcm_psock
*psock
;
818 struct kcm_mux
*mux
= kcm
->mux
;
820 spin_lock_bh(&mux
->lock
);
822 psock
= kcm
->tx_psock
;
824 if (WARN_ON(!psock
)) {
825 spin_unlock_bh(&mux
->lock
);
829 smp_rmb(); /* Read tx_psock before tx_wait */
831 kcm_update_tx_mux_stats(mux
, psock
);
833 WARN_ON(kcm
->tx_wait
);
835 kcm
->tx_psock
= NULL
;
836 psock
->tx_kcm
= NULL
;
837 KCM_STATS_INCR(psock
->stats
.unreserved
);
839 if (unlikely(psock
->tx_stopped
)) {
842 list_del(&psock
->psock_list
);
845 fput(psock
->sk
->sk_socket
->file
);
846 kmem_cache_free(kcm_psockp
, psock
);
849 /* Don't put back on available list */
851 spin_unlock_bh(&mux
->lock
);
856 psock_now_avail(psock
);
858 spin_unlock_bh(&mux
->lock
);
861 static void kcm_report_tx_retry(struct kcm_sock
*kcm
)
863 struct kcm_mux
*mux
= kcm
->mux
;
865 spin_lock_bh(&mux
->lock
);
866 KCM_STATS_INCR(mux
->stats
.tx_retries
);
867 spin_unlock_bh(&mux
->lock
);
870 /* Write any messages ready on the kcm socket. Called with kcm sock lock
871 * held. Return bytes actually sent or error.
873 static int kcm_write_msgs(struct kcm_sock
*kcm
)
875 struct sock
*sk
= &kcm
->sk
;
876 struct kcm_psock
*psock
;
877 struct sk_buff
*skb
, *head
;
878 struct kcm_tx_msg
*txm
;
879 unsigned short fragidx
, frag_offset
;
880 unsigned int sent
, total_sent
= 0;
883 kcm
->tx_wait_more
= false;
884 psock
= kcm
->tx_psock
;
885 if (unlikely(psock
&& psock
->tx_stopped
)) {
886 /* A reserved psock was aborted asynchronously. Unreserve
887 * it and we'll retry the message.
889 unreserve_psock(kcm
);
890 kcm_report_tx_retry(kcm
);
891 if (skb_queue_empty(&sk
->sk_write_queue
))
894 kcm_tx_msg(skb_peek(&sk
->sk_write_queue
))->sent
= 0;
896 } else if (skb_queue_empty(&sk
->sk_write_queue
)) {
900 head
= skb_peek(&sk
->sk_write_queue
);
901 txm
= kcm_tx_msg(head
);
904 /* Send of first skbuff in queue already in progress */
905 if (WARN_ON(!psock
)) {
910 frag_offset
= txm
->frag_offset
;
911 fragidx
= txm
->fragidx
;
918 psock
= reserve_psock(kcm
);
924 txm
= kcm_tx_msg(head
);
928 if (WARN_ON(!skb_shinfo(skb
)->nr_frags
)) {
933 for (fragidx
= 0; fragidx
< skb_shinfo(skb
)->nr_frags
;
939 frag
= &skb_shinfo(skb
)->frags
[fragidx
];
940 if (WARN_ON(!frag
->size
)) {
945 ret
= kernel_sendpage(psock
->sk
->sk_socket
,
947 frag
->page_offset
+ frag_offset
,
948 frag
->size
- frag_offset
,
951 if (ret
== -EAGAIN
) {
952 /* Save state to try again when there's
953 * write space on the socket
956 txm
->frag_offset
= frag_offset
;
957 txm
->fragidx
= fragidx
;
964 /* Hard failure in sending message, abort this
965 * psock since it has lost framing
966 * synchonization and retry sending the
967 * message from the beginning.
969 kcm_abort_tx_psock(psock
, ret
? -ret
: EPIPE
,
971 unreserve_psock(kcm
);
974 kcm_report_tx_retry(kcm
);
982 KCM_STATS_ADD(psock
->stats
.tx_bytes
, ret
);
983 if (frag_offset
< frag
->size
) {
984 /* Not finished with this frag */
990 if (skb_has_frag_list(skb
)) {
991 skb
= skb_shinfo(skb
)->frag_list
;
994 } else if (skb
->next
) {
999 /* Successfully sent the whole packet, account for it. */
1000 skb_dequeue(&sk
->sk_write_queue
);
1002 sk
->sk_wmem_queued
-= sent
;
1004 KCM_STATS_INCR(psock
->stats
.tx_msgs
);
1005 } while ((head
= skb_peek(&sk
->sk_write_queue
)));
1008 /* Done with all queued messages. */
1009 WARN_ON(!skb_queue_empty(&sk
->sk_write_queue
));
1010 unreserve_psock(kcm
);
1013 /* Check if write space is available */
1014 sk
->sk_write_space(sk
);
1016 return total_sent
? : ret
;
1019 static void kcm_tx_work(struct work_struct
*w
)
1021 struct kcm_sock
*kcm
= container_of(w
, struct kcm_sock
, tx_work
);
1022 struct sock
*sk
= &kcm
->sk
;
1027 /* Primarily for SOCK_DGRAM sockets, also handle asynchronous tx
1030 err
= kcm_write_msgs(kcm
);
1032 /* Hard failure in write, report error on KCM socket */
1033 pr_warn("KCM: Hard failure on kcm_write_msgs %d\n", err
);
1034 report_csk_error(&kcm
->sk
, -err
);
1038 /* Primarily for SOCK_SEQPACKET sockets */
1039 if (likely(sk
->sk_socket
) &&
1040 test_bit(SOCK_NOSPACE
, &sk
->sk_socket
->flags
)) {
1041 clear_bit(SOCK_NOSPACE
, &sk
->sk_socket
->flags
);
1042 sk
->sk_write_space(sk
);
1049 static void kcm_push(struct kcm_sock
*kcm
)
1051 if (kcm
->tx_wait_more
)
1052 kcm_write_msgs(kcm
);
1055 static ssize_t
kcm_sendpage(struct socket
*sock
, struct page
*page
,
1056 int offset
, size_t size
, int flags
)
1059 struct sock
*sk
= sock
->sk
;
1060 struct kcm_sock
*kcm
= kcm_sk(sk
);
1061 struct sk_buff
*skb
= NULL
, *head
= NULL
;
1062 long timeo
= sock_sndtimeo(sk
, flags
& MSG_DONTWAIT
);
1067 if (flags
& MSG_SENDPAGE_NOTLAST
)
1070 /* No MSG_EOR from splice, only look at MSG_MORE */
1071 eor
= !(flags
& MSG_MORE
);
1075 sk_clear_bit(SOCKWQ_ASYNC_NOSPACE
, sk
);
1082 /* Previously opened message */
1083 head
= kcm
->seq_skb
;
1084 skb
= kcm_tx_msg(head
)->last_skb
;
1085 i
= skb_shinfo(skb
)->nr_frags
;
1087 if (skb_can_coalesce(skb
, i
, page
, offset
)) {
1088 skb_frag_size_add(&skb_shinfo(skb
)->frags
[i
- 1], size
);
1089 skb_shinfo(skb
)->tx_flags
|= SKBTX_SHARED_FRAG
;
1093 if (i
>= MAX_SKB_FRAGS
) {
1094 struct sk_buff
*tskb
;
1096 tskb
= alloc_skb(0, sk
->sk_allocation
);
1099 err
= sk_stream_wait_memory(sk
, &timeo
);
1105 skb_shinfo(head
)->frag_list
= tskb
;
1110 skb
->ip_summed
= CHECKSUM_UNNECESSARY
;
1114 /* Call the sk_stream functions to manage the sndbuf mem. */
1115 if (!sk_stream_memory_free(sk
)) {
1117 set_bit(SOCK_NOSPACE
, &sk
->sk_socket
->flags
);
1118 err
= sk_stream_wait_memory(sk
, &timeo
);
1123 head
= alloc_skb(0, sk
->sk_allocation
);
1126 err
= sk_stream_wait_memory(sk
, &timeo
);
1136 skb_fill_page_desc(skb
, i
, page
, offset
, size
);
1137 skb_shinfo(skb
)->tx_flags
|= SKBTX_SHARED_FRAG
;
1141 skb
->data_len
+= size
;
1142 skb
->truesize
+= size
;
1143 sk
->sk_wmem_queued
+= size
;
1144 sk_mem_charge(sk
, size
);
1148 head
->data_len
+= size
;
1149 head
->truesize
+= size
;
1153 bool not_busy
= skb_queue_empty(&sk
->sk_write_queue
);
1155 /* Message complete, queue it on send buffer */
1156 __skb_queue_tail(&sk
->sk_write_queue
, head
);
1157 kcm
->seq_skb
= NULL
;
1158 KCM_STATS_INCR(kcm
->stats
.tx_msgs
);
1160 if (flags
& MSG_BATCH
) {
1161 kcm
->tx_wait_more
= true;
1162 } else if (kcm
->tx_wait_more
|| not_busy
) {
1163 err
= kcm_write_msgs(kcm
);
1165 /* We got a hard error in write_msgs but have
1166 * already queued this message. Report an error
1167 * in the socket, but don't affect return value
1170 pr_warn("KCM: Hard failure on kcm_write_msgs\n");
1171 report_csk_error(&kcm
->sk
, -err
);
1175 /* Message not complete, save state */
1176 kcm
->seq_skb
= head
;
1177 kcm_tx_msg(head
)->last_skb
= skb
;
1180 KCM_STATS_ADD(kcm
->stats
.tx_bytes
, size
);
1188 err
= sk_stream_error(sk
, flags
, err
);
1190 /* make sure we wake any epoll edge trigger waiter */
1191 if (unlikely(skb_queue_len(&sk
->sk_write_queue
) == 0 && err
== -EAGAIN
))
1192 sk
->sk_write_space(sk
);
1198 static int kcm_sendmsg(struct socket
*sock
, struct msghdr
*msg
, size_t len
)
1200 struct sock
*sk
= sock
->sk
;
1201 struct kcm_sock
*kcm
= kcm_sk(sk
);
1202 struct sk_buff
*skb
= NULL
, *head
= NULL
;
1203 size_t copy
, copied
= 0;
1204 long timeo
= sock_sndtimeo(sk
, msg
->msg_flags
& MSG_DONTWAIT
);
1205 int eor
= (sock
->type
== SOCK_DGRAM
) ?
1206 !(msg
->msg_flags
& MSG_MORE
) : !!(msg
->msg_flags
& MSG_EOR
);
1211 /* Per tcp_sendmsg this should be in poll */
1212 sk_clear_bit(SOCKWQ_ASYNC_NOSPACE
, sk
);
1218 /* Previously opened message */
1219 head
= kcm
->seq_skb
;
1220 skb
= kcm_tx_msg(head
)->last_skb
;
1224 /* Call the sk_stream functions to manage the sndbuf mem. */
1225 if (!sk_stream_memory_free(sk
)) {
1227 set_bit(SOCK_NOSPACE
, &sk
->sk_socket
->flags
);
1228 err
= sk_stream_wait_memory(sk
, &timeo
);
1233 /* New message, alloc head skb */
1234 head
= alloc_skb(0, sk
->sk_allocation
);
1237 err
= sk_stream_wait_memory(sk
, &timeo
);
1241 head
= alloc_skb(0, sk
->sk_allocation
);
1246 /* Set ip_summed to CHECKSUM_UNNECESSARY to avoid calling
1247 * csum_and_copy_from_iter from skb_do_copy_data_nocache.
1249 skb
->ip_summed
= CHECKSUM_UNNECESSARY
;
1252 while (msg_data_left(msg
)) {
1254 int i
= skb_shinfo(skb
)->nr_frags
;
1255 struct page_frag
*pfrag
= sk_page_frag(sk
);
1257 if (!sk_page_frag_refill(sk
, pfrag
))
1258 goto wait_for_memory
;
1260 if (!skb_can_coalesce(skb
, i
, pfrag
->page
,
1262 if (i
== MAX_SKB_FRAGS
) {
1263 struct sk_buff
*tskb
;
1265 tskb
= alloc_skb(0, sk
->sk_allocation
);
1267 goto wait_for_memory
;
1270 skb_shinfo(head
)->frag_list
= tskb
;
1275 skb
->ip_summed
= CHECKSUM_UNNECESSARY
;
1281 copy
= min_t(int, msg_data_left(msg
),
1282 pfrag
->size
- pfrag
->offset
);
1284 if (!sk_wmem_schedule(sk
, copy
))
1285 goto wait_for_memory
;
1287 err
= skb_copy_to_page_nocache(sk
, &msg
->msg_iter
, skb
,
1294 /* Update the skb. */
1296 skb_frag_size_add(&skb_shinfo(skb
)->frags
[i
- 1], copy
);
1298 skb_fill_page_desc(skb
, i
, pfrag
->page
,
1299 pfrag
->offset
, copy
);
1300 get_page(pfrag
->page
);
1303 pfrag
->offset
+= copy
;
1307 head
->data_len
+= copy
;
1314 err
= sk_stream_wait_memory(sk
, &timeo
);
1320 bool not_busy
= skb_queue_empty(&sk
->sk_write_queue
);
1322 /* Message complete, queue it on send buffer */
1323 __skb_queue_tail(&sk
->sk_write_queue
, head
);
1324 kcm
->seq_skb
= NULL
;
1325 KCM_STATS_INCR(kcm
->stats
.tx_msgs
);
1327 if (msg
->msg_flags
& MSG_BATCH
) {
1328 kcm
->tx_wait_more
= true;
1329 } else if (kcm
->tx_wait_more
|| not_busy
) {
1330 err
= kcm_write_msgs(kcm
);
1332 /* We got a hard error in write_msgs but have
1333 * already queued this message. Report an error
1334 * in the socket, but don't affect return value
1337 pr_warn("KCM: Hard failure on kcm_write_msgs\n");
1338 report_csk_error(&kcm
->sk
, -err
);
1342 /* Message not complete, save state */
1344 kcm
->seq_skb
= head
;
1345 kcm_tx_msg(head
)->last_skb
= skb
;
1348 KCM_STATS_ADD(kcm
->stats
.tx_bytes
, copied
);
1356 if (copied
&& sock
->type
== SOCK_SEQPACKET
) {
1357 /* Wrote some bytes before encountering an
1358 * error, return partial success.
1360 goto partial_message
;
1363 if (head
!= kcm
->seq_skb
)
1366 err
= sk_stream_error(sk
, msg
->msg_flags
, err
);
1368 /* make sure we wake any epoll edge trigger waiter */
1369 if (unlikely(skb_queue_len(&sk
->sk_write_queue
) == 0 && err
== -EAGAIN
))
1370 sk
->sk_write_space(sk
);
1376 static struct sk_buff
*kcm_wait_data(struct sock
*sk
, int flags
,
1377 long timeo
, int *err
)
1379 struct sk_buff
*skb
;
1381 while (!(skb
= skb_peek(&sk
->sk_receive_queue
))) {
1383 *err
= sock_error(sk
);
1387 if (sock_flag(sk
, SOCK_DONE
))
1390 if ((flags
& MSG_DONTWAIT
) || !timeo
) {
1395 sk_wait_data(sk
, &timeo
, NULL
);
1397 /* Handle signals */
1398 if (signal_pending(current
)) {
1399 *err
= sock_intr_errno(timeo
);
1407 static int kcm_recvmsg(struct socket
*sock
, struct msghdr
*msg
,
1408 size_t len
, int flags
)
1410 struct sock
*sk
= sock
->sk
;
1411 struct kcm_sock
*kcm
= kcm_sk(sk
);
1414 struct kcm_rx_msg
*rxm
;
1416 struct sk_buff
*skb
;
1418 timeo
= sock_rcvtimeo(sk
, flags
& MSG_DONTWAIT
);
1422 skb
= kcm_wait_data(sk
, flags
, timeo
, &err
);
1426 /* Okay, have a message on the receive queue */
1428 rxm
= kcm_rx_msg(skb
);
1430 if (len
> rxm
->full_len
)
1431 len
= rxm
->full_len
;
1433 err
= skb_copy_datagram_msg(skb
, rxm
->offset
, msg
, len
);
1438 if (likely(!(flags
& MSG_PEEK
))) {
1439 KCM_STATS_ADD(kcm
->stats
.rx_bytes
, copied
);
1440 if (copied
< rxm
->full_len
) {
1441 if (sock
->type
== SOCK_DGRAM
) {
1442 /* Truncated message */
1443 msg
->msg_flags
|= MSG_TRUNC
;
1446 rxm
->offset
+= copied
;
1447 rxm
->full_len
-= copied
;
1450 /* Finished with message */
1451 msg
->msg_flags
|= MSG_EOR
;
1452 KCM_STATS_INCR(kcm
->stats
.rx_msgs
);
1453 skb_unlink(skb
, &sk
->sk_receive_queue
);
1461 return copied
? : err
;
1464 static ssize_t
kcm_sock_splice(struct sock
*sk
,
1465 struct pipe_inode_info
*pipe
,
1466 struct splice_pipe_desc
*spd
)
1471 ret
= splice_to_pipe(pipe
, spd
);
1477 static ssize_t
kcm_splice_read(struct socket
*sock
, loff_t
*ppos
,
1478 struct pipe_inode_info
*pipe
, size_t len
,
1481 struct sock
*sk
= sock
->sk
;
1482 struct kcm_sock
*kcm
= kcm_sk(sk
);
1484 struct kcm_rx_msg
*rxm
;
1487 struct sk_buff
*skb
;
1489 /* Only support splice for SOCKSEQPACKET */
1491 timeo
= sock_rcvtimeo(sk
, flags
& MSG_DONTWAIT
);
1495 skb
= kcm_wait_data(sk
, flags
, timeo
, &err
);
1499 /* Okay, have a message on the receive queue */
1501 rxm
= kcm_rx_msg(skb
);
1503 if (len
> rxm
->full_len
)
1504 len
= rxm
->full_len
;
1506 copied
= skb_splice_bits(skb
, sk
, rxm
->offset
, pipe
, len
, flags
,
1513 KCM_STATS_ADD(kcm
->stats
.rx_bytes
, copied
);
1515 rxm
->offset
+= copied
;
1516 rxm
->full_len
-= copied
;
1518 /* We have no way to return MSG_EOR. If all the bytes have been
1519 * read we still leave the message in the receive socket buffer.
1520 * A subsequent recvmsg needs to be done to return MSG_EOR and
1521 * finish reading the message.
1534 /* kcm sock lock held */
1535 static void kcm_recv_disable(struct kcm_sock
*kcm
)
1537 struct kcm_mux
*mux
= kcm
->mux
;
1539 if (kcm
->rx_disabled
)
1542 spin_lock_bh(&mux
->rx_lock
);
1544 kcm
->rx_disabled
= 1;
1546 /* If a psock is reserved we'll do cleanup in unreserve */
1547 if (!kcm
->rx_psock
) {
1549 list_del(&kcm
->wait_rx_list
);
1550 kcm
->rx_wait
= false;
1553 requeue_rx_msgs(mux
, &kcm
->sk
.sk_receive_queue
);
1556 spin_unlock_bh(&mux
->rx_lock
);
1559 /* kcm sock lock held */
1560 static void kcm_recv_enable(struct kcm_sock
*kcm
)
1562 struct kcm_mux
*mux
= kcm
->mux
;
1564 if (!kcm
->rx_disabled
)
1567 spin_lock_bh(&mux
->rx_lock
);
1569 kcm
->rx_disabled
= 0;
1572 spin_unlock_bh(&mux
->rx_lock
);
1575 static int kcm_setsockopt(struct socket
*sock
, int level
, int optname
,
1576 char __user
*optval
, unsigned int optlen
)
1578 struct kcm_sock
*kcm
= kcm_sk(sock
->sk
);
1582 if (level
!= SOL_KCM
)
1583 return -ENOPROTOOPT
;
1585 if (optlen
< sizeof(int))
1588 if (get_user(val
, (int __user
*)optval
))
1591 valbool
= val
? 1 : 0;
1594 case KCM_RECV_DISABLE
:
1595 lock_sock(&kcm
->sk
);
1597 kcm_recv_disable(kcm
);
1599 kcm_recv_enable(kcm
);
1600 release_sock(&kcm
->sk
);
1609 static int kcm_getsockopt(struct socket
*sock
, int level
, int optname
,
1610 char __user
*optval
, int __user
*optlen
)
1612 struct kcm_sock
*kcm
= kcm_sk(sock
->sk
);
1615 if (level
!= SOL_KCM
)
1616 return -ENOPROTOOPT
;
1618 if (get_user(len
, optlen
))
1621 len
= min_t(unsigned int, len
, sizeof(int));
1626 case KCM_RECV_DISABLE
:
1627 val
= kcm
->rx_disabled
;
1630 return -ENOPROTOOPT
;
1633 if (put_user(len
, optlen
))
1635 if (copy_to_user(optval
, &val
, len
))
1640 static void init_kcm_sock(struct kcm_sock
*kcm
, struct kcm_mux
*mux
)
1642 struct kcm_sock
*tkcm
;
1643 struct list_head
*head
;
1646 /* For SOCK_SEQPACKET sock type, datagram_poll checks the sk_state, so
1647 * we set sk_state, otherwise epoll_wait always returns right away with
1650 kcm
->sk
.sk_state
= TCP_ESTABLISHED
;
1652 /* Add to mux's kcm sockets list */
1654 spin_lock_bh(&mux
->lock
);
1656 head
= &mux
->kcm_socks
;
1657 list_for_each_entry(tkcm
, &mux
->kcm_socks
, kcm_sock_list
) {
1658 if (tkcm
->index
!= index
)
1660 head
= &tkcm
->kcm_sock_list
;
1664 list_add(&kcm
->kcm_sock_list
, head
);
1667 mux
->kcm_socks_cnt
++;
1668 spin_unlock_bh(&mux
->lock
);
1670 INIT_WORK(&kcm
->tx_work
, kcm_tx_work
);
1672 spin_lock_bh(&mux
->rx_lock
);
1674 spin_unlock_bh(&mux
->rx_lock
);
1677 static void kcm_rx_msg_timeout(unsigned long arg
)
1679 struct kcm_psock
*psock
= (struct kcm_psock
*)arg
;
1681 /* Message assembly timed out */
1682 KCM_STATS_INCR(psock
->stats
.rx_msg_timeouts
);
1683 kcm_abort_rx_psock(psock
, ETIMEDOUT
, NULL
);
1686 static int kcm_attach(struct socket
*sock
, struct socket
*csock
,
1687 struct bpf_prog
*prog
)
1689 struct kcm_sock
*kcm
= kcm_sk(sock
->sk
);
1690 struct kcm_mux
*mux
= kcm
->mux
;
1692 struct kcm_psock
*psock
= NULL
, *tpsock
;
1693 struct list_head
*head
;
1696 if (csock
->ops
->family
!= PF_INET
&&
1697 csock
->ops
->family
!= PF_INET6
)
1704 /* Only support TCP for now */
1705 if (csk
->sk_protocol
!= IPPROTO_TCP
)
1708 psock
= kmem_cache_zalloc(kcm_psockp
, GFP_KERNEL
);
1714 psock
->bpf_prog
= prog
;
1716 setup_timer(&psock
->rx_msg_timer
, kcm_rx_msg_timeout
,
1717 (unsigned long)psock
);
1719 INIT_WORK(&psock
->rx_work
, psock_rx_work
);
1720 INIT_DELAYED_WORK(&psock
->rx_delayed_work
, psock_rx_delayed_work
);
1724 write_lock_bh(&csk
->sk_callback_lock
);
1725 psock
->save_data_ready
= csk
->sk_data_ready
;
1726 psock
->save_write_space
= csk
->sk_write_space
;
1727 psock
->save_state_change
= csk
->sk_state_change
;
1728 csk
->sk_user_data
= psock
;
1729 csk
->sk_data_ready
= psock_tcp_data_ready
;
1730 csk
->sk_write_space
= psock_tcp_write_space
;
1731 csk
->sk_state_change
= psock_tcp_state_change
;
1732 write_unlock_bh(&csk
->sk_callback_lock
);
1734 /* Finished initialization, now add the psock to the MUX. */
1735 spin_lock_bh(&mux
->lock
);
1736 head
= &mux
->psocks
;
1737 list_for_each_entry(tpsock
, &mux
->psocks
, psock_list
) {
1738 if (tpsock
->index
!= index
)
1740 head
= &tpsock
->psock_list
;
1744 list_add(&psock
->psock_list
, head
);
1745 psock
->index
= index
;
1747 KCM_STATS_INCR(mux
->stats
.psock_attach
);
1749 psock_now_avail(psock
);
1750 spin_unlock_bh(&mux
->lock
);
1752 /* Schedule RX work in case there are already bytes queued */
1753 queue_work(kcm_wq
, &psock
->rx_work
);
1758 static int kcm_attach_ioctl(struct socket
*sock
, struct kcm_attach
*info
)
1760 struct socket
*csock
;
1761 struct bpf_prog
*prog
;
1764 csock
= sockfd_lookup(info
->fd
, &err
);
1768 prog
= bpf_prog_get(info
->bpf_fd
);
1770 err
= PTR_ERR(prog
);
1774 if (prog
->type
!= BPF_PROG_TYPE_SOCKET_FILTER
) {
1780 err
= kcm_attach(sock
, csock
, prog
);
1786 /* Keep reference on file also */
1794 static void kcm_unattach(struct kcm_psock
*psock
)
1796 struct sock
*csk
= psock
->sk
;
1797 struct kcm_mux
*mux
= psock
->mux
;
1799 /* Stop getting callbacks from TCP socket. After this there should
1800 * be no way to reserve a kcm for this psock.
1802 write_lock_bh(&csk
->sk_callback_lock
);
1803 csk
->sk_user_data
= NULL
;
1804 csk
->sk_data_ready
= psock
->save_data_ready
;
1805 csk
->sk_write_space
= psock
->save_write_space
;
1806 csk
->sk_state_change
= psock
->save_state_change
;
1807 psock
->rx_stopped
= 1;
1809 if (WARN_ON(psock
->rx_kcm
)) {
1810 write_unlock_bh(&csk
->sk_callback_lock
);
1814 spin_lock_bh(&mux
->rx_lock
);
1816 /* Stop receiver activities. After this point psock should not be
1817 * able to get onto ready list either through callbacks or work.
1819 if (psock
->ready_rx_msg
) {
1820 list_del(&psock
->psock_ready_list
);
1821 kfree_skb(psock
->ready_rx_msg
);
1822 psock
->ready_rx_msg
= NULL
;
1823 KCM_STATS_INCR(mux
->stats
.rx_ready_drops
);
1826 spin_unlock_bh(&mux
->rx_lock
);
1828 write_unlock_bh(&csk
->sk_callback_lock
);
1830 del_timer_sync(&psock
->rx_msg_timer
);
1831 cancel_work_sync(&psock
->rx_work
);
1832 cancel_delayed_work_sync(&psock
->rx_delayed_work
);
1834 bpf_prog_put(psock
->bpf_prog
);
1836 kfree_skb(psock
->rx_skb_head
);
1837 psock
->rx_skb_head
= NULL
;
1839 spin_lock_bh(&mux
->lock
);
1841 aggregate_psock_stats(&psock
->stats
, &mux
->aggregate_psock_stats
);
1843 KCM_STATS_INCR(mux
->stats
.psock_unattach
);
1845 if (psock
->tx_kcm
) {
1846 /* psock was reserved. Just mark it finished and we will clean
1847 * up in the kcm paths, we need kcm lock which can not be
1850 KCM_STATS_INCR(mux
->stats
.psock_unattach_rsvd
);
1851 spin_unlock_bh(&mux
->lock
);
1853 /* We are unattaching a socket that is reserved. Abort the
1854 * socket since we may be out of sync in sending on it. We need
1855 * to do this without the mux lock.
1857 kcm_abort_tx_psock(psock
, EPIPE
, false);
1859 spin_lock_bh(&mux
->lock
);
1860 if (!psock
->tx_kcm
) {
1861 /* psock now unreserved in window mux was unlocked */
1866 /* Commit done before queuing work to process it */
1869 /* Queue tx work to make sure psock->done is handled */
1870 queue_work(kcm_wq
, &psock
->tx_kcm
->tx_work
);
1871 spin_unlock_bh(&mux
->lock
);
1874 if (!psock
->tx_stopped
)
1875 list_del(&psock
->psock_avail_list
);
1876 list_del(&psock
->psock_list
);
1878 spin_unlock_bh(&mux
->lock
);
1881 fput(csk
->sk_socket
->file
);
1882 kmem_cache_free(kcm_psockp
, psock
);
1886 static int kcm_unattach_ioctl(struct socket
*sock
, struct kcm_unattach
*info
)
1888 struct kcm_sock
*kcm
= kcm_sk(sock
->sk
);
1889 struct kcm_mux
*mux
= kcm
->mux
;
1890 struct kcm_psock
*psock
;
1891 struct socket
*csock
;
1895 csock
= sockfd_lookup(info
->fd
, &err
);
1907 spin_lock_bh(&mux
->lock
);
1909 list_for_each_entry(psock
, &mux
->psocks
, psock_list
) {
1910 if (psock
->sk
!= csk
)
1913 /* Found the matching psock */
1915 if (psock
->unattaching
|| WARN_ON(psock
->done
)) {
1920 psock
->unattaching
= 1;
1922 spin_unlock_bh(&mux
->lock
);
1924 kcm_unattach(psock
);
1930 spin_unlock_bh(&mux
->lock
);
1937 static struct proto kcm_proto
= {
1939 .owner
= THIS_MODULE
,
1940 .obj_size
= sizeof(struct kcm_sock
),
1943 /* Clone a kcm socket. */
1944 static int kcm_clone(struct socket
*osock
, struct kcm_clone
*info
,
1945 struct socket
**newsockp
)
1947 struct socket
*newsock
;
1949 struct file
*newfile
;
1953 newsock
= sock_alloc();
1957 newsock
->type
= osock
->type
;
1958 newsock
->ops
= osock
->ops
;
1960 __module_get(newsock
->ops
->owner
);
1962 newfd
= get_unused_fd_flags(0);
1963 if (unlikely(newfd
< 0)) {
1968 newfile
= sock_alloc_file(newsock
, 0, osock
->sk
->sk_prot_creator
->name
);
1969 if (unlikely(IS_ERR(newfile
))) {
1970 err
= PTR_ERR(newfile
);
1971 goto out_sock_alloc_fail
;
1974 newsk
= sk_alloc(sock_net(osock
->sk
), PF_KCM
, GFP_KERNEL
,
1978 goto out_sk_alloc_fail
;
1981 sock_init_data(newsock
, newsk
);
1982 init_kcm_sock(kcm_sk(newsk
), kcm_sk(osock
->sk
)->mux
);
1984 fd_install(newfd
, newfile
);
1985 *newsockp
= newsock
;
1992 out_sock_alloc_fail
:
1993 put_unused_fd(newfd
);
1995 sock_release(newsock
);
2000 static int kcm_ioctl(struct socket
*sock
, unsigned int cmd
, unsigned long arg
)
2005 case SIOCKCMATTACH
: {
2006 struct kcm_attach info
;
2008 if (copy_from_user(&info
, (void __user
*)arg
, sizeof(info
)))
2011 err
= kcm_attach_ioctl(sock
, &info
);
2015 case SIOCKCMUNATTACH
: {
2016 struct kcm_unattach info
;
2018 if (copy_from_user(&info
, (void __user
*)arg
, sizeof(info
)))
2021 err
= kcm_unattach_ioctl(sock
, &info
);
2025 case SIOCKCMCLONE
: {
2026 struct kcm_clone info
;
2027 struct socket
*newsock
= NULL
;
2029 if (copy_from_user(&info
, (void __user
*)arg
, sizeof(info
)))
2032 err
= kcm_clone(sock
, &info
, &newsock
);
2035 if (copy_to_user((void __user
*)arg
, &info
,
2038 sock_release(newsock
);
2052 static void free_mux(struct rcu_head
*rcu
)
2054 struct kcm_mux
*mux
= container_of(rcu
,
2055 struct kcm_mux
, rcu
);
2057 kmem_cache_free(kcm_muxp
, mux
);
2060 static void release_mux(struct kcm_mux
*mux
)
2062 struct kcm_net
*knet
= mux
->knet
;
2063 struct kcm_psock
*psock
, *tmp_psock
;
2065 /* Release psocks */
2066 list_for_each_entry_safe(psock
, tmp_psock
,
2067 &mux
->psocks
, psock_list
) {
2068 if (!WARN_ON(psock
->unattaching
))
2069 kcm_unattach(psock
);
2072 if (WARN_ON(mux
->psocks_cnt
))
2075 __skb_queue_purge(&mux
->rx_hold_queue
);
2077 mutex_lock(&knet
->mutex
);
2078 aggregate_mux_stats(&mux
->stats
, &knet
->aggregate_mux_stats
);
2079 aggregate_psock_stats(&mux
->aggregate_psock_stats
,
2080 &knet
->aggregate_psock_stats
);
2081 list_del_rcu(&mux
->kcm_mux_list
);
2083 mutex_unlock(&knet
->mutex
);
2085 call_rcu(&mux
->rcu
, free_mux
);
2088 static void kcm_done(struct kcm_sock
*kcm
)
2090 struct kcm_mux
*mux
= kcm
->mux
;
2091 struct sock
*sk
= &kcm
->sk
;
2094 spin_lock_bh(&mux
->rx_lock
);
2095 if (kcm
->rx_psock
) {
2096 /* Cleanup in unreserve_rx_kcm */
2098 kcm
->rx_disabled
= 1;
2100 spin_unlock_bh(&mux
->rx_lock
);
2105 list_del(&kcm
->wait_rx_list
);
2106 kcm
->rx_wait
= false;
2108 /* Move any pending receive messages to other kcm sockets */
2109 requeue_rx_msgs(mux
, &sk
->sk_receive_queue
);
2111 spin_unlock_bh(&mux
->rx_lock
);
2113 if (WARN_ON(sk_rmem_alloc_get(sk
)))
2116 /* Detach from MUX */
2117 spin_lock_bh(&mux
->lock
);
2119 list_del(&kcm
->kcm_sock_list
);
2120 mux
->kcm_socks_cnt
--;
2121 socks_cnt
= mux
->kcm_socks_cnt
;
2123 spin_unlock_bh(&mux
->lock
);
2126 /* We are done with the mux now. */
2130 WARN_ON(kcm
->rx_wait
);
2135 /* Called by kcm_release to close a KCM socket.
2136 * If this is the last KCM socket on the MUX, destroy the MUX.
2138 static int kcm_release(struct socket
*sock
)
2140 struct sock
*sk
= sock
->sk
;
2141 struct kcm_sock
*kcm
;
2142 struct kcm_mux
*mux
;
2143 struct kcm_psock
*psock
;
2152 kfree_skb(kcm
->seq_skb
);
2155 /* Purge queue under lock to avoid race condition with tx_work trying
2156 * to act when queue is nonempty. If tx_work runs after this point
2157 * it will just return.
2159 __skb_queue_purge(&sk
->sk_write_queue
);
2162 spin_lock_bh(&mux
->lock
);
2164 /* Take of tx_wait list, after this point there should be no way
2165 * that a psock will be assigned to this kcm.
2167 list_del(&kcm
->wait_psock_list
);
2168 kcm
->tx_wait
= false;
2170 spin_unlock_bh(&mux
->lock
);
2172 /* Cancel work. After this point there should be no outside references
2173 * to the kcm socket.
2175 cancel_work_sync(&kcm
->tx_work
);
2178 psock
= kcm
->tx_psock
;
2180 /* A psock was reserved, so we need to kill it since it
2181 * may already have some bytes queued from a message. We
2182 * need to do this after removing kcm from tx_wait list.
2184 kcm_abort_tx_psock(psock
, EPIPE
, false);
2185 unreserve_psock(kcm
);
2189 WARN_ON(kcm
->tx_wait
);
2190 WARN_ON(kcm
->tx_psock
);
2199 static const struct proto_ops kcm_dgram_ops
= {
2201 .owner
= THIS_MODULE
,
2202 .release
= kcm_release
,
2203 .bind
= sock_no_bind
,
2204 .connect
= sock_no_connect
,
2205 .socketpair
= sock_no_socketpair
,
2206 .accept
= sock_no_accept
,
2207 .getname
= sock_no_getname
,
2208 .poll
= datagram_poll
,
2210 .listen
= sock_no_listen
,
2211 .shutdown
= sock_no_shutdown
,
2212 .setsockopt
= kcm_setsockopt
,
2213 .getsockopt
= kcm_getsockopt
,
2214 .sendmsg
= kcm_sendmsg
,
2215 .recvmsg
= kcm_recvmsg
,
2216 .mmap
= sock_no_mmap
,
2217 .sendpage
= kcm_sendpage
,
2220 static const struct proto_ops kcm_seqpacket_ops
= {
2222 .owner
= THIS_MODULE
,
2223 .release
= kcm_release
,
2224 .bind
= sock_no_bind
,
2225 .connect
= sock_no_connect
,
2226 .socketpair
= sock_no_socketpair
,
2227 .accept
= sock_no_accept
,
2228 .getname
= sock_no_getname
,
2229 .poll
= datagram_poll
,
2231 .listen
= sock_no_listen
,
2232 .shutdown
= sock_no_shutdown
,
2233 .setsockopt
= kcm_setsockopt
,
2234 .getsockopt
= kcm_getsockopt
,
2235 .sendmsg
= kcm_sendmsg
,
2236 .recvmsg
= kcm_recvmsg
,
2237 .mmap
= sock_no_mmap
,
2238 .sendpage
= kcm_sendpage
,
2239 .splice_read
= kcm_splice_read
,
2242 /* Create proto operation for kcm sockets */
2243 static int kcm_create(struct net
*net
, struct socket
*sock
,
2244 int protocol
, int kern
)
2246 struct kcm_net
*knet
= net_generic(net
, kcm_net_id
);
2248 struct kcm_mux
*mux
;
2250 switch (sock
->type
) {
2252 sock
->ops
= &kcm_dgram_ops
;
2254 case SOCK_SEQPACKET
:
2255 sock
->ops
= &kcm_seqpacket_ops
;
2258 return -ESOCKTNOSUPPORT
;
2261 if (protocol
!= KCMPROTO_CONNECTED
)
2262 return -EPROTONOSUPPORT
;
2264 sk
= sk_alloc(net
, PF_KCM
, GFP_KERNEL
, &kcm_proto
, kern
);
2268 /* Allocate a kcm mux, shared between KCM sockets */
2269 mux
= kmem_cache_zalloc(kcm_muxp
, GFP_KERNEL
);
2275 spin_lock_init(&mux
->lock
);
2276 spin_lock_init(&mux
->rx_lock
);
2277 INIT_LIST_HEAD(&mux
->kcm_socks
);
2278 INIT_LIST_HEAD(&mux
->kcm_rx_waiters
);
2279 INIT_LIST_HEAD(&mux
->kcm_tx_waiters
);
2281 INIT_LIST_HEAD(&mux
->psocks
);
2282 INIT_LIST_HEAD(&mux
->psocks_ready
);
2283 INIT_LIST_HEAD(&mux
->psocks_avail
);
2287 /* Add new MUX to list */
2288 mutex_lock(&knet
->mutex
);
2289 list_add_rcu(&mux
->kcm_mux_list
, &knet
->mux_list
);
2291 mutex_unlock(&knet
->mutex
);
2293 skb_queue_head_init(&mux
->rx_hold_queue
);
2295 /* Init KCM socket */
2296 sock_init_data(sock
, sk
);
2297 init_kcm_sock(kcm_sk(sk
), mux
);
2302 static struct net_proto_family kcm_family_ops
= {
2304 .create
= kcm_create
,
2305 .owner
= THIS_MODULE
,
2308 static __net_init
int kcm_init_net(struct net
*net
)
2310 struct kcm_net
*knet
= net_generic(net
, kcm_net_id
);
2312 INIT_LIST_HEAD_RCU(&knet
->mux_list
);
2313 mutex_init(&knet
->mutex
);
2318 static __net_exit
void kcm_exit_net(struct net
*net
)
2320 struct kcm_net
*knet
= net_generic(net
, kcm_net_id
);
2322 /* All KCM sockets should be closed at this point, which should mean
2323 * that all multiplexors and psocks have been destroyed.
2325 WARN_ON(!list_empty(&knet
->mux_list
));
2328 static struct pernet_operations kcm_net_ops
= {
2329 .init
= kcm_init_net
,
2330 .exit
= kcm_exit_net
,
2332 .size
= sizeof(struct kcm_net
),
2335 static int __init
kcm_init(void)
2339 kcm_muxp
= kmem_cache_create("kcm_mux_cache",
2340 sizeof(struct kcm_mux
), 0,
2341 SLAB_HWCACHE_ALIGN
| SLAB_PANIC
, NULL
);
2345 kcm_psockp
= kmem_cache_create("kcm_psock_cache",
2346 sizeof(struct kcm_psock
), 0,
2347 SLAB_HWCACHE_ALIGN
| SLAB_PANIC
, NULL
);
2351 kcm_wq
= create_singlethread_workqueue("kkcmd");
2355 err
= proto_register(&kcm_proto
, 1);
2359 err
= sock_register(&kcm_family_ops
);
2361 goto sock_register_fail
;
2363 err
= register_pernet_device(&kcm_net_ops
);
2367 err
= kcm_proc_init();
2369 goto proc_init_fail
;
2374 unregister_pernet_device(&kcm_net_ops
);
2377 sock_unregister(PF_KCM
);
2380 proto_unregister(&kcm_proto
);
2383 kmem_cache_destroy(kcm_muxp
);
2384 kmem_cache_destroy(kcm_psockp
);
2387 destroy_workqueue(kcm_wq
);
2392 static void __exit
kcm_exit(void)
2395 unregister_pernet_device(&kcm_net_ops
);
2396 sock_unregister(PF_KCM
);
2397 proto_unregister(&kcm_proto
);
2398 destroy_workqueue(kcm_wq
);
2400 kmem_cache_destroy(kcm_muxp
);
2401 kmem_cache_destroy(kcm_psockp
);
2404 module_init(kcm_init
);
2405 module_exit(kcm_exit
);
2407 MODULE_LICENSE("GPL");
2408 MODULE_ALIAS_NETPROTO(PF_KCM
);