4 #include <linux/irq_work.h>
5 #include <linux/slab.h>
6 #include <linux/filter.h>
8 #include <linux/vmalloc.h>
9 #include <linux/wait.h>
10 #include <linux/poll.h>
11 #include <uapi/linux/btf.h>
13 #define RINGBUF_CREATE_FLAG_MASK (BPF_F_NUMA_NODE)
15 /* non-mmap()'able part of bpf_ringbuf (everything up to consumer page) */
16 #define RINGBUF_PGOFF \
17 (offsetof(struct bpf_ringbuf, consumer_pos) >> PAGE_SHIFT)
18 /* consumer page and producer page */
19 #define RINGBUF_POS_PAGES 2
21 #define RINGBUF_MAX_RECORD_SZ (UINT_MAX/4)
23 /* Maximum size of ring buffer area is limited by 32-bit page offset within
24 * record header, counted in pages. Reserve 8 bits for extensibility, and take
25 * into account few extra pages for consumer/producer pages and
26 * non-mmap()'able parts. This gives 64GB limit, which seems plenty for single
29 #define RINGBUF_MAX_DATA_SZ \
30 (((1ULL << 24) - RINGBUF_POS_PAGES - RINGBUF_PGOFF) * PAGE_SIZE)
33 wait_queue_head_t waitq
;
38 spinlock_t spinlock ____cacheline_aligned_in_smp
;
39 /* Consumer and producer counters are put into separate pages to allow
40 * mapping consumer page as r/w, but restrict producer page to r/o.
41 * This protects producer position from being modified by user-space
42 * application and ruining in-kernel position tracking.
44 unsigned long consumer_pos
__aligned(PAGE_SIZE
);
45 unsigned long producer_pos
__aligned(PAGE_SIZE
);
46 char data
[] __aligned(PAGE_SIZE
);
49 struct bpf_ringbuf_map
{
51 struct bpf_map_memory memory
;
52 struct bpf_ringbuf
*rb
;
55 /* 8-byte ring buffer record header structure */
56 struct bpf_ringbuf_hdr
{
61 static struct bpf_ringbuf
*bpf_ringbuf_area_alloc(size_t data_sz
, int numa_node
)
63 const gfp_t flags
= GFP_KERNEL
| __GFP_RETRY_MAYFAIL
| __GFP_NOWARN
|
65 int nr_meta_pages
= RINGBUF_PGOFF
+ RINGBUF_POS_PAGES
;
66 int nr_data_pages
= data_sz
>> PAGE_SHIFT
;
67 int nr_pages
= nr_meta_pages
+ nr_data_pages
;
68 struct page
**pages
, *page
;
69 struct bpf_ringbuf
*rb
;
73 /* Each data page is mapped twice to allow "virtual"
74 * continuous read of samples wrapping around the end of ring
76 * ------------------------------------------------------
77 * | meta pages | real data pages | same data pages |
78 * ------------------------------------------------------
79 * | | 1 2 3 4 5 6 7 8 9 | 1 2 3 4 5 6 7 8 9 |
80 * ------------------------------------------------------
82 * ------------------------------------------------------
85 * Here, no need to worry about special handling of wrapped-around
86 * data due to double-mapped data pages. This works both in kernel and
87 * when mmap()'ed in user-space, simplifying both kernel and
88 * user-space implementations significantly.
90 array_size
= (nr_meta_pages
+ 2 * nr_data_pages
) * sizeof(*pages
);
91 if (array_size
> PAGE_SIZE
)
92 pages
= vmalloc_node(array_size
, numa_node
);
94 pages
= kmalloc_node(array_size
, flags
, numa_node
);
98 for (i
= 0; i
< nr_pages
; i
++) {
99 page
= alloc_pages_node(numa_node
, flags
, 0);
105 if (i
>= nr_meta_pages
)
106 pages
[nr_data_pages
+ i
] = page
;
109 rb
= vmap(pages
, nr_meta_pages
+ 2 * nr_data_pages
,
110 VM_ALLOC
| VM_USERMAP
, PAGE_KERNEL
);
113 rb
->nr_pages
= nr_pages
;
118 for (i
= 0; i
< nr_pages
; i
++)
119 __free_page(pages
[i
]);
124 static void bpf_ringbuf_notify(struct irq_work
*work
)
126 struct bpf_ringbuf
*rb
= container_of(work
, struct bpf_ringbuf
, work
);
128 wake_up_all(&rb
->waitq
);
131 static struct bpf_ringbuf
*bpf_ringbuf_alloc(size_t data_sz
, int numa_node
)
133 struct bpf_ringbuf
*rb
;
135 if (!data_sz
|| !PAGE_ALIGNED(data_sz
))
136 return ERR_PTR(-EINVAL
);
139 /* on 32-bit arch, it's impossible to overflow record's hdr->pgoff */
140 if (data_sz
> RINGBUF_MAX_DATA_SZ
)
141 return ERR_PTR(-E2BIG
);
144 rb
= bpf_ringbuf_area_alloc(data_sz
, numa_node
);
146 return ERR_PTR(-ENOMEM
);
148 spin_lock_init(&rb
->spinlock
);
149 init_waitqueue_head(&rb
->waitq
);
150 init_irq_work(&rb
->work
, bpf_ringbuf_notify
);
152 rb
->mask
= data_sz
- 1;
153 rb
->consumer_pos
= 0;
154 rb
->producer_pos
= 0;
159 static struct bpf_map
*ringbuf_map_alloc(union bpf_attr
*attr
)
161 struct bpf_ringbuf_map
*rb_map
;
165 if (attr
->map_flags
& ~RINGBUF_CREATE_FLAG_MASK
)
166 return ERR_PTR(-EINVAL
);
168 if (attr
->key_size
|| attr
->value_size
||
169 attr
->max_entries
== 0 || !PAGE_ALIGNED(attr
->max_entries
))
170 return ERR_PTR(-EINVAL
);
172 rb_map
= kzalloc(sizeof(*rb_map
), GFP_USER
);
174 return ERR_PTR(-ENOMEM
);
176 bpf_map_init_from_attr(&rb_map
->map
, attr
);
178 cost
= sizeof(struct bpf_ringbuf_map
) +
179 sizeof(struct bpf_ringbuf
) +
181 err
= bpf_map_charge_init(&rb_map
->map
.memory
, cost
);
185 rb_map
->rb
= bpf_ringbuf_alloc(attr
->max_entries
, rb_map
->map
.numa_node
);
186 if (IS_ERR(rb_map
->rb
)) {
187 err
= PTR_ERR(rb_map
->rb
);
194 bpf_map_charge_finish(&rb_map
->map
.memory
);
200 static void bpf_ringbuf_free(struct bpf_ringbuf
*rb
)
202 /* copy pages pointer and nr_pages to local variable, as we are going
203 * to unmap rb itself with vunmap() below
205 struct page
**pages
= rb
->pages
;
206 int i
, nr_pages
= rb
->nr_pages
;
209 for (i
= 0; i
< nr_pages
; i
++)
210 __free_page(pages
[i
]);
214 static void ringbuf_map_free(struct bpf_map
*map
)
216 struct bpf_ringbuf_map
*rb_map
;
218 /* at this point bpf_prog->aux->refcnt == 0 and this map->refcnt == 0,
219 * so the programs (can be more than one that used this map) were
220 * disconnected from events. Wait for outstanding critical sections in
221 * these programs to complete
225 rb_map
= container_of(map
, struct bpf_ringbuf_map
, map
);
226 bpf_ringbuf_free(rb_map
->rb
);
230 static void *ringbuf_map_lookup_elem(struct bpf_map
*map
, void *key
)
232 return ERR_PTR(-ENOTSUPP
);
235 static int ringbuf_map_update_elem(struct bpf_map
*map
, void *key
, void *value
,
241 static int ringbuf_map_delete_elem(struct bpf_map
*map
, void *key
)
246 static int ringbuf_map_get_next_key(struct bpf_map
*map
, void *key
,
252 static size_t bpf_ringbuf_mmap_page_cnt(const struct bpf_ringbuf
*rb
)
254 size_t data_pages
= (rb
->mask
+ 1) >> PAGE_SHIFT
;
256 /* consumer page + producer page + 2 x data pages */
257 return RINGBUF_POS_PAGES
+ 2 * data_pages
;
260 static int ringbuf_map_mmap(struct bpf_map
*map
, struct vm_area_struct
*vma
)
262 struct bpf_ringbuf_map
*rb_map
;
265 rb_map
= container_of(map
, struct bpf_ringbuf_map
, map
);
266 mmap_sz
= bpf_ringbuf_mmap_page_cnt(rb_map
->rb
) << PAGE_SHIFT
;
268 if (vma
->vm_pgoff
* PAGE_SIZE
+ (vma
->vm_end
- vma
->vm_start
) > mmap_sz
)
271 return remap_vmalloc_range(vma
, rb_map
->rb
,
272 vma
->vm_pgoff
+ RINGBUF_PGOFF
);
275 static unsigned long ringbuf_avail_data_sz(struct bpf_ringbuf
*rb
)
277 unsigned long cons_pos
, prod_pos
;
279 cons_pos
= smp_load_acquire(&rb
->consumer_pos
);
280 prod_pos
= smp_load_acquire(&rb
->producer_pos
);
281 return prod_pos
- cons_pos
;
284 static __poll_t
ringbuf_map_poll(struct bpf_map
*map
, struct file
*filp
,
285 struct poll_table_struct
*pts
)
287 struct bpf_ringbuf_map
*rb_map
;
289 rb_map
= container_of(map
, struct bpf_ringbuf_map
, map
);
290 poll_wait(filp
, &rb_map
->rb
->waitq
, pts
);
292 if (ringbuf_avail_data_sz(rb_map
->rb
))
293 return EPOLLIN
| EPOLLRDNORM
;
297 const struct bpf_map_ops ringbuf_map_ops
= {
298 .map_alloc
= ringbuf_map_alloc
,
299 .map_free
= ringbuf_map_free
,
300 .map_mmap
= ringbuf_map_mmap
,
301 .map_poll
= ringbuf_map_poll
,
302 .map_lookup_elem
= ringbuf_map_lookup_elem
,
303 .map_update_elem
= ringbuf_map_update_elem
,
304 .map_delete_elem
= ringbuf_map_delete_elem
,
305 .map_get_next_key
= ringbuf_map_get_next_key
,
308 /* Given pointer to ring buffer record metadata and struct bpf_ringbuf itself,
309 * calculate offset from record metadata to ring buffer in pages, rounded
310 * down. This page offset is stored as part of record metadata and allows to
311 * restore struct bpf_ringbuf * from record pointer. This page offset is
312 * stored at offset 4 of record metadata header.
314 static size_t bpf_ringbuf_rec_pg_off(struct bpf_ringbuf
*rb
,
315 struct bpf_ringbuf_hdr
*hdr
)
317 return ((void *)hdr
- (void *)rb
) >> PAGE_SHIFT
;
320 /* Given pointer to ring buffer record header, restore pointer to struct
321 * bpf_ringbuf itself by using page offset stored at offset 4
323 static struct bpf_ringbuf
*
324 bpf_ringbuf_restore_from_rec(struct bpf_ringbuf_hdr
*hdr
)
326 unsigned long addr
= (unsigned long)(void *)hdr
;
327 unsigned long off
= (unsigned long)hdr
->pg_off
<< PAGE_SHIFT
;
329 return (void*)((addr
& PAGE_MASK
) - off
);
332 static void *__bpf_ringbuf_reserve(struct bpf_ringbuf
*rb
, u64 size
)
334 unsigned long cons_pos
, prod_pos
, new_prod_pos
, flags
;
336 struct bpf_ringbuf_hdr
*hdr
;
338 if (unlikely(size
> RINGBUF_MAX_RECORD_SZ
))
341 len
= round_up(size
+ BPF_RINGBUF_HDR_SZ
, 8);
342 cons_pos
= smp_load_acquire(&rb
->consumer_pos
);
345 if (!spin_trylock_irqsave(&rb
->spinlock
, flags
))
348 spin_lock_irqsave(&rb
->spinlock
, flags
);
351 prod_pos
= rb
->producer_pos
;
352 new_prod_pos
= prod_pos
+ len
;
354 /* check for out of ringbuf space by ensuring producer position
355 * doesn't advance more than (ringbuf_size - 1) ahead
357 if (new_prod_pos
- cons_pos
> rb
->mask
) {
358 spin_unlock_irqrestore(&rb
->spinlock
, flags
);
362 hdr
= (void *)rb
->data
+ (prod_pos
& rb
->mask
);
363 pg_off
= bpf_ringbuf_rec_pg_off(rb
, hdr
);
364 hdr
->len
= size
| BPF_RINGBUF_BUSY_BIT
;
365 hdr
->pg_off
= pg_off
;
367 /* pairs with consumer's smp_load_acquire() */
368 smp_store_release(&rb
->producer_pos
, new_prod_pos
);
370 spin_unlock_irqrestore(&rb
->spinlock
, flags
);
372 return (void *)hdr
+ BPF_RINGBUF_HDR_SZ
;
375 BPF_CALL_3(bpf_ringbuf_reserve
, struct bpf_map
*, map
, u64
, size
, u64
, flags
)
377 struct bpf_ringbuf_map
*rb_map
;
382 rb_map
= container_of(map
, struct bpf_ringbuf_map
, map
);
383 return (unsigned long)__bpf_ringbuf_reserve(rb_map
->rb
, size
);
386 const struct bpf_func_proto bpf_ringbuf_reserve_proto
= {
387 .func
= bpf_ringbuf_reserve
,
388 .ret_type
= RET_PTR_TO_ALLOC_MEM_OR_NULL
,
389 .arg1_type
= ARG_CONST_MAP_PTR
,
390 .arg2_type
= ARG_CONST_ALLOC_SIZE_OR_ZERO
,
391 .arg3_type
= ARG_ANYTHING
,
394 static void bpf_ringbuf_commit(void *sample
, u64 flags
, bool discard
)
396 unsigned long rec_pos
, cons_pos
;
397 struct bpf_ringbuf_hdr
*hdr
;
398 struct bpf_ringbuf
*rb
;
401 hdr
= sample
- BPF_RINGBUF_HDR_SZ
;
402 rb
= bpf_ringbuf_restore_from_rec(hdr
);
403 new_len
= hdr
->len
^ BPF_RINGBUF_BUSY_BIT
;
405 new_len
|= BPF_RINGBUF_DISCARD_BIT
;
407 /* update record header with correct final size prefix */
408 xchg(&hdr
->len
, new_len
);
410 /* if consumer caught up and is waiting for our record, notify about
411 * new data availability
413 rec_pos
= (void *)hdr
- (void *)rb
->data
;
414 cons_pos
= smp_load_acquire(&rb
->consumer_pos
) & rb
->mask
;
416 if (flags
& BPF_RB_FORCE_WAKEUP
)
417 irq_work_queue(&rb
->work
);
418 else if (cons_pos
== rec_pos
&& !(flags
& BPF_RB_NO_WAKEUP
))
419 irq_work_queue(&rb
->work
);
422 BPF_CALL_2(bpf_ringbuf_submit
, void *, sample
, u64
, flags
)
424 bpf_ringbuf_commit(sample
, flags
, false /* discard */);
428 const struct bpf_func_proto bpf_ringbuf_submit_proto
= {
429 .func
= bpf_ringbuf_submit
,
430 .ret_type
= RET_VOID
,
431 .arg1_type
= ARG_PTR_TO_ALLOC_MEM
,
432 .arg2_type
= ARG_ANYTHING
,
435 BPF_CALL_2(bpf_ringbuf_discard
, void *, sample
, u64
, flags
)
437 bpf_ringbuf_commit(sample
, flags
, true /* discard */);
441 const struct bpf_func_proto bpf_ringbuf_discard_proto
= {
442 .func
= bpf_ringbuf_discard
,
443 .ret_type
= RET_VOID
,
444 .arg1_type
= ARG_PTR_TO_ALLOC_MEM
,
445 .arg2_type
= ARG_ANYTHING
,
448 BPF_CALL_4(bpf_ringbuf_output
, struct bpf_map
*, map
, void *, data
, u64
, size
,
451 struct bpf_ringbuf_map
*rb_map
;
454 if (unlikely(flags
& ~(BPF_RB_NO_WAKEUP
| BPF_RB_FORCE_WAKEUP
)))
457 rb_map
= container_of(map
, struct bpf_ringbuf_map
, map
);
458 rec
= __bpf_ringbuf_reserve(rb_map
->rb
, size
);
462 memcpy(rec
, data
, size
);
463 bpf_ringbuf_commit(rec
, flags
, false /* discard */);
467 const struct bpf_func_proto bpf_ringbuf_output_proto
= {
468 .func
= bpf_ringbuf_output
,
469 .ret_type
= RET_INTEGER
,
470 .arg1_type
= ARG_CONST_MAP_PTR
,
471 .arg2_type
= ARG_PTR_TO_MEM
,
472 .arg3_type
= ARG_CONST_SIZE_OR_ZERO
,
473 .arg4_type
= ARG_ANYTHING
,
476 BPF_CALL_2(bpf_ringbuf_query
, struct bpf_map
*, map
, u64
, flags
)
478 struct bpf_ringbuf
*rb
;
480 rb
= container_of(map
, struct bpf_ringbuf_map
, map
)->rb
;
483 case BPF_RB_AVAIL_DATA
:
484 return ringbuf_avail_data_sz(rb
);
485 case BPF_RB_RING_SIZE
:
487 case BPF_RB_CONS_POS
:
488 return smp_load_acquire(&rb
->consumer_pos
);
489 case BPF_RB_PROD_POS
:
490 return smp_load_acquire(&rb
->producer_pos
);
496 const struct bpf_func_proto bpf_ringbuf_query_proto
= {
497 .func
= bpf_ringbuf_query
,
498 .ret_type
= RET_INTEGER
,
499 .arg1_type
= ARG_CONST_MAP_PTR
,
500 .arg2_type
= ARG_ANYTHING
,