4 #include <linux/irq_work.h>
5 #include <linux/slab.h>
6 #include <linux/filter.h>
8 #include <linux/vmalloc.h>
9 #include <linux/wait.h>
10 #include <linux/poll.h>
11 #include <linux/kmemleak.h>
12 #include <uapi/linux/btf.h>
13 #include <linux/btf_ids.h>
15 #define RINGBUF_CREATE_FLAG_MASK (BPF_F_NUMA_NODE)
17 /* non-mmap()'able part of bpf_ringbuf (everything up to consumer page) */
18 #define RINGBUF_PGOFF \
19 (offsetof(struct bpf_ringbuf, consumer_pos) >> PAGE_SHIFT)
20 /* consumer page and producer page */
21 #define RINGBUF_POS_PAGES 2
22 #define RINGBUF_NR_META_PAGES (RINGBUF_PGOFF + RINGBUF_POS_PAGES)
24 #define RINGBUF_MAX_RECORD_SZ (UINT_MAX/4)
27 wait_queue_head_t waitq
;
32 raw_spinlock_t spinlock ____cacheline_aligned_in_smp
;
33 /* For user-space producer ring buffers, an atomic_t busy bit is used
34 * to synchronize access to the ring buffers in the kernel, rather than
35 * the spinlock that is used for kernel-producer ring buffers. This is
36 * done because the ring buffer must hold a lock across a BPF program's
39 * __bpf_user_ringbuf_peek() // lock acquired
40 * -> program callback_fn()
41 * -> __bpf_user_ringbuf_sample_release() // lock released
43 * It is unsafe and incorrect to hold an IRQ spinlock across what could
44 * be a long execution window, so we instead simply disallow concurrent
45 * access to the ring buffer by kernel consumers, and return -EBUSY from
46 * __bpf_user_ringbuf_peek() if the busy bit is held by another task.
48 atomic_t busy ____cacheline_aligned_in_smp
;
49 /* Consumer and producer counters are put into separate pages to
50 * allow each position to be mapped with different permissions.
51 * This prevents a user-space application from modifying the
52 * position and ruining in-kernel tracking. The permissions of the
53 * pages depend on who is producing samples: user-space or the
54 * kernel. Note that the pending counter is placed in the same
55 * page as the producer, so that it shares the same cache line.
59 * The producer position and data pages are mapped as r/o in
60 * userspace. For this approach, bits in the header of samples are
61 * used to signal to user-space, and to other producers, whether a
62 * sample is currently being written.
66 * Only the page containing the consumer position is mapped r/o in
67 * user-space. User-space producers also use bits of the header to
68 * communicate to the kernel, but the kernel must carefully check and
69 * validate each sample to ensure that they're correctly formatted, and
70 * fully contained within the ring buffer.
72 unsigned long consumer_pos
__aligned(PAGE_SIZE
);
73 unsigned long producer_pos
__aligned(PAGE_SIZE
);
74 unsigned long pending_pos
;
75 char data
[] __aligned(PAGE_SIZE
);
78 struct bpf_ringbuf_map
{
80 struct bpf_ringbuf
*rb
;
83 /* 8-byte ring buffer record header structure */
84 struct bpf_ringbuf_hdr
{
89 static struct bpf_ringbuf
*bpf_ringbuf_area_alloc(size_t data_sz
, int numa_node
)
91 const gfp_t flags
= GFP_KERNEL_ACCOUNT
| __GFP_RETRY_MAYFAIL
|
92 __GFP_NOWARN
| __GFP_ZERO
;
93 int nr_meta_pages
= RINGBUF_NR_META_PAGES
;
94 int nr_data_pages
= data_sz
>> PAGE_SHIFT
;
95 int nr_pages
= nr_meta_pages
+ nr_data_pages
;
96 struct page
**pages
, *page
;
97 struct bpf_ringbuf
*rb
;
101 /* Each data page is mapped twice to allow "virtual"
102 * continuous read of samples wrapping around the end of ring
104 * ------------------------------------------------------
105 * | meta pages | real data pages | same data pages |
106 * ------------------------------------------------------
107 * | | 1 2 3 4 5 6 7 8 9 | 1 2 3 4 5 6 7 8 9 |
108 * ------------------------------------------------------
109 * | | TA DA | TA DA |
110 * ------------------------------------------------------
113 * Here, no need to worry about special handling of wrapped-around
114 * data due to double-mapped data pages. This works both in kernel and
115 * when mmap()'ed in user-space, simplifying both kernel and
116 * user-space implementations significantly.
118 array_size
= (nr_meta_pages
+ 2 * nr_data_pages
) * sizeof(*pages
);
119 pages
= bpf_map_area_alloc(array_size
, numa_node
);
123 for (i
= 0; i
< nr_pages
; i
++) {
124 page
= alloc_pages_node(numa_node
, flags
, 0);
130 if (i
>= nr_meta_pages
)
131 pages
[nr_data_pages
+ i
] = page
;
134 rb
= vmap(pages
, nr_meta_pages
+ 2 * nr_data_pages
,
135 VM_MAP
| VM_USERMAP
, PAGE_KERNEL
);
137 kmemleak_not_leak(pages
);
139 rb
->nr_pages
= nr_pages
;
144 for (i
= 0; i
< nr_pages
; i
++)
145 __free_page(pages
[i
]);
146 bpf_map_area_free(pages
);
150 static void bpf_ringbuf_notify(struct irq_work
*work
)
152 struct bpf_ringbuf
*rb
= container_of(work
, struct bpf_ringbuf
, work
);
154 wake_up_all(&rb
->waitq
);
157 /* Maximum size of ring buffer area is limited by 32-bit page offset within
158 * record header, counted in pages. Reserve 8 bits for extensibility, and
159 * take into account few extra pages for consumer/producer pages and
160 * non-mmap()'able parts, the current maximum size would be:
162 * (((1ULL << 24) - RINGBUF_POS_PAGES - RINGBUF_PGOFF) * PAGE_SIZE)
164 * This gives 64GB limit, which seems plenty for single ring buffer. Now
165 * considering that the maximum value of data_sz is (4GB - 1), there
166 * will be no overflow, so just note the size limit in the comments.
168 static struct bpf_ringbuf
*bpf_ringbuf_alloc(size_t data_sz
, int numa_node
)
170 struct bpf_ringbuf
*rb
;
172 rb
= bpf_ringbuf_area_alloc(data_sz
, numa_node
);
176 raw_spin_lock_init(&rb
->spinlock
);
177 atomic_set(&rb
->busy
, 0);
178 init_waitqueue_head(&rb
->waitq
);
179 init_irq_work(&rb
->work
, bpf_ringbuf_notify
);
181 rb
->mask
= data_sz
- 1;
182 rb
->consumer_pos
= 0;
183 rb
->producer_pos
= 0;
189 static struct bpf_map
*ringbuf_map_alloc(union bpf_attr
*attr
)
191 struct bpf_ringbuf_map
*rb_map
;
193 if (attr
->map_flags
& ~RINGBUF_CREATE_FLAG_MASK
)
194 return ERR_PTR(-EINVAL
);
196 if (attr
->key_size
|| attr
->value_size
||
197 !is_power_of_2(attr
->max_entries
) ||
198 !PAGE_ALIGNED(attr
->max_entries
))
199 return ERR_PTR(-EINVAL
);
201 rb_map
= bpf_map_area_alloc(sizeof(*rb_map
), NUMA_NO_NODE
);
203 return ERR_PTR(-ENOMEM
);
205 bpf_map_init_from_attr(&rb_map
->map
, attr
);
207 rb_map
->rb
= bpf_ringbuf_alloc(attr
->max_entries
, rb_map
->map
.numa_node
);
209 bpf_map_area_free(rb_map
);
210 return ERR_PTR(-ENOMEM
);
216 static void bpf_ringbuf_free(struct bpf_ringbuf
*rb
)
218 /* copy pages pointer and nr_pages to local variable, as we are going
219 * to unmap rb itself with vunmap() below
221 struct page
**pages
= rb
->pages
;
222 int i
, nr_pages
= rb
->nr_pages
;
225 for (i
= 0; i
< nr_pages
; i
++)
226 __free_page(pages
[i
]);
227 bpf_map_area_free(pages
);
230 static void ringbuf_map_free(struct bpf_map
*map
)
232 struct bpf_ringbuf_map
*rb_map
;
234 rb_map
= container_of(map
, struct bpf_ringbuf_map
, map
);
235 bpf_ringbuf_free(rb_map
->rb
);
236 bpf_map_area_free(rb_map
);
239 static void *ringbuf_map_lookup_elem(struct bpf_map
*map
, void *key
)
241 return ERR_PTR(-ENOTSUPP
);
244 static long ringbuf_map_update_elem(struct bpf_map
*map
, void *key
, void *value
,
250 static long ringbuf_map_delete_elem(struct bpf_map
*map
, void *key
)
255 static int ringbuf_map_get_next_key(struct bpf_map
*map
, void *key
,
261 static int ringbuf_map_mmap_kern(struct bpf_map
*map
, struct vm_area_struct
*vma
)
263 struct bpf_ringbuf_map
*rb_map
;
265 rb_map
= container_of(map
, struct bpf_ringbuf_map
, map
);
267 if (vma
->vm_flags
& VM_WRITE
) {
268 /* allow writable mapping for the consumer_pos only */
269 if (vma
->vm_pgoff
!= 0 || vma
->vm_end
- vma
->vm_start
!= PAGE_SIZE
)
272 vm_flags_clear(vma
, VM_MAYWRITE
);
274 /* remap_vmalloc_range() checks size and offset constraints */
275 return remap_vmalloc_range(vma
, rb_map
->rb
,
276 vma
->vm_pgoff
+ RINGBUF_PGOFF
);
279 static int ringbuf_map_mmap_user(struct bpf_map
*map
, struct vm_area_struct
*vma
)
281 struct bpf_ringbuf_map
*rb_map
;
283 rb_map
= container_of(map
, struct bpf_ringbuf_map
, map
);
285 if (vma
->vm_flags
& VM_WRITE
) {
286 if (vma
->vm_pgoff
== 0)
287 /* Disallow writable mappings to the consumer pointer,
288 * and allow writable mappings to both the producer
289 * position, and the ring buffer data itself.
293 vm_flags_clear(vma
, VM_MAYWRITE
);
295 /* remap_vmalloc_range() checks size and offset constraints */
296 return remap_vmalloc_range(vma
, rb_map
->rb
, vma
->vm_pgoff
+ RINGBUF_PGOFF
);
299 static unsigned long ringbuf_avail_data_sz(struct bpf_ringbuf
*rb
)
301 unsigned long cons_pos
, prod_pos
;
303 cons_pos
= smp_load_acquire(&rb
->consumer_pos
);
304 prod_pos
= smp_load_acquire(&rb
->producer_pos
);
305 return prod_pos
- cons_pos
;
308 static u32
ringbuf_total_data_sz(const struct bpf_ringbuf
*rb
)
313 static __poll_t
ringbuf_map_poll_kern(struct bpf_map
*map
, struct file
*filp
,
314 struct poll_table_struct
*pts
)
316 struct bpf_ringbuf_map
*rb_map
;
318 rb_map
= container_of(map
, struct bpf_ringbuf_map
, map
);
319 poll_wait(filp
, &rb_map
->rb
->waitq
, pts
);
321 if (ringbuf_avail_data_sz(rb_map
->rb
))
322 return EPOLLIN
| EPOLLRDNORM
;
326 static __poll_t
ringbuf_map_poll_user(struct bpf_map
*map
, struct file
*filp
,
327 struct poll_table_struct
*pts
)
329 struct bpf_ringbuf_map
*rb_map
;
331 rb_map
= container_of(map
, struct bpf_ringbuf_map
, map
);
332 poll_wait(filp
, &rb_map
->rb
->waitq
, pts
);
334 if (ringbuf_avail_data_sz(rb_map
->rb
) < ringbuf_total_data_sz(rb_map
->rb
))
335 return EPOLLOUT
| EPOLLWRNORM
;
339 static u64
ringbuf_map_mem_usage(const struct bpf_map
*map
)
341 struct bpf_ringbuf
*rb
;
344 u64 usage
= sizeof(struct bpf_ringbuf_map
);
346 rb
= container_of(map
, struct bpf_ringbuf_map
, map
)->rb
;
347 usage
+= (u64
)rb
->nr_pages
<< PAGE_SHIFT
;
348 nr_meta_pages
= RINGBUF_NR_META_PAGES
;
349 nr_data_pages
= map
->max_entries
>> PAGE_SHIFT
;
350 usage
+= (nr_meta_pages
+ 2 * nr_data_pages
) * sizeof(struct page
*);
354 BTF_ID_LIST_SINGLE(ringbuf_map_btf_ids
, struct, bpf_ringbuf_map
)
355 const struct bpf_map_ops ringbuf_map_ops
= {
356 .map_meta_equal
= bpf_map_meta_equal
,
357 .map_alloc
= ringbuf_map_alloc
,
358 .map_free
= ringbuf_map_free
,
359 .map_mmap
= ringbuf_map_mmap_kern
,
360 .map_poll
= ringbuf_map_poll_kern
,
361 .map_lookup_elem
= ringbuf_map_lookup_elem
,
362 .map_update_elem
= ringbuf_map_update_elem
,
363 .map_delete_elem
= ringbuf_map_delete_elem
,
364 .map_get_next_key
= ringbuf_map_get_next_key
,
365 .map_mem_usage
= ringbuf_map_mem_usage
,
366 .map_btf_id
= &ringbuf_map_btf_ids
[0],
369 BTF_ID_LIST_SINGLE(user_ringbuf_map_btf_ids
, struct, bpf_ringbuf_map
)
370 const struct bpf_map_ops user_ringbuf_map_ops
= {
371 .map_meta_equal
= bpf_map_meta_equal
,
372 .map_alloc
= ringbuf_map_alloc
,
373 .map_free
= ringbuf_map_free
,
374 .map_mmap
= ringbuf_map_mmap_user
,
375 .map_poll
= ringbuf_map_poll_user
,
376 .map_lookup_elem
= ringbuf_map_lookup_elem
,
377 .map_update_elem
= ringbuf_map_update_elem
,
378 .map_delete_elem
= ringbuf_map_delete_elem
,
379 .map_get_next_key
= ringbuf_map_get_next_key
,
380 .map_mem_usage
= ringbuf_map_mem_usage
,
381 .map_btf_id
= &user_ringbuf_map_btf_ids
[0],
384 /* Given pointer to ring buffer record metadata and struct bpf_ringbuf itself,
385 * calculate offset from record metadata to ring buffer in pages, rounded
386 * down. This page offset is stored as part of record metadata and allows to
387 * restore struct bpf_ringbuf * from record pointer. This page offset is
388 * stored at offset 4 of record metadata header.
390 static size_t bpf_ringbuf_rec_pg_off(struct bpf_ringbuf
*rb
,
391 struct bpf_ringbuf_hdr
*hdr
)
393 return ((void *)hdr
- (void *)rb
) >> PAGE_SHIFT
;
396 /* Given pointer to ring buffer record header, restore pointer to struct
397 * bpf_ringbuf itself by using page offset stored at offset 4
399 static struct bpf_ringbuf
*
400 bpf_ringbuf_restore_from_rec(struct bpf_ringbuf_hdr
*hdr
)
402 unsigned long addr
= (unsigned long)(void *)hdr
;
403 unsigned long off
= (unsigned long)hdr
->pg_off
<< PAGE_SHIFT
;
405 return (void*)((addr
& PAGE_MASK
) - off
);
408 static void *__bpf_ringbuf_reserve(struct bpf_ringbuf
*rb
, u64 size
)
410 unsigned long cons_pos
, prod_pos
, new_prod_pos
, pend_pos
, flags
;
411 struct bpf_ringbuf_hdr
*hdr
;
412 u32 len
, pg_off
, tmp_size
, hdr_len
;
414 if (unlikely(size
> RINGBUF_MAX_RECORD_SZ
))
417 len
= round_up(size
+ BPF_RINGBUF_HDR_SZ
, 8);
418 if (len
> ringbuf_total_data_sz(rb
))
421 cons_pos
= smp_load_acquire(&rb
->consumer_pos
);
424 if (!raw_spin_trylock_irqsave(&rb
->spinlock
, flags
))
427 raw_spin_lock_irqsave(&rb
->spinlock
, flags
);
430 pend_pos
= rb
->pending_pos
;
431 prod_pos
= rb
->producer_pos
;
432 new_prod_pos
= prod_pos
+ len
;
434 while (pend_pos
< prod_pos
) {
435 hdr
= (void *)rb
->data
+ (pend_pos
& rb
->mask
);
436 hdr_len
= READ_ONCE(hdr
->len
);
437 if (hdr_len
& BPF_RINGBUF_BUSY_BIT
)
439 tmp_size
= hdr_len
& ~BPF_RINGBUF_DISCARD_BIT
;
440 tmp_size
= round_up(tmp_size
+ BPF_RINGBUF_HDR_SZ
, 8);
441 pend_pos
+= tmp_size
;
443 rb
->pending_pos
= pend_pos
;
445 /* check for out of ringbuf space:
446 * - by ensuring producer position doesn't advance more than
447 * (ringbuf_size - 1) ahead
448 * - by ensuring oldest not yet committed record until newest
449 * record does not span more than (ringbuf_size - 1)
451 if (new_prod_pos
- cons_pos
> rb
->mask
||
452 new_prod_pos
- pend_pos
> rb
->mask
) {
453 raw_spin_unlock_irqrestore(&rb
->spinlock
, flags
);
457 hdr
= (void *)rb
->data
+ (prod_pos
& rb
->mask
);
458 pg_off
= bpf_ringbuf_rec_pg_off(rb
, hdr
);
459 hdr
->len
= size
| BPF_RINGBUF_BUSY_BIT
;
460 hdr
->pg_off
= pg_off
;
462 /* pairs with consumer's smp_load_acquire() */
463 smp_store_release(&rb
->producer_pos
, new_prod_pos
);
465 raw_spin_unlock_irqrestore(&rb
->spinlock
, flags
);
467 return (void *)hdr
+ BPF_RINGBUF_HDR_SZ
;
470 BPF_CALL_3(bpf_ringbuf_reserve
, struct bpf_map
*, map
, u64
, size
, u64
, flags
)
472 struct bpf_ringbuf_map
*rb_map
;
477 rb_map
= container_of(map
, struct bpf_ringbuf_map
, map
);
478 return (unsigned long)__bpf_ringbuf_reserve(rb_map
->rb
, size
);
481 const struct bpf_func_proto bpf_ringbuf_reserve_proto
= {
482 .func
= bpf_ringbuf_reserve
,
483 .ret_type
= RET_PTR_TO_RINGBUF_MEM_OR_NULL
,
484 .arg1_type
= ARG_CONST_MAP_PTR
,
485 .arg2_type
= ARG_CONST_ALLOC_SIZE_OR_ZERO
,
486 .arg3_type
= ARG_ANYTHING
,
489 static void bpf_ringbuf_commit(void *sample
, u64 flags
, bool discard
)
491 unsigned long rec_pos
, cons_pos
;
492 struct bpf_ringbuf_hdr
*hdr
;
493 struct bpf_ringbuf
*rb
;
496 hdr
= sample
- BPF_RINGBUF_HDR_SZ
;
497 rb
= bpf_ringbuf_restore_from_rec(hdr
);
498 new_len
= hdr
->len
^ BPF_RINGBUF_BUSY_BIT
;
500 new_len
|= BPF_RINGBUF_DISCARD_BIT
;
502 /* update record header with correct final size prefix */
503 xchg(&hdr
->len
, new_len
);
505 /* if consumer caught up and is waiting for our record, notify about
506 * new data availability
508 rec_pos
= (void *)hdr
- (void *)rb
->data
;
509 cons_pos
= smp_load_acquire(&rb
->consumer_pos
) & rb
->mask
;
511 if (flags
& BPF_RB_FORCE_WAKEUP
)
512 irq_work_queue(&rb
->work
);
513 else if (cons_pos
== rec_pos
&& !(flags
& BPF_RB_NO_WAKEUP
))
514 irq_work_queue(&rb
->work
);
517 BPF_CALL_2(bpf_ringbuf_submit
, void *, sample
, u64
, flags
)
519 bpf_ringbuf_commit(sample
, flags
, false /* discard */);
523 const struct bpf_func_proto bpf_ringbuf_submit_proto
= {
524 .func
= bpf_ringbuf_submit
,
525 .ret_type
= RET_VOID
,
526 .arg1_type
= ARG_PTR_TO_RINGBUF_MEM
| OBJ_RELEASE
,
527 .arg2_type
= ARG_ANYTHING
,
530 BPF_CALL_2(bpf_ringbuf_discard
, void *, sample
, u64
, flags
)
532 bpf_ringbuf_commit(sample
, flags
, true /* discard */);
536 const struct bpf_func_proto bpf_ringbuf_discard_proto
= {
537 .func
= bpf_ringbuf_discard
,
538 .ret_type
= RET_VOID
,
539 .arg1_type
= ARG_PTR_TO_RINGBUF_MEM
| OBJ_RELEASE
,
540 .arg2_type
= ARG_ANYTHING
,
543 BPF_CALL_4(bpf_ringbuf_output
, struct bpf_map
*, map
, void *, data
, u64
, size
,
546 struct bpf_ringbuf_map
*rb_map
;
549 if (unlikely(flags
& ~(BPF_RB_NO_WAKEUP
| BPF_RB_FORCE_WAKEUP
)))
552 rb_map
= container_of(map
, struct bpf_ringbuf_map
, map
);
553 rec
= __bpf_ringbuf_reserve(rb_map
->rb
, size
);
557 memcpy(rec
, data
, size
);
558 bpf_ringbuf_commit(rec
, flags
, false /* discard */);
562 const struct bpf_func_proto bpf_ringbuf_output_proto
= {
563 .func
= bpf_ringbuf_output
,
564 .ret_type
= RET_INTEGER
,
565 .arg1_type
= ARG_CONST_MAP_PTR
,
566 .arg2_type
= ARG_PTR_TO_MEM
| MEM_RDONLY
,
567 .arg3_type
= ARG_CONST_SIZE_OR_ZERO
,
568 .arg4_type
= ARG_ANYTHING
,
571 BPF_CALL_2(bpf_ringbuf_query
, struct bpf_map
*, map
, u64
, flags
)
573 struct bpf_ringbuf
*rb
;
575 rb
= container_of(map
, struct bpf_ringbuf_map
, map
)->rb
;
578 case BPF_RB_AVAIL_DATA
:
579 return ringbuf_avail_data_sz(rb
);
580 case BPF_RB_RING_SIZE
:
581 return ringbuf_total_data_sz(rb
);
582 case BPF_RB_CONS_POS
:
583 return smp_load_acquire(&rb
->consumer_pos
);
584 case BPF_RB_PROD_POS
:
585 return smp_load_acquire(&rb
->producer_pos
);
591 const struct bpf_func_proto bpf_ringbuf_query_proto
= {
592 .func
= bpf_ringbuf_query
,
593 .ret_type
= RET_INTEGER
,
594 .arg1_type
= ARG_CONST_MAP_PTR
,
595 .arg2_type
= ARG_ANYTHING
,
598 BPF_CALL_4(bpf_ringbuf_reserve_dynptr
, struct bpf_map
*, map
, u32
, size
, u64
, flags
,
599 struct bpf_dynptr_kern
*, ptr
)
601 struct bpf_ringbuf_map
*rb_map
;
605 if (unlikely(flags
)) {
606 bpf_dynptr_set_null(ptr
);
610 err
= bpf_dynptr_check_size(size
);
612 bpf_dynptr_set_null(ptr
);
616 rb_map
= container_of(map
, struct bpf_ringbuf_map
, map
);
618 sample
= __bpf_ringbuf_reserve(rb_map
->rb
, size
);
620 bpf_dynptr_set_null(ptr
);
624 bpf_dynptr_init(ptr
, sample
, BPF_DYNPTR_TYPE_RINGBUF
, 0, size
);
629 const struct bpf_func_proto bpf_ringbuf_reserve_dynptr_proto
= {
630 .func
= bpf_ringbuf_reserve_dynptr
,
631 .ret_type
= RET_INTEGER
,
632 .arg1_type
= ARG_CONST_MAP_PTR
,
633 .arg2_type
= ARG_ANYTHING
,
634 .arg3_type
= ARG_ANYTHING
,
635 .arg4_type
= ARG_PTR_TO_DYNPTR
| DYNPTR_TYPE_RINGBUF
| MEM_UNINIT
| MEM_WRITE
,
638 BPF_CALL_2(bpf_ringbuf_submit_dynptr
, struct bpf_dynptr_kern
*, ptr
, u64
, flags
)
643 bpf_ringbuf_commit(ptr
->data
, flags
, false /* discard */);
645 bpf_dynptr_set_null(ptr
);
650 const struct bpf_func_proto bpf_ringbuf_submit_dynptr_proto
= {
651 .func
= bpf_ringbuf_submit_dynptr
,
652 .ret_type
= RET_VOID
,
653 .arg1_type
= ARG_PTR_TO_DYNPTR
| DYNPTR_TYPE_RINGBUF
| OBJ_RELEASE
,
654 .arg2_type
= ARG_ANYTHING
,
657 BPF_CALL_2(bpf_ringbuf_discard_dynptr
, struct bpf_dynptr_kern
*, ptr
, u64
, flags
)
662 bpf_ringbuf_commit(ptr
->data
, flags
, true /* discard */);
664 bpf_dynptr_set_null(ptr
);
669 const struct bpf_func_proto bpf_ringbuf_discard_dynptr_proto
= {
670 .func
= bpf_ringbuf_discard_dynptr
,
671 .ret_type
= RET_VOID
,
672 .arg1_type
= ARG_PTR_TO_DYNPTR
| DYNPTR_TYPE_RINGBUF
| OBJ_RELEASE
,
673 .arg2_type
= ARG_ANYTHING
,
676 static int __bpf_user_ringbuf_peek(struct bpf_ringbuf
*rb
, void **sample
, u32
*size
)
679 u32 hdr_len
, sample_len
, total_len
, flags
, *hdr
;
680 u64 cons_pos
, prod_pos
;
682 /* Synchronizes with smp_store_release() in user-space producer. */
683 prod_pos
= smp_load_acquire(&rb
->producer_pos
);
687 /* Synchronizes with smp_store_release() in __bpf_user_ringbuf_sample_release() */
688 cons_pos
= smp_load_acquire(&rb
->consumer_pos
);
689 if (cons_pos
>= prod_pos
)
692 hdr
= (u32
*)((uintptr_t)rb
->data
+ (uintptr_t)(cons_pos
& rb
->mask
));
693 /* Synchronizes with smp_store_release() in user-space producer. */
694 hdr_len
= smp_load_acquire(hdr
);
695 flags
= hdr_len
& (BPF_RINGBUF_BUSY_BIT
| BPF_RINGBUF_DISCARD_BIT
);
696 sample_len
= hdr_len
& ~flags
;
697 total_len
= round_up(sample_len
+ BPF_RINGBUF_HDR_SZ
, 8);
699 /* The sample must fit within the region advertised by the producer position. */
700 if (total_len
> prod_pos
- cons_pos
)
703 /* The sample must fit within the data region of the ring buffer. */
704 if (total_len
> ringbuf_total_data_sz(rb
))
707 /* The sample must fit into a struct bpf_dynptr. */
708 err
= bpf_dynptr_check_size(sample_len
);
712 if (flags
& BPF_RINGBUF_DISCARD_BIT
) {
713 /* If the discard bit is set, the sample should be skipped.
715 * Update the consumer pos, and return -EAGAIN so the caller
716 * knows to skip this sample and try to read the next one.
718 smp_store_release(&rb
->consumer_pos
, cons_pos
+ total_len
);
722 if (flags
& BPF_RINGBUF_BUSY_BIT
)
725 *sample
= (void *)((uintptr_t)rb
->data
+
726 (uintptr_t)((cons_pos
+ BPF_RINGBUF_HDR_SZ
) & rb
->mask
));
731 static void __bpf_user_ringbuf_sample_release(struct bpf_ringbuf
*rb
, size_t size
, u64 flags
)
734 u32 rounded_size
= round_up(size
+ BPF_RINGBUF_HDR_SZ
, 8);
736 /* Using smp_load_acquire() is unnecessary here, as the busy-bit
737 * prevents another task from writing to consumer_pos after it was read
738 * by this task with smp_load_acquire() in __bpf_user_ringbuf_peek().
740 consumer_pos
= rb
->consumer_pos
;
741 /* Synchronizes with smp_load_acquire() in user-space producer. */
742 smp_store_release(&rb
->consumer_pos
, consumer_pos
+ rounded_size
);
745 BPF_CALL_4(bpf_user_ringbuf_drain
, struct bpf_map
*, map
,
746 void *, callback_fn
, void *, callback_ctx
, u64
, flags
)
748 struct bpf_ringbuf
*rb
;
749 long samples
, discarded_samples
= 0, ret
= 0;
750 bpf_callback_t callback
= (bpf_callback_t
)callback_fn
;
751 u64 wakeup_flags
= BPF_RB_NO_WAKEUP
| BPF_RB_FORCE_WAKEUP
;
754 if (unlikely(flags
& ~wakeup_flags
))
757 rb
= container_of(map
, struct bpf_ringbuf_map
, map
)->rb
;
759 /* If another consumer is already consuming a sample, wait for them to finish. */
760 if (!atomic_try_cmpxchg(&rb
->busy
, &busy
, 1))
763 for (samples
= 0; samples
< BPF_MAX_USER_RINGBUF_SAMPLES
&& ret
== 0; samples
++) {
767 struct bpf_dynptr_kern dynptr
;
769 err
= __bpf_user_ringbuf_peek(rb
, &sample
, &size
);
771 if (err
== -ENODATA
) {
773 } else if (err
== -EAGAIN
) {
778 goto schedule_work_return
;
782 bpf_dynptr_init(&dynptr
, sample
, BPF_DYNPTR_TYPE_LOCAL
, 0, size
);
783 ret
= callback((uintptr_t)&dynptr
, (uintptr_t)callback_ctx
, 0, 0, 0);
784 __bpf_user_ringbuf_sample_release(rb
, size
, flags
);
786 ret
= samples
- discarded_samples
;
788 schedule_work_return
:
789 /* Prevent the clearing of the busy-bit from being reordered before the
790 * storing of any rb consumer or producer positions.
792 atomic_set_release(&rb
->busy
, 0);
794 if (flags
& BPF_RB_FORCE_WAKEUP
)
795 irq_work_queue(&rb
->work
);
796 else if (!(flags
& BPF_RB_NO_WAKEUP
) && samples
> 0)
797 irq_work_queue(&rb
->work
);
801 const struct bpf_func_proto bpf_user_ringbuf_drain_proto
= {
802 .func
= bpf_user_ringbuf_drain
,
803 .ret_type
= RET_INTEGER
,
804 .arg1_type
= ARG_CONST_MAP_PTR
,
805 .arg2_type
= ARG_PTR_TO_FUNC
,
806 .arg3_type
= ARG_PTR_TO_STACK_OR_NULL
,
807 .arg4_type
= ARG_ANYTHING
,