4 #include <linux/irq_work.h>
5 #include <linux/slab.h>
6 #include <linux/filter.h>
8 #include <linux/vmalloc.h>
9 #include <linux/wait.h>
10 #include <linux/poll.h>
11 #include <uapi/linux/btf.h>
13 #define RINGBUF_CREATE_FLAG_MASK (BPF_F_NUMA_NODE)
15 /* non-mmap()'able part of bpf_ringbuf (everything up to consumer page) */
16 #define RINGBUF_PGOFF \
17 (offsetof(struct bpf_ringbuf, consumer_pos) >> PAGE_SHIFT)
18 /* consumer page and producer page */
19 #define RINGBUF_POS_PAGES 2
21 #define RINGBUF_MAX_RECORD_SZ (UINT_MAX/4)
23 /* Maximum size of ring buffer area is limited by 32-bit page offset within
24 * record header, counted in pages. Reserve 8 bits for extensibility, and take
25 * into account few extra pages for consumer/producer pages and
26 * non-mmap()'able parts. This gives 64GB limit, which seems plenty for single
29 #define RINGBUF_MAX_DATA_SZ \
30 (((1ULL << 24) - RINGBUF_POS_PAGES - RINGBUF_PGOFF) * PAGE_SIZE)
33 wait_queue_head_t waitq
;
38 spinlock_t spinlock ____cacheline_aligned_in_smp
;
39 /* Consumer and producer counters are put into separate pages to allow
40 * mapping consumer page as r/w, but restrict producer page to r/o.
41 * This protects producer position from being modified by user-space
42 * application and ruining in-kernel position tracking.
44 unsigned long consumer_pos
__aligned(PAGE_SIZE
);
45 unsigned long producer_pos
__aligned(PAGE_SIZE
);
46 char data
[] __aligned(PAGE_SIZE
);
49 struct bpf_ringbuf_map
{
51 struct bpf_map_memory memory
;
52 struct bpf_ringbuf
*rb
;
55 /* 8-byte ring buffer record header structure */
56 struct bpf_ringbuf_hdr
{
61 static struct bpf_ringbuf
*bpf_ringbuf_area_alloc(size_t data_sz
, int numa_node
)
63 const gfp_t flags
= GFP_KERNEL
| __GFP_RETRY_MAYFAIL
| __GFP_NOWARN
|
65 int nr_meta_pages
= RINGBUF_PGOFF
+ RINGBUF_POS_PAGES
;
66 int nr_data_pages
= data_sz
>> PAGE_SHIFT
;
67 int nr_pages
= nr_meta_pages
+ nr_data_pages
;
68 struct page
**pages
, *page
;
69 struct bpf_ringbuf
*rb
;
73 /* Each data page is mapped twice to allow "virtual"
74 * continuous read of samples wrapping around the end of ring
76 * ------------------------------------------------------
77 * | meta pages | real data pages | same data pages |
78 * ------------------------------------------------------
79 * | | 1 2 3 4 5 6 7 8 9 | 1 2 3 4 5 6 7 8 9 |
80 * ------------------------------------------------------
82 * ------------------------------------------------------
85 * Here, no need to worry about special handling of wrapped-around
86 * data due to double-mapped data pages. This works both in kernel and
87 * when mmap()'ed in user-space, simplifying both kernel and
88 * user-space implementations significantly.
90 array_size
= (nr_meta_pages
+ 2 * nr_data_pages
) * sizeof(*pages
);
91 if (array_size
> PAGE_SIZE
)
92 pages
= vmalloc_node(array_size
, numa_node
);
94 pages
= kmalloc_node(array_size
, flags
, numa_node
);
98 for (i
= 0; i
< nr_pages
; i
++) {
99 page
= alloc_pages_node(numa_node
, flags
, 0);
105 if (i
>= nr_meta_pages
)
106 pages
[nr_data_pages
+ i
] = page
;
109 rb
= vmap(pages
, nr_meta_pages
+ 2 * nr_data_pages
,
110 VM_ALLOC
| VM_USERMAP
, PAGE_KERNEL
);
113 rb
->nr_pages
= nr_pages
;
118 for (i
= 0; i
< nr_pages
; i
++)
119 __free_page(pages
[i
]);
124 static void bpf_ringbuf_notify(struct irq_work
*work
)
126 struct bpf_ringbuf
*rb
= container_of(work
, struct bpf_ringbuf
, work
);
128 wake_up_all(&rb
->waitq
);
131 static struct bpf_ringbuf
*bpf_ringbuf_alloc(size_t data_sz
, int numa_node
)
133 struct bpf_ringbuf
*rb
;
135 rb
= bpf_ringbuf_area_alloc(data_sz
, numa_node
);
137 return ERR_PTR(-ENOMEM
);
139 spin_lock_init(&rb
->spinlock
);
140 init_waitqueue_head(&rb
->waitq
);
141 init_irq_work(&rb
->work
, bpf_ringbuf_notify
);
143 rb
->mask
= data_sz
- 1;
144 rb
->consumer_pos
= 0;
145 rb
->producer_pos
= 0;
150 static struct bpf_map
*ringbuf_map_alloc(union bpf_attr
*attr
)
152 struct bpf_ringbuf_map
*rb_map
;
156 if (attr
->map_flags
& ~RINGBUF_CREATE_FLAG_MASK
)
157 return ERR_PTR(-EINVAL
);
159 if (attr
->key_size
|| attr
->value_size
||
160 !is_power_of_2(attr
->max_entries
) ||
161 !PAGE_ALIGNED(attr
->max_entries
))
162 return ERR_PTR(-EINVAL
);
165 /* on 32-bit arch, it's impossible to overflow record's hdr->pgoff */
166 if (attr
->max_entries
> RINGBUF_MAX_DATA_SZ
)
167 return ERR_PTR(-E2BIG
);
170 rb_map
= kzalloc(sizeof(*rb_map
), GFP_USER
);
172 return ERR_PTR(-ENOMEM
);
174 bpf_map_init_from_attr(&rb_map
->map
, attr
);
176 cost
= sizeof(struct bpf_ringbuf_map
) +
177 sizeof(struct bpf_ringbuf
) +
179 err
= bpf_map_charge_init(&rb_map
->map
.memory
, cost
);
183 rb_map
->rb
= bpf_ringbuf_alloc(attr
->max_entries
, rb_map
->map
.numa_node
);
184 if (IS_ERR(rb_map
->rb
)) {
185 err
= PTR_ERR(rb_map
->rb
);
192 bpf_map_charge_finish(&rb_map
->map
.memory
);
198 static void bpf_ringbuf_free(struct bpf_ringbuf
*rb
)
200 /* copy pages pointer and nr_pages to local variable, as we are going
201 * to unmap rb itself with vunmap() below
203 struct page
**pages
= rb
->pages
;
204 int i
, nr_pages
= rb
->nr_pages
;
207 for (i
= 0; i
< nr_pages
; i
++)
208 __free_page(pages
[i
]);
212 static void ringbuf_map_free(struct bpf_map
*map
)
214 struct bpf_ringbuf_map
*rb_map
;
216 rb_map
= container_of(map
, struct bpf_ringbuf_map
, map
);
217 bpf_ringbuf_free(rb_map
->rb
);
221 static void *ringbuf_map_lookup_elem(struct bpf_map
*map
, void *key
)
223 return ERR_PTR(-ENOTSUPP
);
226 static int ringbuf_map_update_elem(struct bpf_map
*map
, void *key
, void *value
,
232 static int ringbuf_map_delete_elem(struct bpf_map
*map
, void *key
)
237 static int ringbuf_map_get_next_key(struct bpf_map
*map
, void *key
,
243 static size_t bpf_ringbuf_mmap_page_cnt(const struct bpf_ringbuf
*rb
)
245 size_t data_pages
= (rb
->mask
+ 1) >> PAGE_SHIFT
;
247 /* consumer page + producer page + 2 x data pages */
248 return RINGBUF_POS_PAGES
+ 2 * data_pages
;
251 static int ringbuf_map_mmap(struct bpf_map
*map
, struct vm_area_struct
*vma
)
253 struct bpf_ringbuf_map
*rb_map
;
256 rb_map
= container_of(map
, struct bpf_ringbuf_map
, map
);
257 mmap_sz
= bpf_ringbuf_mmap_page_cnt(rb_map
->rb
) << PAGE_SHIFT
;
259 if (vma
->vm_pgoff
* PAGE_SIZE
+ (vma
->vm_end
- vma
->vm_start
) > mmap_sz
)
262 return remap_vmalloc_range(vma
, rb_map
->rb
,
263 vma
->vm_pgoff
+ RINGBUF_PGOFF
);
266 static unsigned long ringbuf_avail_data_sz(struct bpf_ringbuf
*rb
)
268 unsigned long cons_pos
, prod_pos
;
270 cons_pos
= smp_load_acquire(&rb
->consumer_pos
);
271 prod_pos
= smp_load_acquire(&rb
->producer_pos
);
272 return prod_pos
- cons_pos
;
275 static __poll_t
ringbuf_map_poll(struct bpf_map
*map
, struct file
*filp
,
276 struct poll_table_struct
*pts
)
278 struct bpf_ringbuf_map
*rb_map
;
280 rb_map
= container_of(map
, struct bpf_ringbuf_map
, map
);
281 poll_wait(filp
, &rb_map
->rb
->waitq
, pts
);
283 if (ringbuf_avail_data_sz(rb_map
->rb
))
284 return EPOLLIN
| EPOLLRDNORM
;
288 static int ringbuf_map_btf_id
;
289 const struct bpf_map_ops ringbuf_map_ops
= {
290 .map_alloc
= ringbuf_map_alloc
,
291 .map_free
= ringbuf_map_free
,
292 .map_mmap
= ringbuf_map_mmap
,
293 .map_poll
= ringbuf_map_poll
,
294 .map_lookup_elem
= ringbuf_map_lookup_elem
,
295 .map_update_elem
= ringbuf_map_update_elem
,
296 .map_delete_elem
= ringbuf_map_delete_elem
,
297 .map_get_next_key
= ringbuf_map_get_next_key
,
298 .map_btf_name
= "bpf_ringbuf_map",
299 .map_btf_id
= &ringbuf_map_btf_id
,
302 /* Given pointer to ring buffer record metadata and struct bpf_ringbuf itself,
303 * calculate offset from record metadata to ring buffer in pages, rounded
304 * down. This page offset is stored as part of record metadata and allows to
305 * restore struct bpf_ringbuf * from record pointer. This page offset is
306 * stored at offset 4 of record metadata header.
308 static size_t bpf_ringbuf_rec_pg_off(struct bpf_ringbuf
*rb
,
309 struct bpf_ringbuf_hdr
*hdr
)
311 return ((void *)hdr
- (void *)rb
) >> PAGE_SHIFT
;
314 /* Given pointer to ring buffer record header, restore pointer to struct
315 * bpf_ringbuf itself by using page offset stored at offset 4
317 static struct bpf_ringbuf
*
318 bpf_ringbuf_restore_from_rec(struct bpf_ringbuf_hdr
*hdr
)
320 unsigned long addr
= (unsigned long)(void *)hdr
;
321 unsigned long off
= (unsigned long)hdr
->pg_off
<< PAGE_SHIFT
;
323 return (void*)((addr
& PAGE_MASK
) - off
);
326 static void *__bpf_ringbuf_reserve(struct bpf_ringbuf
*rb
, u64 size
)
328 unsigned long cons_pos
, prod_pos
, new_prod_pos
, flags
;
330 struct bpf_ringbuf_hdr
*hdr
;
332 if (unlikely(size
> RINGBUF_MAX_RECORD_SZ
))
335 len
= round_up(size
+ BPF_RINGBUF_HDR_SZ
, 8);
336 cons_pos
= smp_load_acquire(&rb
->consumer_pos
);
339 if (!spin_trylock_irqsave(&rb
->spinlock
, flags
))
342 spin_lock_irqsave(&rb
->spinlock
, flags
);
345 prod_pos
= rb
->producer_pos
;
346 new_prod_pos
= prod_pos
+ len
;
348 /* check for out of ringbuf space by ensuring producer position
349 * doesn't advance more than (ringbuf_size - 1) ahead
351 if (new_prod_pos
- cons_pos
> rb
->mask
) {
352 spin_unlock_irqrestore(&rb
->spinlock
, flags
);
356 hdr
= (void *)rb
->data
+ (prod_pos
& rb
->mask
);
357 pg_off
= bpf_ringbuf_rec_pg_off(rb
, hdr
);
358 hdr
->len
= size
| BPF_RINGBUF_BUSY_BIT
;
359 hdr
->pg_off
= pg_off
;
361 /* pairs with consumer's smp_load_acquire() */
362 smp_store_release(&rb
->producer_pos
, new_prod_pos
);
364 spin_unlock_irqrestore(&rb
->spinlock
, flags
);
366 return (void *)hdr
+ BPF_RINGBUF_HDR_SZ
;
369 BPF_CALL_3(bpf_ringbuf_reserve
, struct bpf_map
*, map
, u64
, size
, u64
, flags
)
371 struct bpf_ringbuf_map
*rb_map
;
376 rb_map
= container_of(map
, struct bpf_ringbuf_map
, map
);
377 return (unsigned long)__bpf_ringbuf_reserve(rb_map
->rb
, size
);
380 const struct bpf_func_proto bpf_ringbuf_reserve_proto
= {
381 .func
= bpf_ringbuf_reserve
,
382 .ret_type
= RET_PTR_TO_ALLOC_MEM_OR_NULL
,
383 .arg1_type
= ARG_CONST_MAP_PTR
,
384 .arg2_type
= ARG_CONST_ALLOC_SIZE_OR_ZERO
,
385 .arg3_type
= ARG_ANYTHING
,
388 static void bpf_ringbuf_commit(void *sample
, u64 flags
, bool discard
)
390 unsigned long rec_pos
, cons_pos
;
391 struct bpf_ringbuf_hdr
*hdr
;
392 struct bpf_ringbuf
*rb
;
395 hdr
= sample
- BPF_RINGBUF_HDR_SZ
;
396 rb
= bpf_ringbuf_restore_from_rec(hdr
);
397 new_len
= hdr
->len
^ BPF_RINGBUF_BUSY_BIT
;
399 new_len
|= BPF_RINGBUF_DISCARD_BIT
;
401 /* update record header with correct final size prefix */
402 xchg(&hdr
->len
, new_len
);
404 /* if consumer caught up and is waiting for our record, notify about
405 * new data availability
407 rec_pos
= (void *)hdr
- (void *)rb
->data
;
408 cons_pos
= smp_load_acquire(&rb
->consumer_pos
) & rb
->mask
;
410 if (flags
& BPF_RB_FORCE_WAKEUP
)
411 irq_work_queue(&rb
->work
);
412 else if (cons_pos
== rec_pos
&& !(flags
& BPF_RB_NO_WAKEUP
))
413 irq_work_queue(&rb
->work
);
416 BPF_CALL_2(bpf_ringbuf_submit
, void *, sample
, u64
, flags
)
418 bpf_ringbuf_commit(sample
, flags
, false /* discard */);
422 const struct bpf_func_proto bpf_ringbuf_submit_proto
= {
423 .func
= bpf_ringbuf_submit
,
424 .ret_type
= RET_VOID
,
425 .arg1_type
= ARG_PTR_TO_ALLOC_MEM
,
426 .arg2_type
= ARG_ANYTHING
,
429 BPF_CALL_2(bpf_ringbuf_discard
, void *, sample
, u64
, flags
)
431 bpf_ringbuf_commit(sample
, flags
, true /* discard */);
435 const struct bpf_func_proto bpf_ringbuf_discard_proto
= {
436 .func
= bpf_ringbuf_discard
,
437 .ret_type
= RET_VOID
,
438 .arg1_type
= ARG_PTR_TO_ALLOC_MEM
,
439 .arg2_type
= ARG_ANYTHING
,
442 BPF_CALL_4(bpf_ringbuf_output
, struct bpf_map
*, map
, void *, data
, u64
, size
,
445 struct bpf_ringbuf_map
*rb_map
;
448 if (unlikely(flags
& ~(BPF_RB_NO_WAKEUP
| BPF_RB_FORCE_WAKEUP
)))
451 rb_map
= container_of(map
, struct bpf_ringbuf_map
, map
);
452 rec
= __bpf_ringbuf_reserve(rb_map
->rb
, size
);
456 memcpy(rec
, data
, size
);
457 bpf_ringbuf_commit(rec
, flags
, false /* discard */);
461 const struct bpf_func_proto bpf_ringbuf_output_proto
= {
462 .func
= bpf_ringbuf_output
,
463 .ret_type
= RET_INTEGER
,
464 .arg1_type
= ARG_CONST_MAP_PTR
,
465 .arg2_type
= ARG_PTR_TO_MEM
,
466 .arg3_type
= ARG_CONST_SIZE_OR_ZERO
,
467 .arg4_type
= ARG_ANYTHING
,
470 BPF_CALL_2(bpf_ringbuf_query
, struct bpf_map
*, map
, u64
, flags
)
472 struct bpf_ringbuf
*rb
;
474 rb
= container_of(map
, struct bpf_ringbuf_map
, map
)->rb
;
477 case BPF_RB_AVAIL_DATA
:
478 return ringbuf_avail_data_sz(rb
);
479 case BPF_RB_RING_SIZE
:
481 case BPF_RB_CONS_POS
:
482 return smp_load_acquire(&rb
->consumer_pos
);
483 case BPF_RB_PROD_POS
:
484 return smp_load_acquire(&rb
->producer_pos
);
490 const struct bpf_func_proto bpf_ringbuf_query_proto
= {
491 .func
= bpf_ringbuf_query
,
492 .ret_type
= RET_INTEGER
,
493 .arg1_type
= ARG_CONST_MAP_PTR
,
494 .arg2_type
= ARG_ANYTHING
,