4 * Copyright (c) 2016 Tom Herbert <tom@herbertland.com>
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2
8 * as published by the Free Software Foundation.
11 #include <linux/bpf.h>
12 #include <linux/errno.h>
13 #include <linux/errqueue.h>
14 #include <linux/file.h>
16 #include <linux/kernel.h>
17 #include <linux/module.h>
18 #include <linux/net.h>
19 #include <linux/netdevice.h>
20 #include <linux/poll.h>
21 #include <linux/rculist.h>
22 #include <linux/skbuff.h>
23 #include <linux/socket.h>
24 #include <linux/uaccess.h>
25 #include <linux/workqueue.h>
26 #include <net/strparser.h>
27 #include <net/netns/generic.h>
30 static struct workqueue_struct
*strp_wq
;
33 /* Internal cb structure. struct strp_rx_msg must be first for passing
36 struct strp_rx_msg strp
;
41 static inline struct _strp_rx_msg
*_strp_rx_msg(struct sk_buff
*skb
)
43 return (struct _strp_rx_msg
*)((void *)skb
->cb
+
44 offsetof(struct qdisc_skb_cb
, data
));
48 static void strp_abort_rx_strp(struct strparser
*strp
, int err
)
50 struct sock
*csk
= strp
->sk
;
52 /* Unrecoverable error in receive */
54 del_timer(&strp
->rx_msg_timer
);
61 /* Report an error on the lower socket */
63 csk
->sk_error_report(csk
);
66 static void strp_start_rx_timer(struct strparser
*strp
)
68 if (strp
->sk
->sk_rcvtimeo
)
69 mod_timer(&strp
->rx_msg_timer
, strp
->sk
->sk_rcvtimeo
);
73 static void strp_parser_err(struct strparser
*strp
, int err
,
74 read_descriptor_t
*desc
)
77 kfree_skb(strp
->rx_skb_head
);
78 strp
->rx_skb_head
= NULL
;
79 strp
->cb
.abort_parser(strp
, err
);
82 static inline int strp_peek_len(struct strparser
*strp
)
84 struct socket
*sock
= strp
->sk
->sk_socket
;
86 return sock
->ops
->peek_len(sock
);
89 /* Lower socket lock held */
90 static int strp_recv(read_descriptor_t
*desc
, struct sk_buff
*orig_skb
,
91 unsigned int orig_offset
, size_t orig_len
)
93 struct strparser
*strp
= (struct strparser
*)desc
->arg
.data
;
94 struct _strp_rx_msg
*rxm
;
95 struct sk_buff
*head
, *skb
;
96 size_t eaten
= 0, cand_len
;
99 bool cloned_orig
= false;
104 head
= strp
->rx_skb_head
;
106 /* Message already in progress */
108 rxm
= _strp_rx_msg(head
);
109 if (unlikely(rxm
->early_eaten
)) {
110 /* Already some number of bytes on the receive sock
111 * data saved in rx_skb_head, just indicate they
114 eaten
= orig_len
<= rxm
->early_eaten
?
115 orig_len
: rxm
->early_eaten
;
116 rxm
->early_eaten
-= eaten
;
121 if (unlikely(orig_offset
)) {
122 /* Getting data with a non-zero offset when a message is
123 * in progress is not expected. If it does happen, we
124 * need to clone and pull since we can't deal with
125 * offsets in the skbs for a message expect in the head.
127 orig_skb
= skb_clone(orig_skb
, GFP_ATOMIC
);
129 STRP_STATS_INCR(strp
->stats
.rx_mem_fail
);
130 desc
->error
= -ENOMEM
;
133 if (!pskb_pull(orig_skb
, orig_offset
)) {
134 STRP_STATS_INCR(strp
->stats
.rx_mem_fail
);
136 desc
->error
= -ENOMEM
;
143 if (!strp
->rx_skb_nextp
) {
144 /* We are going to append to the frags_list of head.
145 * Need to unshare the frag_list.
147 err
= skb_unclone(head
, GFP_ATOMIC
);
149 STRP_STATS_INCR(strp
->stats
.rx_mem_fail
);
154 if (unlikely(skb_shinfo(head
)->frag_list
)) {
155 /* We can't append to an sk_buff that already
156 * has a frag_list. We create a new head, point
157 * the frag_list of that to the old head, and
158 * then are able to use the old head->next for
159 * appending to the message.
161 if (WARN_ON(head
->next
)) {
162 desc
->error
= -EINVAL
;
166 skb
= alloc_skb(0, GFP_ATOMIC
);
168 STRP_STATS_INCR(strp
->stats
.rx_mem_fail
);
169 desc
->error
= -ENOMEM
;
172 skb
->len
= head
->len
;
173 skb
->data_len
= head
->len
;
174 skb
->truesize
= head
->truesize
;
175 *_strp_rx_msg(skb
) = *_strp_rx_msg(head
);
176 strp
->rx_skb_nextp
= &head
->next
;
177 skb_shinfo(skb
)->frag_list
= head
;
178 strp
->rx_skb_head
= skb
;
182 &skb_shinfo(head
)->frag_list
;
187 while (eaten
< orig_len
) {
188 /* Always clone since we will consume something */
189 skb
= skb_clone(orig_skb
, GFP_ATOMIC
);
191 STRP_STATS_INCR(strp
->stats
.rx_mem_fail
);
192 desc
->error
= -ENOMEM
;
196 cand_len
= orig_len
- eaten
;
198 head
= strp
->rx_skb_head
;
201 strp
->rx_skb_head
= head
;
202 /* Will set rx_skb_nextp on next packet if needed */
203 strp
->rx_skb_nextp
= NULL
;
204 rxm
= _strp_rx_msg(head
);
205 memset(rxm
, 0, sizeof(*rxm
));
206 rxm
->strp
.offset
= orig_offset
+ eaten
;
208 /* Unclone since we may be appending to an skb that we
209 * already share a frag_list with.
211 err
= skb_unclone(skb
, GFP_ATOMIC
);
213 STRP_STATS_INCR(strp
->stats
.rx_mem_fail
);
218 rxm
= _strp_rx_msg(head
);
219 *strp
->rx_skb_nextp
= skb
;
220 strp
->rx_skb_nextp
= &skb
->next
;
221 head
->data_len
+= skb
->len
;
222 head
->len
+= skb
->len
;
223 head
->truesize
+= skb
->truesize
;
226 if (!rxm
->strp
.full_len
) {
229 len
= (*strp
->cb
.parse_msg
)(strp
, head
);
232 /* Need more header to determine length */
233 if (!rxm
->accum_len
) {
234 /* Start RX timer for new message */
235 strp_start_rx_timer(strp
);
237 rxm
->accum_len
+= cand_len
;
239 STRP_STATS_INCR(strp
->stats
.rx_need_more_hdr
);
240 WARN_ON(eaten
!= orig_len
);
242 } else if (len
< 0) {
243 if (len
== -ESTRPIPE
&& rxm
->accum_len
) {
245 strp
->rx_unrecov_intr
= 1;
247 strp
->rx_interrupted
= 1;
249 strp_parser_err(strp
, len
, desc
);
251 } else if (len
> strp
->sk
->sk_rcvbuf
) {
252 /* Message length exceeds maximum allowed */
253 STRP_STATS_INCR(strp
->stats
.rx_msg_too_big
);
254 strp_parser_err(strp
, -EMSGSIZE
, desc
);
256 } else if (len
<= (ssize_t
)head
->len
-
257 skb
->len
- rxm
->strp
.offset
) {
258 /* Length must be into new skb (and also
261 STRP_STATS_INCR(strp
->stats
.rx_bad_hdr_len
);
262 strp_parser_err(strp
, -EPROTO
, desc
);
266 rxm
->strp
.full_len
= len
;
269 extra
= (ssize_t
)(rxm
->accum_len
+ cand_len
) -
273 /* Message not complete yet. */
274 if (rxm
->strp
.full_len
- rxm
->accum_len
>
275 strp_peek_len(strp
)) {
276 /* Don't have the whole messages in the socket
277 * buffer. Set strp->rx_need_bytes to wait for
278 * the rest of the message. Also, set "early
279 * eaten" since we've already buffered the skb
280 * but don't consume yet per strp_read_sock.
283 if (!rxm
->accum_len
) {
284 /* Start RX timer for new message */
285 strp_start_rx_timer(strp
);
288 strp
->rx_need_bytes
= rxm
->strp
.full_len
-
290 rxm
->accum_len
+= cand_len
;
291 rxm
->early_eaten
= cand_len
;
292 STRP_STATS_ADD(strp
->stats
.rx_bytes
, cand_len
);
293 desc
->count
= 0; /* Stop reading socket */
296 rxm
->accum_len
+= cand_len
;
298 WARN_ON(eaten
!= orig_len
);
302 /* Positive extra indicates ore bytes than needed for the
306 WARN_ON(extra
> cand_len
);
308 eaten
+= (cand_len
- extra
);
310 /* Hurray, we have a new message! */
311 del_timer(&strp
->rx_msg_timer
);
312 strp
->rx_skb_head
= NULL
;
313 STRP_STATS_INCR(strp
->stats
.rx_msgs
);
315 /* Give skb to upper layer */
316 strp
->cb
.rcv_msg(strp
, head
);
318 if (unlikely(strp
->rx_paused
)) {
319 /* Upper layer paused strp */
327 STRP_STATS_ADD(strp
->stats
.rx_bytes
, eaten
);
332 static int default_read_sock_done(struct strparser
*strp
, int err
)
337 /* Called with lock held on lower socket */
338 static int strp_read_sock(struct strparser
*strp
)
340 struct socket
*sock
= strp
->sk
->sk_socket
;
341 read_descriptor_t desc
;
343 desc
.arg
.data
= strp
;
345 desc
.count
= 1; /* give more than one skb per call */
347 /* sk should be locked here, so okay to do read_sock */
348 sock
->ops
->read_sock(strp
->sk
, &desc
, strp_recv
);
350 desc
.error
= strp
->cb
.read_sock_done(strp
, desc
.error
);
355 /* Lower sock lock held */
356 void strp_data_ready(struct strparser
*strp
)
358 if (unlikely(strp
->rx_stopped
))
361 /* This check is needed to synchronize with do_strp_rx_work.
362 * do_strp_rx_work acquires a process lock (lock_sock) whereas
363 * the lock held here is bh_lock_sock. The two locks can be
364 * held by different threads at the same time, but bh_lock_sock
365 * allows a thread in BH context to safely check if the process
366 * lock is held. In this case, if the lock is held, queue work.
368 if (sock_owned_by_user(strp
->sk
)) {
369 queue_work(strp_wq
, &strp
->rx_work
);
376 if (strp
->rx_need_bytes
) {
377 if (strp_peek_len(strp
) >= strp
->rx_need_bytes
)
378 strp
->rx_need_bytes
= 0;
383 if (strp_read_sock(strp
) == -ENOMEM
)
384 queue_work(strp_wq
, &strp
->rx_work
);
386 EXPORT_SYMBOL_GPL(strp_data_ready
);
388 static void do_strp_rx_work(struct strparser
*strp
)
390 read_descriptor_t rd_desc
;
391 struct sock
*csk
= strp
->sk
;
393 /* We need the read lock to synchronize with strp_data_ready. We
394 * need the socket lock for calling strp_read_sock.
398 if (unlikely(strp
->rx_stopped
))
404 rd_desc
.arg
.data
= strp
;
406 if (strp_read_sock(strp
) == -ENOMEM
)
407 queue_work(strp_wq
, &strp
->rx_work
);
413 static void strp_rx_work(struct work_struct
*w
)
415 do_strp_rx_work(container_of(w
, struct strparser
, rx_work
));
418 static void strp_rx_msg_timeout(unsigned long arg
)
420 struct strparser
*strp
= (struct strparser
*)arg
;
422 /* Message assembly timed out */
423 STRP_STATS_INCR(strp
->stats
.rx_msg_timeouts
);
425 strp
->cb
.abort_parser(strp
, ETIMEDOUT
);
426 release_sock(strp
->sk
);
429 int strp_init(struct strparser
*strp
, struct sock
*csk
,
430 struct strp_callbacks
*cb
)
432 struct socket
*sock
= csk
->sk_socket
;
434 if (!cb
|| !cb
->rcv_msg
|| !cb
->parse_msg
)
437 if (!sock
->ops
->read_sock
|| !sock
->ops
->peek_len
)
438 return -EAFNOSUPPORT
;
440 memset(strp
, 0, sizeof(*strp
));
444 setup_timer(&strp
->rx_msg_timer
, strp_rx_msg_timeout
,
445 (unsigned long)strp
);
447 INIT_WORK(&strp
->rx_work
, strp_rx_work
);
449 strp
->cb
.rcv_msg
= cb
->rcv_msg
;
450 strp
->cb
.parse_msg
= cb
->parse_msg
;
451 strp
->cb
.read_sock_done
= cb
->read_sock_done
? : default_read_sock_done
;
452 strp
->cb
.abort_parser
= cb
->abort_parser
? : strp_abort_rx_strp
;
456 EXPORT_SYMBOL_GPL(strp_init
);
458 void strp_unpause(struct strparser
*strp
)
462 /* Sync setting rx_paused with RX work */
465 queue_work(strp_wq
, &strp
->rx_work
);
467 EXPORT_SYMBOL_GPL(strp_unpause
);
469 /* strp must already be stopped so that strp_recv will no longer be called.
470 * Note that strp_done is not called with the lower socket held.
472 void strp_done(struct strparser
*strp
)
474 WARN_ON(!strp
->rx_stopped
);
476 del_timer_sync(&strp
->rx_msg_timer
);
477 cancel_work_sync(&strp
->rx_work
);
479 if (strp
->rx_skb_head
) {
480 kfree_skb(strp
->rx_skb_head
);
481 strp
->rx_skb_head
= NULL
;
484 EXPORT_SYMBOL_GPL(strp_done
);
486 void strp_stop(struct strparser
*strp
)
488 strp
->rx_stopped
= 1;
490 EXPORT_SYMBOL_GPL(strp_stop
);
492 void strp_check_rcv(struct strparser
*strp
)
494 queue_work(strp_wq
, &strp
->rx_work
);
496 EXPORT_SYMBOL_GPL(strp_check_rcv
);
498 static int __init
strp_mod_init(void)
500 strp_wq
= create_singlethread_workqueue("kstrp");
505 static void __exit
strp_mod_exit(void)
508 module_init(strp_mod_init
);
509 module_exit(strp_mod_exit
);
510 MODULE_LICENSE("GPL");