4 #include <linux/irq_work.h>
5 #include <linux/slab.h>
6 #include <linux/filter.h>
8 #include <linux/vmalloc.h>
9 #include <linux/wait.h>
10 #include <linux/poll.h>
11 #include <uapi/linux/btf.h>
13 #define RINGBUF_CREATE_FLAG_MASK (BPF_F_NUMA_NODE)
15 /* non-mmap()'able part of bpf_ringbuf (everything up to consumer page) */
16 #define RINGBUF_PGOFF \
17 (offsetof(struct bpf_ringbuf, consumer_pos) >> PAGE_SHIFT)
18 /* consumer page and producer page */
19 #define RINGBUF_POS_PAGES 2
21 #define RINGBUF_MAX_RECORD_SZ (UINT_MAX/4)
23 /* Maximum size of ring buffer area is limited by 32-bit page offset within
24 * record header, counted in pages. Reserve 8 bits for extensibility, and take
25 * into account few extra pages for consumer/producer pages and
26 * non-mmap()'able parts. This gives 64GB limit, which seems plenty for single
29 #define RINGBUF_MAX_DATA_SZ \
30 (((1ULL << 24) - RINGBUF_POS_PAGES - RINGBUF_PGOFF) * PAGE_SIZE)
33 wait_queue_head_t waitq
;
38 spinlock_t spinlock ____cacheline_aligned_in_smp
;
39 /* Consumer and producer counters are put into separate pages to allow
40 * mapping consumer page as r/w, but restrict producer page to r/o.
41 * This protects producer position from being modified by user-space
42 * application and ruining in-kernel position tracking.
44 unsigned long consumer_pos
__aligned(PAGE_SIZE
);
45 unsigned long producer_pos
__aligned(PAGE_SIZE
);
46 char data
[] __aligned(PAGE_SIZE
);
49 struct bpf_ringbuf_map
{
51 struct bpf_map_memory memory
;
52 struct bpf_ringbuf
*rb
;
55 /* 8-byte ring buffer record header structure */
56 struct bpf_ringbuf_hdr
{
61 static struct bpf_ringbuf
*bpf_ringbuf_area_alloc(size_t data_sz
, int numa_node
)
63 const gfp_t flags
= GFP_KERNEL
| __GFP_RETRY_MAYFAIL
| __GFP_NOWARN
|
65 int nr_meta_pages
= RINGBUF_PGOFF
+ RINGBUF_POS_PAGES
;
66 int nr_data_pages
= data_sz
>> PAGE_SHIFT
;
67 int nr_pages
= nr_meta_pages
+ nr_data_pages
;
68 struct page
**pages
, *page
;
69 struct bpf_ringbuf
*rb
;
73 /* Each data page is mapped twice to allow "virtual"
74 * continuous read of samples wrapping around the end of ring
76 * ------------------------------------------------------
77 * | meta pages | real data pages | same data pages |
78 * ------------------------------------------------------
79 * | | 1 2 3 4 5 6 7 8 9 | 1 2 3 4 5 6 7 8 9 |
80 * ------------------------------------------------------
82 * ------------------------------------------------------
85 * Here, no need to worry about special handling of wrapped-around
86 * data due to double-mapped data pages. This works both in kernel and
87 * when mmap()'ed in user-space, simplifying both kernel and
88 * user-space implementations significantly.
90 array_size
= (nr_meta_pages
+ 2 * nr_data_pages
) * sizeof(*pages
);
91 if (array_size
> PAGE_SIZE
)
92 pages
= vmalloc_node(array_size
, numa_node
);
94 pages
= kmalloc_node(array_size
, flags
, numa_node
);
98 for (i
= 0; i
< nr_pages
; i
++) {
99 page
= alloc_pages_node(numa_node
, flags
, 0);
105 if (i
>= nr_meta_pages
)
106 pages
[nr_data_pages
+ i
] = page
;
109 rb
= vmap(pages
, nr_meta_pages
+ 2 * nr_data_pages
,
110 VM_ALLOC
| VM_USERMAP
, PAGE_KERNEL
);
113 rb
->nr_pages
= nr_pages
;
118 for (i
= 0; i
< nr_pages
; i
++)
119 __free_page(pages
[i
]);
124 static void bpf_ringbuf_notify(struct irq_work
*work
)
126 struct bpf_ringbuf
*rb
= container_of(work
, struct bpf_ringbuf
, work
);
128 wake_up_all(&rb
->waitq
);
131 static struct bpf_ringbuf
*bpf_ringbuf_alloc(size_t data_sz
, int numa_node
)
133 struct bpf_ringbuf
*rb
;
135 rb
= bpf_ringbuf_area_alloc(data_sz
, numa_node
);
137 return ERR_PTR(-ENOMEM
);
139 spin_lock_init(&rb
->spinlock
);
140 init_waitqueue_head(&rb
->waitq
);
141 init_irq_work(&rb
->work
, bpf_ringbuf_notify
);
143 rb
->mask
= data_sz
- 1;
144 rb
->consumer_pos
= 0;
145 rb
->producer_pos
= 0;
150 static struct bpf_map
*ringbuf_map_alloc(union bpf_attr
*attr
)
152 struct bpf_ringbuf_map
*rb_map
;
156 if (attr
->map_flags
& ~RINGBUF_CREATE_FLAG_MASK
)
157 return ERR_PTR(-EINVAL
);
159 if (attr
->key_size
|| attr
->value_size
||
160 !is_power_of_2(attr
->max_entries
) ||
161 !PAGE_ALIGNED(attr
->max_entries
))
162 return ERR_PTR(-EINVAL
);
165 /* on 32-bit arch, it's impossible to overflow record's hdr->pgoff */
166 if (attr
->max_entries
> RINGBUF_MAX_DATA_SZ
)
167 return ERR_PTR(-E2BIG
);
170 rb_map
= kzalloc(sizeof(*rb_map
), GFP_USER
);
172 return ERR_PTR(-ENOMEM
);
174 bpf_map_init_from_attr(&rb_map
->map
, attr
);
176 cost
= sizeof(struct bpf_ringbuf_map
) +
177 sizeof(struct bpf_ringbuf
) +
179 err
= bpf_map_charge_init(&rb_map
->map
.memory
, cost
);
183 rb_map
->rb
= bpf_ringbuf_alloc(attr
->max_entries
, rb_map
->map
.numa_node
);
184 if (IS_ERR(rb_map
->rb
)) {
185 err
= PTR_ERR(rb_map
->rb
);
192 bpf_map_charge_finish(&rb_map
->map
.memory
);
198 static void bpf_ringbuf_free(struct bpf_ringbuf
*rb
)
200 /* copy pages pointer and nr_pages to local variable, as we are going
201 * to unmap rb itself with vunmap() below
203 struct page
**pages
= rb
->pages
;
204 int i
, nr_pages
= rb
->nr_pages
;
207 for (i
= 0; i
< nr_pages
; i
++)
208 __free_page(pages
[i
]);
212 static void ringbuf_map_free(struct bpf_map
*map
)
214 struct bpf_ringbuf_map
*rb_map
;
216 /* at this point bpf_prog->aux->refcnt == 0 and this map->refcnt == 0,
217 * so the programs (can be more than one that used this map) were
218 * disconnected from events. Wait for outstanding critical sections in
219 * these programs to complete
223 rb_map
= container_of(map
, struct bpf_ringbuf_map
, map
);
224 bpf_ringbuf_free(rb_map
->rb
);
228 static void *ringbuf_map_lookup_elem(struct bpf_map
*map
, void *key
)
230 return ERR_PTR(-ENOTSUPP
);
233 static int ringbuf_map_update_elem(struct bpf_map
*map
, void *key
, void *value
,
239 static int ringbuf_map_delete_elem(struct bpf_map
*map
, void *key
)
244 static int ringbuf_map_get_next_key(struct bpf_map
*map
, void *key
,
250 static size_t bpf_ringbuf_mmap_page_cnt(const struct bpf_ringbuf
*rb
)
252 size_t data_pages
= (rb
->mask
+ 1) >> PAGE_SHIFT
;
254 /* consumer page + producer page + 2 x data pages */
255 return RINGBUF_POS_PAGES
+ 2 * data_pages
;
258 static int ringbuf_map_mmap(struct bpf_map
*map
, struct vm_area_struct
*vma
)
260 struct bpf_ringbuf_map
*rb_map
;
263 rb_map
= container_of(map
, struct bpf_ringbuf_map
, map
);
264 mmap_sz
= bpf_ringbuf_mmap_page_cnt(rb_map
->rb
) << PAGE_SHIFT
;
266 if (vma
->vm_pgoff
* PAGE_SIZE
+ (vma
->vm_end
- vma
->vm_start
) > mmap_sz
)
269 return remap_vmalloc_range(vma
, rb_map
->rb
,
270 vma
->vm_pgoff
+ RINGBUF_PGOFF
);
273 static unsigned long ringbuf_avail_data_sz(struct bpf_ringbuf
*rb
)
275 unsigned long cons_pos
, prod_pos
;
277 cons_pos
= smp_load_acquire(&rb
->consumer_pos
);
278 prod_pos
= smp_load_acquire(&rb
->producer_pos
);
279 return prod_pos
- cons_pos
;
282 static __poll_t
ringbuf_map_poll(struct bpf_map
*map
, struct file
*filp
,
283 struct poll_table_struct
*pts
)
285 struct bpf_ringbuf_map
*rb_map
;
287 rb_map
= container_of(map
, struct bpf_ringbuf_map
, map
);
288 poll_wait(filp
, &rb_map
->rb
->waitq
, pts
);
290 if (ringbuf_avail_data_sz(rb_map
->rb
))
291 return EPOLLIN
| EPOLLRDNORM
;
295 const struct bpf_map_ops ringbuf_map_ops
= {
296 .map_alloc
= ringbuf_map_alloc
,
297 .map_free
= ringbuf_map_free
,
298 .map_mmap
= ringbuf_map_mmap
,
299 .map_poll
= ringbuf_map_poll
,
300 .map_lookup_elem
= ringbuf_map_lookup_elem
,
301 .map_update_elem
= ringbuf_map_update_elem
,
302 .map_delete_elem
= ringbuf_map_delete_elem
,
303 .map_get_next_key
= ringbuf_map_get_next_key
,
306 /* Given pointer to ring buffer record metadata and struct bpf_ringbuf itself,
307 * calculate offset from record metadata to ring buffer in pages, rounded
308 * down. This page offset is stored as part of record metadata and allows to
309 * restore struct bpf_ringbuf * from record pointer. This page offset is
310 * stored at offset 4 of record metadata header.
312 static size_t bpf_ringbuf_rec_pg_off(struct bpf_ringbuf
*rb
,
313 struct bpf_ringbuf_hdr
*hdr
)
315 return ((void *)hdr
- (void *)rb
) >> PAGE_SHIFT
;
318 /* Given pointer to ring buffer record header, restore pointer to struct
319 * bpf_ringbuf itself by using page offset stored at offset 4
321 static struct bpf_ringbuf
*
322 bpf_ringbuf_restore_from_rec(struct bpf_ringbuf_hdr
*hdr
)
324 unsigned long addr
= (unsigned long)(void *)hdr
;
325 unsigned long off
= (unsigned long)hdr
->pg_off
<< PAGE_SHIFT
;
327 return (void*)((addr
& PAGE_MASK
) - off
);
330 static void *__bpf_ringbuf_reserve(struct bpf_ringbuf
*rb
, u64 size
)
332 unsigned long cons_pos
, prod_pos
, new_prod_pos
, flags
;
334 struct bpf_ringbuf_hdr
*hdr
;
336 if (unlikely(size
> RINGBUF_MAX_RECORD_SZ
))
339 len
= round_up(size
+ BPF_RINGBUF_HDR_SZ
, 8);
340 cons_pos
= smp_load_acquire(&rb
->consumer_pos
);
343 if (!spin_trylock_irqsave(&rb
->spinlock
, flags
))
346 spin_lock_irqsave(&rb
->spinlock
, flags
);
349 prod_pos
= rb
->producer_pos
;
350 new_prod_pos
= prod_pos
+ len
;
352 /* check for out of ringbuf space by ensuring producer position
353 * doesn't advance more than (ringbuf_size - 1) ahead
355 if (new_prod_pos
- cons_pos
> rb
->mask
) {
356 spin_unlock_irqrestore(&rb
->spinlock
, flags
);
360 hdr
= (void *)rb
->data
+ (prod_pos
& rb
->mask
);
361 pg_off
= bpf_ringbuf_rec_pg_off(rb
, hdr
);
362 hdr
->len
= size
| BPF_RINGBUF_BUSY_BIT
;
363 hdr
->pg_off
= pg_off
;
365 /* pairs with consumer's smp_load_acquire() */
366 smp_store_release(&rb
->producer_pos
, new_prod_pos
);
368 spin_unlock_irqrestore(&rb
->spinlock
, flags
);
370 return (void *)hdr
+ BPF_RINGBUF_HDR_SZ
;
373 BPF_CALL_3(bpf_ringbuf_reserve
, struct bpf_map
*, map
, u64
, size
, u64
, flags
)
375 struct bpf_ringbuf_map
*rb_map
;
380 rb_map
= container_of(map
, struct bpf_ringbuf_map
, map
);
381 return (unsigned long)__bpf_ringbuf_reserve(rb_map
->rb
, size
);
384 const struct bpf_func_proto bpf_ringbuf_reserve_proto
= {
385 .func
= bpf_ringbuf_reserve
,
386 .ret_type
= RET_PTR_TO_ALLOC_MEM_OR_NULL
,
387 .arg1_type
= ARG_CONST_MAP_PTR
,
388 .arg2_type
= ARG_CONST_ALLOC_SIZE_OR_ZERO
,
389 .arg3_type
= ARG_ANYTHING
,
392 static void bpf_ringbuf_commit(void *sample
, u64 flags
, bool discard
)
394 unsigned long rec_pos
, cons_pos
;
395 struct bpf_ringbuf_hdr
*hdr
;
396 struct bpf_ringbuf
*rb
;
399 hdr
= sample
- BPF_RINGBUF_HDR_SZ
;
400 rb
= bpf_ringbuf_restore_from_rec(hdr
);
401 new_len
= hdr
->len
^ BPF_RINGBUF_BUSY_BIT
;
403 new_len
|= BPF_RINGBUF_DISCARD_BIT
;
405 /* update record header with correct final size prefix */
406 xchg(&hdr
->len
, new_len
);
408 /* if consumer caught up and is waiting for our record, notify about
409 * new data availability
411 rec_pos
= (void *)hdr
- (void *)rb
->data
;
412 cons_pos
= smp_load_acquire(&rb
->consumer_pos
) & rb
->mask
;
414 if (flags
& BPF_RB_FORCE_WAKEUP
)
415 irq_work_queue(&rb
->work
);
416 else if (cons_pos
== rec_pos
&& !(flags
& BPF_RB_NO_WAKEUP
))
417 irq_work_queue(&rb
->work
);
420 BPF_CALL_2(bpf_ringbuf_submit
, void *, sample
, u64
, flags
)
422 bpf_ringbuf_commit(sample
, flags
, false /* discard */);
426 const struct bpf_func_proto bpf_ringbuf_submit_proto
= {
427 .func
= bpf_ringbuf_submit
,
428 .ret_type
= RET_VOID
,
429 .arg1_type
= ARG_PTR_TO_ALLOC_MEM
,
430 .arg2_type
= ARG_ANYTHING
,
433 BPF_CALL_2(bpf_ringbuf_discard
, void *, sample
, u64
, flags
)
435 bpf_ringbuf_commit(sample
, flags
, true /* discard */);
439 const struct bpf_func_proto bpf_ringbuf_discard_proto
= {
440 .func
= bpf_ringbuf_discard
,
441 .ret_type
= RET_VOID
,
442 .arg1_type
= ARG_PTR_TO_ALLOC_MEM
,
443 .arg2_type
= ARG_ANYTHING
,
446 BPF_CALL_4(bpf_ringbuf_output
, struct bpf_map
*, map
, void *, data
, u64
, size
,
449 struct bpf_ringbuf_map
*rb_map
;
452 if (unlikely(flags
& ~(BPF_RB_NO_WAKEUP
| BPF_RB_FORCE_WAKEUP
)))
455 rb_map
= container_of(map
, struct bpf_ringbuf_map
, map
);
456 rec
= __bpf_ringbuf_reserve(rb_map
->rb
, size
);
460 memcpy(rec
, data
, size
);
461 bpf_ringbuf_commit(rec
, flags
, false /* discard */);
465 const struct bpf_func_proto bpf_ringbuf_output_proto
= {
466 .func
= bpf_ringbuf_output
,
467 .ret_type
= RET_INTEGER
,
468 .arg1_type
= ARG_CONST_MAP_PTR
,
469 .arg2_type
= ARG_PTR_TO_MEM
,
470 .arg3_type
= ARG_CONST_SIZE_OR_ZERO
,
471 .arg4_type
= ARG_ANYTHING
,
474 BPF_CALL_2(bpf_ringbuf_query
, struct bpf_map
*, map
, u64
, flags
)
476 struct bpf_ringbuf
*rb
;
478 rb
= container_of(map
, struct bpf_ringbuf_map
, map
)->rb
;
481 case BPF_RB_AVAIL_DATA
:
482 return ringbuf_avail_data_sz(rb
);
483 case BPF_RB_RING_SIZE
:
485 case BPF_RB_CONS_POS
:
486 return smp_load_acquire(&rb
->consumer_pos
);
487 case BPF_RB_PROD_POS
:
488 return smp_load_acquire(&rb
->producer_pos
);
494 const struct bpf_func_proto bpf_ringbuf_query_proto
= {
495 .func
= bpf_ringbuf_query
,
496 .ret_type
= RET_INTEGER
,
497 .arg1_type
= ARG_CONST_MAP_PTR
,
498 .arg2_type
= ARG_ANYTHING
,