4 #include <linux/irq_work.h>
5 #include <linux/slab.h>
6 #include <linux/filter.h>
8 #include <linux/vmalloc.h>
9 #include <linux/wait.h>
10 #include <linux/poll.h>
11 #include <uapi/linux/btf.h>
13 #define RINGBUF_CREATE_FLAG_MASK (BPF_F_NUMA_NODE)
15 /* non-mmap()'able part of bpf_ringbuf (everything up to consumer page) */
16 #define RINGBUF_PGOFF \
17 (offsetof(struct bpf_ringbuf, consumer_pos) >> PAGE_SHIFT)
18 /* consumer page and producer page */
19 #define RINGBUF_POS_PAGES 2
21 #define RINGBUF_MAX_RECORD_SZ (UINT_MAX/4)
23 /* Maximum size of ring buffer area is limited by 32-bit page offset within
24 * record header, counted in pages. Reserve 8 bits for extensibility, and take
25 * into account few extra pages for consumer/producer pages and
26 * non-mmap()'able parts. This gives 64GB limit, which seems plenty for single
29 #define RINGBUF_MAX_DATA_SZ \
30 (((1ULL << 24) - RINGBUF_POS_PAGES - RINGBUF_PGOFF) * PAGE_SIZE)
33 wait_queue_head_t waitq
;
38 spinlock_t spinlock ____cacheline_aligned_in_smp
;
39 /* Consumer and producer counters are put into separate pages to allow
40 * mapping consumer page as r/w, but restrict producer page to r/o.
41 * This protects producer position from being modified by user-space
42 * application and ruining in-kernel position tracking.
44 unsigned long consumer_pos
__aligned(PAGE_SIZE
);
45 unsigned long producer_pos
__aligned(PAGE_SIZE
);
46 char data
[] __aligned(PAGE_SIZE
);
49 struct bpf_ringbuf_map
{
51 struct bpf_ringbuf
*rb
;
54 /* 8-byte ring buffer record header structure */
55 struct bpf_ringbuf_hdr
{
60 static struct bpf_ringbuf
*bpf_ringbuf_area_alloc(size_t data_sz
, int numa_node
)
62 const gfp_t flags
= GFP_KERNEL_ACCOUNT
| __GFP_RETRY_MAYFAIL
|
63 __GFP_NOWARN
| __GFP_ZERO
;
64 int nr_meta_pages
= RINGBUF_PGOFF
+ RINGBUF_POS_PAGES
;
65 int nr_data_pages
= data_sz
>> PAGE_SHIFT
;
66 int nr_pages
= nr_meta_pages
+ nr_data_pages
;
67 struct page
**pages
, *page
;
68 struct bpf_ringbuf
*rb
;
72 /* Each data page is mapped twice to allow "virtual"
73 * continuous read of samples wrapping around the end of ring
75 * ------------------------------------------------------
76 * | meta pages | real data pages | same data pages |
77 * ------------------------------------------------------
78 * | | 1 2 3 4 5 6 7 8 9 | 1 2 3 4 5 6 7 8 9 |
79 * ------------------------------------------------------
81 * ------------------------------------------------------
84 * Here, no need to worry about special handling of wrapped-around
85 * data due to double-mapped data pages. This works both in kernel and
86 * when mmap()'ed in user-space, simplifying both kernel and
87 * user-space implementations significantly.
89 array_size
= (nr_meta_pages
+ 2 * nr_data_pages
) * sizeof(*pages
);
90 pages
= bpf_map_area_alloc(array_size
, numa_node
);
94 for (i
= 0; i
< nr_pages
; i
++) {
95 page
= alloc_pages_node(numa_node
, flags
, 0);
101 if (i
>= nr_meta_pages
)
102 pages
[nr_data_pages
+ i
] = page
;
105 rb
= vmap(pages
, nr_meta_pages
+ 2 * nr_data_pages
,
106 VM_ALLOC
| VM_USERMAP
, PAGE_KERNEL
);
109 rb
->nr_pages
= nr_pages
;
114 for (i
= 0; i
< nr_pages
; i
++)
115 __free_page(pages
[i
]);
120 static void bpf_ringbuf_notify(struct irq_work
*work
)
122 struct bpf_ringbuf
*rb
= container_of(work
, struct bpf_ringbuf
, work
);
124 wake_up_all(&rb
->waitq
);
127 static struct bpf_ringbuf
*bpf_ringbuf_alloc(size_t data_sz
, int numa_node
)
129 struct bpf_ringbuf
*rb
;
131 rb
= bpf_ringbuf_area_alloc(data_sz
, numa_node
);
135 spin_lock_init(&rb
->spinlock
);
136 init_waitqueue_head(&rb
->waitq
);
137 init_irq_work(&rb
->work
, bpf_ringbuf_notify
);
139 rb
->mask
= data_sz
- 1;
140 rb
->consumer_pos
= 0;
141 rb
->producer_pos
= 0;
146 static struct bpf_map
*ringbuf_map_alloc(union bpf_attr
*attr
)
148 struct bpf_ringbuf_map
*rb_map
;
150 if (attr
->map_flags
& ~RINGBUF_CREATE_FLAG_MASK
)
151 return ERR_PTR(-EINVAL
);
153 if (attr
->key_size
|| attr
->value_size
||
154 !is_power_of_2(attr
->max_entries
) ||
155 !PAGE_ALIGNED(attr
->max_entries
))
156 return ERR_PTR(-EINVAL
);
159 /* on 32-bit arch, it's impossible to overflow record's hdr->pgoff */
160 if (attr
->max_entries
> RINGBUF_MAX_DATA_SZ
)
161 return ERR_PTR(-E2BIG
);
164 rb_map
= kzalloc(sizeof(*rb_map
), GFP_USER
| __GFP_ACCOUNT
);
166 return ERR_PTR(-ENOMEM
);
168 bpf_map_init_from_attr(&rb_map
->map
, attr
);
170 rb_map
->rb
= bpf_ringbuf_alloc(attr
->max_entries
, rb_map
->map
.numa_node
);
173 return ERR_PTR(-ENOMEM
);
179 static void bpf_ringbuf_free(struct bpf_ringbuf
*rb
)
181 /* copy pages pointer and nr_pages to local variable, as we are going
182 * to unmap rb itself with vunmap() below
184 struct page
**pages
= rb
->pages
;
185 int i
, nr_pages
= rb
->nr_pages
;
188 for (i
= 0; i
< nr_pages
; i
++)
189 __free_page(pages
[i
]);
193 static void ringbuf_map_free(struct bpf_map
*map
)
195 struct bpf_ringbuf_map
*rb_map
;
197 rb_map
= container_of(map
, struct bpf_ringbuf_map
, map
);
198 bpf_ringbuf_free(rb_map
->rb
);
202 static void *ringbuf_map_lookup_elem(struct bpf_map
*map
, void *key
)
204 return ERR_PTR(-ENOTSUPP
);
207 static int ringbuf_map_update_elem(struct bpf_map
*map
, void *key
, void *value
,
213 static int ringbuf_map_delete_elem(struct bpf_map
*map
, void *key
)
218 static int ringbuf_map_get_next_key(struct bpf_map
*map
, void *key
,
224 static size_t bpf_ringbuf_mmap_page_cnt(const struct bpf_ringbuf
*rb
)
226 size_t data_pages
= (rb
->mask
+ 1) >> PAGE_SHIFT
;
228 /* consumer page + producer page + 2 x data pages */
229 return RINGBUF_POS_PAGES
+ 2 * data_pages
;
232 static int ringbuf_map_mmap(struct bpf_map
*map
, struct vm_area_struct
*vma
)
234 struct bpf_ringbuf_map
*rb_map
;
237 rb_map
= container_of(map
, struct bpf_ringbuf_map
, map
);
238 mmap_sz
= bpf_ringbuf_mmap_page_cnt(rb_map
->rb
) << PAGE_SHIFT
;
240 if (vma
->vm_pgoff
* PAGE_SIZE
+ (vma
->vm_end
- vma
->vm_start
) > mmap_sz
)
243 return remap_vmalloc_range(vma
, rb_map
->rb
,
244 vma
->vm_pgoff
+ RINGBUF_PGOFF
);
247 static unsigned long ringbuf_avail_data_sz(struct bpf_ringbuf
*rb
)
249 unsigned long cons_pos
, prod_pos
;
251 cons_pos
= smp_load_acquire(&rb
->consumer_pos
);
252 prod_pos
= smp_load_acquire(&rb
->producer_pos
);
253 return prod_pos
- cons_pos
;
256 static __poll_t
ringbuf_map_poll(struct bpf_map
*map
, struct file
*filp
,
257 struct poll_table_struct
*pts
)
259 struct bpf_ringbuf_map
*rb_map
;
261 rb_map
= container_of(map
, struct bpf_ringbuf_map
, map
);
262 poll_wait(filp
, &rb_map
->rb
->waitq
, pts
);
264 if (ringbuf_avail_data_sz(rb_map
->rb
))
265 return EPOLLIN
| EPOLLRDNORM
;
269 static int ringbuf_map_btf_id
;
270 const struct bpf_map_ops ringbuf_map_ops
= {
271 .map_meta_equal
= bpf_map_meta_equal
,
272 .map_alloc
= ringbuf_map_alloc
,
273 .map_free
= ringbuf_map_free
,
274 .map_mmap
= ringbuf_map_mmap
,
275 .map_poll
= ringbuf_map_poll
,
276 .map_lookup_elem
= ringbuf_map_lookup_elem
,
277 .map_update_elem
= ringbuf_map_update_elem
,
278 .map_delete_elem
= ringbuf_map_delete_elem
,
279 .map_get_next_key
= ringbuf_map_get_next_key
,
280 .map_btf_name
= "bpf_ringbuf_map",
281 .map_btf_id
= &ringbuf_map_btf_id
,
284 /* Given pointer to ring buffer record metadata and struct bpf_ringbuf itself,
285 * calculate offset from record metadata to ring buffer in pages, rounded
286 * down. This page offset is stored as part of record metadata and allows to
287 * restore struct bpf_ringbuf * from record pointer. This page offset is
288 * stored at offset 4 of record metadata header.
290 static size_t bpf_ringbuf_rec_pg_off(struct bpf_ringbuf
*rb
,
291 struct bpf_ringbuf_hdr
*hdr
)
293 return ((void *)hdr
- (void *)rb
) >> PAGE_SHIFT
;
296 /* Given pointer to ring buffer record header, restore pointer to struct
297 * bpf_ringbuf itself by using page offset stored at offset 4
299 static struct bpf_ringbuf
*
300 bpf_ringbuf_restore_from_rec(struct bpf_ringbuf_hdr
*hdr
)
302 unsigned long addr
= (unsigned long)(void *)hdr
;
303 unsigned long off
= (unsigned long)hdr
->pg_off
<< PAGE_SHIFT
;
305 return (void*)((addr
& PAGE_MASK
) - off
);
308 static void *__bpf_ringbuf_reserve(struct bpf_ringbuf
*rb
, u64 size
)
310 unsigned long cons_pos
, prod_pos
, new_prod_pos
, flags
;
312 struct bpf_ringbuf_hdr
*hdr
;
314 if (unlikely(size
> RINGBUF_MAX_RECORD_SZ
))
317 len
= round_up(size
+ BPF_RINGBUF_HDR_SZ
, 8);
318 cons_pos
= smp_load_acquire(&rb
->consumer_pos
);
321 if (!spin_trylock_irqsave(&rb
->spinlock
, flags
))
324 spin_lock_irqsave(&rb
->spinlock
, flags
);
327 prod_pos
= rb
->producer_pos
;
328 new_prod_pos
= prod_pos
+ len
;
330 /* check for out of ringbuf space by ensuring producer position
331 * doesn't advance more than (ringbuf_size - 1) ahead
333 if (new_prod_pos
- cons_pos
> rb
->mask
) {
334 spin_unlock_irqrestore(&rb
->spinlock
, flags
);
338 hdr
= (void *)rb
->data
+ (prod_pos
& rb
->mask
);
339 pg_off
= bpf_ringbuf_rec_pg_off(rb
, hdr
);
340 hdr
->len
= size
| BPF_RINGBUF_BUSY_BIT
;
341 hdr
->pg_off
= pg_off
;
343 /* pairs with consumer's smp_load_acquire() */
344 smp_store_release(&rb
->producer_pos
, new_prod_pos
);
346 spin_unlock_irqrestore(&rb
->spinlock
, flags
);
348 return (void *)hdr
+ BPF_RINGBUF_HDR_SZ
;
351 BPF_CALL_3(bpf_ringbuf_reserve
, struct bpf_map
*, map
, u64
, size
, u64
, flags
)
353 struct bpf_ringbuf_map
*rb_map
;
358 rb_map
= container_of(map
, struct bpf_ringbuf_map
, map
);
359 return (unsigned long)__bpf_ringbuf_reserve(rb_map
->rb
, size
);
362 const struct bpf_func_proto bpf_ringbuf_reserve_proto
= {
363 .func
= bpf_ringbuf_reserve
,
364 .ret_type
= RET_PTR_TO_ALLOC_MEM_OR_NULL
,
365 .arg1_type
= ARG_CONST_MAP_PTR
,
366 .arg2_type
= ARG_CONST_ALLOC_SIZE_OR_ZERO
,
367 .arg3_type
= ARG_ANYTHING
,
370 static void bpf_ringbuf_commit(void *sample
, u64 flags
, bool discard
)
372 unsigned long rec_pos
, cons_pos
;
373 struct bpf_ringbuf_hdr
*hdr
;
374 struct bpf_ringbuf
*rb
;
377 hdr
= sample
- BPF_RINGBUF_HDR_SZ
;
378 rb
= bpf_ringbuf_restore_from_rec(hdr
);
379 new_len
= hdr
->len
^ BPF_RINGBUF_BUSY_BIT
;
381 new_len
|= BPF_RINGBUF_DISCARD_BIT
;
383 /* update record header with correct final size prefix */
384 xchg(&hdr
->len
, new_len
);
386 /* if consumer caught up and is waiting for our record, notify about
387 * new data availability
389 rec_pos
= (void *)hdr
- (void *)rb
->data
;
390 cons_pos
= smp_load_acquire(&rb
->consumer_pos
) & rb
->mask
;
392 if (flags
& BPF_RB_FORCE_WAKEUP
)
393 irq_work_queue(&rb
->work
);
394 else if (cons_pos
== rec_pos
&& !(flags
& BPF_RB_NO_WAKEUP
))
395 irq_work_queue(&rb
->work
);
398 BPF_CALL_2(bpf_ringbuf_submit
, void *, sample
, u64
, flags
)
400 bpf_ringbuf_commit(sample
, flags
, false /* discard */);
404 const struct bpf_func_proto bpf_ringbuf_submit_proto
= {
405 .func
= bpf_ringbuf_submit
,
406 .ret_type
= RET_VOID
,
407 .arg1_type
= ARG_PTR_TO_ALLOC_MEM
,
408 .arg2_type
= ARG_ANYTHING
,
411 BPF_CALL_2(bpf_ringbuf_discard
, void *, sample
, u64
, flags
)
413 bpf_ringbuf_commit(sample
, flags
, true /* discard */);
417 const struct bpf_func_proto bpf_ringbuf_discard_proto
= {
418 .func
= bpf_ringbuf_discard
,
419 .ret_type
= RET_VOID
,
420 .arg1_type
= ARG_PTR_TO_ALLOC_MEM
,
421 .arg2_type
= ARG_ANYTHING
,
424 BPF_CALL_4(bpf_ringbuf_output
, struct bpf_map
*, map
, void *, data
, u64
, size
,
427 struct bpf_ringbuf_map
*rb_map
;
430 if (unlikely(flags
& ~(BPF_RB_NO_WAKEUP
| BPF_RB_FORCE_WAKEUP
)))
433 rb_map
= container_of(map
, struct bpf_ringbuf_map
, map
);
434 rec
= __bpf_ringbuf_reserve(rb_map
->rb
, size
);
438 memcpy(rec
, data
, size
);
439 bpf_ringbuf_commit(rec
, flags
, false /* discard */);
443 const struct bpf_func_proto bpf_ringbuf_output_proto
= {
444 .func
= bpf_ringbuf_output
,
445 .ret_type
= RET_INTEGER
,
446 .arg1_type
= ARG_CONST_MAP_PTR
,
447 .arg2_type
= ARG_PTR_TO_MEM
,
448 .arg3_type
= ARG_CONST_SIZE_OR_ZERO
,
449 .arg4_type
= ARG_ANYTHING
,
452 BPF_CALL_2(bpf_ringbuf_query
, struct bpf_map
*, map
, u64
, flags
)
454 struct bpf_ringbuf
*rb
;
456 rb
= container_of(map
, struct bpf_ringbuf_map
, map
)->rb
;
459 case BPF_RB_AVAIL_DATA
:
460 return ringbuf_avail_data_sz(rb
);
461 case BPF_RB_RING_SIZE
:
463 case BPF_RB_CONS_POS
:
464 return smp_load_acquire(&rb
->consumer_pos
);
465 case BPF_RB_PROD_POS
:
466 return smp_load_acquire(&rb
->producer_pos
);
472 const struct bpf_func_proto bpf_ringbuf_query_proto
= {
473 .func
= bpf_ringbuf_query
,
474 .ret_type
= RET_INTEGER
,
475 .arg1_type
= ARG_CONST_MAP_PTR
,
476 .arg2_type
= ARG_ANYTHING
,