4 * Copyright (C) 2008 Steven Rostedt <srostedt@redhat.com>
6 #include <linux/ring_buffer.h>
7 #include <linux/trace_clock.h>
8 #include <linux/ftrace_irq.h>
9 #include <linux/spinlock.h>
10 #include <linux/debugfs.h>
11 #include <linux/uaccess.h>
12 #include <linux/hardirq.h>
13 #include <linux/kmemcheck.h>
14 #include <linux/module.h>
15 #include <linux/percpu.h>
16 #include <linux/mutex.h>
17 #include <linux/init.h>
18 #include <linux/hash.h>
19 #include <linux/list.h>
20 #include <linux/cpu.h>
23 #include <asm/local.h>
27 * The ring buffer header is special. We must manually up keep it.
29 int ring_buffer_print_entry_header(struct trace_seq
*s
)
33 ret
= trace_seq_printf(s
, "# compressed entry header\n");
34 ret
= trace_seq_printf(s
, "\ttype_len : 5 bits\n");
35 ret
= trace_seq_printf(s
, "\ttime_delta : 27 bits\n");
36 ret
= trace_seq_printf(s
, "\tarray : 32 bits\n");
37 ret
= trace_seq_printf(s
, "\n");
38 ret
= trace_seq_printf(s
, "\tpadding : type == %d\n",
39 RINGBUF_TYPE_PADDING
);
40 ret
= trace_seq_printf(s
, "\ttime_extend : type == %d\n",
41 RINGBUF_TYPE_TIME_EXTEND
);
42 ret
= trace_seq_printf(s
, "\tdata max type_len == %d\n",
43 RINGBUF_TYPE_DATA_TYPE_LEN_MAX
);
49 * The ring buffer is made up of a list of pages. A separate list of pages is
50 * allocated for each CPU. A writer may only write to a buffer that is
51 * associated with the CPU it is currently executing on. A reader may read
52 * from any per cpu buffer.
54 * The reader is special. For each per cpu buffer, the reader has its own
55 * reader page. When a reader has read the entire reader page, this reader
56 * page is swapped with another page in the ring buffer.
58 * Now, as long as the writer is off the reader page, the reader can do what
59 * ever it wants with that page. The writer will never write to that page
60 * again (as long as it is out of the ring buffer).
62 * Here's some silly ASCII art.
65 * |reader| RING BUFFER
67 * +------+ +---+ +---+ +---+
76 * |reader| RING BUFFER
77 * |page |------------------v
78 * +------+ +---+ +---+ +---+
87 * |reader| RING BUFFER
88 * |page |------------------v
89 * +------+ +---+ +---+ +---+
94 * +------------------------------+
98 * |buffer| RING BUFFER
99 * |page |------------------v
100 * +------+ +---+ +---+ +---+
102 * | New +---+ +---+ +---+
105 * +------------------------------+
108 * After we make this swap, the reader can hand this page off to the splice
109 * code and be done with it. It can even allocate a new page if it needs to
110 * and swap that into the ring buffer.
112 * We will be using cmpxchg soon to make all this lockless.
117 * A fast way to enable or disable all ring buffers is to
118 * call tracing_on or tracing_off. Turning off the ring buffers
119 * prevents all ring buffers from being recorded to.
120 * Turning this switch on, makes it OK to write to the
121 * ring buffer, if the ring buffer is enabled itself.
123 * There's three layers that must be on in order to write
124 * to the ring buffer.
126 * 1) This global flag must be set.
127 * 2) The ring buffer must be enabled for recording.
128 * 3) The per cpu buffer must be enabled for recording.
130 * In case of an anomaly, this global flag has a bit set that
131 * will permantly disable all ring buffers.
135 * Global flag to disable all recording to ring buffers
136 * This has two bits: ON, DISABLED
140 * 0 0 : ring buffers are off
141 * 1 0 : ring buffers are on
142 * X 1 : ring buffers are permanently disabled
146 RB_BUFFERS_ON_BIT
= 0,
147 RB_BUFFERS_DISABLED_BIT
= 1,
151 RB_BUFFERS_ON
= 1 << RB_BUFFERS_ON_BIT
,
152 RB_BUFFERS_DISABLED
= 1 << RB_BUFFERS_DISABLED_BIT
,
155 static unsigned long ring_buffer_flags __read_mostly
= RB_BUFFERS_ON
;
157 #define BUF_PAGE_HDR_SIZE offsetof(struct buffer_data_page, data)
160 * tracing_on - enable all tracing buffers
162 * This function enables all tracing buffers that may have been
163 * disabled with tracing_off.
165 void tracing_on(void)
167 set_bit(RB_BUFFERS_ON_BIT
, &ring_buffer_flags
);
169 EXPORT_SYMBOL_GPL(tracing_on
);
172 * tracing_off - turn off all tracing buffers
174 * This function stops all tracing buffers from recording data.
175 * It does not disable any overhead the tracers themselves may
176 * be causing. This function simply causes all recording to
177 * the ring buffers to fail.
179 void tracing_off(void)
181 clear_bit(RB_BUFFERS_ON_BIT
, &ring_buffer_flags
);
183 EXPORT_SYMBOL_GPL(tracing_off
);
186 * tracing_off_permanent - permanently disable ring buffers
188 * This function, once called, will disable all ring buffers
191 void tracing_off_permanent(void)
193 set_bit(RB_BUFFERS_DISABLED_BIT
, &ring_buffer_flags
);
197 * tracing_is_on - show state of ring buffers enabled
199 int tracing_is_on(void)
201 return ring_buffer_flags
== RB_BUFFERS_ON
;
203 EXPORT_SYMBOL_GPL(tracing_is_on
);
205 #define RB_EVNT_HDR_SIZE (offsetof(struct ring_buffer_event, array))
206 #define RB_ALIGNMENT 4U
207 #define RB_MAX_SMALL_DATA (RB_ALIGNMENT * RINGBUF_TYPE_DATA_TYPE_LEN_MAX)
208 #define RB_EVNT_MIN_SIZE 8U /* two 32bit words */
210 /* define RINGBUF_TYPE_DATA for 'case RINGBUF_TYPE_DATA:' */
211 #define RINGBUF_TYPE_DATA 0 ... RINGBUF_TYPE_DATA_TYPE_LEN_MAX
214 RB_LEN_TIME_EXTEND
= 8,
215 RB_LEN_TIME_STAMP
= 16,
218 static inline int rb_null_event(struct ring_buffer_event
*event
)
220 return event
->type_len
== RINGBUF_TYPE_PADDING
&& !event
->time_delta
;
223 static void rb_event_set_padding(struct ring_buffer_event
*event
)
225 /* padding has a NULL time_delta */
226 event
->type_len
= RINGBUF_TYPE_PADDING
;
227 event
->time_delta
= 0;
231 rb_event_data_length(struct ring_buffer_event
*event
)
236 length
= event
->type_len
* RB_ALIGNMENT
;
238 length
= event
->array
[0];
239 return length
+ RB_EVNT_HDR_SIZE
;
242 /* inline for ring buffer fast paths */
244 rb_event_length(struct ring_buffer_event
*event
)
246 switch (event
->type_len
) {
247 case RINGBUF_TYPE_PADDING
:
248 if (rb_null_event(event
))
251 return event
->array
[0] + RB_EVNT_HDR_SIZE
;
253 case RINGBUF_TYPE_TIME_EXTEND
:
254 return RB_LEN_TIME_EXTEND
;
256 case RINGBUF_TYPE_TIME_STAMP
:
257 return RB_LEN_TIME_STAMP
;
259 case RINGBUF_TYPE_DATA
:
260 return rb_event_data_length(event
);
269 * ring_buffer_event_length - return the length of the event
270 * @event: the event to get the length of
272 unsigned ring_buffer_event_length(struct ring_buffer_event
*event
)
274 unsigned length
= rb_event_length(event
);
275 if (event
->type_len
> RINGBUF_TYPE_DATA_TYPE_LEN_MAX
)
277 length
-= RB_EVNT_HDR_SIZE
;
278 if (length
> RB_MAX_SMALL_DATA
+ sizeof(event
->array
[0]))
279 length
-= sizeof(event
->array
[0]);
282 EXPORT_SYMBOL_GPL(ring_buffer_event_length
);
284 /* inline for ring buffer fast paths */
286 rb_event_data(struct ring_buffer_event
*event
)
288 BUG_ON(event
->type_len
> RINGBUF_TYPE_DATA_TYPE_LEN_MAX
);
289 /* If length is in len field, then array[0] has the data */
291 return (void *)&event
->array
[0];
292 /* Otherwise length is in array[0] and array[1] has the data */
293 return (void *)&event
->array
[1];
297 * ring_buffer_event_data - return the data of the event
298 * @event: the event to get the data from
300 void *ring_buffer_event_data(struct ring_buffer_event
*event
)
302 return rb_event_data(event
);
304 EXPORT_SYMBOL_GPL(ring_buffer_event_data
);
306 #define for_each_buffer_cpu(buffer, cpu) \
307 for_each_cpu(cpu, buffer->cpumask)
310 #define TS_MASK ((1ULL << TS_SHIFT) - 1)
311 #define TS_DELTA_TEST (~TS_MASK)
313 struct buffer_data_page
{
314 u64 time_stamp
; /* page time stamp */
315 local_t commit
; /* write committed index */
316 unsigned char data
[]; /* data of buffer page */
320 * Note, the buffer_page list must be first. The buffer pages
321 * are allocated in cache lines, which means that each buffer
322 * page will be at the beginning of a cache line, and thus
323 * the least significant bits will be zero. We use this to
324 * add flags in the list struct pointers, to make the ring buffer
328 struct list_head list
; /* list of buffer pages */
329 local_t write
; /* index for next write */
330 unsigned read
; /* index for next read */
331 local_t entries
; /* entries on this page */
332 struct buffer_data_page
*page
; /* Actual data page */
336 * The buffer page counters, write and entries, must be reset
337 * atomically when crossing page boundaries. To synchronize this
338 * update, two counters are inserted into the number. One is
339 * the actual counter for the write position or count on the page.
341 * The other is a counter of updaters. Before an update happens
342 * the update partition of the counter is incremented. This will
343 * allow the updater to update the counter atomically.
345 * The counter is 20 bits, and the state data is 12.
347 #define RB_WRITE_MASK 0xfffff
348 #define RB_WRITE_INTCNT (1 << 20)
350 static void rb_init_page(struct buffer_data_page
*bpage
)
352 local_set(&bpage
->commit
, 0);
356 * ring_buffer_page_len - the size of data on the page.
357 * @page: The page to read
359 * Returns the amount of data on the page, including buffer page header.
361 size_t ring_buffer_page_len(void *page
)
363 return local_read(&((struct buffer_data_page
*)page
)->commit
)
368 * Also stolen from mm/slob.c. Thanks to Mathieu Desnoyers for pointing
371 static void free_buffer_page(struct buffer_page
*bpage
)
373 free_page((unsigned long)bpage
->page
);
378 * We need to fit the time_stamp delta into 27 bits.
380 static inline int test_time_stamp(u64 delta
)
382 if (delta
& TS_DELTA_TEST
)
387 #define BUF_PAGE_SIZE (PAGE_SIZE - BUF_PAGE_HDR_SIZE)
389 /* Max payload is BUF_PAGE_SIZE - header (8bytes) */
390 #define BUF_MAX_DATA_SIZE (BUF_PAGE_SIZE - (sizeof(u32) * 2))
392 /* Max number of timestamps that can fit on a page */
393 #define RB_TIMESTAMPS_PER_PAGE (BUF_PAGE_SIZE / RB_LEN_TIME_STAMP)
395 int ring_buffer_print_page_header(struct trace_seq
*s
)
397 struct buffer_data_page field
;
400 ret
= trace_seq_printf(s
, "\tfield: u64 timestamp;\t"
401 "offset:0;\tsize:%u;\tsigned:%u;\n",
402 (unsigned int)sizeof(field
.time_stamp
),
403 (unsigned int)is_signed_type(u64
));
405 ret
= trace_seq_printf(s
, "\tfield: local_t commit;\t"
406 "offset:%u;\tsize:%u;\tsigned:%u;\n",
407 (unsigned int)offsetof(typeof(field
), commit
),
408 (unsigned int)sizeof(field
.commit
),
409 (unsigned int)is_signed_type(long));
411 ret
= trace_seq_printf(s
, "\tfield: char data;\t"
412 "offset:%u;\tsize:%u;\tsigned:%u;\n",
413 (unsigned int)offsetof(typeof(field
), data
),
414 (unsigned int)BUF_PAGE_SIZE
,
415 (unsigned int)is_signed_type(char));
421 * head_page == tail_page && head == tail then buffer is empty.
423 struct ring_buffer_per_cpu
{
425 struct ring_buffer
*buffer
;
426 spinlock_t reader_lock
; /* serialize readers */
427 arch_spinlock_t lock
;
428 struct lock_class_key lock_key
;
429 struct list_head
*pages
;
430 struct buffer_page
*head_page
; /* read from head */
431 struct buffer_page
*tail_page
; /* write to tail */
432 struct buffer_page
*commit_page
; /* committed pages */
433 struct buffer_page
*reader_page
;
434 local_t commit_overrun
;
442 atomic_t record_disabled
;
449 atomic_t record_disabled
;
450 cpumask_var_t cpumask
;
452 struct lock_class_key
*reader_lock_key
;
456 struct ring_buffer_per_cpu
**buffers
;
458 #ifdef CONFIG_HOTPLUG_CPU
459 struct notifier_block cpu_notify
;
464 struct ring_buffer_iter
{
465 struct ring_buffer_per_cpu
*cpu_buffer
;
467 struct buffer_page
*head_page
;
468 struct buffer_page
*cache_reader_page
;
469 unsigned long cache_read
;
473 /* buffer may be either ring_buffer or ring_buffer_per_cpu */
474 #define RB_WARN_ON(b, cond) \
476 int _____ret = unlikely(cond); \
478 if (__same_type(*(b), struct ring_buffer_per_cpu)) { \
479 struct ring_buffer_per_cpu *__b = \
481 atomic_inc(&__b->buffer->record_disabled); \
483 atomic_inc(&b->record_disabled); \
489 /* Up this if you want to test the TIME_EXTENTS and normalization */
490 #define DEBUG_SHIFT 0
492 static inline u64
rb_time_stamp(struct ring_buffer
*buffer
)
494 /* shift to debug/test normalization and TIME_EXTENTS */
495 return buffer
->clock() << DEBUG_SHIFT
;
498 u64
ring_buffer_time_stamp(struct ring_buffer
*buffer
, int cpu
)
502 preempt_disable_notrace();
503 time
= rb_time_stamp(buffer
);
504 preempt_enable_no_resched_notrace();
508 EXPORT_SYMBOL_GPL(ring_buffer_time_stamp
);
510 void ring_buffer_normalize_time_stamp(struct ring_buffer
*buffer
,
513 /* Just stupid testing the normalize function and deltas */
516 EXPORT_SYMBOL_GPL(ring_buffer_normalize_time_stamp
);
519 * Making the ring buffer lockless makes things tricky.
520 * Although writes only happen on the CPU that they are on,
521 * and they only need to worry about interrupts. Reads can
524 * The reader page is always off the ring buffer, but when the
525 * reader finishes with a page, it needs to swap its page with
526 * a new one from the buffer. The reader needs to take from
527 * the head (writes go to the tail). But if a writer is in overwrite
528 * mode and wraps, it must push the head page forward.
530 * Here lies the problem.
532 * The reader must be careful to replace only the head page, and
533 * not another one. As described at the top of the file in the
534 * ASCII art, the reader sets its old page to point to the next
535 * page after head. It then sets the page after head to point to
536 * the old reader page. But if the writer moves the head page
537 * during this operation, the reader could end up with the tail.
539 * We use cmpxchg to help prevent this race. We also do something
540 * special with the page before head. We set the LSB to 1.
542 * When the writer must push the page forward, it will clear the
543 * bit that points to the head page, move the head, and then set
544 * the bit that points to the new head page.
546 * We also don't want an interrupt coming in and moving the head
547 * page on another writer. Thus we use the second LSB to catch
550 * head->list->prev->next bit 1 bit 0
553 * Points to head page 0 1
556 * Note we can not trust the prev pointer of the head page, because:
558 * +----+ +-----+ +-----+
559 * | |------>| T |---X--->| N |
561 * +----+ +-----+ +-----+
564 * +----------| R |----------+ |
568 * Key: ---X--> HEAD flag set in pointer
573 * (see __rb_reserve_next() to see where this happens)
575 * What the above shows is that the reader just swapped out
576 * the reader page with a page in the buffer, but before it
577 * could make the new header point back to the new page added
578 * it was preempted by a writer. The writer moved forward onto
579 * the new page added by the reader and is about to move forward
582 * You can see, it is legitimate for the previous pointer of
583 * the head (or any page) not to point back to itself. But only
587 #define RB_PAGE_NORMAL 0UL
588 #define RB_PAGE_HEAD 1UL
589 #define RB_PAGE_UPDATE 2UL
592 #define RB_FLAG_MASK 3UL
594 /* PAGE_MOVED is not part of the mask */
595 #define RB_PAGE_MOVED 4UL
598 * rb_list_head - remove any bit
600 static struct list_head
*rb_list_head(struct list_head
*list
)
602 unsigned long val
= (unsigned long)list
;
604 return (struct list_head
*)(val
& ~RB_FLAG_MASK
);
608 * rb_is_head_page - test if the given page is the head page
610 * Because the reader may move the head_page pointer, we can
611 * not trust what the head page is (it may be pointing to
612 * the reader page). But if the next page is a header page,
613 * its flags will be non zero.
616 rb_is_head_page(struct ring_buffer_per_cpu
*cpu_buffer
,
617 struct buffer_page
*page
, struct list_head
*list
)
621 val
= (unsigned long)list
->next
;
623 if ((val
& ~RB_FLAG_MASK
) != (unsigned long)&page
->list
)
624 return RB_PAGE_MOVED
;
626 return val
& RB_FLAG_MASK
;
632 * The unique thing about the reader page, is that, if the
633 * writer is ever on it, the previous pointer never points
634 * back to the reader page.
636 static int rb_is_reader_page(struct buffer_page
*page
)
638 struct list_head
*list
= page
->list
.prev
;
640 return rb_list_head(list
->next
) != &page
->list
;
644 * rb_set_list_to_head - set a list_head to be pointing to head.
646 static void rb_set_list_to_head(struct ring_buffer_per_cpu
*cpu_buffer
,
647 struct list_head
*list
)
651 ptr
= (unsigned long *)&list
->next
;
652 *ptr
|= RB_PAGE_HEAD
;
653 *ptr
&= ~RB_PAGE_UPDATE
;
657 * rb_head_page_activate - sets up head page
659 static void rb_head_page_activate(struct ring_buffer_per_cpu
*cpu_buffer
)
661 struct buffer_page
*head
;
663 head
= cpu_buffer
->head_page
;
668 * Set the previous list pointer to have the HEAD flag.
670 rb_set_list_to_head(cpu_buffer
, head
->list
.prev
);
673 static void rb_list_head_clear(struct list_head
*list
)
675 unsigned long *ptr
= (unsigned long *)&list
->next
;
677 *ptr
&= ~RB_FLAG_MASK
;
681 * rb_head_page_dactivate - clears head page ptr (for free list)
684 rb_head_page_deactivate(struct ring_buffer_per_cpu
*cpu_buffer
)
686 struct list_head
*hd
;
688 /* Go through the whole list and clear any pointers found. */
689 rb_list_head_clear(cpu_buffer
->pages
);
691 list_for_each(hd
, cpu_buffer
->pages
)
692 rb_list_head_clear(hd
);
695 static int rb_head_page_set(struct ring_buffer_per_cpu
*cpu_buffer
,
696 struct buffer_page
*head
,
697 struct buffer_page
*prev
,
698 int old_flag
, int new_flag
)
700 struct list_head
*list
;
701 unsigned long val
= (unsigned long)&head
->list
;
706 val
&= ~RB_FLAG_MASK
;
708 ret
= cmpxchg((unsigned long *)&list
->next
,
709 val
| old_flag
, val
| new_flag
);
711 /* check if the reader took the page */
712 if ((ret
& ~RB_FLAG_MASK
) != val
)
713 return RB_PAGE_MOVED
;
715 return ret
& RB_FLAG_MASK
;
718 static int rb_head_page_set_update(struct ring_buffer_per_cpu
*cpu_buffer
,
719 struct buffer_page
*head
,
720 struct buffer_page
*prev
,
723 return rb_head_page_set(cpu_buffer
, head
, prev
,
724 old_flag
, RB_PAGE_UPDATE
);
727 static int rb_head_page_set_head(struct ring_buffer_per_cpu
*cpu_buffer
,
728 struct buffer_page
*head
,
729 struct buffer_page
*prev
,
732 return rb_head_page_set(cpu_buffer
, head
, prev
,
733 old_flag
, RB_PAGE_HEAD
);
736 static int rb_head_page_set_normal(struct ring_buffer_per_cpu
*cpu_buffer
,
737 struct buffer_page
*head
,
738 struct buffer_page
*prev
,
741 return rb_head_page_set(cpu_buffer
, head
, prev
,
742 old_flag
, RB_PAGE_NORMAL
);
745 static inline void rb_inc_page(struct ring_buffer_per_cpu
*cpu_buffer
,
746 struct buffer_page
**bpage
)
748 struct list_head
*p
= rb_list_head((*bpage
)->list
.next
);
750 *bpage
= list_entry(p
, struct buffer_page
, list
);
753 static struct buffer_page
*
754 rb_set_head_page(struct ring_buffer_per_cpu
*cpu_buffer
)
756 struct buffer_page
*head
;
757 struct buffer_page
*page
;
758 struct list_head
*list
;
761 if (RB_WARN_ON(cpu_buffer
, !cpu_buffer
->head_page
))
765 list
= cpu_buffer
->pages
;
766 if (RB_WARN_ON(cpu_buffer
, rb_list_head(list
->prev
->next
) != list
))
769 page
= head
= cpu_buffer
->head_page
;
771 * It is possible that the writer moves the header behind
772 * where we started, and we miss in one loop.
773 * A second loop should grab the header, but we'll do
774 * three loops just because I'm paranoid.
776 for (i
= 0; i
< 3; i
++) {
778 if (rb_is_head_page(cpu_buffer
, page
, page
->list
.prev
)) {
779 cpu_buffer
->head_page
= page
;
782 rb_inc_page(cpu_buffer
, &page
);
783 } while (page
!= head
);
786 RB_WARN_ON(cpu_buffer
, 1);
791 static int rb_head_page_replace(struct buffer_page
*old
,
792 struct buffer_page
*new)
794 unsigned long *ptr
= (unsigned long *)&old
->list
.prev
->next
;
798 val
= *ptr
& ~RB_FLAG_MASK
;
801 ret
= cmpxchg(ptr
, val
, (unsigned long)&new->list
);
807 * rb_tail_page_update - move the tail page forward
809 * Returns 1 if moved tail page, 0 if someone else did.
811 static int rb_tail_page_update(struct ring_buffer_per_cpu
*cpu_buffer
,
812 struct buffer_page
*tail_page
,
813 struct buffer_page
*next_page
)
815 struct buffer_page
*old_tail
;
816 unsigned long old_entries
;
817 unsigned long old_write
;
821 * The tail page now needs to be moved forward.
823 * We need to reset the tail page, but without messing
824 * with possible erasing of data brought in by interrupts
825 * that have moved the tail page and are currently on it.
827 * We add a counter to the write field to denote this.
829 old_write
= local_add_return(RB_WRITE_INTCNT
, &next_page
->write
);
830 old_entries
= local_add_return(RB_WRITE_INTCNT
, &next_page
->entries
);
833 * Just make sure we have seen our old_write and synchronize
834 * with any interrupts that come in.
839 * If the tail page is still the same as what we think
840 * it is, then it is up to us to update the tail
843 if (tail_page
== cpu_buffer
->tail_page
) {
844 /* Zero the write counter */
845 unsigned long val
= old_write
& ~RB_WRITE_MASK
;
846 unsigned long eval
= old_entries
& ~RB_WRITE_MASK
;
849 * This will only succeed if an interrupt did
850 * not come in and change it. In which case, we
851 * do not want to modify it.
853 * We add (void) to let the compiler know that we do not care
854 * about the return value of these functions. We use the
855 * cmpxchg to only update if an interrupt did not already
856 * do it for us. If the cmpxchg fails, we don't care.
858 (void)local_cmpxchg(&next_page
->write
, old_write
, val
);
859 (void)local_cmpxchg(&next_page
->entries
, old_entries
, eval
);
862 * No need to worry about races with clearing out the commit.
863 * it only can increment when a commit takes place. But that
864 * only happens in the outer most nested commit.
866 local_set(&next_page
->page
->commit
, 0);
868 old_tail
= cmpxchg(&cpu_buffer
->tail_page
,
869 tail_page
, next_page
);
871 if (old_tail
== tail_page
)
878 static int rb_check_bpage(struct ring_buffer_per_cpu
*cpu_buffer
,
879 struct buffer_page
*bpage
)
881 unsigned long val
= (unsigned long)bpage
;
883 if (RB_WARN_ON(cpu_buffer
, val
& RB_FLAG_MASK
))
890 * rb_check_list - make sure a pointer to a list has the last bits zero
892 static int rb_check_list(struct ring_buffer_per_cpu
*cpu_buffer
,
893 struct list_head
*list
)
895 if (RB_WARN_ON(cpu_buffer
, rb_list_head(list
->prev
) != list
->prev
))
897 if (RB_WARN_ON(cpu_buffer
, rb_list_head(list
->next
) != list
->next
))
903 * check_pages - integrity check of buffer pages
904 * @cpu_buffer: CPU buffer with pages to test
906 * As a safety measure we check to make sure the data pages have not
909 static int rb_check_pages(struct ring_buffer_per_cpu
*cpu_buffer
)
911 struct list_head
*head
= cpu_buffer
->pages
;
912 struct buffer_page
*bpage
, *tmp
;
914 rb_head_page_deactivate(cpu_buffer
);
916 if (RB_WARN_ON(cpu_buffer
, head
->next
->prev
!= head
))
918 if (RB_WARN_ON(cpu_buffer
, head
->prev
->next
!= head
))
921 if (rb_check_list(cpu_buffer
, head
))
924 list_for_each_entry_safe(bpage
, tmp
, head
, list
) {
925 if (RB_WARN_ON(cpu_buffer
,
926 bpage
->list
.next
->prev
!= &bpage
->list
))
928 if (RB_WARN_ON(cpu_buffer
,
929 bpage
->list
.prev
->next
!= &bpage
->list
))
931 if (rb_check_list(cpu_buffer
, &bpage
->list
))
935 rb_head_page_activate(cpu_buffer
);
940 static int rb_allocate_pages(struct ring_buffer_per_cpu
*cpu_buffer
,
943 struct buffer_page
*bpage
, *tmp
;
950 for (i
= 0; i
< nr_pages
; i
++) {
951 bpage
= kzalloc_node(ALIGN(sizeof(*bpage
), cache_line_size()),
952 GFP_KERNEL
, cpu_to_node(cpu_buffer
->cpu
));
956 rb_check_bpage(cpu_buffer
, bpage
);
958 list_add(&bpage
->list
, &pages
);
960 addr
= __get_free_page(GFP_KERNEL
);
963 bpage
->page
= (void *)addr
;
964 rb_init_page(bpage
->page
);
968 * The ring buffer page list is a circular list that does not
969 * start and end with a list head. All page list items point to
972 cpu_buffer
->pages
= pages
.next
;
975 rb_check_pages(cpu_buffer
);
980 list_for_each_entry_safe(bpage
, tmp
, &pages
, list
) {
981 list_del_init(&bpage
->list
);
982 free_buffer_page(bpage
);
987 static struct ring_buffer_per_cpu
*
988 rb_allocate_cpu_buffer(struct ring_buffer
*buffer
, int cpu
)
990 struct ring_buffer_per_cpu
*cpu_buffer
;
991 struct buffer_page
*bpage
;
995 cpu_buffer
= kzalloc_node(ALIGN(sizeof(*cpu_buffer
), cache_line_size()),
996 GFP_KERNEL
, cpu_to_node(cpu
));
1000 cpu_buffer
->cpu
= cpu
;
1001 cpu_buffer
->buffer
= buffer
;
1002 spin_lock_init(&cpu_buffer
->reader_lock
);
1003 lockdep_set_class(&cpu_buffer
->reader_lock
, buffer
->reader_lock_key
);
1004 cpu_buffer
->lock
= (arch_spinlock_t
)__ARCH_SPIN_LOCK_UNLOCKED
;
1006 bpage
= kzalloc_node(ALIGN(sizeof(*bpage
), cache_line_size()),
1007 GFP_KERNEL
, cpu_to_node(cpu
));
1009 goto fail_free_buffer
;
1011 rb_check_bpage(cpu_buffer
, bpage
);
1013 cpu_buffer
->reader_page
= bpage
;
1014 addr
= __get_free_page(GFP_KERNEL
);
1016 goto fail_free_reader
;
1017 bpage
->page
= (void *)addr
;
1018 rb_init_page(bpage
->page
);
1020 INIT_LIST_HEAD(&cpu_buffer
->reader_page
->list
);
1022 ret
= rb_allocate_pages(cpu_buffer
, buffer
->pages
);
1024 goto fail_free_reader
;
1026 cpu_buffer
->head_page
1027 = list_entry(cpu_buffer
->pages
, struct buffer_page
, list
);
1028 cpu_buffer
->tail_page
= cpu_buffer
->commit_page
= cpu_buffer
->head_page
;
1030 rb_head_page_activate(cpu_buffer
);
1035 free_buffer_page(cpu_buffer
->reader_page
);
1042 static void rb_free_cpu_buffer(struct ring_buffer_per_cpu
*cpu_buffer
)
1044 struct list_head
*head
= cpu_buffer
->pages
;
1045 struct buffer_page
*bpage
, *tmp
;
1047 free_buffer_page(cpu_buffer
->reader_page
);
1049 rb_head_page_deactivate(cpu_buffer
);
1052 list_for_each_entry_safe(bpage
, tmp
, head
, list
) {
1053 list_del_init(&bpage
->list
);
1054 free_buffer_page(bpage
);
1056 bpage
= list_entry(head
, struct buffer_page
, list
);
1057 free_buffer_page(bpage
);
1063 #ifdef CONFIG_HOTPLUG_CPU
1064 static int rb_cpu_notify(struct notifier_block
*self
,
1065 unsigned long action
, void *hcpu
);
1069 * ring_buffer_alloc - allocate a new ring_buffer
1070 * @size: the size in bytes per cpu that is needed.
1071 * @flags: attributes to set for the ring buffer.
1073 * Currently the only flag that is available is the RB_FL_OVERWRITE
1074 * flag. This flag means that the buffer will overwrite old data
1075 * when the buffer wraps. If this flag is not set, the buffer will
1076 * drop data when the tail hits the head.
1078 struct ring_buffer
*__ring_buffer_alloc(unsigned long size
, unsigned flags
,
1079 struct lock_class_key
*key
)
1081 struct ring_buffer
*buffer
;
1085 /* keep it in its own cache line */
1086 buffer
= kzalloc(ALIGN(sizeof(*buffer
), cache_line_size()),
1091 if (!alloc_cpumask_var(&buffer
->cpumask
, GFP_KERNEL
))
1092 goto fail_free_buffer
;
1094 buffer
->pages
= DIV_ROUND_UP(size
, BUF_PAGE_SIZE
);
1095 buffer
->flags
= flags
;
1096 buffer
->clock
= trace_clock_local
;
1097 buffer
->reader_lock_key
= key
;
1099 /* need at least two pages */
1100 if (buffer
->pages
< 2)
1104 * In case of non-hotplug cpu, if the ring-buffer is allocated
1105 * in early initcall, it will not be notified of secondary cpus.
1106 * In that off case, we need to allocate for all possible cpus.
1108 #ifdef CONFIG_HOTPLUG_CPU
1110 cpumask_copy(buffer
->cpumask
, cpu_online_mask
);
1112 cpumask_copy(buffer
->cpumask
, cpu_possible_mask
);
1114 buffer
->cpus
= nr_cpu_ids
;
1116 bsize
= sizeof(void *) * nr_cpu_ids
;
1117 buffer
->buffers
= kzalloc(ALIGN(bsize
, cache_line_size()),
1119 if (!buffer
->buffers
)
1120 goto fail_free_cpumask
;
1122 for_each_buffer_cpu(buffer
, cpu
) {
1123 buffer
->buffers
[cpu
] =
1124 rb_allocate_cpu_buffer(buffer
, cpu
);
1125 if (!buffer
->buffers
[cpu
])
1126 goto fail_free_buffers
;
1129 #ifdef CONFIG_HOTPLUG_CPU
1130 buffer
->cpu_notify
.notifier_call
= rb_cpu_notify
;
1131 buffer
->cpu_notify
.priority
= 0;
1132 register_cpu_notifier(&buffer
->cpu_notify
);
1136 mutex_init(&buffer
->mutex
);
1141 for_each_buffer_cpu(buffer
, cpu
) {
1142 if (buffer
->buffers
[cpu
])
1143 rb_free_cpu_buffer(buffer
->buffers
[cpu
]);
1145 kfree(buffer
->buffers
);
1148 free_cpumask_var(buffer
->cpumask
);
1155 EXPORT_SYMBOL_GPL(__ring_buffer_alloc
);
1158 * ring_buffer_free - free a ring buffer.
1159 * @buffer: the buffer to free.
1162 ring_buffer_free(struct ring_buffer
*buffer
)
1168 #ifdef CONFIG_HOTPLUG_CPU
1169 unregister_cpu_notifier(&buffer
->cpu_notify
);
1172 for_each_buffer_cpu(buffer
, cpu
)
1173 rb_free_cpu_buffer(buffer
->buffers
[cpu
]);
1177 kfree(buffer
->buffers
);
1178 free_cpumask_var(buffer
->cpumask
);
1182 EXPORT_SYMBOL_GPL(ring_buffer_free
);
1184 void ring_buffer_set_clock(struct ring_buffer
*buffer
,
1187 buffer
->clock
= clock
;
1190 static void rb_reset_cpu(struct ring_buffer_per_cpu
*cpu_buffer
);
1193 rb_remove_pages(struct ring_buffer_per_cpu
*cpu_buffer
, unsigned nr_pages
)
1195 struct buffer_page
*bpage
;
1196 struct list_head
*p
;
1199 spin_lock_irq(&cpu_buffer
->reader_lock
);
1200 rb_head_page_deactivate(cpu_buffer
);
1202 for (i
= 0; i
< nr_pages
; i
++) {
1203 if (RB_WARN_ON(cpu_buffer
, list_empty(cpu_buffer
->pages
)))
1205 p
= cpu_buffer
->pages
->next
;
1206 bpage
= list_entry(p
, struct buffer_page
, list
);
1207 list_del_init(&bpage
->list
);
1208 free_buffer_page(bpage
);
1210 if (RB_WARN_ON(cpu_buffer
, list_empty(cpu_buffer
->pages
)))
1213 rb_reset_cpu(cpu_buffer
);
1214 rb_check_pages(cpu_buffer
);
1216 spin_unlock_irq(&cpu_buffer
->reader_lock
);
1220 rb_insert_pages(struct ring_buffer_per_cpu
*cpu_buffer
,
1221 struct list_head
*pages
, unsigned nr_pages
)
1223 struct buffer_page
*bpage
;
1224 struct list_head
*p
;
1227 spin_lock_irq(&cpu_buffer
->reader_lock
);
1228 rb_head_page_deactivate(cpu_buffer
);
1230 for (i
= 0; i
< nr_pages
; i
++) {
1231 if (RB_WARN_ON(cpu_buffer
, list_empty(pages
)))
1234 bpage
= list_entry(p
, struct buffer_page
, list
);
1235 list_del_init(&bpage
->list
);
1236 list_add_tail(&bpage
->list
, cpu_buffer
->pages
);
1238 rb_reset_cpu(cpu_buffer
);
1239 rb_check_pages(cpu_buffer
);
1241 spin_unlock_irq(&cpu_buffer
->reader_lock
);
1245 * ring_buffer_resize - resize the ring buffer
1246 * @buffer: the buffer to resize.
1247 * @size: the new size.
1249 * Minimum size is 2 * BUF_PAGE_SIZE.
1251 * Returns -1 on failure.
1253 int ring_buffer_resize(struct ring_buffer
*buffer
, unsigned long size
)
1255 struct ring_buffer_per_cpu
*cpu_buffer
;
1256 unsigned nr_pages
, rm_pages
, new_pages
;
1257 struct buffer_page
*bpage
, *tmp
;
1258 unsigned long buffer_size
;
1264 * Always succeed at resizing a non-existent buffer:
1269 size
= DIV_ROUND_UP(size
, BUF_PAGE_SIZE
);
1270 size
*= BUF_PAGE_SIZE
;
1271 buffer_size
= buffer
->pages
* BUF_PAGE_SIZE
;
1273 /* we need a minimum of two pages */
1274 if (size
< BUF_PAGE_SIZE
* 2)
1275 size
= BUF_PAGE_SIZE
* 2;
1277 if (size
== buffer_size
)
1280 atomic_inc(&buffer
->record_disabled
);
1282 /* Make sure all writers are done with this buffer. */
1283 synchronize_sched();
1285 mutex_lock(&buffer
->mutex
);
1288 nr_pages
= DIV_ROUND_UP(size
, BUF_PAGE_SIZE
);
1290 if (size
< buffer_size
) {
1292 /* easy case, just free pages */
1293 if (RB_WARN_ON(buffer
, nr_pages
>= buffer
->pages
))
1296 rm_pages
= buffer
->pages
- nr_pages
;
1298 for_each_buffer_cpu(buffer
, cpu
) {
1299 cpu_buffer
= buffer
->buffers
[cpu
];
1300 rb_remove_pages(cpu_buffer
, rm_pages
);
1306 * This is a bit more difficult. We only want to add pages
1307 * when we can allocate enough for all CPUs. We do this
1308 * by allocating all the pages and storing them on a local
1309 * link list. If we succeed in our allocation, then we
1310 * add these pages to the cpu_buffers. Otherwise we just free
1311 * them all and return -ENOMEM;
1313 if (RB_WARN_ON(buffer
, nr_pages
<= buffer
->pages
))
1316 new_pages
= nr_pages
- buffer
->pages
;
1318 for_each_buffer_cpu(buffer
, cpu
) {
1319 for (i
= 0; i
< new_pages
; i
++) {
1320 bpage
= kzalloc_node(ALIGN(sizeof(*bpage
),
1322 GFP_KERNEL
, cpu_to_node(cpu
));
1325 list_add(&bpage
->list
, &pages
);
1326 addr
= __get_free_page(GFP_KERNEL
);
1329 bpage
->page
= (void *)addr
;
1330 rb_init_page(bpage
->page
);
1334 for_each_buffer_cpu(buffer
, cpu
) {
1335 cpu_buffer
= buffer
->buffers
[cpu
];
1336 rb_insert_pages(cpu_buffer
, &pages
, new_pages
);
1339 if (RB_WARN_ON(buffer
, !list_empty(&pages
)))
1343 buffer
->pages
= nr_pages
;
1345 mutex_unlock(&buffer
->mutex
);
1347 atomic_dec(&buffer
->record_disabled
);
1352 list_for_each_entry_safe(bpage
, tmp
, &pages
, list
) {
1353 list_del_init(&bpage
->list
);
1354 free_buffer_page(bpage
);
1357 mutex_unlock(&buffer
->mutex
);
1358 atomic_dec(&buffer
->record_disabled
);
1362 * Something went totally wrong, and we are too paranoid
1363 * to even clean up the mess.
1367 mutex_unlock(&buffer
->mutex
);
1368 atomic_dec(&buffer
->record_disabled
);
1371 EXPORT_SYMBOL_GPL(ring_buffer_resize
);
1373 static inline void *
1374 __rb_data_page_index(struct buffer_data_page
*bpage
, unsigned index
)
1376 return bpage
->data
+ index
;
1379 static inline void *__rb_page_index(struct buffer_page
*bpage
, unsigned index
)
1381 return bpage
->page
->data
+ index
;
1384 static inline struct ring_buffer_event
*
1385 rb_reader_event(struct ring_buffer_per_cpu
*cpu_buffer
)
1387 return __rb_page_index(cpu_buffer
->reader_page
,
1388 cpu_buffer
->reader_page
->read
);
1391 static inline struct ring_buffer_event
*
1392 rb_iter_head_event(struct ring_buffer_iter
*iter
)
1394 return __rb_page_index(iter
->head_page
, iter
->head
);
1397 static inline unsigned long rb_page_write(struct buffer_page
*bpage
)
1399 return local_read(&bpage
->write
) & RB_WRITE_MASK
;
1402 static inline unsigned rb_page_commit(struct buffer_page
*bpage
)
1404 return local_read(&bpage
->page
->commit
);
1407 static inline unsigned long rb_page_entries(struct buffer_page
*bpage
)
1409 return local_read(&bpage
->entries
) & RB_WRITE_MASK
;
1412 /* Size is determined by what has been commited */
1413 static inline unsigned rb_page_size(struct buffer_page
*bpage
)
1415 return rb_page_commit(bpage
);
1418 static inline unsigned
1419 rb_commit_index(struct ring_buffer_per_cpu
*cpu_buffer
)
1421 return rb_page_commit(cpu_buffer
->commit_page
);
1424 static inline unsigned
1425 rb_event_index(struct ring_buffer_event
*event
)
1427 unsigned long addr
= (unsigned long)event
;
1429 return (addr
& ~PAGE_MASK
) - BUF_PAGE_HDR_SIZE
;
1433 rb_event_is_commit(struct ring_buffer_per_cpu
*cpu_buffer
,
1434 struct ring_buffer_event
*event
)
1436 unsigned long addr
= (unsigned long)event
;
1437 unsigned long index
;
1439 index
= rb_event_index(event
);
1442 return cpu_buffer
->commit_page
->page
== (void *)addr
&&
1443 rb_commit_index(cpu_buffer
) == index
;
1447 rb_set_commit_to_write(struct ring_buffer_per_cpu
*cpu_buffer
)
1449 unsigned long max_count
;
1452 * We only race with interrupts and NMIs on this CPU.
1453 * If we own the commit event, then we can commit
1454 * all others that interrupted us, since the interruptions
1455 * are in stack format (they finish before they come
1456 * back to us). This allows us to do a simple loop to
1457 * assign the commit to the tail.
1460 max_count
= cpu_buffer
->buffer
->pages
* 100;
1462 while (cpu_buffer
->commit_page
!= cpu_buffer
->tail_page
) {
1463 if (RB_WARN_ON(cpu_buffer
, !(--max_count
)))
1465 if (RB_WARN_ON(cpu_buffer
,
1466 rb_is_reader_page(cpu_buffer
->tail_page
)))
1468 local_set(&cpu_buffer
->commit_page
->page
->commit
,
1469 rb_page_write(cpu_buffer
->commit_page
));
1470 rb_inc_page(cpu_buffer
, &cpu_buffer
->commit_page
);
1471 cpu_buffer
->write_stamp
=
1472 cpu_buffer
->commit_page
->page
->time_stamp
;
1473 /* add barrier to keep gcc from optimizing too much */
1476 while (rb_commit_index(cpu_buffer
) !=
1477 rb_page_write(cpu_buffer
->commit_page
)) {
1479 local_set(&cpu_buffer
->commit_page
->page
->commit
,
1480 rb_page_write(cpu_buffer
->commit_page
));
1481 RB_WARN_ON(cpu_buffer
,
1482 local_read(&cpu_buffer
->commit_page
->page
->commit
) &
1487 /* again, keep gcc from optimizing */
1491 * If an interrupt came in just after the first while loop
1492 * and pushed the tail page forward, we will be left with
1493 * a dangling commit that will never go forward.
1495 if (unlikely(cpu_buffer
->commit_page
!= cpu_buffer
->tail_page
))
1499 static void rb_reset_reader_page(struct ring_buffer_per_cpu
*cpu_buffer
)
1501 cpu_buffer
->read_stamp
= cpu_buffer
->reader_page
->page
->time_stamp
;
1502 cpu_buffer
->reader_page
->read
= 0;
1505 static void rb_inc_iter(struct ring_buffer_iter
*iter
)
1507 struct ring_buffer_per_cpu
*cpu_buffer
= iter
->cpu_buffer
;
1510 * The iterator could be on the reader page (it starts there).
1511 * But the head could have moved, since the reader was
1512 * found. Check for this case and assign the iterator
1513 * to the head page instead of next.
1515 if (iter
->head_page
== cpu_buffer
->reader_page
)
1516 iter
->head_page
= rb_set_head_page(cpu_buffer
);
1518 rb_inc_page(cpu_buffer
, &iter
->head_page
);
1520 iter
->read_stamp
= iter
->head_page
->page
->time_stamp
;
1525 * ring_buffer_update_event - update event type and data
1526 * @event: the even to update
1527 * @type: the type of event
1528 * @length: the size of the event field in the ring buffer
1530 * Update the type and data fields of the event. The length
1531 * is the actual size that is written to the ring buffer,
1532 * and with this, we can determine what to place into the
1536 rb_update_event(struct ring_buffer_event
*event
,
1537 unsigned type
, unsigned length
)
1539 event
->type_len
= type
;
1543 case RINGBUF_TYPE_PADDING
:
1544 case RINGBUF_TYPE_TIME_EXTEND
:
1545 case RINGBUF_TYPE_TIME_STAMP
:
1549 length
-= RB_EVNT_HDR_SIZE
;
1550 if (length
> RB_MAX_SMALL_DATA
)
1551 event
->array
[0] = length
;
1553 event
->type_len
= DIV_ROUND_UP(length
, RB_ALIGNMENT
);
1561 * rb_handle_head_page - writer hit the head page
1563 * Returns: +1 to retry page
1568 rb_handle_head_page(struct ring_buffer_per_cpu
*cpu_buffer
,
1569 struct buffer_page
*tail_page
,
1570 struct buffer_page
*next_page
)
1572 struct buffer_page
*new_head
;
1577 entries
= rb_page_entries(next_page
);
1580 * The hard part is here. We need to move the head
1581 * forward, and protect against both readers on
1582 * other CPUs and writers coming in via interrupts.
1584 type
= rb_head_page_set_update(cpu_buffer
, next_page
, tail_page
,
1588 * type can be one of four:
1589 * NORMAL - an interrupt already moved it for us
1590 * HEAD - we are the first to get here.
1591 * UPDATE - we are the interrupt interrupting
1593 * MOVED - a reader on another CPU moved the next
1594 * pointer to its reader page. Give up
1601 * We changed the head to UPDATE, thus
1602 * it is our responsibility to update
1605 local_add(entries
, &cpu_buffer
->overrun
);
1608 * The entries will be zeroed out when we move the
1612 /* still more to do */
1615 case RB_PAGE_UPDATE
:
1617 * This is an interrupt that interrupt the
1618 * previous update. Still more to do.
1621 case RB_PAGE_NORMAL
:
1623 * An interrupt came in before the update
1624 * and processed this for us.
1625 * Nothing left to do.
1630 * The reader is on another CPU and just did
1631 * a swap with our next_page.
1636 RB_WARN_ON(cpu_buffer
, 1); /* WTF??? */
1641 * Now that we are here, the old head pointer is
1642 * set to UPDATE. This will keep the reader from
1643 * swapping the head page with the reader page.
1644 * The reader (on another CPU) will spin till
1647 * We just need to protect against interrupts
1648 * doing the job. We will set the next pointer
1649 * to HEAD. After that, we set the old pointer
1650 * to NORMAL, but only if it was HEAD before.
1651 * otherwise we are an interrupt, and only
1652 * want the outer most commit to reset it.
1654 new_head
= next_page
;
1655 rb_inc_page(cpu_buffer
, &new_head
);
1657 ret
= rb_head_page_set_head(cpu_buffer
, new_head
, next_page
,
1661 * Valid returns are:
1662 * HEAD - an interrupt came in and already set it.
1663 * NORMAL - One of two things:
1664 * 1) We really set it.
1665 * 2) A bunch of interrupts came in and moved
1666 * the page forward again.
1670 case RB_PAGE_NORMAL
:
1674 RB_WARN_ON(cpu_buffer
, 1);
1679 * It is possible that an interrupt came in,
1680 * set the head up, then more interrupts came in
1681 * and moved it again. When we get back here,
1682 * the page would have been set to NORMAL but we
1683 * just set it back to HEAD.
1685 * How do you detect this? Well, if that happened
1686 * the tail page would have moved.
1688 if (ret
== RB_PAGE_NORMAL
) {
1690 * If the tail had moved passed next, then we need
1691 * to reset the pointer.
1693 if (cpu_buffer
->tail_page
!= tail_page
&&
1694 cpu_buffer
->tail_page
!= next_page
)
1695 rb_head_page_set_normal(cpu_buffer
, new_head
,
1701 * If this was the outer most commit (the one that
1702 * changed the original pointer from HEAD to UPDATE),
1703 * then it is up to us to reset it to NORMAL.
1705 if (type
== RB_PAGE_HEAD
) {
1706 ret
= rb_head_page_set_normal(cpu_buffer
, next_page
,
1709 if (RB_WARN_ON(cpu_buffer
,
1710 ret
!= RB_PAGE_UPDATE
))
1717 static unsigned rb_calculate_event_length(unsigned length
)
1719 struct ring_buffer_event event
; /* Used only for sizeof array */
1721 /* zero length can cause confusions */
1725 if (length
> RB_MAX_SMALL_DATA
)
1726 length
+= sizeof(event
.array
[0]);
1728 length
+= RB_EVNT_HDR_SIZE
;
1729 length
= ALIGN(length
, RB_ALIGNMENT
);
1735 rb_reset_tail(struct ring_buffer_per_cpu
*cpu_buffer
,
1736 struct buffer_page
*tail_page
,
1737 unsigned long tail
, unsigned long length
)
1739 struct ring_buffer_event
*event
;
1742 * Only the event that crossed the page boundary
1743 * must fill the old tail_page with padding.
1745 if (tail
>= BUF_PAGE_SIZE
) {
1746 local_sub(length
, &tail_page
->write
);
1750 event
= __rb_page_index(tail_page
, tail
);
1751 kmemcheck_annotate_bitfield(event
, bitfield
);
1754 * If this event is bigger than the minimum size, then
1755 * we need to be careful that we don't subtract the
1756 * write counter enough to allow another writer to slip
1758 * We put in a discarded commit instead, to make sure
1759 * that this space is not used again.
1761 * If we are less than the minimum size, we don't need to
1764 if (tail
> (BUF_PAGE_SIZE
- RB_EVNT_MIN_SIZE
)) {
1765 /* No room for any events */
1767 /* Mark the rest of the page with padding */
1768 rb_event_set_padding(event
);
1770 /* Set the write back to the previous setting */
1771 local_sub(length
, &tail_page
->write
);
1775 /* Put in a discarded event */
1776 event
->array
[0] = (BUF_PAGE_SIZE
- tail
) - RB_EVNT_HDR_SIZE
;
1777 event
->type_len
= RINGBUF_TYPE_PADDING
;
1778 /* time delta must be non zero */
1779 event
->time_delta
= 1;
1781 /* Set write to end of buffer */
1782 length
= (tail
+ length
) - BUF_PAGE_SIZE
;
1783 local_sub(length
, &tail_page
->write
);
1786 static struct ring_buffer_event
*
1787 rb_move_tail(struct ring_buffer_per_cpu
*cpu_buffer
,
1788 unsigned long length
, unsigned long tail
,
1789 struct buffer_page
*tail_page
, u64
*ts
)
1791 struct buffer_page
*commit_page
= cpu_buffer
->commit_page
;
1792 struct ring_buffer
*buffer
= cpu_buffer
->buffer
;
1793 struct buffer_page
*next_page
;
1796 next_page
= tail_page
;
1798 rb_inc_page(cpu_buffer
, &next_page
);
1801 * If for some reason, we had an interrupt storm that made
1802 * it all the way around the buffer, bail, and warn
1805 if (unlikely(next_page
== commit_page
)) {
1806 local_inc(&cpu_buffer
->commit_overrun
);
1811 * This is where the fun begins!
1813 * We are fighting against races between a reader that
1814 * could be on another CPU trying to swap its reader
1815 * page with the buffer head.
1817 * We are also fighting against interrupts coming in and
1818 * moving the head or tail on us as well.
1820 * If the next page is the head page then we have filled
1821 * the buffer, unless the commit page is still on the
1824 if (rb_is_head_page(cpu_buffer
, next_page
, &tail_page
->list
)) {
1827 * If the commit is not on the reader page, then
1828 * move the header page.
1830 if (!rb_is_reader_page(cpu_buffer
->commit_page
)) {
1832 * If we are not in overwrite mode,
1833 * this is easy, just stop here.
1835 if (!(buffer
->flags
& RB_FL_OVERWRITE
))
1838 ret
= rb_handle_head_page(cpu_buffer
,
1847 * We need to be careful here too. The
1848 * commit page could still be on the reader
1849 * page. We could have a small buffer, and
1850 * have filled up the buffer with events
1851 * from interrupts and such, and wrapped.
1853 * Note, if the tail page is also the on the
1854 * reader_page, we let it move out.
1856 if (unlikely((cpu_buffer
->commit_page
!=
1857 cpu_buffer
->tail_page
) &&
1858 (cpu_buffer
->commit_page
==
1859 cpu_buffer
->reader_page
))) {
1860 local_inc(&cpu_buffer
->commit_overrun
);
1866 ret
= rb_tail_page_update(cpu_buffer
, tail_page
, next_page
);
1869 * Nested commits always have zero deltas, so
1870 * just reread the time stamp
1872 *ts
= rb_time_stamp(buffer
);
1873 next_page
->page
->time_stamp
= *ts
;
1878 rb_reset_tail(cpu_buffer
, tail_page
, tail
, length
);
1880 /* fail and let the caller try again */
1881 return ERR_PTR(-EAGAIN
);
1885 rb_reset_tail(cpu_buffer
, tail_page
, tail
, length
);
1890 static struct ring_buffer_event
*
1891 __rb_reserve_next(struct ring_buffer_per_cpu
*cpu_buffer
,
1892 unsigned type
, unsigned long length
, u64
*ts
)
1894 struct buffer_page
*tail_page
;
1895 struct ring_buffer_event
*event
;
1896 unsigned long tail
, write
;
1898 tail_page
= cpu_buffer
->tail_page
;
1899 write
= local_add_return(length
, &tail_page
->write
);
1901 /* set write to only the index of the write */
1902 write
&= RB_WRITE_MASK
;
1903 tail
= write
- length
;
1905 /* See if we shot pass the end of this buffer page */
1906 if (write
> BUF_PAGE_SIZE
)
1907 return rb_move_tail(cpu_buffer
, length
, tail
,
1910 /* We reserved something on the buffer */
1912 event
= __rb_page_index(tail_page
, tail
);
1913 kmemcheck_annotate_bitfield(event
, bitfield
);
1914 rb_update_event(event
, type
, length
);
1916 /* The passed in type is zero for DATA */
1918 local_inc(&tail_page
->entries
);
1921 * If this is the first commit on the page, then update
1925 tail_page
->page
->time_stamp
= *ts
;
1931 rb_try_to_discard(struct ring_buffer_per_cpu
*cpu_buffer
,
1932 struct ring_buffer_event
*event
)
1934 unsigned long new_index
, old_index
;
1935 struct buffer_page
*bpage
;
1936 unsigned long index
;
1939 new_index
= rb_event_index(event
);
1940 old_index
= new_index
+ rb_event_length(event
);
1941 addr
= (unsigned long)event
;
1944 bpage
= cpu_buffer
->tail_page
;
1946 if (bpage
->page
== (void *)addr
&& rb_page_write(bpage
) == old_index
) {
1947 unsigned long write_mask
=
1948 local_read(&bpage
->write
) & ~RB_WRITE_MASK
;
1950 * This is on the tail page. It is possible that
1951 * a write could come in and move the tail page
1952 * and write to the next page. That is fine
1953 * because we just shorten what is on this page.
1955 old_index
+= write_mask
;
1956 new_index
+= write_mask
;
1957 index
= local_cmpxchg(&bpage
->write
, old_index
, new_index
);
1958 if (index
== old_index
)
1962 /* could not discard */
1967 rb_add_time_stamp(struct ring_buffer_per_cpu
*cpu_buffer
,
1968 u64
*ts
, u64
*delta
)
1970 struct ring_buffer_event
*event
;
1974 if (unlikely(*delta
> (1ULL << 59) && !once
++)) {
1975 printk(KERN_WARNING
"Delta way too big! %llu"
1976 " ts=%llu write stamp = %llu\n",
1977 (unsigned long long)*delta
,
1978 (unsigned long long)*ts
,
1979 (unsigned long long)cpu_buffer
->write_stamp
);
1984 * The delta is too big, we to add a
1987 event
= __rb_reserve_next(cpu_buffer
,
1988 RINGBUF_TYPE_TIME_EXTEND
,
1994 if (PTR_ERR(event
) == -EAGAIN
)
1997 /* Only a commited time event can update the write stamp */
1998 if (rb_event_is_commit(cpu_buffer
, event
)) {
2000 * If this is the first on the page, then it was
2001 * updated with the page itself. Try to discard it
2002 * and if we can't just make it zero.
2004 if (rb_event_index(event
)) {
2005 event
->time_delta
= *delta
& TS_MASK
;
2006 event
->array
[0] = *delta
>> TS_SHIFT
;
2008 /* try to discard, since we do not need this */
2009 if (!rb_try_to_discard(cpu_buffer
, event
)) {
2010 /* nope, just zero it */
2011 event
->time_delta
= 0;
2012 event
->array
[0] = 0;
2015 cpu_buffer
->write_stamp
= *ts
;
2016 /* let the caller know this was the commit */
2019 /* Try to discard the event */
2020 if (!rb_try_to_discard(cpu_buffer
, event
)) {
2021 /* Darn, this is just wasted space */
2022 event
->time_delta
= 0;
2023 event
->array
[0] = 0;
2033 static void rb_start_commit(struct ring_buffer_per_cpu
*cpu_buffer
)
2035 local_inc(&cpu_buffer
->committing
);
2036 local_inc(&cpu_buffer
->commits
);
2039 static void rb_end_commit(struct ring_buffer_per_cpu
*cpu_buffer
)
2041 unsigned long commits
;
2043 if (RB_WARN_ON(cpu_buffer
,
2044 !local_read(&cpu_buffer
->committing
)))
2048 commits
= local_read(&cpu_buffer
->commits
);
2049 /* synchronize with interrupts */
2051 if (local_read(&cpu_buffer
->committing
) == 1)
2052 rb_set_commit_to_write(cpu_buffer
);
2054 local_dec(&cpu_buffer
->committing
);
2056 /* synchronize with interrupts */
2060 * Need to account for interrupts coming in between the
2061 * updating of the commit page and the clearing of the
2062 * committing counter.
2064 if (unlikely(local_read(&cpu_buffer
->commits
) != commits
) &&
2065 !local_read(&cpu_buffer
->committing
)) {
2066 local_inc(&cpu_buffer
->committing
);
2071 static struct ring_buffer_event
*
2072 rb_reserve_next_event(struct ring_buffer
*buffer
,
2073 struct ring_buffer_per_cpu
*cpu_buffer
,
2074 unsigned long length
)
2076 struct ring_buffer_event
*event
;
2081 rb_start_commit(cpu_buffer
);
2083 #ifdef CONFIG_RING_BUFFER_ALLOW_SWAP
2085 * Due to the ability to swap a cpu buffer from a buffer
2086 * it is possible it was swapped before we committed.
2087 * (committing stops a swap). We check for it here and
2088 * if it happened, we have to fail the write.
2091 if (unlikely(ACCESS_ONCE(cpu_buffer
->buffer
) != buffer
)) {
2092 local_dec(&cpu_buffer
->committing
);
2093 local_dec(&cpu_buffer
->commits
);
2098 length
= rb_calculate_event_length(length
);
2101 * We allow for interrupts to reenter here and do a trace.
2102 * If one does, it will cause this original code to loop
2103 * back here. Even with heavy interrupts happening, this
2104 * should only happen a few times in a row. If this happens
2105 * 1000 times in a row, there must be either an interrupt
2106 * storm or we have something buggy.
2109 if (RB_WARN_ON(cpu_buffer
, ++nr_loops
> 1000))
2112 ts
= rb_time_stamp(cpu_buffer
->buffer
);
2115 * Only the first commit can update the timestamp.
2116 * Yes there is a race here. If an interrupt comes in
2117 * just after the conditional and it traces too, then it
2118 * will also check the deltas. More than one timestamp may
2119 * also be made. But only the entry that did the actual
2120 * commit will be something other than zero.
2122 if (likely(cpu_buffer
->tail_page
== cpu_buffer
->commit_page
&&
2123 rb_page_write(cpu_buffer
->tail_page
) ==
2124 rb_commit_index(cpu_buffer
))) {
2127 diff
= ts
- cpu_buffer
->write_stamp
;
2129 /* make sure this diff is calculated here */
2132 /* Did the write stamp get updated already? */
2133 if (unlikely(ts
< cpu_buffer
->write_stamp
))
2137 if (unlikely(test_time_stamp(delta
))) {
2139 commit
= rb_add_time_stamp(cpu_buffer
, &ts
, &delta
);
2140 if (commit
== -EBUSY
)
2143 if (commit
== -EAGAIN
)
2146 RB_WARN_ON(cpu_buffer
, commit
< 0);
2151 event
= __rb_reserve_next(cpu_buffer
, 0, length
, &ts
);
2152 if (unlikely(PTR_ERR(event
) == -EAGAIN
))
2158 if (!rb_event_is_commit(cpu_buffer
, event
))
2161 event
->time_delta
= delta
;
2166 rb_end_commit(cpu_buffer
);
2170 #ifdef CONFIG_TRACING
2172 #define TRACE_RECURSIVE_DEPTH 16
2174 static int trace_recursive_lock(void)
2176 current
->trace_recursion
++;
2178 if (likely(current
->trace_recursion
< TRACE_RECURSIVE_DEPTH
))
2181 /* Disable all tracing before we do anything else */
2182 tracing_off_permanent();
2184 printk_once(KERN_WARNING
"Tracing recursion: depth[%ld]:"
2185 "HC[%lu]:SC[%lu]:NMI[%lu]\n",
2186 current
->trace_recursion
,
2187 hardirq_count() >> HARDIRQ_SHIFT
,
2188 softirq_count() >> SOFTIRQ_SHIFT
,
2195 static void trace_recursive_unlock(void)
2197 WARN_ON_ONCE(!current
->trace_recursion
);
2199 current
->trace_recursion
--;
2204 #define trace_recursive_lock() (0)
2205 #define trace_recursive_unlock() do { } while (0)
2209 static DEFINE_PER_CPU(int, rb_need_resched
);
2212 * ring_buffer_lock_reserve - reserve a part of the buffer
2213 * @buffer: the ring buffer to reserve from
2214 * @length: the length of the data to reserve (excluding event header)
2216 * Returns a reseverd event on the ring buffer to copy directly to.
2217 * The user of this interface will need to get the body to write into
2218 * and can use the ring_buffer_event_data() interface.
2220 * The length is the length of the data needed, not the event length
2221 * which also includes the event header.
2223 * Must be paired with ring_buffer_unlock_commit, unless NULL is returned.
2224 * If NULL is returned, then nothing has been allocated or locked.
2226 struct ring_buffer_event
*
2227 ring_buffer_lock_reserve(struct ring_buffer
*buffer
, unsigned long length
)
2229 struct ring_buffer_per_cpu
*cpu_buffer
;
2230 struct ring_buffer_event
*event
;
2233 if (ring_buffer_flags
!= RB_BUFFERS_ON
)
2236 /* If we are tracing schedule, we don't want to recurse */
2237 resched
= ftrace_preempt_disable();
2239 if (atomic_read(&buffer
->record_disabled
))
2242 if (trace_recursive_lock())
2245 cpu
= raw_smp_processor_id();
2247 if (!cpumask_test_cpu(cpu
, buffer
->cpumask
))
2250 cpu_buffer
= buffer
->buffers
[cpu
];
2252 if (atomic_read(&cpu_buffer
->record_disabled
))
2255 if (length
> BUF_MAX_DATA_SIZE
)
2258 event
= rb_reserve_next_event(buffer
, cpu_buffer
, length
);
2263 * Need to store resched state on this cpu.
2264 * Only the first needs to.
2267 if (preempt_count() == 1)
2268 per_cpu(rb_need_resched
, cpu
) = resched
;
2273 trace_recursive_unlock();
2276 ftrace_preempt_enable(resched
);
2279 EXPORT_SYMBOL_GPL(ring_buffer_lock_reserve
);
2282 rb_update_write_stamp(struct ring_buffer_per_cpu
*cpu_buffer
,
2283 struct ring_buffer_event
*event
)
2286 * The event first in the commit queue updates the
2289 if (rb_event_is_commit(cpu_buffer
, event
))
2290 cpu_buffer
->write_stamp
+= event
->time_delta
;
2293 static void rb_commit(struct ring_buffer_per_cpu
*cpu_buffer
,
2294 struct ring_buffer_event
*event
)
2296 local_inc(&cpu_buffer
->entries
);
2297 rb_update_write_stamp(cpu_buffer
, event
);
2298 rb_end_commit(cpu_buffer
);
2302 * ring_buffer_unlock_commit - commit a reserved
2303 * @buffer: The buffer to commit to
2304 * @event: The event pointer to commit.
2306 * This commits the data to the ring buffer, and releases any locks held.
2308 * Must be paired with ring_buffer_lock_reserve.
2310 int ring_buffer_unlock_commit(struct ring_buffer
*buffer
,
2311 struct ring_buffer_event
*event
)
2313 struct ring_buffer_per_cpu
*cpu_buffer
;
2314 int cpu
= raw_smp_processor_id();
2316 cpu_buffer
= buffer
->buffers
[cpu
];
2318 rb_commit(cpu_buffer
, event
);
2320 trace_recursive_unlock();
2323 * Only the last preempt count needs to restore preemption.
2325 if (preempt_count() == 1)
2326 ftrace_preempt_enable(per_cpu(rb_need_resched
, cpu
));
2328 preempt_enable_no_resched_notrace();
2332 EXPORT_SYMBOL_GPL(ring_buffer_unlock_commit
);
2334 static inline void rb_event_discard(struct ring_buffer_event
*event
)
2336 /* array[0] holds the actual length for the discarded event */
2337 event
->array
[0] = rb_event_data_length(event
) - RB_EVNT_HDR_SIZE
;
2338 event
->type_len
= RINGBUF_TYPE_PADDING
;
2339 /* time delta must be non zero */
2340 if (!event
->time_delta
)
2341 event
->time_delta
= 1;
2345 * Decrement the entries to the page that an event is on.
2346 * The event does not even need to exist, only the pointer
2347 * to the page it is on. This may only be called before the commit
2351 rb_decrement_entry(struct ring_buffer_per_cpu
*cpu_buffer
,
2352 struct ring_buffer_event
*event
)
2354 unsigned long addr
= (unsigned long)event
;
2355 struct buffer_page
*bpage
= cpu_buffer
->commit_page
;
2356 struct buffer_page
*start
;
2360 /* Do the likely case first */
2361 if (likely(bpage
->page
== (void *)addr
)) {
2362 local_dec(&bpage
->entries
);
2367 * Because the commit page may be on the reader page we
2368 * start with the next page and check the end loop there.
2370 rb_inc_page(cpu_buffer
, &bpage
);
2373 if (bpage
->page
== (void *)addr
) {
2374 local_dec(&bpage
->entries
);
2377 rb_inc_page(cpu_buffer
, &bpage
);
2378 } while (bpage
!= start
);
2380 /* commit not part of this buffer?? */
2381 RB_WARN_ON(cpu_buffer
, 1);
2385 * ring_buffer_commit_discard - discard an event that has not been committed
2386 * @buffer: the ring buffer
2387 * @event: non committed event to discard
2389 * Sometimes an event that is in the ring buffer needs to be ignored.
2390 * This function lets the user discard an event in the ring buffer
2391 * and then that event will not be read later.
2393 * This function only works if it is called before the the item has been
2394 * committed. It will try to free the event from the ring buffer
2395 * if another event has not been added behind it.
2397 * If another event has been added behind it, it will set the event
2398 * up as discarded, and perform the commit.
2400 * If this function is called, do not call ring_buffer_unlock_commit on
2403 void ring_buffer_discard_commit(struct ring_buffer
*buffer
,
2404 struct ring_buffer_event
*event
)
2406 struct ring_buffer_per_cpu
*cpu_buffer
;
2409 /* The event is discarded regardless */
2410 rb_event_discard(event
);
2412 cpu
= smp_processor_id();
2413 cpu_buffer
= buffer
->buffers
[cpu
];
2416 * This must only be called if the event has not been
2417 * committed yet. Thus we can assume that preemption
2418 * is still disabled.
2420 RB_WARN_ON(buffer
, !local_read(&cpu_buffer
->committing
));
2422 rb_decrement_entry(cpu_buffer
, event
);
2423 if (rb_try_to_discard(cpu_buffer
, event
))
2427 * The commit is still visible by the reader, so we
2428 * must still update the timestamp.
2430 rb_update_write_stamp(cpu_buffer
, event
);
2432 rb_end_commit(cpu_buffer
);
2434 trace_recursive_unlock();
2437 * Only the last preempt count needs to restore preemption.
2439 if (preempt_count() == 1)
2440 ftrace_preempt_enable(per_cpu(rb_need_resched
, cpu
));
2442 preempt_enable_no_resched_notrace();
2445 EXPORT_SYMBOL_GPL(ring_buffer_discard_commit
);
2448 * ring_buffer_write - write data to the buffer without reserving
2449 * @buffer: The ring buffer to write to.
2450 * @length: The length of the data being written (excluding the event header)
2451 * @data: The data to write to the buffer.
2453 * This is like ring_buffer_lock_reserve and ring_buffer_unlock_commit as
2454 * one function. If you already have the data to write to the buffer, it
2455 * may be easier to simply call this function.
2457 * Note, like ring_buffer_lock_reserve, the length is the length of the data
2458 * and not the length of the event which would hold the header.
2460 int ring_buffer_write(struct ring_buffer
*buffer
,
2461 unsigned long length
,
2464 struct ring_buffer_per_cpu
*cpu_buffer
;
2465 struct ring_buffer_event
*event
;
2470 if (ring_buffer_flags
!= RB_BUFFERS_ON
)
2473 resched
= ftrace_preempt_disable();
2475 if (atomic_read(&buffer
->record_disabled
))
2478 cpu
= raw_smp_processor_id();
2480 if (!cpumask_test_cpu(cpu
, buffer
->cpumask
))
2483 cpu_buffer
= buffer
->buffers
[cpu
];
2485 if (atomic_read(&cpu_buffer
->record_disabled
))
2488 if (length
> BUF_MAX_DATA_SIZE
)
2491 event
= rb_reserve_next_event(buffer
, cpu_buffer
, length
);
2495 body
= rb_event_data(event
);
2497 memcpy(body
, data
, length
);
2499 rb_commit(cpu_buffer
, event
);
2503 ftrace_preempt_enable(resched
);
2507 EXPORT_SYMBOL_GPL(ring_buffer_write
);
2509 static int rb_per_cpu_empty(struct ring_buffer_per_cpu
*cpu_buffer
)
2511 struct buffer_page
*reader
= cpu_buffer
->reader_page
;
2512 struct buffer_page
*head
= rb_set_head_page(cpu_buffer
);
2513 struct buffer_page
*commit
= cpu_buffer
->commit_page
;
2515 /* In case of error, head will be NULL */
2516 if (unlikely(!head
))
2519 return reader
->read
== rb_page_commit(reader
) &&
2520 (commit
== reader
||
2522 head
->read
== rb_page_commit(commit
)));
2526 * ring_buffer_record_disable - stop all writes into the buffer
2527 * @buffer: The ring buffer to stop writes to.
2529 * This prevents all writes to the buffer. Any attempt to write
2530 * to the buffer after this will fail and return NULL.
2532 * The caller should call synchronize_sched() after this.
2534 void ring_buffer_record_disable(struct ring_buffer
*buffer
)
2536 atomic_inc(&buffer
->record_disabled
);
2538 EXPORT_SYMBOL_GPL(ring_buffer_record_disable
);
2541 * ring_buffer_record_enable - enable writes to the buffer
2542 * @buffer: The ring buffer to enable writes
2544 * Note, multiple disables will need the same number of enables
2545 * to truly enable the writing (much like preempt_disable).
2547 void ring_buffer_record_enable(struct ring_buffer
*buffer
)
2549 atomic_dec(&buffer
->record_disabled
);
2551 EXPORT_SYMBOL_GPL(ring_buffer_record_enable
);
2554 * ring_buffer_record_disable_cpu - stop all writes into the cpu_buffer
2555 * @buffer: The ring buffer to stop writes to.
2556 * @cpu: The CPU buffer to stop
2558 * This prevents all writes to the buffer. Any attempt to write
2559 * to the buffer after this will fail and return NULL.
2561 * The caller should call synchronize_sched() after this.
2563 void ring_buffer_record_disable_cpu(struct ring_buffer
*buffer
, int cpu
)
2565 struct ring_buffer_per_cpu
*cpu_buffer
;
2567 if (!cpumask_test_cpu(cpu
, buffer
->cpumask
))
2570 cpu_buffer
= buffer
->buffers
[cpu
];
2571 atomic_inc(&cpu_buffer
->record_disabled
);
2573 EXPORT_SYMBOL_GPL(ring_buffer_record_disable_cpu
);
2576 * ring_buffer_record_enable_cpu - enable writes to the buffer
2577 * @buffer: The ring buffer to enable writes
2578 * @cpu: The CPU to enable.
2580 * Note, multiple disables will need the same number of enables
2581 * to truly enable the writing (much like preempt_disable).
2583 void ring_buffer_record_enable_cpu(struct ring_buffer
*buffer
, int cpu
)
2585 struct ring_buffer_per_cpu
*cpu_buffer
;
2587 if (!cpumask_test_cpu(cpu
, buffer
->cpumask
))
2590 cpu_buffer
= buffer
->buffers
[cpu
];
2591 atomic_dec(&cpu_buffer
->record_disabled
);
2593 EXPORT_SYMBOL_GPL(ring_buffer_record_enable_cpu
);
2596 * ring_buffer_entries_cpu - get the number of entries in a cpu buffer
2597 * @buffer: The ring buffer
2598 * @cpu: The per CPU buffer to get the entries from.
2600 unsigned long ring_buffer_entries_cpu(struct ring_buffer
*buffer
, int cpu
)
2602 struct ring_buffer_per_cpu
*cpu_buffer
;
2605 if (!cpumask_test_cpu(cpu
, buffer
->cpumask
))
2608 cpu_buffer
= buffer
->buffers
[cpu
];
2609 ret
= (local_read(&cpu_buffer
->entries
) - local_read(&cpu_buffer
->overrun
))
2614 EXPORT_SYMBOL_GPL(ring_buffer_entries_cpu
);
2617 * ring_buffer_overrun_cpu - get the number of overruns in a cpu_buffer
2618 * @buffer: The ring buffer
2619 * @cpu: The per CPU buffer to get the number of overruns from
2621 unsigned long ring_buffer_overrun_cpu(struct ring_buffer
*buffer
, int cpu
)
2623 struct ring_buffer_per_cpu
*cpu_buffer
;
2626 if (!cpumask_test_cpu(cpu
, buffer
->cpumask
))
2629 cpu_buffer
= buffer
->buffers
[cpu
];
2630 ret
= local_read(&cpu_buffer
->overrun
);
2634 EXPORT_SYMBOL_GPL(ring_buffer_overrun_cpu
);
2637 * ring_buffer_commit_overrun_cpu - get the number of overruns caused by commits
2638 * @buffer: The ring buffer
2639 * @cpu: The per CPU buffer to get the number of overruns from
2642 ring_buffer_commit_overrun_cpu(struct ring_buffer
*buffer
, int cpu
)
2644 struct ring_buffer_per_cpu
*cpu_buffer
;
2647 if (!cpumask_test_cpu(cpu
, buffer
->cpumask
))
2650 cpu_buffer
= buffer
->buffers
[cpu
];
2651 ret
= local_read(&cpu_buffer
->commit_overrun
);
2655 EXPORT_SYMBOL_GPL(ring_buffer_commit_overrun_cpu
);
2658 * ring_buffer_entries - get the number of entries in a buffer
2659 * @buffer: The ring buffer
2661 * Returns the total number of entries in the ring buffer
2664 unsigned long ring_buffer_entries(struct ring_buffer
*buffer
)
2666 struct ring_buffer_per_cpu
*cpu_buffer
;
2667 unsigned long entries
= 0;
2670 /* if you care about this being correct, lock the buffer */
2671 for_each_buffer_cpu(buffer
, cpu
) {
2672 cpu_buffer
= buffer
->buffers
[cpu
];
2673 entries
+= (local_read(&cpu_buffer
->entries
) -
2674 local_read(&cpu_buffer
->overrun
)) - cpu_buffer
->read
;
2679 EXPORT_SYMBOL_GPL(ring_buffer_entries
);
2682 * ring_buffer_overruns - get the number of overruns in buffer
2683 * @buffer: The ring buffer
2685 * Returns the total number of overruns in the ring buffer
2688 unsigned long ring_buffer_overruns(struct ring_buffer
*buffer
)
2690 struct ring_buffer_per_cpu
*cpu_buffer
;
2691 unsigned long overruns
= 0;
2694 /* if you care about this being correct, lock the buffer */
2695 for_each_buffer_cpu(buffer
, cpu
) {
2696 cpu_buffer
= buffer
->buffers
[cpu
];
2697 overruns
+= local_read(&cpu_buffer
->overrun
);
2702 EXPORT_SYMBOL_GPL(ring_buffer_overruns
);
2704 static void rb_iter_reset(struct ring_buffer_iter
*iter
)
2706 struct ring_buffer_per_cpu
*cpu_buffer
= iter
->cpu_buffer
;
2708 /* Iterator usage is expected to have record disabled */
2709 if (list_empty(&cpu_buffer
->reader_page
->list
)) {
2710 iter
->head_page
= rb_set_head_page(cpu_buffer
);
2711 if (unlikely(!iter
->head_page
))
2713 iter
->head
= iter
->head_page
->read
;
2715 iter
->head_page
= cpu_buffer
->reader_page
;
2716 iter
->head
= cpu_buffer
->reader_page
->read
;
2719 iter
->read_stamp
= cpu_buffer
->read_stamp
;
2721 iter
->read_stamp
= iter
->head_page
->page
->time_stamp
;
2722 iter
->cache_reader_page
= cpu_buffer
->reader_page
;
2723 iter
->cache_read
= cpu_buffer
->read
;
2727 * ring_buffer_iter_reset - reset an iterator
2728 * @iter: The iterator to reset
2730 * Resets the iterator, so that it will start from the beginning
2733 void ring_buffer_iter_reset(struct ring_buffer_iter
*iter
)
2735 struct ring_buffer_per_cpu
*cpu_buffer
;
2736 unsigned long flags
;
2741 cpu_buffer
= iter
->cpu_buffer
;
2743 spin_lock_irqsave(&cpu_buffer
->reader_lock
, flags
);
2744 rb_iter_reset(iter
);
2745 spin_unlock_irqrestore(&cpu_buffer
->reader_lock
, flags
);
2747 EXPORT_SYMBOL_GPL(ring_buffer_iter_reset
);
2750 * ring_buffer_iter_empty - check if an iterator has no more to read
2751 * @iter: The iterator to check
2753 int ring_buffer_iter_empty(struct ring_buffer_iter
*iter
)
2755 struct ring_buffer_per_cpu
*cpu_buffer
;
2757 cpu_buffer
= iter
->cpu_buffer
;
2759 return iter
->head_page
== cpu_buffer
->commit_page
&&
2760 iter
->head
== rb_commit_index(cpu_buffer
);
2762 EXPORT_SYMBOL_GPL(ring_buffer_iter_empty
);
2765 rb_update_read_stamp(struct ring_buffer_per_cpu
*cpu_buffer
,
2766 struct ring_buffer_event
*event
)
2770 switch (event
->type_len
) {
2771 case RINGBUF_TYPE_PADDING
:
2774 case RINGBUF_TYPE_TIME_EXTEND
:
2775 delta
= event
->array
[0];
2777 delta
+= event
->time_delta
;
2778 cpu_buffer
->read_stamp
+= delta
;
2781 case RINGBUF_TYPE_TIME_STAMP
:
2782 /* FIXME: not implemented */
2785 case RINGBUF_TYPE_DATA
:
2786 cpu_buffer
->read_stamp
+= event
->time_delta
;
2796 rb_update_iter_read_stamp(struct ring_buffer_iter
*iter
,
2797 struct ring_buffer_event
*event
)
2801 switch (event
->type_len
) {
2802 case RINGBUF_TYPE_PADDING
:
2805 case RINGBUF_TYPE_TIME_EXTEND
:
2806 delta
= event
->array
[0];
2808 delta
+= event
->time_delta
;
2809 iter
->read_stamp
+= delta
;
2812 case RINGBUF_TYPE_TIME_STAMP
:
2813 /* FIXME: not implemented */
2816 case RINGBUF_TYPE_DATA
:
2817 iter
->read_stamp
+= event
->time_delta
;
2826 static struct buffer_page
*
2827 rb_get_reader_page(struct ring_buffer_per_cpu
*cpu_buffer
)
2829 struct buffer_page
*reader
= NULL
;
2830 unsigned long flags
;
2834 local_irq_save(flags
);
2835 arch_spin_lock(&cpu_buffer
->lock
);
2839 * This should normally only loop twice. But because the
2840 * start of the reader inserts an empty page, it causes
2841 * a case where we will loop three times. There should be no
2842 * reason to loop four times (that I know of).
2844 if (RB_WARN_ON(cpu_buffer
, ++nr_loops
> 3)) {
2849 reader
= cpu_buffer
->reader_page
;
2851 /* If there's more to read, return this page */
2852 if (cpu_buffer
->reader_page
->read
< rb_page_size(reader
))
2855 /* Never should we have an index greater than the size */
2856 if (RB_WARN_ON(cpu_buffer
,
2857 cpu_buffer
->reader_page
->read
> rb_page_size(reader
)))
2860 /* check if we caught up to the tail */
2862 if (cpu_buffer
->commit_page
== cpu_buffer
->reader_page
)
2866 * Reset the reader page to size zero.
2868 local_set(&cpu_buffer
->reader_page
->write
, 0);
2869 local_set(&cpu_buffer
->reader_page
->entries
, 0);
2870 local_set(&cpu_buffer
->reader_page
->page
->commit
, 0);
2874 * Splice the empty reader page into the list around the head.
2876 reader
= rb_set_head_page(cpu_buffer
);
2877 cpu_buffer
->reader_page
->list
.next
= rb_list_head(reader
->list
.next
);
2878 cpu_buffer
->reader_page
->list
.prev
= reader
->list
.prev
;
2881 * cpu_buffer->pages just needs to point to the buffer, it
2882 * has no specific buffer page to point to. Lets move it out
2883 * of our way so we don't accidently swap it.
2885 cpu_buffer
->pages
= reader
->list
.prev
;
2887 /* The reader page will be pointing to the new head */
2888 rb_set_list_to_head(cpu_buffer
, &cpu_buffer
->reader_page
->list
);
2891 * Here's the tricky part.
2893 * We need to move the pointer past the header page.
2894 * But we can only do that if a writer is not currently
2895 * moving it. The page before the header page has the
2896 * flag bit '1' set if it is pointing to the page we want.
2897 * but if the writer is in the process of moving it
2898 * than it will be '2' or already moved '0'.
2901 ret
= rb_head_page_replace(reader
, cpu_buffer
->reader_page
);
2904 * If we did not convert it, then we must try again.
2910 * Yeah! We succeeded in replacing the page.
2912 * Now make the new head point back to the reader page.
2914 rb_list_head(reader
->list
.next
)->prev
= &cpu_buffer
->reader_page
->list
;
2915 rb_inc_page(cpu_buffer
, &cpu_buffer
->head_page
);
2917 /* Finally update the reader page to the new head */
2918 cpu_buffer
->reader_page
= reader
;
2919 rb_reset_reader_page(cpu_buffer
);
2924 arch_spin_unlock(&cpu_buffer
->lock
);
2925 local_irq_restore(flags
);
2930 static void rb_advance_reader(struct ring_buffer_per_cpu
*cpu_buffer
)
2932 struct ring_buffer_event
*event
;
2933 struct buffer_page
*reader
;
2936 reader
= rb_get_reader_page(cpu_buffer
);
2938 /* This function should not be called when buffer is empty */
2939 if (RB_WARN_ON(cpu_buffer
, !reader
))
2942 event
= rb_reader_event(cpu_buffer
);
2944 if (event
->type_len
<= RINGBUF_TYPE_DATA_TYPE_LEN_MAX
)
2947 rb_update_read_stamp(cpu_buffer
, event
);
2949 length
= rb_event_length(event
);
2950 cpu_buffer
->reader_page
->read
+= length
;
2953 static void rb_advance_iter(struct ring_buffer_iter
*iter
)
2955 struct ring_buffer
*buffer
;
2956 struct ring_buffer_per_cpu
*cpu_buffer
;
2957 struct ring_buffer_event
*event
;
2960 cpu_buffer
= iter
->cpu_buffer
;
2961 buffer
= cpu_buffer
->buffer
;
2964 * Check if we are at the end of the buffer.
2966 if (iter
->head
>= rb_page_size(iter
->head_page
)) {
2967 /* discarded commits can make the page empty */
2968 if (iter
->head_page
== cpu_buffer
->commit_page
)
2974 event
= rb_iter_head_event(iter
);
2976 length
= rb_event_length(event
);
2979 * This should not be called to advance the header if we are
2980 * at the tail of the buffer.
2982 if (RB_WARN_ON(cpu_buffer
,
2983 (iter
->head_page
== cpu_buffer
->commit_page
) &&
2984 (iter
->head
+ length
> rb_commit_index(cpu_buffer
))))
2987 rb_update_iter_read_stamp(iter
, event
);
2989 iter
->head
+= length
;
2991 /* check for end of page padding */
2992 if ((iter
->head
>= rb_page_size(iter
->head_page
)) &&
2993 (iter
->head_page
!= cpu_buffer
->commit_page
))
2994 rb_advance_iter(iter
);
2997 static struct ring_buffer_event
*
2998 rb_buffer_peek(struct ring_buffer_per_cpu
*cpu_buffer
, u64
*ts
)
3000 struct ring_buffer_event
*event
;
3001 struct buffer_page
*reader
;
3006 * We repeat when a timestamp is encountered. It is possible
3007 * to get multiple timestamps from an interrupt entering just
3008 * as one timestamp is about to be written, or from discarded
3009 * commits. The most that we can have is the number on a single page.
3011 if (RB_WARN_ON(cpu_buffer
, ++nr_loops
> RB_TIMESTAMPS_PER_PAGE
))
3014 reader
= rb_get_reader_page(cpu_buffer
);
3018 event
= rb_reader_event(cpu_buffer
);
3020 switch (event
->type_len
) {
3021 case RINGBUF_TYPE_PADDING
:
3022 if (rb_null_event(event
))
3023 RB_WARN_ON(cpu_buffer
, 1);
3025 * Because the writer could be discarding every
3026 * event it creates (which would probably be bad)
3027 * if we were to go back to "again" then we may never
3028 * catch up, and will trigger the warn on, or lock
3029 * the box. Return the padding, and we will release
3030 * the current locks, and try again.
3034 case RINGBUF_TYPE_TIME_EXTEND
:
3035 /* Internal data, OK to advance */
3036 rb_advance_reader(cpu_buffer
);
3039 case RINGBUF_TYPE_TIME_STAMP
:
3040 /* FIXME: not implemented */
3041 rb_advance_reader(cpu_buffer
);
3044 case RINGBUF_TYPE_DATA
:
3046 *ts
= cpu_buffer
->read_stamp
+ event
->time_delta
;
3047 ring_buffer_normalize_time_stamp(cpu_buffer
->buffer
,
3048 cpu_buffer
->cpu
, ts
);
3058 EXPORT_SYMBOL_GPL(ring_buffer_peek
);
3060 static struct ring_buffer_event
*
3061 rb_iter_peek(struct ring_buffer_iter
*iter
, u64
*ts
)
3063 struct ring_buffer
*buffer
;
3064 struct ring_buffer_per_cpu
*cpu_buffer
;
3065 struct ring_buffer_event
*event
;
3068 cpu_buffer
= iter
->cpu_buffer
;
3069 buffer
= cpu_buffer
->buffer
;
3072 * Check if someone performed a consuming read to
3073 * the buffer. A consuming read invalidates the iterator
3074 * and we need to reset the iterator in this case.
3076 if (unlikely(iter
->cache_read
!= cpu_buffer
->read
||
3077 iter
->cache_reader_page
!= cpu_buffer
->reader_page
))
3078 rb_iter_reset(iter
);
3081 if (ring_buffer_iter_empty(iter
))
3085 * We repeat when a timestamp is encountered.
3086 * We can get multiple timestamps by nested interrupts or also
3087 * if filtering is on (discarding commits). Since discarding
3088 * commits can be frequent we can get a lot of timestamps.
3089 * But we limit them by not adding timestamps if they begin
3090 * at the start of a page.
3092 if (RB_WARN_ON(cpu_buffer
, ++nr_loops
> RB_TIMESTAMPS_PER_PAGE
))
3095 if (rb_per_cpu_empty(cpu_buffer
))
3098 if (iter
->head
>= local_read(&iter
->head_page
->page
->commit
)) {
3103 event
= rb_iter_head_event(iter
);
3105 switch (event
->type_len
) {
3106 case RINGBUF_TYPE_PADDING
:
3107 if (rb_null_event(event
)) {
3111 rb_advance_iter(iter
);
3114 case RINGBUF_TYPE_TIME_EXTEND
:
3115 /* Internal data, OK to advance */
3116 rb_advance_iter(iter
);
3119 case RINGBUF_TYPE_TIME_STAMP
:
3120 /* FIXME: not implemented */
3121 rb_advance_iter(iter
);
3124 case RINGBUF_TYPE_DATA
:
3126 *ts
= iter
->read_stamp
+ event
->time_delta
;
3127 ring_buffer_normalize_time_stamp(buffer
,
3128 cpu_buffer
->cpu
, ts
);
3138 EXPORT_SYMBOL_GPL(ring_buffer_iter_peek
);
3140 static inline int rb_ok_to_lock(void)
3143 * If an NMI die dumps out the content of the ring buffer
3144 * do not grab locks. We also permanently disable the ring
3145 * buffer too. A one time deal is all you get from reading
3146 * the ring buffer from an NMI.
3148 if (likely(!in_nmi()))
3151 tracing_off_permanent();
3156 * ring_buffer_peek - peek at the next event to be read
3157 * @buffer: The ring buffer to read
3158 * @cpu: The cpu to peak at
3159 * @ts: The timestamp counter of this event.
3161 * This will return the event that will be read next, but does
3162 * not consume the data.
3164 struct ring_buffer_event
*
3165 ring_buffer_peek(struct ring_buffer
*buffer
, int cpu
, u64
*ts
)
3167 struct ring_buffer_per_cpu
*cpu_buffer
= buffer
->buffers
[cpu
];
3168 struct ring_buffer_event
*event
;
3169 unsigned long flags
;
3172 if (!cpumask_test_cpu(cpu
, buffer
->cpumask
))
3175 dolock
= rb_ok_to_lock();
3177 local_irq_save(flags
);
3179 spin_lock(&cpu_buffer
->reader_lock
);
3180 event
= rb_buffer_peek(cpu_buffer
, ts
);
3181 if (event
&& event
->type_len
== RINGBUF_TYPE_PADDING
)
3182 rb_advance_reader(cpu_buffer
);
3184 spin_unlock(&cpu_buffer
->reader_lock
);
3185 local_irq_restore(flags
);
3187 if (event
&& event
->type_len
== RINGBUF_TYPE_PADDING
)
3194 * ring_buffer_iter_peek - peek at the next event to be read
3195 * @iter: The ring buffer iterator
3196 * @ts: The timestamp counter of this event.
3198 * This will return the event that will be read next, but does
3199 * not increment the iterator.
3201 struct ring_buffer_event
*
3202 ring_buffer_iter_peek(struct ring_buffer_iter
*iter
, u64
*ts
)
3204 struct ring_buffer_per_cpu
*cpu_buffer
= iter
->cpu_buffer
;
3205 struct ring_buffer_event
*event
;
3206 unsigned long flags
;
3209 spin_lock_irqsave(&cpu_buffer
->reader_lock
, flags
);
3210 event
= rb_iter_peek(iter
, ts
);
3211 spin_unlock_irqrestore(&cpu_buffer
->reader_lock
, flags
);
3213 if (event
&& event
->type_len
== RINGBUF_TYPE_PADDING
)
3220 * ring_buffer_consume - return an event and consume it
3221 * @buffer: The ring buffer to get the next event from
3223 * Returns the next event in the ring buffer, and that event is consumed.
3224 * Meaning, that sequential reads will keep returning a different event,
3225 * and eventually empty the ring buffer if the producer is slower.
3227 struct ring_buffer_event
*
3228 ring_buffer_consume(struct ring_buffer
*buffer
, int cpu
, u64
*ts
)
3230 struct ring_buffer_per_cpu
*cpu_buffer
;
3231 struct ring_buffer_event
*event
= NULL
;
3232 unsigned long flags
;
3235 dolock
= rb_ok_to_lock();
3238 /* might be called in atomic */
3241 if (!cpumask_test_cpu(cpu
, buffer
->cpumask
))
3244 cpu_buffer
= buffer
->buffers
[cpu
];
3245 local_irq_save(flags
);
3247 spin_lock(&cpu_buffer
->reader_lock
);
3249 event
= rb_buffer_peek(cpu_buffer
, ts
);
3251 rb_advance_reader(cpu_buffer
);
3254 spin_unlock(&cpu_buffer
->reader_lock
);
3255 local_irq_restore(flags
);
3260 if (event
&& event
->type_len
== RINGBUF_TYPE_PADDING
)
3265 EXPORT_SYMBOL_GPL(ring_buffer_consume
);
3268 * ring_buffer_read_start - start a non consuming read of the buffer
3269 * @buffer: The ring buffer to read from
3270 * @cpu: The cpu buffer to iterate over
3272 * This starts up an iteration through the buffer. It also disables
3273 * the recording to the buffer until the reading is finished.
3274 * This prevents the reading from being corrupted. This is not
3275 * a consuming read, so a producer is not expected.
3277 * Must be paired with ring_buffer_finish.
3279 struct ring_buffer_iter
*
3280 ring_buffer_read_start(struct ring_buffer
*buffer
, int cpu
)
3282 struct ring_buffer_per_cpu
*cpu_buffer
;
3283 struct ring_buffer_iter
*iter
;
3284 unsigned long flags
;
3286 if (!cpumask_test_cpu(cpu
, buffer
->cpumask
))
3289 iter
= kmalloc(sizeof(*iter
), GFP_KERNEL
);
3293 cpu_buffer
= buffer
->buffers
[cpu
];
3295 iter
->cpu_buffer
= cpu_buffer
;
3297 atomic_inc(&cpu_buffer
->record_disabled
);
3298 synchronize_sched();
3300 spin_lock_irqsave(&cpu_buffer
->reader_lock
, flags
);
3301 arch_spin_lock(&cpu_buffer
->lock
);
3302 rb_iter_reset(iter
);
3303 arch_spin_unlock(&cpu_buffer
->lock
);
3304 spin_unlock_irqrestore(&cpu_buffer
->reader_lock
, flags
);
3308 EXPORT_SYMBOL_GPL(ring_buffer_read_start
);
3311 * ring_buffer_finish - finish reading the iterator of the buffer
3312 * @iter: The iterator retrieved by ring_buffer_start
3314 * This re-enables the recording to the buffer, and frees the
3318 ring_buffer_read_finish(struct ring_buffer_iter
*iter
)
3320 struct ring_buffer_per_cpu
*cpu_buffer
= iter
->cpu_buffer
;
3322 atomic_dec(&cpu_buffer
->record_disabled
);
3325 EXPORT_SYMBOL_GPL(ring_buffer_read_finish
);
3328 * ring_buffer_read - read the next item in the ring buffer by the iterator
3329 * @iter: The ring buffer iterator
3330 * @ts: The time stamp of the event read.
3332 * This reads the next event in the ring buffer and increments the iterator.
3334 struct ring_buffer_event
*
3335 ring_buffer_read(struct ring_buffer_iter
*iter
, u64
*ts
)
3337 struct ring_buffer_event
*event
;
3338 struct ring_buffer_per_cpu
*cpu_buffer
= iter
->cpu_buffer
;
3339 unsigned long flags
;
3341 spin_lock_irqsave(&cpu_buffer
->reader_lock
, flags
);
3343 event
= rb_iter_peek(iter
, ts
);
3347 if (event
->type_len
== RINGBUF_TYPE_PADDING
)
3350 rb_advance_iter(iter
);
3352 spin_unlock_irqrestore(&cpu_buffer
->reader_lock
, flags
);
3356 EXPORT_SYMBOL_GPL(ring_buffer_read
);
3359 * ring_buffer_size - return the size of the ring buffer (in bytes)
3360 * @buffer: The ring buffer.
3362 unsigned long ring_buffer_size(struct ring_buffer
*buffer
)
3364 return BUF_PAGE_SIZE
* buffer
->pages
;
3366 EXPORT_SYMBOL_GPL(ring_buffer_size
);
3369 rb_reset_cpu(struct ring_buffer_per_cpu
*cpu_buffer
)
3371 rb_head_page_deactivate(cpu_buffer
);
3373 cpu_buffer
->head_page
3374 = list_entry(cpu_buffer
->pages
, struct buffer_page
, list
);
3375 local_set(&cpu_buffer
->head_page
->write
, 0);
3376 local_set(&cpu_buffer
->head_page
->entries
, 0);
3377 local_set(&cpu_buffer
->head_page
->page
->commit
, 0);
3379 cpu_buffer
->head_page
->read
= 0;
3381 cpu_buffer
->tail_page
= cpu_buffer
->head_page
;
3382 cpu_buffer
->commit_page
= cpu_buffer
->head_page
;
3384 INIT_LIST_HEAD(&cpu_buffer
->reader_page
->list
);
3385 local_set(&cpu_buffer
->reader_page
->write
, 0);
3386 local_set(&cpu_buffer
->reader_page
->entries
, 0);
3387 local_set(&cpu_buffer
->reader_page
->page
->commit
, 0);
3388 cpu_buffer
->reader_page
->read
= 0;
3390 local_set(&cpu_buffer
->commit_overrun
, 0);
3391 local_set(&cpu_buffer
->overrun
, 0);
3392 local_set(&cpu_buffer
->entries
, 0);
3393 local_set(&cpu_buffer
->committing
, 0);
3394 local_set(&cpu_buffer
->commits
, 0);
3395 cpu_buffer
->read
= 0;
3397 cpu_buffer
->write_stamp
= 0;
3398 cpu_buffer
->read_stamp
= 0;
3400 rb_head_page_activate(cpu_buffer
);
3404 * ring_buffer_reset_cpu - reset a ring buffer per CPU buffer
3405 * @buffer: The ring buffer to reset a per cpu buffer of
3406 * @cpu: The CPU buffer to be reset
3408 void ring_buffer_reset_cpu(struct ring_buffer
*buffer
, int cpu
)
3410 struct ring_buffer_per_cpu
*cpu_buffer
= buffer
->buffers
[cpu
];
3411 unsigned long flags
;
3413 if (!cpumask_test_cpu(cpu
, buffer
->cpumask
))
3416 atomic_inc(&cpu_buffer
->record_disabled
);
3418 spin_lock_irqsave(&cpu_buffer
->reader_lock
, flags
);
3420 if (RB_WARN_ON(cpu_buffer
, local_read(&cpu_buffer
->committing
)))
3423 arch_spin_lock(&cpu_buffer
->lock
);
3425 rb_reset_cpu(cpu_buffer
);
3427 arch_spin_unlock(&cpu_buffer
->lock
);
3430 spin_unlock_irqrestore(&cpu_buffer
->reader_lock
, flags
);
3432 atomic_dec(&cpu_buffer
->record_disabled
);
3434 EXPORT_SYMBOL_GPL(ring_buffer_reset_cpu
);
3437 * ring_buffer_reset - reset a ring buffer
3438 * @buffer: The ring buffer to reset all cpu buffers
3440 void ring_buffer_reset(struct ring_buffer
*buffer
)
3444 for_each_buffer_cpu(buffer
, cpu
)
3445 ring_buffer_reset_cpu(buffer
, cpu
);
3447 EXPORT_SYMBOL_GPL(ring_buffer_reset
);
3450 * rind_buffer_empty - is the ring buffer empty?
3451 * @buffer: The ring buffer to test
3453 int ring_buffer_empty(struct ring_buffer
*buffer
)
3455 struct ring_buffer_per_cpu
*cpu_buffer
;
3456 unsigned long flags
;
3461 dolock
= rb_ok_to_lock();
3463 /* yes this is racy, but if you don't like the race, lock the buffer */
3464 for_each_buffer_cpu(buffer
, cpu
) {
3465 cpu_buffer
= buffer
->buffers
[cpu
];
3466 local_irq_save(flags
);
3468 spin_lock(&cpu_buffer
->reader_lock
);
3469 ret
= rb_per_cpu_empty(cpu_buffer
);
3471 spin_unlock(&cpu_buffer
->reader_lock
);
3472 local_irq_restore(flags
);
3480 EXPORT_SYMBOL_GPL(ring_buffer_empty
);
3483 * ring_buffer_empty_cpu - is a cpu buffer of a ring buffer empty?
3484 * @buffer: The ring buffer
3485 * @cpu: The CPU buffer to test
3487 int ring_buffer_empty_cpu(struct ring_buffer
*buffer
, int cpu
)
3489 struct ring_buffer_per_cpu
*cpu_buffer
;
3490 unsigned long flags
;
3494 if (!cpumask_test_cpu(cpu
, buffer
->cpumask
))
3497 dolock
= rb_ok_to_lock();
3499 cpu_buffer
= buffer
->buffers
[cpu
];
3500 local_irq_save(flags
);
3502 spin_lock(&cpu_buffer
->reader_lock
);
3503 ret
= rb_per_cpu_empty(cpu_buffer
);
3505 spin_unlock(&cpu_buffer
->reader_lock
);
3506 local_irq_restore(flags
);
3510 EXPORT_SYMBOL_GPL(ring_buffer_empty_cpu
);
3512 #ifdef CONFIG_RING_BUFFER_ALLOW_SWAP
3514 * ring_buffer_swap_cpu - swap a CPU buffer between two ring buffers
3515 * @buffer_a: One buffer to swap with
3516 * @buffer_b: The other buffer to swap with
3518 * This function is useful for tracers that want to take a "snapshot"
3519 * of a CPU buffer and has another back up buffer lying around.
3520 * it is expected that the tracer handles the cpu buffer not being
3521 * used at the moment.
3523 int ring_buffer_swap_cpu(struct ring_buffer
*buffer_a
,
3524 struct ring_buffer
*buffer_b
, int cpu
)
3526 struct ring_buffer_per_cpu
*cpu_buffer_a
;
3527 struct ring_buffer_per_cpu
*cpu_buffer_b
;
3530 if (!cpumask_test_cpu(cpu
, buffer_a
->cpumask
) ||
3531 !cpumask_test_cpu(cpu
, buffer_b
->cpumask
))
3534 /* At least make sure the two buffers are somewhat the same */
3535 if (buffer_a
->pages
!= buffer_b
->pages
)
3540 if (ring_buffer_flags
!= RB_BUFFERS_ON
)
3543 if (atomic_read(&buffer_a
->record_disabled
))
3546 if (atomic_read(&buffer_b
->record_disabled
))
3549 cpu_buffer_a
= buffer_a
->buffers
[cpu
];
3550 cpu_buffer_b
= buffer_b
->buffers
[cpu
];
3552 if (atomic_read(&cpu_buffer_a
->record_disabled
))
3555 if (atomic_read(&cpu_buffer_b
->record_disabled
))
3559 * We can't do a synchronize_sched here because this
3560 * function can be called in atomic context.
3561 * Normally this will be called from the same CPU as cpu.
3562 * If not it's up to the caller to protect this.
3564 atomic_inc(&cpu_buffer_a
->record_disabled
);
3565 atomic_inc(&cpu_buffer_b
->record_disabled
);
3568 if (local_read(&cpu_buffer_a
->committing
))
3570 if (local_read(&cpu_buffer_b
->committing
))
3573 buffer_a
->buffers
[cpu
] = cpu_buffer_b
;
3574 buffer_b
->buffers
[cpu
] = cpu_buffer_a
;
3576 cpu_buffer_b
->buffer
= buffer_a
;
3577 cpu_buffer_a
->buffer
= buffer_b
;
3582 atomic_dec(&cpu_buffer_a
->record_disabled
);
3583 atomic_dec(&cpu_buffer_b
->record_disabled
);
3587 EXPORT_SYMBOL_GPL(ring_buffer_swap_cpu
);
3588 #endif /* CONFIG_RING_BUFFER_ALLOW_SWAP */
3591 * ring_buffer_alloc_read_page - allocate a page to read from buffer
3592 * @buffer: the buffer to allocate for.
3594 * This function is used in conjunction with ring_buffer_read_page.
3595 * When reading a full page from the ring buffer, these functions
3596 * can be used to speed up the process. The calling function should
3597 * allocate a few pages first with this function. Then when it
3598 * needs to get pages from the ring buffer, it passes the result
3599 * of this function into ring_buffer_read_page, which will swap
3600 * the page that was allocated, with the read page of the buffer.
3603 * The page allocated, or NULL on error.
3605 void *ring_buffer_alloc_read_page(struct ring_buffer
*buffer
)
3607 struct buffer_data_page
*bpage
;
3610 addr
= __get_free_page(GFP_KERNEL
);
3614 bpage
= (void *)addr
;
3616 rb_init_page(bpage
);
3620 EXPORT_SYMBOL_GPL(ring_buffer_alloc_read_page
);
3623 * ring_buffer_free_read_page - free an allocated read page
3624 * @buffer: the buffer the page was allocate for
3625 * @data: the page to free
3627 * Free a page allocated from ring_buffer_alloc_read_page.
3629 void ring_buffer_free_read_page(struct ring_buffer
*buffer
, void *data
)
3631 free_page((unsigned long)data
);
3633 EXPORT_SYMBOL_GPL(ring_buffer_free_read_page
);
3636 * ring_buffer_read_page - extract a page from the ring buffer
3637 * @buffer: buffer to extract from
3638 * @data_page: the page to use allocated from ring_buffer_alloc_read_page
3639 * @len: amount to extract
3640 * @cpu: the cpu of the buffer to extract
3641 * @full: should the extraction only happen when the page is full.
3643 * This function will pull out a page from the ring buffer and consume it.
3644 * @data_page must be the address of the variable that was returned
3645 * from ring_buffer_alloc_read_page. This is because the page might be used
3646 * to swap with a page in the ring buffer.
3649 * rpage = ring_buffer_alloc_read_page(buffer);
3652 * ret = ring_buffer_read_page(buffer, &rpage, len, cpu, 0);
3654 * process_page(rpage, ret);
3656 * When @full is set, the function will not return true unless
3657 * the writer is off the reader page.
3659 * Note: it is up to the calling functions to handle sleeps and wakeups.
3660 * The ring buffer can be used anywhere in the kernel and can not
3661 * blindly call wake_up. The layer that uses the ring buffer must be
3662 * responsible for that.
3665 * >=0 if data has been transferred, returns the offset of consumed data.
3666 * <0 if no data has been transferred.
3668 int ring_buffer_read_page(struct ring_buffer
*buffer
,
3669 void **data_page
, size_t len
, int cpu
, int full
)
3671 struct ring_buffer_per_cpu
*cpu_buffer
= buffer
->buffers
[cpu
];
3672 struct ring_buffer_event
*event
;
3673 struct buffer_data_page
*bpage
;
3674 struct buffer_page
*reader
;
3675 unsigned long flags
;
3676 unsigned int commit
;
3681 if (!cpumask_test_cpu(cpu
, buffer
->cpumask
))
3685 * If len is not big enough to hold the page header, then
3686 * we can not copy anything.
3688 if (len
<= BUF_PAGE_HDR_SIZE
)
3691 len
-= BUF_PAGE_HDR_SIZE
;
3700 spin_lock_irqsave(&cpu_buffer
->reader_lock
, flags
);
3702 reader
= rb_get_reader_page(cpu_buffer
);
3706 event
= rb_reader_event(cpu_buffer
);
3708 read
= reader
->read
;
3709 commit
= rb_page_commit(reader
);
3712 * If this page has been partially read or
3713 * if len is not big enough to read the rest of the page or
3714 * a writer is still on the page, then
3715 * we must copy the data from the page to the buffer.
3716 * Otherwise, we can simply swap the page with the one passed in.
3718 if (read
|| (len
< (commit
- read
)) ||
3719 cpu_buffer
->reader_page
== cpu_buffer
->commit_page
) {
3720 struct buffer_data_page
*rpage
= cpu_buffer
->reader_page
->page
;
3721 unsigned int rpos
= read
;
3722 unsigned int pos
= 0;
3728 if (len
> (commit
- read
))
3729 len
= (commit
- read
);
3731 size
= rb_event_length(event
);
3736 /* save the current timestamp, since the user will need it */
3737 save_timestamp
= cpu_buffer
->read_stamp
;
3739 /* Need to copy one event at a time */
3741 memcpy(bpage
->data
+ pos
, rpage
->data
+ rpos
, size
);
3745 rb_advance_reader(cpu_buffer
);
3746 rpos
= reader
->read
;
3749 event
= rb_reader_event(cpu_buffer
);
3750 size
= rb_event_length(event
);
3751 } while (len
> size
);
3754 local_set(&bpage
->commit
, pos
);
3755 bpage
->time_stamp
= save_timestamp
;
3757 /* we copied everything to the beginning */
3760 /* update the entry counter */
3761 cpu_buffer
->read
+= rb_page_entries(reader
);
3763 /* swap the pages */
3764 rb_init_page(bpage
);
3765 bpage
= reader
->page
;
3766 reader
->page
= *data_page
;
3767 local_set(&reader
->write
, 0);
3768 local_set(&reader
->entries
, 0);
3775 spin_unlock_irqrestore(&cpu_buffer
->reader_lock
, flags
);
3780 EXPORT_SYMBOL_GPL(ring_buffer_read_page
);
3782 #ifdef CONFIG_TRACING
3784 rb_simple_read(struct file
*filp
, char __user
*ubuf
,
3785 size_t cnt
, loff_t
*ppos
)
3787 unsigned long *p
= filp
->private_data
;
3791 if (test_bit(RB_BUFFERS_DISABLED_BIT
, p
))
3792 r
= sprintf(buf
, "permanently disabled\n");
3794 r
= sprintf(buf
, "%d\n", test_bit(RB_BUFFERS_ON_BIT
, p
));
3796 return simple_read_from_buffer(ubuf
, cnt
, ppos
, buf
, r
);
3800 rb_simple_write(struct file
*filp
, const char __user
*ubuf
,
3801 size_t cnt
, loff_t
*ppos
)
3803 unsigned long *p
= filp
->private_data
;
3808 if (cnt
>= sizeof(buf
))
3811 if (copy_from_user(&buf
, ubuf
, cnt
))
3816 ret
= strict_strtoul(buf
, 10, &val
);
3821 set_bit(RB_BUFFERS_ON_BIT
, p
);
3823 clear_bit(RB_BUFFERS_ON_BIT
, p
);
3830 static const struct file_operations rb_simple_fops
= {
3831 .open
= tracing_open_generic
,
3832 .read
= rb_simple_read
,
3833 .write
= rb_simple_write
,
3837 static __init
int rb_init_debugfs(void)
3839 struct dentry
*d_tracer
;
3841 d_tracer
= tracing_init_dentry();
3843 trace_create_file("tracing_on", 0644, d_tracer
,
3844 &ring_buffer_flags
, &rb_simple_fops
);
3849 fs_initcall(rb_init_debugfs
);
3852 #ifdef CONFIG_HOTPLUG_CPU
3853 static int rb_cpu_notify(struct notifier_block
*self
,
3854 unsigned long action
, void *hcpu
)
3856 struct ring_buffer
*buffer
=
3857 container_of(self
, struct ring_buffer
, cpu_notify
);
3858 long cpu
= (long)hcpu
;
3861 case CPU_UP_PREPARE
:
3862 case CPU_UP_PREPARE_FROZEN
:
3863 if (cpumask_test_cpu(cpu
, buffer
->cpumask
))
3866 buffer
->buffers
[cpu
] =
3867 rb_allocate_cpu_buffer(buffer
, cpu
);
3868 if (!buffer
->buffers
[cpu
]) {
3869 WARN(1, "failed to allocate ring buffer on CPU %ld\n",
3874 cpumask_set_cpu(cpu
, buffer
->cpumask
);
3876 case CPU_DOWN_PREPARE
:
3877 case CPU_DOWN_PREPARE_FROZEN
:
3880 * If we were to free the buffer, then the user would
3881 * lose any trace that was in the buffer.