1 // SPDX-License-Identifier: (LGPL-2.1 OR BSD-2-Clause)
3 * Ring buffer operations.
5 * Copyright (C) 2020 Facebook, Inc.
14 #include <linux/err.h>
15 #include <linux/bpf.h>
16 #include <asm/barrier.h>
18 #include <sys/epoll.h>
22 #include "libbpf_internal.h"
24 #include "str_error.h"
27 ring_buffer_sample_fn sample_cb
;
30 unsigned long *consumer_pos
;
31 unsigned long *producer_pos
;
37 struct epoll_event
*events
;
44 struct user_ring_buffer
{
45 struct epoll_event event
;
46 unsigned long *consumer_pos
;
47 unsigned long *producer_pos
;
55 /* 8-byte ring buffer header structure */
61 static void ringbuf_free_ring(struct ring_buffer
*rb
, struct ring
*r
)
63 if (r
->consumer_pos
) {
64 munmap(r
->consumer_pos
, rb
->page_size
);
65 r
->consumer_pos
= NULL
;
67 if (r
->producer_pos
) {
68 munmap(r
->producer_pos
, rb
->page_size
+ 2 * (r
->mask
+ 1));
69 r
->producer_pos
= NULL
;
75 /* Add extra RINGBUF maps to this ring buffer manager */
76 int ring_buffer__add(struct ring_buffer
*rb
, int map_fd
,
77 ring_buffer_sample_fn sample_cb
, void *ctx
)
79 struct bpf_map_info info
;
80 __u32 len
= sizeof(info
);
81 struct epoll_event
*e
;
87 memset(&info
, 0, sizeof(info
));
89 err
= bpf_map_get_info_by_fd(map_fd
, &info
, &len
);
92 pr_warn("ringbuf: failed to get map info for fd=%d: %s\n",
94 return libbpf_err(err
);
97 if (info
.type
!= BPF_MAP_TYPE_RINGBUF
) {
98 pr_warn("ringbuf: map fd=%d is not BPF_MAP_TYPE_RINGBUF\n",
100 return libbpf_err(-EINVAL
);
103 tmp
= libbpf_reallocarray(rb
->rings
, rb
->ring_cnt
+ 1, sizeof(*rb
->rings
));
105 return libbpf_err(-ENOMEM
);
108 tmp
= libbpf_reallocarray(rb
->events
, rb
->ring_cnt
+ 1, sizeof(*rb
->events
));
110 return libbpf_err(-ENOMEM
);
113 r
= calloc(1, sizeof(*r
));
115 return libbpf_err(-ENOMEM
);
116 rb
->rings
[rb
->ring_cnt
] = r
;
119 r
->sample_cb
= sample_cb
;
121 r
->mask
= info
.max_entries
- 1;
123 /* Map writable consumer page */
124 tmp
= mmap(NULL
, rb
->page_size
, PROT_READ
| PROT_WRITE
, MAP_SHARED
, map_fd
, 0);
125 if (tmp
== MAP_FAILED
) {
127 pr_warn("ringbuf: failed to mmap consumer page for map fd=%d: %s\n",
128 map_fd
, errstr(err
));
131 r
->consumer_pos
= tmp
;
133 /* Map read-only producer page and data pages. We map twice as big
134 * data size to allow simple reading of samples that wrap around the
135 * end of a ring buffer. See kernel implementation for details.
137 mmap_sz
= rb
->page_size
+ 2 * (__u64
)info
.max_entries
;
138 if (mmap_sz
!= (__u64
)(size_t)mmap_sz
) {
140 pr_warn("ringbuf: ring buffer size (%u) is too big\n", info
.max_entries
);
143 tmp
= mmap(NULL
, (size_t)mmap_sz
, PROT_READ
, MAP_SHARED
, map_fd
, rb
->page_size
);
144 if (tmp
== MAP_FAILED
) {
146 pr_warn("ringbuf: failed to mmap data pages for map fd=%d: %s\n",
147 map_fd
, errstr(err
));
150 r
->producer_pos
= tmp
;
151 r
->data
= tmp
+ rb
->page_size
;
153 e
= &rb
->events
[rb
->ring_cnt
];
154 memset(e
, 0, sizeof(*e
));
157 e
->data
.fd
= rb
->ring_cnt
;
158 if (epoll_ctl(rb
->epoll_fd
, EPOLL_CTL_ADD
, map_fd
, e
) < 0) {
160 pr_warn("ringbuf: failed to epoll add map fd=%d: %s\n",
161 map_fd
, errstr(err
));
169 ringbuf_free_ring(rb
, r
);
170 return libbpf_err(err
);
173 void ring_buffer__free(struct ring_buffer
*rb
)
180 for (i
= 0; i
< rb
->ring_cnt
; ++i
)
181 ringbuf_free_ring(rb
, rb
->rings
[i
]);
182 if (rb
->epoll_fd
>= 0)
191 ring_buffer__new(int map_fd
, ring_buffer_sample_fn sample_cb
, void *ctx
,
192 const struct ring_buffer_opts
*opts
)
194 struct ring_buffer
*rb
;
197 if (!OPTS_VALID(opts
, ring_buffer_opts
))
198 return errno
= EINVAL
, NULL
;
200 rb
= calloc(1, sizeof(*rb
));
202 return errno
= ENOMEM
, NULL
;
204 rb
->page_size
= getpagesize();
206 rb
->epoll_fd
= epoll_create1(EPOLL_CLOEXEC
);
207 if (rb
->epoll_fd
< 0) {
209 pr_warn("ringbuf: failed to create epoll instance: %s\n", errstr(err
));
213 err
= ring_buffer__add(rb
, map_fd
, sample_cb
, ctx
);
220 ring_buffer__free(rb
);
221 return errno
= -err
, NULL
;
224 static inline int roundup_len(__u32 len
)
226 /* clear out top 2 bits (discard and busy, if set) */
229 /* add length prefix */
230 len
+= BPF_RINGBUF_HDR_SZ
;
231 /* round up to 8 byte alignment */
232 return (len
+ 7) / 8 * 8;
235 static int64_t ringbuf_process_ring(struct ring
*r
, size_t n
)
237 int *len_ptr
, len
, err
;
238 /* 64-bit to avoid overflow in case of extreme application behavior */
240 unsigned long cons_pos
, prod_pos
;
244 cons_pos
= smp_load_acquire(r
->consumer_pos
);
246 got_new_data
= false;
247 prod_pos
= smp_load_acquire(r
->producer_pos
);
248 while (cons_pos
< prod_pos
) {
249 len_ptr
= r
->data
+ (cons_pos
& r
->mask
);
250 len
= smp_load_acquire(len_ptr
);
252 /* sample not committed yet, bail out for now */
253 if (len
& BPF_RINGBUF_BUSY_BIT
)
257 cons_pos
+= roundup_len(len
);
259 if ((len
& BPF_RINGBUF_DISCARD_BIT
) == 0) {
260 sample
= (void *)len_ptr
+ BPF_RINGBUF_HDR_SZ
;
261 err
= r
->sample_cb(r
->ctx
, sample
, len
);
263 /* update consumer pos and bail out */
264 smp_store_release(r
->consumer_pos
,
271 smp_store_release(r
->consumer_pos
, cons_pos
);
276 } while (got_new_data
);
281 /* Consume available ring buffer(s) data without event polling, up to n
284 * Returns number of records consumed across all registered ring buffers (or
285 * n, whichever is less), or negative number if any of the callbacks return
288 int ring_buffer__consume_n(struct ring_buffer
*rb
, size_t n
)
290 int64_t err
, res
= 0;
293 for (i
= 0; i
< rb
->ring_cnt
; i
++) {
294 struct ring
*ring
= rb
->rings
[i
];
296 err
= ringbuf_process_ring(ring
, n
);
298 return libbpf_err(err
);
305 return res
> INT_MAX
? INT_MAX
: res
;
308 /* Consume available ring buffer(s) data without event polling.
309 * Returns number of records consumed across all registered ring buffers (or
310 * INT_MAX, whichever is less), or negative number if any of the callbacks
313 int ring_buffer__consume(struct ring_buffer
*rb
)
315 int64_t err
, res
= 0;
318 for (i
= 0; i
< rb
->ring_cnt
; i
++) {
319 struct ring
*ring
= rb
->rings
[i
];
321 err
= ringbuf_process_ring(ring
, INT_MAX
);
323 return libbpf_err(err
);
333 /* Poll for available data and consume records, if any are available.
334 * Returns number of records consumed (or INT_MAX, whichever is less), or
335 * negative number, if any of the registered callbacks returned error.
337 int ring_buffer__poll(struct ring_buffer
*rb
, int timeout_ms
)
340 int64_t err
, res
= 0;
342 cnt
= epoll_wait(rb
->epoll_fd
, rb
->events
, rb
->ring_cnt
, timeout_ms
);
344 return libbpf_err(-errno
);
346 for (i
= 0; i
< cnt
; i
++) {
347 __u32 ring_id
= rb
->events
[i
].data
.fd
;
348 struct ring
*ring
= rb
->rings
[ring_id
];
350 err
= ringbuf_process_ring(ring
, INT_MAX
);
352 return libbpf_err(err
);
360 /* Get an fd that can be used to sleep until data is available in the ring(s) */
361 int ring_buffer__epoll_fd(const struct ring_buffer
*rb
)
366 struct ring
*ring_buffer__ring(struct ring_buffer
*rb
, unsigned int idx
)
368 if (idx
>= rb
->ring_cnt
)
369 return errno
= ERANGE
, NULL
;
371 return rb
->rings
[idx
];
374 unsigned long ring__consumer_pos(const struct ring
*r
)
376 /* Synchronizes with smp_store_release() in ringbuf_process_ring(). */
377 return smp_load_acquire(r
->consumer_pos
);
380 unsigned long ring__producer_pos(const struct ring
*r
)
382 /* Synchronizes with smp_store_release() in __bpf_ringbuf_reserve() in
385 return smp_load_acquire(r
->producer_pos
);
388 size_t ring__avail_data_size(const struct ring
*r
)
390 unsigned long cons_pos
, prod_pos
;
392 cons_pos
= ring__consumer_pos(r
);
393 prod_pos
= ring__producer_pos(r
);
394 return prod_pos
- cons_pos
;
397 size_t ring__size(const struct ring
*r
)
402 int ring__map_fd(const struct ring
*r
)
407 int ring__consume_n(struct ring
*r
, size_t n
)
411 res
= ringbuf_process_ring(r
, n
);
413 return libbpf_err(res
);
415 return res
> INT_MAX
? INT_MAX
: res
;
418 int ring__consume(struct ring
*r
)
420 return ring__consume_n(r
, INT_MAX
);
423 static void user_ringbuf_unmap_ring(struct user_ring_buffer
*rb
)
425 if (rb
->consumer_pos
) {
426 munmap(rb
->consumer_pos
, rb
->page_size
);
427 rb
->consumer_pos
= NULL
;
429 if (rb
->producer_pos
) {
430 munmap(rb
->producer_pos
, rb
->page_size
+ 2 * (rb
->mask
+ 1));
431 rb
->producer_pos
= NULL
;
435 void user_ring_buffer__free(struct user_ring_buffer
*rb
)
440 user_ringbuf_unmap_ring(rb
);
442 if (rb
->epoll_fd
>= 0)
448 static int user_ringbuf_map(struct user_ring_buffer
*rb
, int map_fd
)
450 struct bpf_map_info info
;
451 __u32 len
= sizeof(info
);
454 struct epoll_event
*rb_epoll
;
457 memset(&info
, 0, sizeof(info
));
459 err
= bpf_map_get_info_by_fd(map_fd
, &info
, &len
);
462 pr_warn("user ringbuf: failed to get map info for fd=%d: %s\n",
463 map_fd
, errstr(err
));
467 if (info
.type
!= BPF_MAP_TYPE_USER_RINGBUF
) {
468 pr_warn("user ringbuf: map fd=%d is not BPF_MAP_TYPE_USER_RINGBUF\n", map_fd
);
473 rb
->mask
= info
.max_entries
- 1;
475 /* Map read-only consumer page */
476 tmp
= mmap(NULL
, rb
->page_size
, PROT_READ
, MAP_SHARED
, map_fd
, 0);
477 if (tmp
== MAP_FAILED
) {
479 pr_warn("user ringbuf: failed to mmap consumer page for map fd=%d: %s\n",
480 map_fd
, errstr(err
));
483 rb
->consumer_pos
= tmp
;
485 /* Map read-write the producer page and data pages. We map the data
486 * region as twice the total size of the ring buffer to allow the
487 * simple reading and writing of samples that wrap around the end of
488 * the buffer. See the kernel implementation for details.
490 mmap_sz
= rb
->page_size
+ 2 * (__u64
)info
.max_entries
;
491 if (mmap_sz
!= (__u64
)(size_t)mmap_sz
) {
492 pr_warn("user ringbuf: ring buf size (%u) is too big\n", info
.max_entries
);
495 tmp
= mmap(NULL
, (size_t)mmap_sz
, PROT_READ
| PROT_WRITE
, MAP_SHARED
,
496 map_fd
, rb
->page_size
);
497 if (tmp
== MAP_FAILED
) {
499 pr_warn("user ringbuf: failed to mmap data pages for map fd=%d: %s\n",
500 map_fd
, errstr(err
));
504 rb
->producer_pos
= tmp
;
505 rb
->data
= tmp
+ rb
->page_size
;
507 rb_epoll
= &rb
->event
;
508 rb_epoll
->events
= EPOLLOUT
;
509 if (epoll_ctl(rb
->epoll_fd
, EPOLL_CTL_ADD
, map_fd
, rb_epoll
) < 0) {
511 pr_warn("user ringbuf: failed to epoll add map fd=%d: %s\n", map_fd
, errstr(err
));
518 struct user_ring_buffer
*
519 user_ring_buffer__new(int map_fd
, const struct user_ring_buffer_opts
*opts
)
521 struct user_ring_buffer
*rb
;
524 if (!OPTS_VALID(opts
, user_ring_buffer_opts
))
525 return errno
= EINVAL
, NULL
;
527 rb
= calloc(1, sizeof(*rb
));
529 return errno
= ENOMEM
, NULL
;
531 rb
->page_size
= getpagesize();
533 rb
->epoll_fd
= epoll_create1(EPOLL_CLOEXEC
);
534 if (rb
->epoll_fd
< 0) {
536 pr_warn("user ringbuf: failed to create epoll instance: %s\n", errstr(err
));
540 err
= user_ringbuf_map(rb
, map_fd
);
547 user_ring_buffer__free(rb
);
548 return errno
= -err
, NULL
;
551 static void user_ringbuf_commit(struct user_ring_buffer
*rb
, void *sample
, bool discard
)
554 struct ringbuf_hdr
*hdr
;
555 uintptr_t hdr_offset
;
557 hdr_offset
= rb
->mask
+ 1 + (sample
- rb
->data
) - BPF_RINGBUF_HDR_SZ
;
558 hdr
= rb
->data
+ (hdr_offset
& rb
->mask
);
560 new_len
= hdr
->len
& ~BPF_RINGBUF_BUSY_BIT
;
562 new_len
|= BPF_RINGBUF_DISCARD_BIT
;
564 /* Synchronizes with smp_load_acquire() in __bpf_user_ringbuf_peek() in
567 __atomic_exchange_n(&hdr
->len
, new_len
, __ATOMIC_ACQ_REL
);
570 void user_ring_buffer__discard(struct user_ring_buffer
*rb
, void *sample
)
572 user_ringbuf_commit(rb
, sample
, true);
575 void user_ring_buffer__submit(struct user_ring_buffer
*rb
, void *sample
)
577 user_ringbuf_commit(rb
, sample
, false);
580 void *user_ring_buffer__reserve(struct user_ring_buffer
*rb
, __u32 size
)
582 __u32 avail_size
, total_size
, max_size
;
583 /* 64-bit to avoid overflow in case of extreme application behavior */
584 __u64 cons_pos
, prod_pos
;
585 struct ringbuf_hdr
*hdr
;
587 /* The top two bits are used as special flags */
588 if (size
& (BPF_RINGBUF_BUSY_BIT
| BPF_RINGBUF_DISCARD_BIT
))
589 return errno
= E2BIG
, NULL
;
591 /* Synchronizes with smp_store_release() in __bpf_user_ringbuf_peek() in
594 cons_pos
= smp_load_acquire(rb
->consumer_pos
);
595 /* Synchronizes with smp_store_release() in user_ringbuf_commit() */
596 prod_pos
= smp_load_acquire(rb
->producer_pos
);
598 max_size
= rb
->mask
+ 1;
599 avail_size
= max_size
- (prod_pos
- cons_pos
);
600 /* Round up total size to a multiple of 8. */
601 total_size
= (size
+ BPF_RINGBUF_HDR_SZ
+ 7) / 8 * 8;
603 if (total_size
> max_size
)
604 return errno
= E2BIG
, NULL
;
606 if (avail_size
< total_size
)
607 return errno
= ENOSPC
, NULL
;
609 hdr
= rb
->data
+ (prod_pos
& rb
->mask
);
610 hdr
->len
= size
| BPF_RINGBUF_BUSY_BIT
;
613 /* Synchronizes with smp_load_acquire() in __bpf_user_ringbuf_peek() in
616 smp_store_release(rb
->producer_pos
, prod_pos
+ total_size
);
618 return (void *)rb
->data
+ ((prod_pos
+ BPF_RINGBUF_HDR_SZ
) & rb
->mask
);
621 static __u64
ns_elapsed_timespec(const struct timespec
*start
, const struct timespec
*end
)
623 __u64 start_ns
, end_ns
, ns_per_s
= 1000000000;
625 start_ns
= (__u64
)start
->tv_sec
* ns_per_s
+ start
->tv_nsec
;
626 end_ns
= (__u64
)end
->tv_sec
* ns_per_s
+ end
->tv_nsec
;
628 return end_ns
- start_ns
;
631 void *user_ring_buffer__reserve_blocking(struct user_ring_buffer
*rb
, __u32 size
, int timeout_ms
)
634 int err
, ms_remaining
= timeout_ms
;
635 struct timespec start
;
637 if (timeout_ms
< 0 && timeout_ms
!= -1)
638 return errno
= EINVAL
, NULL
;
640 if (timeout_ms
!= -1) {
641 err
= clock_gettime(CLOCK_MONOTONIC
, &start
);
648 struct timespec curr
;
649 __u64 ns_per_ms
= 1000000;
651 sample
= user_ring_buffer__reserve(rb
, size
);
654 else if (errno
!= ENOSPC
)
657 /* The kernel guarantees at least one event notification
658 * delivery whenever at least one sample is drained from the
659 * ring buffer in an invocation to bpf_ringbuf_drain(). Other
660 * additional events may be delivered at any time, but only one
661 * event is guaranteed per bpf_ringbuf_drain() invocation,
662 * provided that a sample is drained, and the BPF program did
663 * not pass BPF_RB_NO_WAKEUP to bpf_ringbuf_drain(). If
664 * BPF_RB_FORCE_WAKEUP is passed to bpf_ringbuf_drain(), a
665 * wakeup event will be delivered even if no samples are
668 cnt
= epoll_wait(rb
->epoll_fd
, &rb
->event
, 1, ms_remaining
);
672 if (timeout_ms
== -1)
675 err
= clock_gettime(CLOCK_MONOTONIC
, &curr
);
679 ms_elapsed
= ns_elapsed_timespec(&start
, &curr
) / ns_per_ms
;
680 ms_remaining
= timeout_ms
- ms_elapsed
;
681 } while (ms_remaining
> 0);
683 /* Try one more time to reserve a sample after the specified timeout has elapsed. */
684 return user_ring_buffer__reserve(rb
, size
);