1 // SPDX-License-Identifier: GPL-2.0
5 * Copyright (C) 2008 Steven Rostedt <srostedt@redhat.com>
7 #include <linux/trace_events.h>
8 #include <linux/ring_buffer.h>
9 #include <linux/trace_clock.h>
10 #include <linux/sched/clock.h>
11 #include <linux/trace_seq.h>
12 #include <linux/spinlock.h>
13 #include <linux/irq_work.h>
14 #include <linux/security.h>
15 #include <linux/uaccess.h>
16 #include <linux/hardirq.h>
17 #include <linux/kthread.h> /* for self test */
18 #include <linux/module.h>
19 #include <linux/percpu.h>
20 #include <linux/mutex.h>
21 #include <linux/delay.h>
22 #include <linux/slab.h>
23 #include <linux/init.h>
24 #include <linux/hash.h>
25 #include <linux/list.h>
26 #include <linux/cpu.h>
27 #include <linux/oom.h>
29 #include <asm/local.h>
31 static void update_pages_handler(struct work_struct *work);
34 * The ring buffer header is special. We must manually up keep it.
36 int ring_buffer_print_entry_header(struct trace_seq *s)
38 trace_seq_puts(s, "# compressed entry header\n");
39 trace_seq_puts(s, "\ttype_len : 5 bits\n");
40 trace_seq_puts(s, "\ttime_delta : 27 bits\n");
41 trace_seq_puts(s, "\tarray : 32 bits\n");
42 trace_seq_putc(s, '\n');
43 trace_seq_printf(s, "\tpadding : type == %d\n",
44 RINGBUF_TYPE_PADDING);
45 trace_seq_printf(s, "\ttime_extend : type == %d\n",
46 RINGBUF_TYPE_TIME_EXTEND);
47 trace_seq_printf(s, "\ttime_stamp : type == %d\n",
48 RINGBUF_TYPE_TIME_STAMP);
49 trace_seq_printf(s, "\tdata max type_len == %d\n",
50 RINGBUF_TYPE_DATA_TYPE_LEN_MAX);
52 return !trace_seq_has_overflowed(s);
56 * The ring buffer is made up of a list of pages. A separate list of pages is
57 * allocated for each CPU. A writer may only write to a buffer that is
58 * associated with the CPU it is currently executing on. A reader may read
59 * from any per cpu buffer.
61 * The reader is special. For each per cpu buffer, the reader has its own
62 * reader page. When a reader has read the entire reader page, this reader
63 * page is swapped with another page in the ring buffer.
65 * Now, as long as the writer is off the reader page, the reader can do what
66 * ever it wants with that page. The writer will never write to that page
67 * again (as long as it is out of the ring buffer).
69 * Here's some silly ASCII art.
72 * |reader| RING BUFFER
74 * +------+ +---+ +---+ +---+
83 * |reader| RING BUFFER
84 * |page |------------------v
85 * +------+ +---+ +---+ +---+
94 * |reader| RING BUFFER
95 * |page |------------------v
96 * +------+ +---+ +---+ +---+
101 * +------------------------------+
105 * |buffer| RING BUFFER
106 * |page |------------------v
107 * +------+ +---+ +---+ +---+
109 * | New +---+ +---+ +---+
112 * +------------------------------+
115 * After we make this swap, the reader can hand this page off to the splice
116 * code and be done with it. It can even allocate a new page if it needs to
117 * and swap that into the ring buffer.
119 * We will be using cmpxchg soon to make all this lockless.
123 /* Used for individual buffers (after the counter) */
124 #define RB_BUFFER_OFF (1 << 20)
126 #define BUF_PAGE_HDR_SIZE offsetof(struct buffer_data_page, data)
128 #define RB_EVNT_HDR_SIZE (offsetof(struct ring_buffer_event, array))
129 #define RB_ALIGNMENT 4U
130 #define RB_MAX_SMALL_DATA (RB_ALIGNMENT * RINGBUF_TYPE_DATA_TYPE_LEN_MAX)
131 #define RB_EVNT_MIN_SIZE 8U /* two 32bit words */
133 #ifndef CONFIG_HAVE_64BIT_ALIGNED_ACCESS
134 # define RB_FORCE_8BYTE_ALIGNMENT 0
135 # define RB_ARCH_ALIGNMENT RB_ALIGNMENT
137 # define RB_FORCE_8BYTE_ALIGNMENT 1
138 # define RB_ARCH_ALIGNMENT 8U
141 #define RB_ALIGN_DATA __aligned(RB_ARCH_ALIGNMENT)
143 /* define RINGBUF_TYPE_DATA for 'case RINGBUF_TYPE_DATA:' */
144 #define RINGBUF_TYPE_DATA 0 ... RINGBUF_TYPE_DATA_TYPE_LEN_MAX
147 RB_LEN_TIME_EXTEND = 8,
148 RB_LEN_TIME_STAMP = 8,
151 #define skip_time_extend(event) \
152 ((struct ring_buffer_event *)((char *)event + RB_LEN_TIME_EXTEND))
154 #define extended_time(event) \
155 (event->type_len >= RINGBUF_TYPE_TIME_EXTEND)
157 static inline int rb_null_event(struct ring_buffer_event *event)
159 return event->type_len == RINGBUF_TYPE_PADDING && !event->time_delta;
162 static void rb_event_set_padding(struct ring_buffer_event *event)
164 /* padding has a NULL time_delta */
165 event->type_len = RINGBUF_TYPE_PADDING;
166 event->time_delta = 0;
170 rb_event_data_length(struct ring_buffer_event *event)
175 length = event->type_len * RB_ALIGNMENT;
177 length = event->array[0];
178 return length + RB_EVNT_HDR_SIZE;
182 * Return the length of the given event. Will return
183 * the length of the time extend if the event is a
186 static inline unsigned
187 rb_event_length(struct ring_buffer_event *event)
189 switch (event->type_len) {
190 case RINGBUF_TYPE_PADDING:
191 if (rb_null_event(event))
194 return event->array[0] + RB_EVNT_HDR_SIZE;
196 case RINGBUF_TYPE_TIME_EXTEND:
197 return RB_LEN_TIME_EXTEND;
199 case RINGBUF_TYPE_TIME_STAMP:
200 return RB_LEN_TIME_STAMP;
202 case RINGBUF_TYPE_DATA:
203 return rb_event_data_length(event);
212 * Return total length of time extend and data,
213 * or just the event length for all other events.
215 static inline unsigned
216 rb_event_ts_length(struct ring_buffer_event *event)
220 if (extended_time(event)) {
221 /* time extends include the data event after it */
222 len = RB_LEN_TIME_EXTEND;
223 event = skip_time_extend(event);
225 return len + rb_event_length(event);
229 * ring_buffer_event_length - return the length of the event
230 * @event: the event to get the length of
232 * Returns the size of the data load of a data event.
233 * If the event is something other than a data event, it
234 * returns the size of the event itself. With the exception
235 * of a TIME EXTEND, where it still returns the size of the
236 * data load of the data event after it.
238 unsigned ring_buffer_event_length(struct ring_buffer_event *event)
242 if (extended_time(event))
243 event = skip_time_extend(event);
245 length = rb_event_length(event);
246 if (event->type_len > RINGBUF_TYPE_DATA_TYPE_LEN_MAX)
248 length -= RB_EVNT_HDR_SIZE;
249 if (length > RB_MAX_SMALL_DATA + sizeof(event->array[0]))
250 length -= sizeof(event->array[0]);
253 EXPORT_SYMBOL_GPL(ring_buffer_event_length);
255 /* inline for ring buffer fast paths */
256 static __always_inline void *
257 rb_event_data(struct ring_buffer_event *event)
259 if (extended_time(event))
260 event = skip_time_extend(event);
261 BUG_ON(event->type_len > RINGBUF_TYPE_DATA_TYPE_LEN_MAX);
262 /* If length is in len field, then array[0] has the data */
264 return (void *)&event->array[0];
265 /* Otherwise length is in array[0] and array[1] has the data */
266 return (void *)&event->array[1];
270 * ring_buffer_event_data - return the data of the event
271 * @event: the event to get the data from
273 void *ring_buffer_event_data(struct ring_buffer_event *event)
275 return rb_event_data(event);
277 EXPORT_SYMBOL_GPL(ring_buffer_event_data);
279 #define for_each_buffer_cpu(buffer, cpu) \
280 for_each_cpu(cpu, buffer->cpumask)
283 #define TS_MASK ((1ULL << TS_SHIFT) - 1)
284 #define TS_DELTA_TEST (~TS_MASK)
287 * ring_buffer_event_time_stamp - return the event's extended timestamp
288 * @event: the event to get the timestamp of
290 * Returns the extended timestamp associated with a data event.
291 * An extended time_stamp is a 64-bit timestamp represented
292 * internally in a special way that makes the best use of space
293 * contained within a ring buffer event. This function decodes
294 * it and maps it to a straight u64 value.
296 u64 ring_buffer_event_time_stamp(struct ring_buffer_event *event)
300 ts = event->array[0];
302 ts += event->time_delta;
307 /* Flag when events were overwritten */
308 #define RB_MISSED_EVENTS (1 << 31)
309 /* Missed count stored at end */
310 #define RB_MISSED_STORED (1 << 30)
312 #define RB_MISSED_FLAGS (RB_MISSED_EVENTS|RB_MISSED_STORED)
314 struct buffer_data_page {
315 u64 time_stamp; /* page time stamp */
316 local_t commit; /* write committed index */
317 unsigned char data[] RB_ALIGN_DATA; /* data of buffer page */
321 * Note, the buffer_page list must be first. The buffer pages
322 * are allocated in cache lines, which means that each buffer
323 * page will be at the beginning of a cache line, and thus
324 * the least significant bits will be zero. We use this to
325 * add flags in the list struct pointers, to make the ring buffer
329 struct list_head list; /* list of buffer pages */
330 local_t write; /* index for next write */
331 unsigned read; /* index for next read */
332 local_t entries; /* entries on this page */
333 unsigned long real_end; /* real end of data */
334 struct buffer_data_page *page; /* Actual data page */
338 * The buffer page counters, write and entries, must be reset
339 * atomically when crossing page boundaries. To synchronize this
340 * update, two counters are inserted into the number. One is
341 * the actual counter for the write position or count on the page.
343 * The other is a counter of updaters. Before an update happens
344 * the update partition of the counter is incremented. This will
345 * allow the updater to update the counter atomically.
347 * The counter is 20 bits, and the state data is 12.
349 #define RB_WRITE_MASK 0xfffff
350 #define RB_WRITE_INTCNT (1 << 20)
352 static void rb_init_page(struct buffer_data_page *bpage)
354 local_set(&bpage->commit, 0);
358 * Also stolen from mm/slob.c. Thanks to Mathieu Desnoyers for pointing
361 static void free_buffer_page(struct buffer_page *bpage)
363 free_page((unsigned long)bpage->page);
368 * We need to fit the time_stamp delta into 27 bits.
370 static inline int test_time_stamp(u64 delta)
372 if (delta & TS_DELTA_TEST)
377 #define BUF_PAGE_SIZE (PAGE_SIZE - BUF_PAGE_HDR_SIZE)
379 /* Max payload is BUF_PAGE_SIZE - header (8bytes) */
380 #define BUF_MAX_DATA_SIZE (BUF_PAGE_SIZE - (sizeof(u32) * 2))
382 int ring_buffer_print_page_header(struct trace_seq *s)
384 struct buffer_data_page field;
386 trace_seq_printf(s, "\tfield: u64 timestamp;\t"
387 "offset:0;\tsize:%u;\tsigned:%u;\n",
388 (unsigned int)sizeof(field.time_stamp),
389 (unsigned int)is_signed_type(u64));
391 trace_seq_printf(s, "\tfield: local_t commit;\t"
392 "offset:%u;\tsize:%u;\tsigned:%u;\n",
393 (unsigned int)offsetof(typeof(field), commit),
394 (unsigned int)sizeof(field.commit),
395 (unsigned int)is_signed_type(long));
397 trace_seq_printf(s, "\tfield: int overwrite;\t"
398 "offset:%u;\tsize:%u;\tsigned:%u;\n",
399 (unsigned int)offsetof(typeof(field), commit),
401 (unsigned int)is_signed_type(long));
403 trace_seq_printf(s, "\tfield: char data;\t"
404 "offset:%u;\tsize:%u;\tsigned:%u;\n",
405 (unsigned int)offsetof(typeof(field), data),
406 (unsigned int)BUF_PAGE_SIZE,
407 (unsigned int)is_signed_type(char));
409 return !trace_seq_has_overflowed(s);
413 struct irq_work work;
414 wait_queue_head_t waiters;
415 wait_queue_head_t full_waiters;
416 bool waiters_pending;
417 bool full_waiters_pending;
422 * Structure to hold event state and handle nested events.
424 struct rb_event_info {
427 unsigned long length;
428 struct buffer_page *tail_page;
433 * Used for which event context the event is in.
440 * See trace_recursive_lock() comment below for more details.
452 * head_page == tail_page && head == tail then buffer is empty.
454 struct ring_buffer_per_cpu {
456 atomic_t record_disabled;
457 struct ring_buffer *buffer;
458 raw_spinlock_t reader_lock; /* serialize readers */
459 arch_spinlock_t lock;
460 struct lock_class_key lock_key;
461 struct buffer_data_page *free_page;
462 unsigned long nr_pages;
463 unsigned int current_context;
464 struct list_head *pages;
465 struct buffer_page *head_page; /* read from head */
466 struct buffer_page *tail_page; /* write to tail */
467 struct buffer_page *commit_page; /* committed pages */
468 struct buffer_page *reader_page;
469 unsigned long lost_events;
470 unsigned long last_overrun;
472 local_t entries_bytes;
475 local_t commit_overrun;
476 local_t dropped_events;
479 local_t pages_touched;
482 long last_pages_touch;
483 size_t shortest_full;
485 unsigned long read_bytes;
488 /* pages removed since last reset */
489 unsigned long pages_removed;
490 /* ring buffer pages to update, > 0 to add, < 0 to remove */
491 long nr_pages_to_update;
492 struct list_head new_pages; /* new pages to add */
493 struct work_struct update_pages_work;
494 struct completion update_done;
496 struct rb_irq_work irq_work;
502 atomic_t record_disabled;
503 atomic_t resize_disabled;
504 cpumask_var_t cpumask;
506 struct lock_class_key *reader_lock_key;
510 struct ring_buffer_per_cpu **buffers;
512 struct hlist_node node;
515 struct rb_irq_work irq_work;
519 struct ring_buffer_iter {
520 struct ring_buffer_per_cpu *cpu_buffer;
522 struct buffer_page *head_page;
523 struct buffer_page *cache_reader_page;
524 unsigned long cache_read;
525 unsigned long cache_pages_removed;
530 * ring_buffer_nr_pages - get the number of buffer pages in the ring buffer
531 * @buffer: The ring_buffer to get the number of pages from
532 * @cpu: The cpu of the ring_buffer to get the number of pages from
534 * Returns the number of pages used by a per_cpu buffer of the ring buffer.
536 size_t ring_buffer_nr_pages(struct ring_buffer *buffer, int cpu)
538 return buffer->buffers[cpu]->nr_pages;
542 * ring_buffer_nr_pages_dirty - get the number of used pages in the ring buffer
543 * @buffer: The ring_buffer to get the number of pages from
544 * @cpu: The cpu of the ring_buffer to get the number of pages from
546 * Returns the number of pages that have content in the ring buffer.
548 size_t ring_buffer_nr_dirty_pages(struct ring_buffer *buffer, int cpu)
554 read = local_read(&buffer->buffers[cpu]->pages_read);
555 lost = local_read(&buffer->buffers[cpu]->pages_lost);
556 cnt = local_read(&buffer->buffers[cpu]->pages_touched);
558 if (WARN_ON_ONCE(cnt < lost))
563 /* The reader can read an empty page, but not more than that */
565 WARN_ON_ONCE(read > cnt + 1);
572 static __always_inline bool full_hit(struct ring_buffer *buffer, int cpu, int full)
574 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu];
578 nr_pages = cpu_buffer->nr_pages;
579 if (!nr_pages || !full)
582 dirty = ring_buffer_nr_dirty_pages(buffer, cpu);
584 return (dirty * 100) > (full * nr_pages);
588 * rb_wake_up_waiters - wake up tasks waiting for ring buffer input
590 * Schedules a delayed work to wake up any task that is blocked on the
591 * ring buffer waiters queue.
593 static void rb_wake_up_waiters(struct irq_work *work)
595 struct rb_irq_work *rbwork = container_of(work, struct rb_irq_work, work);
597 wake_up_all(&rbwork->waiters);
598 if (rbwork->full_waiters_pending || rbwork->wakeup_full) {
599 rbwork->wakeup_full = false;
600 rbwork->full_waiters_pending = false;
601 wake_up_all(&rbwork->full_waiters);
606 * ring_buffer_wait - wait for input to the ring buffer
607 * @buffer: buffer to wait on
608 * @cpu: the cpu buffer to wait on
609 * @full: wait until a full page is available, if @cpu != RING_BUFFER_ALL_CPUS
611 * If @cpu == RING_BUFFER_ALL_CPUS then the task will wake up as soon
612 * as data is added to any of the @buffer's cpu buffers. Otherwise
613 * it will wait for data to be added to a specific cpu buffer.
615 int ring_buffer_wait(struct ring_buffer *buffer, int cpu, int full)
617 struct ring_buffer_per_cpu *cpu_buffer;
619 struct rb_irq_work *work;
623 * Depending on what the caller is waiting for, either any
624 * data in any cpu buffer, or a specific buffer, put the
625 * caller on the appropriate wait queue.
627 if (cpu == RING_BUFFER_ALL_CPUS) {
628 work = &buffer->irq_work;
629 /* Full only makes sense on per cpu reads */
632 if (!cpumask_test_cpu(cpu, buffer->cpumask))
634 cpu_buffer = buffer->buffers[cpu];
635 work = &cpu_buffer->irq_work;
641 prepare_to_wait(&work->full_waiters, &wait, TASK_INTERRUPTIBLE);
643 prepare_to_wait(&work->waiters, &wait, TASK_INTERRUPTIBLE);
646 * The events can happen in critical sections where
647 * checking a work queue can cause deadlocks.
648 * After adding a task to the queue, this flag is set
649 * only to notify events to try to wake up the queue
652 * We don't clear it even if the buffer is no longer
653 * empty. The flag only causes the next event to run
654 * irq_work to do the work queue wake up. The worse
655 * that can happen if we race with !trace_empty() is that
656 * an event will cause an irq_work to try to wake up
659 * There's no reason to protect this flag either, as
660 * the work queue and irq_work logic will do the necessary
661 * synchronization for the wake ups. The only thing
662 * that is necessary is that the wake up happens after
663 * a task has been queued. It's OK for spurious wake ups.
666 work->full_waiters_pending = true;
668 work->waiters_pending = true;
670 if (signal_pending(current)) {
675 if (cpu == RING_BUFFER_ALL_CPUS && !ring_buffer_empty(buffer))
678 if (cpu != RING_BUFFER_ALL_CPUS &&
679 !ring_buffer_empty_cpu(buffer, cpu)) {
687 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
688 pagebusy = cpu_buffer->reader_page == cpu_buffer->commit_page;
689 done = !pagebusy && full_hit(buffer, cpu, full);
691 if (!cpu_buffer->shortest_full ||
692 cpu_buffer->shortest_full > full)
693 cpu_buffer->shortest_full = full;
694 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
703 finish_wait(&work->full_waiters, &wait);
705 finish_wait(&work->waiters, &wait);
711 * ring_buffer_poll_wait - poll on buffer input
712 * @buffer: buffer to wait on
713 * @cpu: the cpu buffer to wait on
714 * @filp: the file descriptor
715 * @poll_table: The poll descriptor
716 * @full: wait until the percentage of pages are available, if @cpu != RING_BUFFER_ALL_CPUS
718 * If @cpu == RING_BUFFER_ALL_CPUS then the task will wake up as soon
719 * as data is added to any of the @buffer's cpu buffers. Otherwise
720 * it will wait for data to be added to a specific cpu buffer.
722 * Returns EPOLLIN | EPOLLRDNORM if data exists in the buffers,
725 __poll_t ring_buffer_poll_wait(struct ring_buffer *buffer, int cpu,
726 struct file *filp, poll_table *poll_table, int full)
728 struct ring_buffer_per_cpu *cpu_buffer;
729 struct rb_irq_work *work;
731 if (cpu == RING_BUFFER_ALL_CPUS) {
732 work = &buffer->irq_work;
735 if (!cpumask_test_cpu(cpu, buffer->cpumask))
738 cpu_buffer = buffer->buffers[cpu];
739 work = &cpu_buffer->irq_work;
743 poll_wait(filp, &work->full_waiters, poll_table);
744 work->full_waiters_pending = true;
746 poll_wait(filp, &work->waiters, poll_table);
747 work->waiters_pending = true;
751 * There's a tight race between setting the waiters_pending and
752 * checking if the ring buffer is empty. Once the waiters_pending bit
753 * is set, the next event will wake the task up, but we can get stuck
754 * if there's only a single event in.
756 * FIXME: Ideally, we need a memory barrier on the writer side as well,
757 * but adding a memory barrier to all events will cause too much of a
758 * performance hit in the fast path. We only need a memory barrier when
759 * the buffer goes from empty to having content. But as this race is
760 * extremely small, and it's not a problem if another event comes in, we
766 return full_hit(buffer, cpu, full) ? EPOLLIN | EPOLLRDNORM : 0;
768 if ((cpu == RING_BUFFER_ALL_CPUS && !ring_buffer_empty(buffer)) ||
769 (cpu != RING_BUFFER_ALL_CPUS && !ring_buffer_empty_cpu(buffer, cpu)))
770 return EPOLLIN | EPOLLRDNORM;
774 /* buffer may be either ring_buffer or ring_buffer_per_cpu */
775 #define RB_WARN_ON(b, cond) \
777 int _____ret = unlikely(cond); \
779 if (__same_type(*(b), struct ring_buffer_per_cpu)) { \
780 struct ring_buffer_per_cpu *__b = \
782 atomic_inc(&__b->buffer->record_disabled); \
784 atomic_inc(&b->record_disabled); \
790 /* Up this if you want to test the TIME_EXTENTS and normalization */
791 #define DEBUG_SHIFT 0
793 static inline u64 rb_time_stamp(struct ring_buffer *buffer)
795 /* shift to debug/test normalization and TIME_EXTENTS */
796 return buffer->clock() << DEBUG_SHIFT;
799 u64 ring_buffer_time_stamp(struct ring_buffer *buffer, int cpu)
803 preempt_disable_notrace();
804 time = rb_time_stamp(buffer);
805 preempt_enable_notrace();
809 EXPORT_SYMBOL_GPL(ring_buffer_time_stamp);
811 void ring_buffer_normalize_time_stamp(struct ring_buffer *buffer,
814 /* Just stupid testing the normalize function and deltas */
817 EXPORT_SYMBOL_GPL(ring_buffer_normalize_time_stamp);
820 * Making the ring buffer lockless makes things tricky.
821 * Although writes only happen on the CPU that they are on,
822 * and they only need to worry about interrupts. Reads can
825 * The reader page is always off the ring buffer, but when the
826 * reader finishes with a page, it needs to swap its page with
827 * a new one from the buffer. The reader needs to take from
828 * the head (writes go to the tail). But if a writer is in overwrite
829 * mode and wraps, it must push the head page forward.
831 * Here lies the problem.
833 * The reader must be careful to replace only the head page, and
834 * not another one. As described at the top of the file in the
835 * ASCII art, the reader sets its old page to point to the next
836 * page after head. It then sets the page after head to point to
837 * the old reader page. But if the writer moves the head page
838 * during this operation, the reader could end up with the tail.
840 * We use cmpxchg to help prevent this race. We also do something
841 * special with the page before head. We set the LSB to 1.
843 * When the writer must push the page forward, it will clear the
844 * bit that points to the head page, move the head, and then set
845 * the bit that points to the new head page.
847 * We also don't want an interrupt coming in and moving the head
848 * page on another writer. Thus we use the second LSB to catch
851 * head->list->prev->next bit 1 bit 0
854 * Points to head page 0 1
857 * Note we can not trust the prev pointer of the head page, because:
859 * +----+ +-----+ +-----+
860 * | |------>| T |---X--->| N |
862 * +----+ +-----+ +-----+
865 * +----------| R |----------+ |
869 * Key: ---X--> HEAD flag set in pointer
874 * (see __rb_reserve_next() to see where this happens)
876 * What the above shows is that the reader just swapped out
877 * the reader page with a page in the buffer, but before it
878 * could make the new header point back to the new page added
879 * it was preempted by a writer. The writer moved forward onto
880 * the new page added by the reader and is about to move forward
883 * You can see, it is legitimate for the previous pointer of
884 * the head (or any page) not to point back to itself. But only
888 #define RB_PAGE_NORMAL 0UL
889 #define RB_PAGE_HEAD 1UL
890 #define RB_PAGE_UPDATE 2UL
893 #define RB_FLAG_MASK 3UL
895 /* PAGE_MOVED is not part of the mask */
896 #define RB_PAGE_MOVED 4UL
899 * rb_list_head - remove any bit
901 static struct list_head *rb_list_head(struct list_head *list)
903 unsigned long val = (unsigned long)list;
905 return (struct list_head *)(val & ~RB_FLAG_MASK);
909 * rb_is_head_page - test if the given page is the head page
911 * Because the reader may move the head_page pointer, we can
912 * not trust what the head page is (it may be pointing to
913 * the reader page). But if the next page is a header page,
914 * its flags will be non zero.
917 rb_is_head_page(struct ring_buffer_per_cpu *cpu_buffer,
918 struct buffer_page *page, struct list_head *list)
922 val = (unsigned long)list->next;
924 if ((val & ~RB_FLAG_MASK) != (unsigned long)&page->list)
925 return RB_PAGE_MOVED;
927 return val & RB_FLAG_MASK;
933 * The unique thing about the reader page, is that, if the
934 * writer is ever on it, the previous pointer never points
935 * back to the reader page.
937 static bool rb_is_reader_page(struct buffer_page *page)
939 struct list_head *list = page->list.prev;
941 return rb_list_head(list->next) != &page->list;
945 * rb_set_list_to_head - set a list_head to be pointing to head.
947 static void rb_set_list_to_head(struct ring_buffer_per_cpu *cpu_buffer,
948 struct list_head *list)
952 ptr = (unsigned long *)&list->next;
953 *ptr |= RB_PAGE_HEAD;
954 *ptr &= ~RB_PAGE_UPDATE;
958 * rb_head_page_activate - sets up head page
960 static void rb_head_page_activate(struct ring_buffer_per_cpu *cpu_buffer)
962 struct buffer_page *head;
964 head = cpu_buffer->head_page;
969 * Set the previous list pointer to have the HEAD flag.
971 rb_set_list_to_head(cpu_buffer, head->list.prev);
974 static void rb_list_head_clear(struct list_head *list)
976 unsigned long *ptr = (unsigned long *)&list->next;
978 *ptr &= ~RB_FLAG_MASK;
982 * rb_head_page_deactivate - clears head page ptr (for free list)
985 rb_head_page_deactivate(struct ring_buffer_per_cpu *cpu_buffer)
987 struct list_head *hd;
989 /* Go through the whole list and clear any pointers found. */
990 rb_list_head_clear(cpu_buffer->pages);
992 list_for_each(hd, cpu_buffer->pages)
993 rb_list_head_clear(hd);
996 static int rb_head_page_set(struct ring_buffer_per_cpu *cpu_buffer,
997 struct buffer_page *head,
998 struct buffer_page *prev,
999 int old_flag, int new_flag)
1001 struct list_head *list;
1002 unsigned long val = (unsigned long)&head->list;
1007 val &= ~RB_FLAG_MASK;
1009 ret = cmpxchg((unsigned long *)&list->next,
1010 val | old_flag, val | new_flag);
1012 /* check if the reader took the page */
1013 if ((ret & ~RB_FLAG_MASK) != val)
1014 return RB_PAGE_MOVED;
1016 return ret & RB_FLAG_MASK;
1019 static int rb_head_page_set_update(struct ring_buffer_per_cpu *cpu_buffer,
1020 struct buffer_page *head,
1021 struct buffer_page *prev,
1024 return rb_head_page_set(cpu_buffer, head, prev,
1025 old_flag, RB_PAGE_UPDATE);
1028 static int rb_head_page_set_head(struct ring_buffer_per_cpu *cpu_buffer,
1029 struct buffer_page *head,
1030 struct buffer_page *prev,
1033 return rb_head_page_set(cpu_buffer, head, prev,
1034 old_flag, RB_PAGE_HEAD);
1037 static int rb_head_page_set_normal(struct ring_buffer_per_cpu *cpu_buffer,
1038 struct buffer_page *head,
1039 struct buffer_page *prev,
1042 return rb_head_page_set(cpu_buffer, head, prev,
1043 old_flag, RB_PAGE_NORMAL);
1046 static inline void rb_inc_page(struct ring_buffer_per_cpu *cpu_buffer,
1047 struct buffer_page **bpage)
1049 struct list_head *p = rb_list_head((*bpage)->list.next);
1051 *bpage = list_entry(p, struct buffer_page, list);
1054 static struct buffer_page *
1055 rb_set_head_page(struct ring_buffer_per_cpu *cpu_buffer)
1057 struct buffer_page *head;
1058 struct buffer_page *page;
1059 struct list_head *list;
1062 if (RB_WARN_ON(cpu_buffer, !cpu_buffer->head_page))
1066 list = cpu_buffer->pages;
1067 if (RB_WARN_ON(cpu_buffer, rb_list_head(list->prev->next) != list))
1070 page = head = cpu_buffer->head_page;
1072 * It is possible that the writer moves the header behind
1073 * where we started, and we miss in one loop.
1074 * A second loop should grab the header, but we'll do
1075 * three loops just because I'm paranoid.
1077 for (i = 0; i < 3; i++) {
1079 if (rb_is_head_page(cpu_buffer, page, page->list.prev)) {
1080 cpu_buffer->head_page = page;
1083 rb_inc_page(cpu_buffer, &page);
1084 } while (page != head);
1087 RB_WARN_ON(cpu_buffer, 1);
1092 static int rb_head_page_replace(struct buffer_page *old,
1093 struct buffer_page *new)
1095 unsigned long *ptr = (unsigned long *)&old->list.prev->next;
1099 val = *ptr & ~RB_FLAG_MASK;
1100 val |= RB_PAGE_HEAD;
1102 ret = cmpxchg(ptr, val, (unsigned long)&new->list);
1108 * rb_tail_page_update - move the tail page forward
1110 static void rb_tail_page_update(struct ring_buffer_per_cpu *cpu_buffer,
1111 struct buffer_page *tail_page,
1112 struct buffer_page *next_page)
1114 unsigned long old_entries;
1115 unsigned long old_write;
1118 * The tail page now needs to be moved forward.
1120 * We need to reset the tail page, but without messing
1121 * with possible erasing of data brought in by interrupts
1122 * that have moved the tail page and are currently on it.
1124 * We add a counter to the write field to denote this.
1126 old_write = local_add_return(RB_WRITE_INTCNT, &next_page->write);
1127 old_entries = local_add_return(RB_WRITE_INTCNT, &next_page->entries);
1129 local_inc(&cpu_buffer->pages_touched);
1131 * Just make sure we have seen our old_write and synchronize
1132 * with any interrupts that come in.
1137 * If the tail page is still the same as what we think
1138 * it is, then it is up to us to update the tail
1141 if (tail_page == READ_ONCE(cpu_buffer->tail_page)) {
1142 /* Zero the write counter */
1143 unsigned long val = old_write & ~RB_WRITE_MASK;
1144 unsigned long eval = old_entries & ~RB_WRITE_MASK;
1147 * This will only succeed if an interrupt did
1148 * not come in and change it. In which case, we
1149 * do not want to modify it.
1151 * We add (void) to let the compiler know that we do not care
1152 * about the return value of these functions. We use the
1153 * cmpxchg to only update if an interrupt did not already
1154 * do it for us. If the cmpxchg fails, we don't care.
1156 (void)local_cmpxchg(&next_page->write, old_write, val);
1157 (void)local_cmpxchg(&next_page->entries, old_entries, eval);
1160 * No need to worry about races with clearing out the commit.
1161 * it only can increment when a commit takes place. But that
1162 * only happens in the outer most nested commit.
1164 local_set(&next_page->page->commit, 0);
1166 /* Again, either we update tail_page or an interrupt does */
1167 (void)cmpxchg(&cpu_buffer->tail_page, tail_page, next_page);
1171 static int rb_check_bpage(struct ring_buffer_per_cpu *cpu_buffer,
1172 struct buffer_page *bpage)
1174 unsigned long val = (unsigned long)bpage;
1176 if (RB_WARN_ON(cpu_buffer, val & RB_FLAG_MASK))
1183 * rb_check_list - make sure a pointer to a list has the last bits zero
1185 static int rb_check_list(struct ring_buffer_per_cpu *cpu_buffer,
1186 struct list_head *list)
1188 if (RB_WARN_ON(cpu_buffer, rb_list_head(list->prev) != list->prev))
1190 if (RB_WARN_ON(cpu_buffer, rb_list_head(list->next) != list->next))
1196 * rb_check_pages - integrity check of buffer pages
1197 * @cpu_buffer: CPU buffer with pages to test
1199 * As a safety measure we check to make sure the data pages have not
1202 static int rb_check_pages(struct ring_buffer_per_cpu *cpu_buffer)
1204 struct list_head *head = cpu_buffer->pages;
1205 struct buffer_page *bpage, *tmp;
1207 /* Reset the head page if it exists */
1208 if (cpu_buffer->head_page)
1209 rb_set_head_page(cpu_buffer);
1211 rb_head_page_deactivate(cpu_buffer);
1213 if (RB_WARN_ON(cpu_buffer, head->next->prev != head))
1215 if (RB_WARN_ON(cpu_buffer, head->prev->next != head))
1218 if (rb_check_list(cpu_buffer, head))
1221 list_for_each_entry_safe(bpage, tmp, head, list) {
1222 if (RB_WARN_ON(cpu_buffer,
1223 bpage->list.next->prev != &bpage->list))
1225 if (RB_WARN_ON(cpu_buffer,
1226 bpage->list.prev->next != &bpage->list))
1228 if (rb_check_list(cpu_buffer, &bpage->list))
1232 rb_head_page_activate(cpu_buffer);
1237 static int __rb_allocate_pages(long nr_pages, struct list_head *pages, int cpu)
1239 struct buffer_page *bpage, *tmp;
1240 bool user_thread = current->mm != NULL;
1245 * Check if the available memory is there first.
1246 * Note, si_mem_available() only gives us a rough estimate of available
1247 * memory. It may not be accurate. But we don't care, we just want
1248 * to prevent doing any allocation when it is obvious that it is
1249 * not going to succeed.
1251 i = si_mem_available();
1256 * __GFP_RETRY_MAYFAIL flag makes sure that the allocation fails
1257 * gracefully without invoking oom-killer and the system is not
1260 mflags = GFP_KERNEL | __GFP_RETRY_MAYFAIL;
1263 * If a user thread allocates too much, and si_mem_available()
1264 * reports there's enough memory, even though there is not.
1265 * Make sure the OOM killer kills this thread. This can happen
1266 * even with RETRY_MAYFAIL because another task may be doing
1267 * an allocation after this task has taken all memory.
1268 * This is the task the OOM killer needs to take out during this
1269 * loop, even if it was triggered by an allocation somewhere else.
1272 set_current_oom_origin();
1273 for (i = 0; i < nr_pages; i++) {
1276 bpage = kzalloc_node(ALIGN(sizeof(*bpage), cache_line_size()),
1277 mflags, cpu_to_node(cpu));
1281 list_add(&bpage->list, pages);
1283 page = alloc_pages_node(cpu_to_node(cpu), mflags, 0);
1286 bpage->page = page_address(page);
1287 rb_init_page(bpage->page);
1289 if (user_thread && fatal_signal_pending(current))
1293 clear_current_oom_origin();
1298 list_for_each_entry_safe(bpage, tmp, pages, list) {
1299 list_del_init(&bpage->list);
1300 free_buffer_page(bpage);
1303 clear_current_oom_origin();
1308 static int rb_allocate_pages(struct ring_buffer_per_cpu *cpu_buffer,
1309 unsigned long nr_pages)
1315 if (__rb_allocate_pages(nr_pages, &pages, cpu_buffer->cpu))
1319 * The ring buffer page list is a circular list that does not
1320 * start and end with a list head. All page list items point to
1323 cpu_buffer->pages = pages.next;
1326 cpu_buffer->nr_pages = nr_pages;
1328 rb_check_pages(cpu_buffer);
1333 static struct ring_buffer_per_cpu *
1334 rb_allocate_cpu_buffer(struct ring_buffer *buffer, long nr_pages, int cpu)
1336 struct ring_buffer_per_cpu *cpu_buffer;
1337 struct buffer_page *bpage;
1341 cpu_buffer = kzalloc_node(ALIGN(sizeof(*cpu_buffer), cache_line_size()),
1342 GFP_KERNEL, cpu_to_node(cpu));
1346 cpu_buffer->cpu = cpu;
1347 cpu_buffer->buffer = buffer;
1348 raw_spin_lock_init(&cpu_buffer->reader_lock);
1349 lockdep_set_class(&cpu_buffer->reader_lock, buffer->reader_lock_key);
1350 cpu_buffer->lock = (arch_spinlock_t)__ARCH_SPIN_LOCK_UNLOCKED;
1351 INIT_WORK(&cpu_buffer->update_pages_work, update_pages_handler);
1352 init_completion(&cpu_buffer->update_done);
1353 init_irq_work(&cpu_buffer->irq_work.work, rb_wake_up_waiters);
1354 init_waitqueue_head(&cpu_buffer->irq_work.waiters);
1355 init_waitqueue_head(&cpu_buffer->irq_work.full_waiters);
1357 bpage = kzalloc_node(ALIGN(sizeof(*bpage), cache_line_size()),
1358 GFP_KERNEL, cpu_to_node(cpu));
1360 goto fail_free_buffer;
1362 rb_check_bpage(cpu_buffer, bpage);
1364 cpu_buffer->reader_page = bpage;
1365 page = alloc_pages_node(cpu_to_node(cpu), GFP_KERNEL, 0);
1367 goto fail_free_reader;
1368 bpage->page = page_address(page);
1369 rb_init_page(bpage->page);
1371 INIT_LIST_HEAD(&cpu_buffer->reader_page->list);
1372 INIT_LIST_HEAD(&cpu_buffer->new_pages);
1374 ret = rb_allocate_pages(cpu_buffer, nr_pages);
1376 goto fail_free_reader;
1378 cpu_buffer->head_page
1379 = list_entry(cpu_buffer->pages, struct buffer_page, list);
1380 cpu_buffer->tail_page = cpu_buffer->commit_page = cpu_buffer->head_page;
1382 rb_head_page_activate(cpu_buffer);
1387 free_buffer_page(cpu_buffer->reader_page);
1394 static void rb_free_cpu_buffer(struct ring_buffer_per_cpu *cpu_buffer)
1396 struct list_head *head = cpu_buffer->pages;
1397 struct buffer_page *bpage, *tmp;
1399 irq_work_sync(&cpu_buffer->irq_work.work);
1401 free_buffer_page(cpu_buffer->reader_page);
1404 rb_head_page_deactivate(cpu_buffer);
1406 list_for_each_entry_safe(bpage, tmp, head, list) {
1407 list_del_init(&bpage->list);
1408 free_buffer_page(bpage);
1410 bpage = list_entry(head, struct buffer_page, list);
1411 free_buffer_page(bpage);
1418 * __ring_buffer_alloc - allocate a new ring_buffer
1419 * @size: the size in bytes per cpu that is needed.
1420 * @flags: attributes to set for the ring buffer.
1422 * Currently the only flag that is available is the RB_FL_OVERWRITE
1423 * flag. This flag means that the buffer will overwrite old data
1424 * when the buffer wraps. If this flag is not set, the buffer will
1425 * drop data when the tail hits the head.
1427 struct ring_buffer *__ring_buffer_alloc(unsigned long size, unsigned flags,
1428 struct lock_class_key *key)
1430 struct ring_buffer *buffer;
1436 /* keep it in its own cache line */
1437 buffer = kzalloc(ALIGN(sizeof(*buffer), cache_line_size()),
1442 if (!zalloc_cpumask_var(&buffer->cpumask, GFP_KERNEL))
1443 goto fail_free_buffer;
1445 nr_pages = DIV_ROUND_UP(size, BUF_PAGE_SIZE);
1446 buffer->flags = flags;
1447 buffer->clock = trace_clock_local;
1448 buffer->reader_lock_key = key;
1450 init_irq_work(&buffer->irq_work.work, rb_wake_up_waiters);
1451 init_waitqueue_head(&buffer->irq_work.waiters);
1453 /* need at least two pages */
1457 buffer->cpus = nr_cpu_ids;
1459 bsize = sizeof(void *) * nr_cpu_ids;
1460 buffer->buffers = kzalloc(ALIGN(bsize, cache_line_size()),
1462 if (!buffer->buffers)
1463 goto fail_free_cpumask;
1465 cpu = raw_smp_processor_id();
1466 cpumask_set_cpu(cpu, buffer->cpumask);
1467 buffer->buffers[cpu] = rb_allocate_cpu_buffer(buffer, nr_pages, cpu);
1468 if (!buffer->buffers[cpu])
1469 goto fail_free_buffers;
1471 ret = cpuhp_state_add_instance(CPUHP_TRACE_RB_PREPARE, &buffer->node);
1473 goto fail_free_buffers;
1475 mutex_init(&buffer->mutex);
1480 for_each_buffer_cpu(buffer, cpu) {
1481 if (buffer->buffers[cpu])
1482 rb_free_cpu_buffer(buffer->buffers[cpu]);
1484 kfree(buffer->buffers);
1487 free_cpumask_var(buffer->cpumask);
1493 EXPORT_SYMBOL_GPL(__ring_buffer_alloc);
1496 * ring_buffer_free - free a ring buffer.
1497 * @buffer: the buffer to free.
1500 ring_buffer_free(struct ring_buffer *buffer)
1504 cpuhp_state_remove_instance(CPUHP_TRACE_RB_PREPARE, &buffer->node);
1506 irq_work_sync(&buffer->irq_work.work);
1508 for_each_buffer_cpu(buffer, cpu)
1509 rb_free_cpu_buffer(buffer->buffers[cpu]);
1511 kfree(buffer->buffers);
1512 free_cpumask_var(buffer->cpumask);
1516 EXPORT_SYMBOL_GPL(ring_buffer_free);
1518 void ring_buffer_set_clock(struct ring_buffer *buffer,
1521 buffer->clock = clock;
1524 void ring_buffer_set_time_stamp_abs(struct ring_buffer *buffer, bool abs)
1526 buffer->time_stamp_abs = abs;
1529 bool ring_buffer_time_stamp_abs(struct ring_buffer *buffer)
1531 return buffer->time_stamp_abs;
1534 static void rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer);
1536 static inline unsigned long rb_page_entries(struct buffer_page *bpage)
1538 return local_read(&bpage->entries) & RB_WRITE_MASK;
1541 static inline unsigned long rb_page_write(struct buffer_page *bpage)
1543 return local_read(&bpage->write) & RB_WRITE_MASK;
1547 rb_remove_pages(struct ring_buffer_per_cpu *cpu_buffer, unsigned long nr_pages)
1549 struct list_head *tail_page, *to_remove, *next_page;
1550 struct buffer_page *to_remove_page, *tmp_iter_page;
1551 struct buffer_page *last_page, *first_page;
1552 unsigned long nr_removed;
1553 unsigned long head_bit;
1558 raw_spin_lock_irq(&cpu_buffer->reader_lock);
1559 atomic_inc(&cpu_buffer->record_disabled);
1561 * We don't race with the readers since we have acquired the reader
1562 * lock. We also don't race with writers after disabling recording.
1563 * This makes it easy to figure out the first and the last page to be
1564 * removed from the list. We unlink all the pages in between including
1565 * the first and last pages. This is done in a busy loop so that we
1566 * lose the least number of traces.
1567 * The pages are freed after we restart recording and unlock readers.
1569 tail_page = &cpu_buffer->tail_page->list;
1572 * tail page might be on reader page, we remove the next page
1573 * from the ring buffer
1575 if (cpu_buffer->tail_page == cpu_buffer->reader_page)
1576 tail_page = rb_list_head(tail_page->next);
1577 to_remove = tail_page;
1579 /* start of pages to remove */
1580 first_page = list_entry(rb_list_head(to_remove->next),
1581 struct buffer_page, list);
1583 for (nr_removed = 0; nr_removed < nr_pages; nr_removed++) {
1584 to_remove = rb_list_head(to_remove)->next;
1585 head_bit |= (unsigned long)to_remove & RB_PAGE_HEAD;
1587 /* Read iterators need to reset themselves when some pages removed */
1588 cpu_buffer->pages_removed += nr_removed;
1590 next_page = rb_list_head(to_remove)->next;
1593 * Now we remove all pages between tail_page and next_page.
1594 * Make sure that we have head_bit value preserved for the
1597 tail_page->next = (struct list_head *)((unsigned long)next_page |
1599 next_page = rb_list_head(next_page);
1600 next_page->prev = tail_page;
1602 /* make sure pages points to a valid page in the ring buffer */
1603 cpu_buffer->pages = next_page;
1605 /* update head page */
1607 cpu_buffer->head_page = list_entry(next_page,
1608 struct buffer_page, list);
1610 /* pages are removed, resume tracing and then free the pages */
1611 atomic_dec(&cpu_buffer->record_disabled);
1612 raw_spin_unlock_irq(&cpu_buffer->reader_lock);
1614 RB_WARN_ON(cpu_buffer, list_empty(cpu_buffer->pages));
1616 /* last buffer page to remove */
1617 last_page = list_entry(rb_list_head(to_remove), struct buffer_page,
1619 tmp_iter_page = first_page;
1624 to_remove_page = tmp_iter_page;
1625 rb_inc_page(cpu_buffer, &tmp_iter_page);
1627 /* update the counters */
1628 page_entries = rb_page_entries(to_remove_page);
1631 * If something was added to this page, it was full
1632 * since it is not the tail page. So we deduct the
1633 * bytes consumed in ring buffer from here.
1634 * Increment overrun to account for the lost events.
1636 local_add(page_entries, &cpu_buffer->overrun);
1637 local_sub(BUF_PAGE_SIZE, &cpu_buffer->entries_bytes);
1638 local_inc(&cpu_buffer->pages_lost);
1642 * We have already removed references to this list item, just
1643 * free up the buffer_page and its page
1645 free_buffer_page(to_remove_page);
1648 } while (to_remove_page != last_page);
1650 RB_WARN_ON(cpu_buffer, nr_removed);
1652 return nr_removed == 0;
1656 rb_insert_pages(struct ring_buffer_per_cpu *cpu_buffer)
1658 struct list_head *pages = &cpu_buffer->new_pages;
1659 int retries, success;
1661 raw_spin_lock_irq(&cpu_buffer->reader_lock);
1663 * We are holding the reader lock, so the reader page won't be swapped
1664 * in the ring buffer. Now we are racing with the writer trying to
1665 * move head page and the tail page.
1666 * We are going to adapt the reader page update process where:
1667 * 1. We first splice the start and end of list of new pages between
1668 * the head page and its previous page.
1669 * 2. We cmpxchg the prev_page->next to point from head page to the
1670 * start of new pages list.
1671 * 3. Finally, we update the head->prev to the end of new list.
1673 * We will try this process 10 times, to make sure that we don't keep
1679 struct list_head *head_page, *prev_page, *r;
1680 struct list_head *last_page, *first_page;
1681 struct list_head *head_page_with_bit;
1683 head_page = &rb_set_head_page(cpu_buffer)->list;
1686 prev_page = head_page->prev;
1688 first_page = pages->next;
1689 last_page = pages->prev;
1691 head_page_with_bit = (struct list_head *)
1692 ((unsigned long)head_page | RB_PAGE_HEAD);
1694 last_page->next = head_page_with_bit;
1695 first_page->prev = prev_page;
1697 r = cmpxchg(&prev_page->next, head_page_with_bit, first_page);
1699 if (r == head_page_with_bit) {
1701 * yay, we replaced the page pointer to our new list,
1702 * now, we just have to update to head page's prev
1703 * pointer to point to end of list
1705 head_page->prev = last_page;
1712 INIT_LIST_HEAD(pages);
1714 * If we weren't successful in adding in new pages, warn and stop
1717 RB_WARN_ON(cpu_buffer, !success);
1718 raw_spin_unlock_irq(&cpu_buffer->reader_lock);
1720 /* free pages if they weren't inserted */
1722 struct buffer_page *bpage, *tmp;
1723 list_for_each_entry_safe(bpage, tmp, &cpu_buffer->new_pages,
1725 list_del_init(&bpage->list);
1726 free_buffer_page(bpage);
1732 static void rb_update_pages(struct ring_buffer_per_cpu *cpu_buffer)
1736 if (cpu_buffer->nr_pages_to_update > 0)
1737 success = rb_insert_pages(cpu_buffer);
1739 success = rb_remove_pages(cpu_buffer,
1740 -cpu_buffer->nr_pages_to_update);
1743 cpu_buffer->nr_pages += cpu_buffer->nr_pages_to_update;
1746 static void update_pages_handler(struct work_struct *work)
1748 struct ring_buffer_per_cpu *cpu_buffer = container_of(work,
1749 struct ring_buffer_per_cpu, update_pages_work);
1750 rb_update_pages(cpu_buffer);
1751 complete(&cpu_buffer->update_done);
1755 * ring_buffer_resize - resize the ring buffer
1756 * @buffer: the buffer to resize.
1757 * @size: the new size.
1758 * @cpu_id: the cpu buffer to resize
1760 * Minimum size is 2 * BUF_PAGE_SIZE.
1762 * Returns 0 on success and < 0 on failure.
1764 int ring_buffer_resize(struct ring_buffer *buffer, unsigned long size,
1767 struct ring_buffer_per_cpu *cpu_buffer;
1768 unsigned long nr_pages;
1772 * Always succeed at resizing a non-existent buffer:
1777 /* Make sure the requested buffer exists */
1778 if (cpu_id != RING_BUFFER_ALL_CPUS &&
1779 !cpumask_test_cpu(cpu_id, buffer->cpumask))
1782 nr_pages = DIV_ROUND_UP(size, BUF_PAGE_SIZE);
1784 /* we need a minimum of two pages */
1788 size = nr_pages * BUF_PAGE_SIZE;
1791 * Don't succeed if resizing is disabled, as a reader might be
1792 * manipulating the ring buffer and is expecting a sane state while
1795 if (atomic_read(&buffer->resize_disabled))
1798 /* prevent another thread from changing buffer sizes */
1799 mutex_lock(&buffer->mutex);
1801 if (cpu_id == RING_BUFFER_ALL_CPUS) {
1802 /* calculate the pages to update */
1803 for_each_buffer_cpu(buffer, cpu) {
1804 cpu_buffer = buffer->buffers[cpu];
1806 cpu_buffer->nr_pages_to_update = nr_pages -
1807 cpu_buffer->nr_pages;
1809 * nothing more to do for removing pages or no update
1811 if (cpu_buffer->nr_pages_to_update <= 0)
1814 * to add pages, make sure all new pages can be
1815 * allocated without receiving ENOMEM
1817 INIT_LIST_HEAD(&cpu_buffer->new_pages);
1818 if (__rb_allocate_pages(cpu_buffer->nr_pages_to_update,
1819 &cpu_buffer->new_pages, cpu)) {
1820 /* not enough memory for new pages */
1828 * Fire off all the required work handlers
1829 * We can't schedule on offline CPUs, but it's not necessary
1830 * since we can change their buffer sizes without any race.
1832 for_each_buffer_cpu(buffer, cpu) {
1833 cpu_buffer = buffer->buffers[cpu];
1834 if (!cpu_buffer->nr_pages_to_update)
1837 /* Can't run something on an offline CPU. */
1838 if (!cpu_online(cpu)) {
1839 rb_update_pages(cpu_buffer);
1840 cpu_buffer->nr_pages_to_update = 0;
1842 schedule_work_on(cpu,
1843 &cpu_buffer->update_pages_work);
1847 /* wait for all the updates to complete */
1848 for_each_buffer_cpu(buffer, cpu) {
1849 cpu_buffer = buffer->buffers[cpu];
1850 if (!cpu_buffer->nr_pages_to_update)
1853 if (cpu_online(cpu))
1854 wait_for_completion(&cpu_buffer->update_done);
1855 cpu_buffer->nr_pages_to_update = 0;
1860 /* Make sure this CPU has been initialized */
1861 if (!cpumask_test_cpu(cpu_id, buffer->cpumask))
1864 cpu_buffer = buffer->buffers[cpu_id];
1866 if (nr_pages == cpu_buffer->nr_pages)
1869 cpu_buffer->nr_pages_to_update = nr_pages -
1870 cpu_buffer->nr_pages;
1872 INIT_LIST_HEAD(&cpu_buffer->new_pages);
1873 if (cpu_buffer->nr_pages_to_update > 0 &&
1874 __rb_allocate_pages(cpu_buffer->nr_pages_to_update,
1875 &cpu_buffer->new_pages, cpu_id)) {
1882 /* Can't run something on an offline CPU. */
1883 if (!cpu_online(cpu_id))
1884 rb_update_pages(cpu_buffer);
1886 schedule_work_on(cpu_id,
1887 &cpu_buffer->update_pages_work);
1888 wait_for_completion(&cpu_buffer->update_done);
1891 cpu_buffer->nr_pages_to_update = 0;
1897 * The ring buffer resize can happen with the ring buffer
1898 * enabled, so that the update disturbs the tracing as little
1899 * as possible. But if the buffer is disabled, we do not need
1900 * to worry about that, and we can take the time to verify
1901 * that the buffer is not corrupt.
1903 if (atomic_read(&buffer->record_disabled)) {
1904 atomic_inc(&buffer->record_disabled);
1906 * Even though the buffer was disabled, we must make sure
1907 * that it is truly disabled before calling rb_check_pages.
1908 * There could have been a race between checking
1909 * record_disable and incrementing it.
1912 for_each_buffer_cpu(buffer, cpu) {
1913 cpu_buffer = buffer->buffers[cpu];
1914 rb_check_pages(cpu_buffer);
1916 atomic_dec(&buffer->record_disabled);
1919 mutex_unlock(&buffer->mutex);
1923 for_each_buffer_cpu(buffer, cpu) {
1924 struct buffer_page *bpage, *tmp;
1926 cpu_buffer = buffer->buffers[cpu];
1927 cpu_buffer->nr_pages_to_update = 0;
1929 if (list_empty(&cpu_buffer->new_pages))
1932 list_for_each_entry_safe(bpage, tmp, &cpu_buffer->new_pages,
1934 list_del_init(&bpage->list);
1935 free_buffer_page(bpage);
1938 mutex_unlock(&buffer->mutex);
1941 EXPORT_SYMBOL_GPL(ring_buffer_resize);
1943 void ring_buffer_change_overwrite(struct ring_buffer *buffer, int val)
1945 mutex_lock(&buffer->mutex);
1947 buffer->flags |= RB_FL_OVERWRITE;
1949 buffer->flags &= ~RB_FL_OVERWRITE;
1950 mutex_unlock(&buffer->mutex);
1952 EXPORT_SYMBOL_GPL(ring_buffer_change_overwrite);
1954 static __always_inline void *__rb_page_index(struct buffer_page *bpage, unsigned index)
1956 return bpage->page->data + index;
1959 static __always_inline struct ring_buffer_event *
1960 rb_reader_event(struct ring_buffer_per_cpu *cpu_buffer)
1962 return __rb_page_index(cpu_buffer->reader_page,
1963 cpu_buffer->reader_page->read);
1966 static __always_inline struct ring_buffer_event *
1967 rb_iter_head_event(struct ring_buffer_iter *iter)
1969 return __rb_page_index(iter->head_page, iter->head);
1972 static __always_inline unsigned rb_page_commit(struct buffer_page *bpage)
1974 return local_read(&bpage->page->commit);
1977 /* Size is determined by what has been committed */
1978 static __always_inline unsigned rb_page_size(struct buffer_page *bpage)
1980 return rb_page_commit(bpage);
1983 static __always_inline unsigned
1984 rb_commit_index(struct ring_buffer_per_cpu *cpu_buffer)
1986 return rb_page_commit(cpu_buffer->commit_page);
1989 static __always_inline unsigned
1990 rb_event_index(struct ring_buffer_event *event)
1992 unsigned long addr = (unsigned long)event;
1994 return (addr & ~PAGE_MASK) - BUF_PAGE_HDR_SIZE;
1997 static void rb_inc_iter(struct ring_buffer_iter *iter)
1999 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer;
2002 * The iterator could be on the reader page (it starts there).
2003 * But the head could have moved, since the reader was
2004 * found. Check for this case and assign the iterator
2005 * to the head page instead of next.
2007 if (iter->head_page == cpu_buffer->reader_page)
2008 iter->head_page = rb_set_head_page(cpu_buffer);
2010 rb_inc_page(cpu_buffer, &iter->head_page);
2012 iter->read_stamp = iter->head_page->page->time_stamp;
2017 * rb_handle_head_page - writer hit the head page
2019 * Returns: +1 to retry page
2024 rb_handle_head_page(struct ring_buffer_per_cpu *cpu_buffer,
2025 struct buffer_page *tail_page,
2026 struct buffer_page *next_page)
2028 struct buffer_page *new_head;
2033 entries = rb_page_entries(next_page);
2036 * The hard part is here. We need to move the head
2037 * forward, and protect against both readers on
2038 * other CPUs and writers coming in via interrupts.
2040 type = rb_head_page_set_update(cpu_buffer, next_page, tail_page,
2044 * type can be one of four:
2045 * NORMAL - an interrupt already moved it for us
2046 * HEAD - we are the first to get here.
2047 * UPDATE - we are the interrupt interrupting
2049 * MOVED - a reader on another CPU moved the next
2050 * pointer to its reader page. Give up
2057 * We changed the head to UPDATE, thus
2058 * it is our responsibility to update
2061 local_add(entries, &cpu_buffer->overrun);
2062 local_sub(BUF_PAGE_SIZE, &cpu_buffer->entries_bytes);
2063 local_inc(&cpu_buffer->pages_lost);
2066 * The entries will be zeroed out when we move the
2070 /* still more to do */
2073 case RB_PAGE_UPDATE:
2075 * This is an interrupt that interrupt the
2076 * previous update. Still more to do.
2079 case RB_PAGE_NORMAL:
2081 * An interrupt came in before the update
2082 * and processed this for us.
2083 * Nothing left to do.
2088 * The reader is on another CPU and just did
2089 * a swap with our next_page.
2094 RB_WARN_ON(cpu_buffer, 1); /* WTF??? */
2099 * Now that we are here, the old head pointer is
2100 * set to UPDATE. This will keep the reader from
2101 * swapping the head page with the reader page.
2102 * The reader (on another CPU) will spin till
2105 * We just need to protect against interrupts
2106 * doing the job. We will set the next pointer
2107 * to HEAD. After that, we set the old pointer
2108 * to NORMAL, but only if it was HEAD before.
2109 * otherwise we are an interrupt, and only
2110 * want the outer most commit to reset it.
2112 new_head = next_page;
2113 rb_inc_page(cpu_buffer, &new_head);
2115 ret = rb_head_page_set_head(cpu_buffer, new_head, next_page,
2119 * Valid returns are:
2120 * HEAD - an interrupt came in and already set it.
2121 * NORMAL - One of two things:
2122 * 1) We really set it.
2123 * 2) A bunch of interrupts came in and moved
2124 * the page forward again.
2128 case RB_PAGE_NORMAL:
2132 RB_WARN_ON(cpu_buffer, 1);
2137 * It is possible that an interrupt came in,
2138 * set the head up, then more interrupts came in
2139 * and moved it again. When we get back here,
2140 * the page would have been set to NORMAL but we
2141 * just set it back to HEAD.
2143 * How do you detect this? Well, if that happened
2144 * the tail page would have moved.
2146 if (ret == RB_PAGE_NORMAL) {
2147 struct buffer_page *buffer_tail_page;
2149 buffer_tail_page = READ_ONCE(cpu_buffer->tail_page);
2151 * If the tail had moved passed next, then we need
2152 * to reset the pointer.
2154 if (buffer_tail_page != tail_page &&
2155 buffer_tail_page != next_page)
2156 rb_head_page_set_normal(cpu_buffer, new_head,
2162 * If this was the outer most commit (the one that
2163 * changed the original pointer from HEAD to UPDATE),
2164 * then it is up to us to reset it to NORMAL.
2166 if (type == RB_PAGE_HEAD) {
2167 ret = rb_head_page_set_normal(cpu_buffer, next_page,
2170 if (RB_WARN_ON(cpu_buffer,
2171 ret != RB_PAGE_UPDATE))
2179 rb_reset_tail(struct ring_buffer_per_cpu *cpu_buffer,
2180 unsigned long tail, struct rb_event_info *info)
2182 struct buffer_page *tail_page = info->tail_page;
2183 struct ring_buffer_event *event;
2184 unsigned long length = info->length;
2187 * Only the event that crossed the page boundary
2188 * must fill the old tail_page with padding.
2190 if (tail >= BUF_PAGE_SIZE) {
2192 * If the page was filled, then we still need
2193 * to update the real_end. Reset it to zero
2194 * and the reader will ignore it.
2196 if (tail == BUF_PAGE_SIZE)
2197 tail_page->real_end = 0;
2199 local_sub(length, &tail_page->write);
2203 event = __rb_page_index(tail_page, tail);
2205 /* account for padding bytes */
2206 local_add(BUF_PAGE_SIZE - tail, &cpu_buffer->entries_bytes);
2209 * Save the original length to the meta data.
2210 * This will be used by the reader to add lost event
2213 tail_page->real_end = tail;
2216 * If this event is bigger than the minimum size, then
2217 * we need to be careful that we don't subtract the
2218 * write counter enough to allow another writer to slip
2220 * We put in a discarded commit instead, to make sure
2221 * that this space is not used again.
2223 * If we are less than the minimum size, we don't need to
2226 if (tail > (BUF_PAGE_SIZE - RB_EVNT_MIN_SIZE)) {
2227 /* No room for any events */
2229 /* Mark the rest of the page with padding */
2230 rb_event_set_padding(event);
2232 /* Make sure the padding is visible before the write update */
2235 /* Set the write back to the previous setting */
2236 local_sub(length, &tail_page->write);
2240 /* Put in a discarded event */
2241 event->array[0] = (BUF_PAGE_SIZE - tail) - RB_EVNT_HDR_SIZE;
2242 event->type_len = RINGBUF_TYPE_PADDING;
2243 /* time delta must be non zero */
2244 event->time_delta = 1;
2246 /* Make sure the padding is visible before the tail_page->write update */
2249 /* Set write to end of buffer */
2250 length = (tail + length) - BUF_PAGE_SIZE;
2251 local_sub(length, &tail_page->write);
2254 static inline void rb_end_commit(struct ring_buffer_per_cpu *cpu_buffer);
2257 * This is the slow path, force gcc not to inline it.
2259 static noinline struct ring_buffer_event *
2260 rb_move_tail(struct ring_buffer_per_cpu *cpu_buffer,
2261 unsigned long tail, struct rb_event_info *info)
2263 struct buffer_page *tail_page = info->tail_page;
2264 struct buffer_page *commit_page = cpu_buffer->commit_page;
2265 struct ring_buffer *buffer = cpu_buffer->buffer;
2266 struct buffer_page *next_page;
2269 next_page = tail_page;
2271 rb_inc_page(cpu_buffer, &next_page);
2274 * If for some reason, we had an interrupt storm that made
2275 * it all the way around the buffer, bail, and warn
2278 if (unlikely(next_page == commit_page)) {
2279 local_inc(&cpu_buffer->commit_overrun);
2284 * This is where the fun begins!
2286 * We are fighting against races between a reader that
2287 * could be on another CPU trying to swap its reader
2288 * page with the buffer head.
2290 * We are also fighting against interrupts coming in and
2291 * moving the head or tail on us as well.
2293 * If the next page is the head page then we have filled
2294 * the buffer, unless the commit page is still on the
2297 if (rb_is_head_page(cpu_buffer, next_page, &tail_page->list)) {
2300 * If the commit is not on the reader page, then
2301 * move the header page.
2303 if (!rb_is_reader_page(cpu_buffer->commit_page)) {
2305 * If we are not in overwrite mode,
2306 * this is easy, just stop here.
2308 if (!(buffer->flags & RB_FL_OVERWRITE)) {
2309 local_inc(&cpu_buffer->dropped_events);
2313 ret = rb_handle_head_page(cpu_buffer,
2322 * We need to be careful here too. The
2323 * commit page could still be on the reader
2324 * page. We could have a small buffer, and
2325 * have filled up the buffer with events
2326 * from interrupts and such, and wrapped.
2328 * Note, if the tail page is also the on the
2329 * reader_page, we let it move out.
2331 if (unlikely((cpu_buffer->commit_page !=
2332 cpu_buffer->tail_page) &&
2333 (cpu_buffer->commit_page ==
2334 cpu_buffer->reader_page))) {
2335 local_inc(&cpu_buffer->commit_overrun);
2341 rb_tail_page_update(cpu_buffer, tail_page, next_page);
2345 rb_reset_tail(cpu_buffer, tail, info);
2347 /* Commit what we have for now. */
2348 rb_end_commit(cpu_buffer);
2349 /* rb_end_commit() decs committing */
2350 local_inc(&cpu_buffer->committing);
2352 /* fail and let the caller try again */
2353 return ERR_PTR(-EAGAIN);
2357 rb_reset_tail(cpu_buffer, tail, info);
2362 /* Slow path, do not inline */
2363 static noinline struct ring_buffer_event *
2364 rb_add_time_stamp(struct ring_buffer_event *event, u64 delta, bool abs)
2367 event->type_len = RINGBUF_TYPE_TIME_STAMP;
2369 event->type_len = RINGBUF_TYPE_TIME_EXTEND;
2371 /* Not the first event on the page, or not delta? */
2372 if (abs || rb_event_index(event)) {
2373 event->time_delta = delta & TS_MASK;
2374 event->array[0] = delta >> TS_SHIFT;
2376 /* nope, just zero it */
2377 event->time_delta = 0;
2378 event->array[0] = 0;
2381 return skip_time_extend(event);
2384 static inline bool rb_event_is_commit(struct ring_buffer_per_cpu *cpu_buffer,
2385 struct ring_buffer_event *event);
2388 * rb_update_event - update event type and data
2389 * @event: the event to update
2390 * @type: the type of event
2391 * @length: the size of the event field in the ring buffer
2393 * Update the type and data fields of the event. The length
2394 * is the actual size that is written to the ring buffer,
2395 * and with this, we can determine what to place into the
2399 rb_update_event(struct ring_buffer_per_cpu *cpu_buffer,
2400 struct ring_buffer_event *event,
2401 struct rb_event_info *info)
2403 unsigned length = info->length;
2404 u64 delta = info->delta;
2406 /* Only a commit updates the timestamp */
2407 if (unlikely(!rb_event_is_commit(cpu_buffer, event)))
2411 * If we need to add a timestamp, then we
2412 * add it to the start of the reserved space.
2414 if (unlikely(info->add_timestamp)) {
2415 bool abs = ring_buffer_time_stamp_abs(cpu_buffer->buffer);
2417 event = rb_add_time_stamp(event, abs ? info->delta : delta, abs);
2418 length -= RB_LEN_TIME_EXTEND;
2422 event->time_delta = delta;
2423 length -= RB_EVNT_HDR_SIZE;
2424 if (length > RB_MAX_SMALL_DATA || RB_FORCE_8BYTE_ALIGNMENT) {
2425 event->type_len = 0;
2426 event->array[0] = length;
2428 event->type_len = DIV_ROUND_UP(length, RB_ALIGNMENT);
2431 static unsigned rb_calculate_event_length(unsigned length)
2433 struct ring_buffer_event event; /* Used only for sizeof array */
2435 /* zero length can cause confusions */
2439 if (length > RB_MAX_SMALL_DATA || RB_FORCE_8BYTE_ALIGNMENT)
2440 length += sizeof(event.array[0]);
2442 length += RB_EVNT_HDR_SIZE;
2443 length = ALIGN(length, RB_ARCH_ALIGNMENT);
2446 * In case the time delta is larger than the 27 bits for it
2447 * in the header, we need to add a timestamp. If another
2448 * event comes in when trying to discard this one to increase
2449 * the length, then the timestamp will be added in the allocated
2450 * space of this event. If length is bigger than the size needed
2451 * for the TIME_EXTEND, then padding has to be used. The events
2452 * length must be either RB_LEN_TIME_EXTEND, or greater than or equal
2453 * to RB_LEN_TIME_EXTEND + 8, as 8 is the minimum size for padding.
2454 * As length is a multiple of 4, we only need to worry if it
2455 * is 12 (RB_LEN_TIME_EXTEND + 4).
2457 if (length == RB_LEN_TIME_EXTEND + RB_ALIGNMENT)
2458 length += RB_ALIGNMENT;
2463 #ifndef CONFIG_HAVE_UNSTABLE_SCHED_CLOCK
2464 static inline bool sched_clock_stable(void)
2471 rb_try_to_discard(struct ring_buffer_per_cpu *cpu_buffer,
2472 struct ring_buffer_event *event)
2474 unsigned long new_index, old_index;
2475 struct buffer_page *bpage;
2476 unsigned long index;
2479 new_index = rb_event_index(event);
2480 old_index = new_index + rb_event_ts_length(event);
2481 addr = (unsigned long)event;
2484 bpage = READ_ONCE(cpu_buffer->tail_page);
2486 if (bpage->page == (void *)addr && rb_page_write(bpage) == old_index) {
2487 unsigned long write_mask =
2488 local_read(&bpage->write) & ~RB_WRITE_MASK;
2489 unsigned long event_length = rb_event_length(event);
2491 * This is on the tail page. It is possible that
2492 * a write could come in and move the tail page
2493 * and write to the next page. That is fine
2494 * because we just shorten what is on this page.
2496 old_index += write_mask;
2497 new_index += write_mask;
2498 index = local_cmpxchg(&bpage->write, old_index, new_index);
2499 if (index == old_index) {
2500 /* update counters */
2501 local_sub(event_length, &cpu_buffer->entries_bytes);
2506 /* could not discard */
2510 static void rb_start_commit(struct ring_buffer_per_cpu *cpu_buffer)
2512 local_inc(&cpu_buffer->committing);
2513 local_inc(&cpu_buffer->commits);
2516 static __always_inline void
2517 rb_set_commit_to_write(struct ring_buffer_per_cpu *cpu_buffer)
2519 unsigned long max_count;
2522 * We only race with interrupts and NMIs on this CPU.
2523 * If we own the commit event, then we can commit
2524 * all others that interrupted us, since the interruptions
2525 * are in stack format (they finish before they come
2526 * back to us). This allows us to do a simple loop to
2527 * assign the commit to the tail.
2530 max_count = cpu_buffer->nr_pages * 100;
2532 while (cpu_buffer->commit_page != READ_ONCE(cpu_buffer->tail_page)) {
2533 if (RB_WARN_ON(cpu_buffer, !(--max_count)))
2535 if (RB_WARN_ON(cpu_buffer,
2536 rb_is_reader_page(cpu_buffer->tail_page)))
2539 * No need for a memory barrier here, as the update
2540 * of the tail_page did it for this page.
2542 local_set(&cpu_buffer->commit_page->page->commit,
2543 rb_page_write(cpu_buffer->commit_page));
2544 rb_inc_page(cpu_buffer, &cpu_buffer->commit_page);
2545 /* Only update the write stamp if the page has an event */
2546 if (rb_page_write(cpu_buffer->commit_page))
2547 cpu_buffer->write_stamp =
2548 cpu_buffer->commit_page->page->time_stamp;
2549 /* add barrier to keep gcc from optimizing too much */
2552 while (rb_commit_index(cpu_buffer) !=
2553 rb_page_write(cpu_buffer->commit_page)) {
2555 /* Make sure the readers see the content of what is committed. */
2557 local_set(&cpu_buffer->commit_page->page->commit,
2558 rb_page_write(cpu_buffer->commit_page));
2559 RB_WARN_ON(cpu_buffer,
2560 local_read(&cpu_buffer->commit_page->page->commit) &
2565 /* again, keep gcc from optimizing */
2569 * If an interrupt came in just after the first while loop
2570 * and pushed the tail page forward, we will be left with
2571 * a dangling commit that will never go forward.
2573 if (unlikely(cpu_buffer->commit_page != READ_ONCE(cpu_buffer->tail_page)))
2577 static __always_inline void rb_end_commit(struct ring_buffer_per_cpu *cpu_buffer)
2579 unsigned long commits;
2581 if (RB_WARN_ON(cpu_buffer,
2582 !local_read(&cpu_buffer->committing)))
2586 commits = local_read(&cpu_buffer->commits);
2587 /* synchronize with interrupts */
2589 if (local_read(&cpu_buffer->committing) == 1)
2590 rb_set_commit_to_write(cpu_buffer);
2592 local_dec(&cpu_buffer->committing);
2594 /* synchronize with interrupts */
2598 * Need to account for interrupts coming in between the
2599 * updating of the commit page and the clearing of the
2600 * committing counter.
2602 if (unlikely(local_read(&cpu_buffer->commits) != commits) &&
2603 !local_read(&cpu_buffer->committing)) {
2604 local_inc(&cpu_buffer->committing);
2609 static inline void rb_event_discard(struct ring_buffer_event *event)
2611 if (extended_time(event))
2612 event = skip_time_extend(event);
2614 /* array[0] holds the actual length for the discarded event */
2615 event->array[0] = rb_event_data_length(event) - RB_EVNT_HDR_SIZE;
2616 event->type_len = RINGBUF_TYPE_PADDING;
2617 /* time delta must be non zero */
2618 if (!event->time_delta)
2619 event->time_delta = 1;
2622 static __always_inline bool
2623 rb_event_is_commit(struct ring_buffer_per_cpu *cpu_buffer,
2624 struct ring_buffer_event *event)
2626 unsigned long addr = (unsigned long)event;
2627 unsigned long index;
2629 index = rb_event_index(event);
2632 return cpu_buffer->commit_page->page == (void *)addr &&
2633 rb_commit_index(cpu_buffer) == index;
2636 static __always_inline void
2637 rb_update_write_stamp(struct ring_buffer_per_cpu *cpu_buffer,
2638 struct ring_buffer_event *event)
2643 * The event first in the commit queue updates the
2646 if (rb_event_is_commit(cpu_buffer, event)) {
2648 * A commit event that is first on a page
2649 * updates the write timestamp with the page stamp
2651 if (!rb_event_index(event))
2652 cpu_buffer->write_stamp =
2653 cpu_buffer->commit_page->page->time_stamp;
2654 else if (event->type_len == RINGBUF_TYPE_TIME_EXTEND) {
2655 delta = ring_buffer_event_time_stamp(event);
2656 cpu_buffer->write_stamp += delta;
2657 } else if (event->type_len == RINGBUF_TYPE_TIME_STAMP) {
2658 delta = ring_buffer_event_time_stamp(event);
2659 cpu_buffer->write_stamp = delta;
2661 cpu_buffer->write_stamp += event->time_delta;
2665 static void rb_commit(struct ring_buffer_per_cpu *cpu_buffer,
2666 struct ring_buffer_event *event)
2668 local_inc(&cpu_buffer->entries);
2669 rb_update_write_stamp(cpu_buffer, event);
2670 rb_end_commit(cpu_buffer);
2673 static __always_inline void
2674 rb_wakeups(struct ring_buffer *buffer, struct ring_buffer_per_cpu *cpu_buffer)
2676 if (buffer->irq_work.waiters_pending) {
2677 buffer->irq_work.waiters_pending = false;
2678 /* irq_work_queue() supplies it's own memory barriers */
2679 irq_work_queue(&buffer->irq_work.work);
2682 if (cpu_buffer->irq_work.waiters_pending) {
2683 cpu_buffer->irq_work.waiters_pending = false;
2684 /* irq_work_queue() supplies it's own memory barriers */
2685 irq_work_queue(&cpu_buffer->irq_work.work);
2688 if (cpu_buffer->last_pages_touch == local_read(&cpu_buffer->pages_touched))
2691 if (cpu_buffer->reader_page == cpu_buffer->commit_page)
2694 if (!cpu_buffer->irq_work.full_waiters_pending)
2697 cpu_buffer->last_pages_touch = local_read(&cpu_buffer->pages_touched);
2699 if (!full_hit(buffer, cpu_buffer->cpu, cpu_buffer->shortest_full))
2702 cpu_buffer->irq_work.wakeup_full = true;
2703 cpu_buffer->irq_work.full_waiters_pending = false;
2704 /* irq_work_queue() supplies it's own memory barriers */
2705 irq_work_queue(&cpu_buffer->irq_work.work);
2709 * The lock and unlock are done within a preempt disable section.
2710 * The current_context per_cpu variable can only be modified
2711 * by the current task between lock and unlock. But it can
2712 * be modified more than once via an interrupt. To pass this
2713 * information from the lock to the unlock without having to
2714 * access the 'in_interrupt()' functions again (which do show
2715 * a bit of overhead in something as critical as function tracing,
2716 * we use a bitmask trick.
2718 * bit 1 = NMI context
2719 * bit 2 = IRQ context
2720 * bit 3 = SoftIRQ context
2721 * bit 4 = normal context.
2723 * This works because this is the order of contexts that can
2724 * preempt other contexts. A SoftIRQ never preempts an IRQ
2727 * When the context is determined, the corresponding bit is
2728 * checked and set (if it was set, then a recursion of that context
2731 * On unlock, we need to clear this bit. To do so, just subtract
2732 * 1 from the current_context and AND it to itself.
2736 * 101 & 100 = 100 (clearing bit zero)
2739 * 1010 & 1001 = 1000 (clearing bit 1)
2741 * The least significant bit can be cleared this way, and it
2742 * just so happens that it is the same bit corresponding to
2743 * the current context.
2745 * Now the TRANSITION bit breaks the above slightly. The TRANSITION bit
2746 * is set when a recursion is detected at the current context, and if
2747 * the TRANSITION bit is already set, it will fail the recursion.
2748 * This is needed because there's a lag between the changing of
2749 * interrupt context and updating the preempt count. In this case,
2750 * a false positive will be found. To handle this, one extra recursion
2751 * is allowed, and this is done by the TRANSITION bit. If the TRANSITION
2752 * bit is already set, then it is considered a recursion and the function
2753 * ends. Otherwise, the TRANSITION bit is set, and that bit is returned.
2755 * On the trace_recursive_unlock(), the TRANSITION bit will be the first
2756 * to be cleared. Even if it wasn't the context that set it. That is,
2757 * if an interrupt comes in while NORMAL bit is set and the ring buffer
2758 * is called before preempt_count() is updated, since the check will
2759 * be on the NORMAL bit, the TRANSITION bit will then be set. If an
2760 * NMI then comes in, it will set the NMI bit, but when the NMI code
2761 * does the trace_recursive_unlock() it will clear the TRANSTION bit
2762 * and leave the NMI bit set. But this is fine, because the interrupt
2763 * code that set the TRANSITION bit will then clear the NMI bit when it
2764 * calls trace_recursive_unlock(). If another NMI comes in, it will
2765 * set the TRANSITION bit and continue.
2767 * Note: The TRANSITION bit only handles a single transition between context.
2770 static __always_inline int
2771 trace_recursive_lock(struct ring_buffer_per_cpu *cpu_buffer)
2773 unsigned int val = cpu_buffer->current_context;
2774 unsigned long pc = preempt_count();
2777 if (!(pc & (NMI_MASK | HARDIRQ_MASK | SOFTIRQ_OFFSET)))
2778 bit = RB_CTX_NORMAL;
2780 bit = pc & NMI_MASK ? RB_CTX_NMI :
2781 pc & HARDIRQ_MASK ? RB_CTX_IRQ : RB_CTX_SOFTIRQ;
2783 if (unlikely(val & (1 << (bit + cpu_buffer->nest)))) {
2785 * It is possible that this was called by transitioning
2786 * between interrupt context, and preempt_count() has not
2787 * been updated yet. In this case, use the TRANSITION bit.
2789 bit = RB_CTX_TRANSITION;
2790 if (val & (1 << (bit + cpu_buffer->nest)))
2794 val |= (1 << (bit + cpu_buffer->nest));
2795 cpu_buffer->current_context = val;
2800 static __always_inline void
2801 trace_recursive_unlock(struct ring_buffer_per_cpu *cpu_buffer)
2803 cpu_buffer->current_context &=
2804 cpu_buffer->current_context - (1 << cpu_buffer->nest);
2807 /* The recursive locking above uses 5 bits */
2808 #define NESTED_BITS 5
2811 * ring_buffer_nest_start - Allow to trace while nested
2812 * @buffer: The ring buffer to modify
2814 * The ring buffer has a safety mechanism to prevent recursion.
2815 * But there may be a case where a trace needs to be done while
2816 * tracing something else. In this case, calling this function
2817 * will allow this function to nest within a currently active
2818 * ring_buffer_lock_reserve().
2820 * Call this function before calling another ring_buffer_lock_reserve() and
2821 * call ring_buffer_nest_end() after the nested ring_buffer_unlock_commit().
2823 void ring_buffer_nest_start(struct ring_buffer *buffer)
2825 struct ring_buffer_per_cpu *cpu_buffer;
2828 /* Enabled by ring_buffer_nest_end() */
2829 preempt_disable_notrace();
2830 cpu = raw_smp_processor_id();
2831 cpu_buffer = buffer->buffers[cpu];
2832 /* This is the shift value for the above recursive locking */
2833 cpu_buffer->nest += NESTED_BITS;
2837 * ring_buffer_nest_end - Allow to trace while nested
2838 * @buffer: The ring buffer to modify
2840 * Must be called after ring_buffer_nest_start() and after the
2841 * ring_buffer_unlock_commit().
2843 void ring_buffer_nest_end(struct ring_buffer *buffer)
2845 struct ring_buffer_per_cpu *cpu_buffer;
2848 /* disabled by ring_buffer_nest_start() */
2849 cpu = raw_smp_processor_id();
2850 cpu_buffer = buffer->buffers[cpu];
2851 /* This is the shift value for the above recursive locking */
2852 cpu_buffer->nest -= NESTED_BITS;
2853 preempt_enable_notrace();
2857 * ring_buffer_unlock_commit - commit a reserved
2858 * @buffer: The buffer to commit to
2859 * @event: The event pointer to commit.
2861 * This commits the data to the ring buffer, and releases any locks held.
2863 * Must be paired with ring_buffer_lock_reserve.
2865 int ring_buffer_unlock_commit(struct ring_buffer *buffer,
2866 struct ring_buffer_event *event)
2868 struct ring_buffer_per_cpu *cpu_buffer;
2869 int cpu = raw_smp_processor_id();
2871 cpu_buffer = buffer->buffers[cpu];
2873 rb_commit(cpu_buffer, event);
2875 rb_wakeups(buffer, cpu_buffer);
2877 trace_recursive_unlock(cpu_buffer);
2879 preempt_enable_notrace();
2883 EXPORT_SYMBOL_GPL(ring_buffer_unlock_commit);
2885 static noinline void
2886 rb_handle_timestamp(struct ring_buffer_per_cpu *cpu_buffer,
2887 struct rb_event_info *info)
2889 WARN_ONCE(info->delta > (1ULL << 59),
2890 KERN_WARNING "Delta way too big! %llu ts=%llu write stamp = %llu\n%s",
2891 (unsigned long long)info->delta,
2892 (unsigned long long)info->ts,
2893 (unsigned long long)cpu_buffer->write_stamp,
2894 sched_clock_stable() ? "" :
2895 "If you just came from a suspend/resume,\n"
2896 "please switch to the trace global clock:\n"
2897 " echo global > /sys/kernel/debug/tracing/trace_clock\n"
2898 "or add trace_clock=global to the kernel command line\n");
2899 info->add_timestamp = 1;
2902 static struct ring_buffer_event *
2903 __rb_reserve_next(struct ring_buffer_per_cpu *cpu_buffer,
2904 struct rb_event_info *info)
2906 struct ring_buffer_event *event;
2907 struct buffer_page *tail_page;
2908 unsigned long tail, write;
2911 * If the time delta since the last event is too big to
2912 * hold in the time field of the event, then we append a
2913 * TIME EXTEND event ahead of the data event.
2915 if (unlikely(info->add_timestamp))
2916 info->length += RB_LEN_TIME_EXTEND;
2918 /* Don't let the compiler play games with cpu_buffer->tail_page */
2919 tail_page = info->tail_page = READ_ONCE(cpu_buffer->tail_page);
2920 write = local_add_return(info->length, &tail_page->write);
2922 /* set write to only the index of the write */
2923 write &= RB_WRITE_MASK;
2924 tail = write - info->length;
2927 * If this is the first commit on the page, then it has the same
2928 * timestamp as the page itself.
2930 if (!tail && !ring_buffer_time_stamp_abs(cpu_buffer->buffer))
2933 /* See if we shot pass the end of this buffer page */
2934 if (unlikely(write > BUF_PAGE_SIZE))
2935 return rb_move_tail(cpu_buffer, tail, info);
2937 /* We reserved something on the buffer */
2939 event = __rb_page_index(tail_page, tail);
2940 rb_update_event(cpu_buffer, event, info);
2942 local_inc(&tail_page->entries);
2945 * If this is the first commit on the page, then update
2949 tail_page->page->time_stamp = info->ts;
2951 /* account for these added bytes */
2952 local_add(info->length, &cpu_buffer->entries_bytes);
2957 static __always_inline struct ring_buffer_event *
2958 rb_reserve_next_event(struct ring_buffer *buffer,
2959 struct ring_buffer_per_cpu *cpu_buffer,
2960 unsigned long length)
2962 struct ring_buffer_event *event;
2963 struct rb_event_info info;
2967 rb_start_commit(cpu_buffer);
2969 #ifdef CONFIG_RING_BUFFER_ALLOW_SWAP
2971 * Due to the ability to swap a cpu buffer from a buffer
2972 * it is possible it was swapped before we committed.
2973 * (committing stops a swap). We check for it here and
2974 * if it happened, we have to fail the write.
2977 if (unlikely(READ_ONCE(cpu_buffer->buffer) != buffer)) {
2978 local_dec(&cpu_buffer->committing);
2979 local_dec(&cpu_buffer->commits);
2984 info.length = rb_calculate_event_length(length);
2986 info.add_timestamp = 0;
2990 * We allow for interrupts to reenter here and do a trace.
2991 * If one does, it will cause this original code to loop
2992 * back here. Even with heavy interrupts happening, this
2993 * should only happen a few times in a row. If this happens
2994 * 1000 times in a row, there must be either an interrupt
2995 * storm or we have something buggy.
2998 if (RB_WARN_ON(cpu_buffer, ++nr_loops > 1000))
3001 info.ts = rb_time_stamp(cpu_buffer->buffer);
3002 diff = info.ts - cpu_buffer->write_stamp;
3004 /* make sure this diff is calculated here */
3007 if (ring_buffer_time_stamp_abs(buffer)) {
3008 info.delta = info.ts;
3009 rb_handle_timestamp(cpu_buffer, &info);
3010 } else /* Did the write stamp get updated already? */
3011 if (likely(info.ts >= cpu_buffer->write_stamp)) {
3013 if (unlikely(test_time_stamp(info.delta)))
3014 rb_handle_timestamp(cpu_buffer, &info);
3017 event = __rb_reserve_next(cpu_buffer, &info);
3019 if (unlikely(PTR_ERR(event) == -EAGAIN)) {
3020 if (info.add_timestamp)
3021 info.length -= RB_LEN_TIME_EXTEND;
3031 rb_end_commit(cpu_buffer);
3036 * ring_buffer_lock_reserve - reserve a part of the buffer
3037 * @buffer: the ring buffer to reserve from
3038 * @length: the length of the data to reserve (excluding event header)
3040 * Returns a reserved event on the ring buffer to copy directly to.
3041 * The user of this interface will need to get the body to write into
3042 * and can use the ring_buffer_event_data() interface.
3044 * The length is the length of the data needed, not the event length
3045 * which also includes the event header.
3047 * Must be paired with ring_buffer_unlock_commit, unless NULL is returned.
3048 * If NULL is returned, then nothing has been allocated or locked.
3050 struct ring_buffer_event *
3051 ring_buffer_lock_reserve(struct ring_buffer *buffer, unsigned long length)
3053 struct ring_buffer_per_cpu *cpu_buffer;
3054 struct ring_buffer_event *event;
3057 /* If we are tracing schedule, we don't want to recurse */
3058 preempt_disable_notrace();
3060 if (unlikely(atomic_read(&buffer->record_disabled)))
3063 cpu = raw_smp_processor_id();
3065 if (unlikely(!cpumask_test_cpu(cpu, buffer->cpumask)))
3068 cpu_buffer = buffer->buffers[cpu];
3070 if (unlikely(atomic_read(&cpu_buffer->record_disabled)))
3073 if (unlikely(length > BUF_MAX_DATA_SIZE))
3076 if (unlikely(trace_recursive_lock(cpu_buffer)))
3079 event = rb_reserve_next_event(buffer, cpu_buffer, length);
3086 trace_recursive_unlock(cpu_buffer);
3088 preempt_enable_notrace();
3091 EXPORT_SYMBOL_GPL(ring_buffer_lock_reserve);
3094 * Decrement the entries to the page that an event is on.
3095 * The event does not even need to exist, only the pointer
3096 * to the page it is on. This may only be called before the commit
3100 rb_decrement_entry(struct ring_buffer_per_cpu *cpu_buffer,
3101 struct ring_buffer_event *event)
3103 unsigned long addr = (unsigned long)event;
3104 struct buffer_page *bpage = cpu_buffer->commit_page;
3105 struct buffer_page *start;
3109 /* Do the likely case first */
3110 if (likely(bpage->page == (void *)addr)) {
3111 local_dec(&bpage->entries);
3116 * Because the commit page may be on the reader page we
3117 * start with the next page and check the end loop there.
3119 rb_inc_page(cpu_buffer, &bpage);
3122 if (bpage->page == (void *)addr) {
3123 local_dec(&bpage->entries);
3126 rb_inc_page(cpu_buffer, &bpage);
3127 } while (bpage != start);
3129 /* commit not part of this buffer?? */
3130 RB_WARN_ON(cpu_buffer, 1);
3134 * ring_buffer_commit_discard - discard an event that has not been committed
3135 * @buffer: the ring buffer
3136 * @event: non committed event to discard
3138 * Sometimes an event that is in the ring buffer needs to be ignored.
3139 * This function lets the user discard an event in the ring buffer
3140 * and then that event will not be read later.
3142 * This function only works if it is called before the item has been
3143 * committed. It will try to free the event from the ring buffer
3144 * if another event has not been added behind it.
3146 * If another event has been added behind it, it will set the event
3147 * up as discarded, and perform the commit.
3149 * If this function is called, do not call ring_buffer_unlock_commit on
3152 void ring_buffer_discard_commit(struct ring_buffer *buffer,
3153 struct ring_buffer_event *event)
3155 struct ring_buffer_per_cpu *cpu_buffer;
3158 /* The event is discarded regardless */
3159 rb_event_discard(event);
3161 cpu = smp_processor_id();
3162 cpu_buffer = buffer->buffers[cpu];
3165 * This must only be called if the event has not been
3166 * committed yet. Thus we can assume that preemption
3167 * is still disabled.
3169 RB_WARN_ON(buffer, !local_read(&cpu_buffer->committing));
3171 rb_decrement_entry(cpu_buffer, event);
3172 if (rb_try_to_discard(cpu_buffer, event))
3176 * The commit is still visible by the reader, so we
3177 * must still update the timestamp.
3179 rb_update_write_stamp(cpu_buffer, event);
3181 rb_end_commit(cpu_buffer);
3183 trace_recursive_unlock(cpu_buffer);
3185 preempt_enable_notrace();
3188 EXPORT_SYMBOL_GPL(ring_buffer_discard_commit);
3191 * ring_buffer_write - write data to the buffer without reserving
3192 * @buffer: The ring buffer to write to.
3193 * @length: The length of the data being written (excluding the event header)
3194 * @data: The data to write to the buffer.
3196 * This is like ring_buffer_lock_reserve and ring_buffer_unlock_commit as
3197 * one function. If you already have the data to write to the buffer, it
3198 * may be easier to simply call this function.
3200 * Note, like ring_buffer_lock_reserve, the length is the length of the data
3201 * and not the length of the event which would hold the header.
3203 int ring_buffer_write(struct ring_buffer *buffer,
3204 unsigned long length,
3207 struct ring_buffer_per_cpu *cpu_buffer;
3208 struct ring_buffer_event *event;
3213 preempt_disable_notrace();
3215 if (atomic_read(&buffer->record_disabled))
3218 cpu = raw_smp_processor_id();
3220 if (!cpumask_test_cpu(cpu, buffer->cpumask))
3223 cpu_buffer = buffer->buffers[cpu];
3225 if (atomic_read(&cpu_buffer->record_disabled))
3228 if (length > BUF_MAX_DATA_SIZE)
3231 if (unlikely(trace_recursive_lock(cpu_buffer)))
3234 event = rb_reserve_next_event(buffer, cpu_buffer, length);
3238 body = rb_event_data(event);
3240 memcpy(body, data, length);
3242 rb_commit(cpu_buffer, event);
3244 rb_wakeups(buffer, cpu_buffer);
3249 trace_recursive_unlock(cpu_buffer);
3252 preempt_enable_notrace();
3256 EXPORT_SYMBOL_GPL(ring_buffer_write);
3258 static bool rb_per_cpu_empty(struct ring_buffer_per_cpu *cpu_buffer)
3260 struct buffer_page *reader = cpu_buffer->reader_page;
3261 struct buffer_page *head = rb_set_head_page(cpu_buffer);
3262 struct buffer_page *commit = cpu_buffer->commit_page;
3264 /* In case of error, head will be NULL */
3265 if (unlikely(!head))
3268 /* Reader should exhaust content in reader page */
3269 if (reader->read != rb_page_commit(reader))
3273 * If writers are committing on the reader page, knowing all
3274 * committed content has been read, the ring buffer is empty.
3276 if (commit == reader)
3280 * If writers are committing on a page other than reader page
3281 * and head page, there should always be content to read.
3287 * Writers are committing on the head page, we just need
3288 * to care about there're committed data, and the reader will
3289 * swap reader page with head page when it is to read data.
3291 return rb_page_commit(commit) == 0;
3295 * ring_buffer_record_disable - stop all writes into the buffer
3296 * @buffer: The ring buffer to stop writes to.
3298 * This prevents all writes to the buffer. Any attempt to write
3299 * to the buffer after this will fail and return NULL.
3301 * The caller should call synchronize_rcu() after this.
3303 void ring_buffer_record_disable(struct ring_buffer *buffer)
3305 atomic_inc(&buffer->record_disabled);
3307 EXPORT_SYMBOL_GPL(ring_buffer_record_disable);
3310 * ring_buffer_record_enable - enable writes to the buffer
3311 * @buffer: The ring buffer to enable writes
3313 * Note, multiple disables will need the same number of enables
3314 * to truly enable the writing (much like preempt_disable).
3316 void ring_buffer_record_enable(struct ring_buffer *buffer)
3318 atomic_dec(&buffer->record_disabled);
3320 EXPORT_SYMBOL_GPL(ring_buffer_record_enable);
3323 * ring_buffer_record_off - stop all writes into the buffer
3324 * @buffer: The ring buffer to stop writes to.
3326 * This prevents all writes to the buffer. Any attempt to write
3327 * to the buffer after this will fail and return NULL.
3329 * This is different than ring_buffer_record_disable() as
3330 * it works like an on/off switch, where as the disable() version
3331 * must be paired with a enable().
3333 void ring_buffer_record_off(struct ring_buffer *buffer)
3336 unsigned int new_rd;
3339 rd = atomic_read(&buffer->record_disabled);
3340 new_rd = rd | RB_BUFFER_OFF;
3341 } while (atomic_cmpxchg(&buffer->record_disabled, rd, new_rd) != rd);
3343 EXPORT_SYMBOL_GPL(ring_buffer_record_off);
3346 * ring_buffer_record_on - restart writes into the buffer
3347 * @buffer: The ring buffer to start writes to.
3349 * This enables all writes to the buffer that was disabled by
3350 * ring_buffer_record_off().
3352 * This is different than ring_buffer_record_enable() as
3353 * it works like an on/off switch, where as the enable() version
3354 * must be paired with a disable().
3356 void ring_buffer_record_on(struct ring_buffer *buffer)
3359 unsigned int new_rd;
3362 rd = atomic_read(&buffer->record_disabled);
3363 new_rd = rd & ~RB_BUFFER_OFF;
3364 } while (atomic_cmpxchg(&buffer->record_disabled, rd, new_rd) != rd);
3366 EXPORT_SYMBOL_GPL(ring_buffer_record_on);
3369 * ring_buffer_record_is_on - return true if the ring buffer can write
3370 * @buffer: The ring buffer to see if write is enabled
3372 * Returns true if the ring buffer is in a state that it accepts writes.
3374 bool ring_buffer_record_is_on(struct ring_buffer *buffer)
3376 return !atomic_read(&buffer->record_disabled);
3380 * ring_buffer_record_is_set_on - return true if the ring buffer is set writable
3381 * @buffer: The ring buffer to see if write is set enabled
3383 * Returns true if the ring buffer is set writable by ring_buffer_record_on().
3384 * Note that this does NOT mean it is in a writable state.
3386 * It may return true when the ring buffer has been disabled by
3387 * ring_buffer_record_disable(), as that is a temporary disabling of
3390 bool ring_buffer_record_is_set_on(struct ring_buffer *buffer)
3392 return !(atomic_read(&buffer->record_disabled) & RB_BUFFER_OFF);
3396 * ring_buffer_record_disable_cpu - stop all writes into the cpu_buffer
3397 * @buffer: The ring buffer to stop writes to.
3398 * @cpu: The CPU buffer to stop
3400 * This prevents all writes to the buffer. Any attempt to write
3401 * to the buffer after this will fail and return NULL.
3403 * The caller should call synchronize_rcu() after this.
3405 void ring_buffer_record_disable_cpu(struct ring_buffer *buffer, int cpu)
3407 struct ring_buffer_per_cpu *cpu_buffer;
3409 if (!cpumask_test_cpu(cpu, buffer->cpumask))
3412 cpu_buffer = buffer->buffers[cpu];
3413 atomic_inc(&cpu_buffer->record_disabled);
3415 EXPORT_SYMBOL_GPL(ring_buffer_record_disable_cpu);
3418 * ring_buffer_record_enable_cpu - enable writes to the buffer
3419 * @buffer: The ring buffer to enable writes
3420 * @cpu: The CPU to enable.
3422 * Note, multiple disables will need the same number of enables
3423 * to truly enable the writing (much like preempt_disable).
3425 void ring_buffer_record_enable_cpu(struct ring_buffer *buffer, int cpu)
3427 struct ring_buffer_per_cpu *cpu_buffer;
3429 if (!cpumask_test_cpu(cpu, buffer->cpumask))
3432 cpu_buffer = buffer->buffers[cpu];
3433 atomic_dec(&cpu_buffer->record_disabled);
3435 EXPORT_SYMBOL_GPL(ring_buffer_record_enable_cpu);
3438 * The total entries in the ring buffer is the running counter
3439 * of entries entered into the ring buffer, minus the sum of
3440 * the entries read from the ring buffer and the number of
3441 * entries that were overwritten.
3443 static inline unsigned long
3444 rb_num_of_entries(struct ring_buffer_per_cpu *cpu_buffer)
3446 return local_read(&cpu_buffer->entries) -
3447 (local_read(&cpu_buffer->overrun) + cpu_buffer->read);
3451 * ring_buffer_oldest_event_ts - get the oldest event timestamp from the buffer
3452 * @buffer: The ring buffer
3453 * @cpu: The per CPU buffer to read from.
3455 u64 ring_buffer_oldest_event_ts(struct ring_buffer *buffer, int cpu)
3457 unsigned long flags;
3458 struct ring_buffer_per_cpu *cpu_buffer;
3459 struct buffer_page *bpage;
3462 if (!cpumask_test_cpu(cpu, buffer->cpumask))
3465 cpu_buffer = buffer->buffers[cpu];
3466 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
3468 * if the tail is on reader_page, oldest time stamp is on the reader
3471 if (cpu_buffer->tail_page == cpu_buffer->reader_page)
3472 bpage = cpu_buffer->reader_page;
3474 bpage = rb_set_head_page(cpu_buffer);
3476 ret = bpage->page->time_stamp;
3477 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
3481 EXPORT_SYMBOL_GPL(ring_buffer_oldest_event_ts);
3484 * ring_buffer_bytes_cpu - get the number of bytes consumed in a cpu buffer
3485 * @buffer: The ring buffer
3486 * @cpu: The per CPU buffer to read from.
3488 unsigned long ring_buffer_bytes_cpu(struct ring_buffer *buffer, int cpu)
3490 struct ring_buffer_per_cpu *cpu_buffer;
3493 if (!cpumask_test_cpu(cpu, buffer->cpumask))
3496 cpu_buffer = buffer->buffers[cpu];
3497 ret = local_read(&cpu_buffer->entries_bytes) - cpu_buffer->read_bytes;
3501 EXPORT_SYMBOL_GPL(ring_buffer_bytes_cpu);
3504 * ring_buffer_entries_cpu - get the number of entries in a cpu buffer
3505 * @buffer: The ring buffer
3506 * @cpu: The per CPU buffer to get the entries from.
3508 unsigned long ring_buffer_entries_cpu(struct ring_buffer *buffer, int cpu)
3510 struct ring_buffer_per_cpu *cpu_buffer;
3512 if (!cpumask_test_cpu(cpu, buffer->cpumask))
3515 cpu_buffer = buffer->buffers[cpu];
3517 return rb_num_of_entries(cpu_buffer);
3519 EXPORT_SYMBOL_GPL(ring_buffer_entries_cpu);
3522 * ring_buffer_overrun_cpu - get the number of overruns caused by the ring
3523 * buffer wrapping around (only if RB_FL_OVERWRITE is on).
3524 * @buffer: The ring buffer
3525 * @cpu: The per CPU buffer to get the number of overruns from
3527 unsigned long ring_buffer_overrun_cpu(struct ring_buffer *buffer, int cpu)
3529 struct ring_buffer_per_cpu *cpu_buffer;
3532 if (!cpumask_test_cpu(cpu, buffer->cpumask))
3535 cpu_buffer = buffer->buffers[cpu];
3536 ret = local_read(&cpu_buffer->overrun);
3540 EXPORT_SYMBOL_GPL(ring_buffer_overrun_cpu);
3543 * ring_buffer_commit_overrun_cpu - get the number of overruns caused by
3544 * commits failing due to the buffer wrapping around while there are uncommitted
3545 * events, such as during an interrupt storm.
3546 * @buffer: The ring buffer
3547 * @cpu: The per CPU buffer to get the number of overruns from
3550 ring_buffer_commit_overrun_cpu(struct ring_buffer *buffer, int cpu)
3552 struct ring_buffer_per_cpu *cpu_buffer;
3555 if (!cpumask_test_cpu(cpu, buffer->cpumask))
3558 cpu_buffer = buffer->buffers[cpu];
3559 ret = local_read(&cpu_buffer->commit_overrun);
3563 EXPORT_SYMBOL_GPL(ring_buffer_commit_overrun_cpu);
3566 * ring_buffer_dropped_events_cpu - get the number of dropped events caused by
3567 * the ring buffer filling up (only if RB_FL_OVERWRITE is off).
3568 * @buffer: The ring buffer
3569 * @cpu: The per CPU buffer to get the number of overruns from
3572 ring_buffer_dropped_events_cpu(struct ring_buffer *buffer, int cpu)
3574 struct ring_buffer_per_cpu *cpu_buffer;
3577 if (!cpumask_test_cpu(cpu, buffer->cpumask))
3580 cpu_buffer = buffer->buffers[cpu];
3581 ret = local_read(&cpu_buffer->dropped_events);
3585 EXPORT_SYMBOL_GPL(ring_buffer_dropped_events_cpu);
3588 * ring_buffer_read_events_cpu - get the number of events successfully read
3589 * @buffer: The ring buffer
3590 * @cpu: The per CPU buffer to get the number of events read
3593 ring_buffer_read_events_cpu(struct ring_buffer *buffer, int cpu)
3595 struct ring_buffer_per_cpu *cpu_buffer;
3597 if (!cpumask_test_cpu(cpu, buffer->cpumask))
3600 cpu_buffer = buffer->buffers[cpu];
3601 return cpu_buffer->read;
3603 EXPORT_SYMBOL_GPL(ring_buffer_read_events_cpu);
3606 * ring_buffer_entries - get the number of entries in a buffer
3607 * @buffer: The ring buffer
3609 * Returns the total number of entries in the ring buffer
3612 unsigned long ring_buffer_entries(struct ring_buffer *buffer)
3614 struct ring_buffer_per_cpu *cpu_buffer;
3615 unsigned long entries = 0;
3618 /* if you care about this being correct, lock the buffer */
3619 for_each_buffer_cpu(buffer, cpu) {
3620 cpu_buffer = buffer->buffers[cpu];
3621 entries += rb_num_of_entries(cpu_buffer);
3626 EXPORT_SYMBOL_GPL(ring_buffer_entries);
3629 * ring_buffer_overruns - get the number of overruns in buffer
3630 * @buffer: The ring buffer
3632 * Returns the total number of overruns in the ring buffer
3635 unsigned long ring_buffer_overruns(struct ring_buffer *buffer)
3637 struct ring_buffer_per_cpu *cpu_buffer;
3638 unsigned long overruns = 0;
3641 /* if you care about this being correct, lock the buffer */
3642 for_each_buffer_cpu(buffer, cpu) {
3643 cpu_buffer = buffer->buffers[cpu];
3644 overruns += local_read(&cpu_buffer->overrun);
3649 EXPORT_SYMBOL_GPL(ring_buffer_overruns);
3651 static void rb_iter_reset(struct ring_buffer_iter *iter)
3653 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer;
3655 /* Iterator usage is expected to have record disabled */
3656 iter->head_page = cpu_buffer->reader_page;
3657 iter->head = cpu_buffer->reader_page->read;
3659 iter->cache_reader_page = iter->head_page;
3660 iter->cache_read = cpu_buffer->read;
3661 iter->cache_pages_removed = cpu_buffer->pages_removed;
3664 iter->read_stamp = cpu_buffer->read_stamp;
3666 iter->read_stamp = iter->head_page->page->time_stamp;
3670 * ring_buffer_iter_reset - reset an iterator
3671 * @iter: The iterator to reset
3673 * Resets the iterator, so that it will start from the beginning
3676 void ring_buffer_iter_reset(struct ring_buffer_iter *iter)
3678 struct ring_buffer_per_cpu *cpu_buffer;
3679 unsigned long flags;
3684 cpu_buffer = iter->cpu_buffer;
3686 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
3687 rb_iter_reset(iter);
3688 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
3690 EXPORT_SYMBOL_GPL(ring_buffer_iter_reset);
3693 * ring_buffer_iter_empty - check if an iterator has no more to read
3694 * @iter: The iterator to check
3696 int ring_buffer_iter_empty(struct ring_buffer_iter *iter)
3698 struct ring_buffer_per_cpu *cpu_buffer;
3699 struct buffer_page *reader;
3700 struct buffer_page *head_page;
3701 struct buffer_page *commit_page;
3704 cpu_buffer = iter->cpu_buffer;
3706 /* Remember, trace recording is off when iterator is in use */
3707 reader = cpu_buffer->reader_page;
3708 head_page = cpu_buffer->head_page;
3709 commit_page = cpu_buffer->commit_page;
3710 commit = rb_page_commit(commit_page);
3712 return ((iter->head_page == commit_page && iter->head == commit) ||
3713 (iter->head_page == reader && commit_page == head_page &&
3714 head_page->read == commit &&
3715 iter->head == rb_page_commit(cpu_buffer->reader_page)));
3717 EXPORT_SYMBOL_GPL(ring_buffer_iter_empty);
3720 rb_update_read_stamp(struct ring_buffer_per_cpu *cpu_buffer,
3721 struct ring_buffer_event *event)
3725 switch (event->type_len) {
3726 case RINGBUF_TYPE_PADDING:
3729 case RINGBUF_TYPE_TIME_EXTEND:
3730 delta = ring_buffer_event_time_stamp(event);
3731 cpu_buffer->read_stamp += delta;
3734 case RINGBUF_TYPE_TIME_STAMP:
3735 delta = ring_buffer_event_time_stamp(event);
3736 cpu_buffer->read_stamp = delta;
3739 case RINGBUF_TYPE_DATA:
3740 cpu_buffer->read_stamp += event->time_delta;
3750 rb_update_iter_read_stamp(struct ring_buffer_iter *iter,
3751 struct ring_buffer_event *event)
3755 switch (event->type_len) {
3756 case RINGBUF_TYPE_PADDING:
3759 case RINGBUF_TYPE_TIME_EXTEND:
3760 delta = ring_buffer_event_time_stamp(event);
3761 iter->read_stamp += delta;
3764 case RINGBUF_TYPE_TIME_STAMP:
3765 delta = ring_buffer_event_time_stamp(event);
3766 iter->read_stamp = delta;
3769 case RINGBUF_TYPE_DATA:
3770 iter->read_stamp += event->time_delta;
3779 static struct buffer_page *
3780 rb_get_reader_page(struct ring_buffer_per_cpu *cpu_buffer)
3782 struct buffer_page *reader = NULL;
3783 unsigned long overwrite;
3784 unsigned long flags;
3788 local_irq_save(flags);
3789 arch_spin_lock(&cpu_buffer->lock);
3793 * This should normally only loop twice. But because the
3794 * start of the reader inserts an empty page, it causes
3795 * a case where we will loop three times. There should be no
3796 * reason to loop four times (that I know of).
3798 if (RB_WARN_ON(cpu_buffer, ++nr_loops > 3)) {
3803 reader = cpu_buffer->reader_page;
3805 /* If there's more to read, return this page */
3806 if (cpu_buffer->reader_page->read < rb_page_size(reader))
3809 /* Never should we have an index greater than the size */
3810 if (RB_WARN_ON(cpu_buffer,
3811 cpu_buffer->reader_page->read > rb_page_size(reader)))
3814 /* check if we caught up to the tail */
3816 if (cpu_buffer->commit_page == cpu_buffer->reader_page)
3819 /* Don't bother swapping if the ring buffer is empty */
3820 if (rb_num_of_entries(cpu_buffer) == 0)
3824 * Reset the reader page to size zero.
3826 local_set(&cpu_buffer->reader_page->write, 0);
3827 local_set(&cpu_buffer->reader_page->entries, 0);
3828 local_set(&cpu_buffer->reader_page->page->commit, 0);
3829 cpu_buffer->reader_page->real_end = 0;
3833 * Splice the empty reader page into the list around the head.
3835 reader = rb_set_head_page(cpu_buffer);
3838 cpu_buffer->reader_page->list.next = rb_list_head(reader->list.next);
3839 cpu_buffer->reader_page->list.prev = reader->list.prev;
3842 * cpu_buffer->pages just needs to point to the buffer, it
3843 * has no specific buffer page to point to. Lets move it out
3844 * of our way so we don't accidentally swap it.
3846 cpu_buffer->pages = reader->list.prev;
3848 /* The reader page will be pointing to the new head */
3849 rb_set_list_to_head(cpu_buffer, &cpu_buffer->reader_page->list);
3852 * We want to make sure we read the overruns after we set up our
3853 * pointers to the next object. The writer side does a
3854 * cmpxchg to cross pages which acts as the mb on the writer
3855 * side. Note, the reader will constantly fail the swap
3856 * while the writer is updating the pointers, so this
3857 * guarantees that the overwrite recorded here is the one we
3858 * want to compare with the last_overrun.
3861 overwrite = local_read(&(cpu_buffer->overrun));
3864 * Here's the tricky part.
3866 * We need to move the pointer past the header page.
3867 * But we can only do that if a writer is not currently
3868 * moving it. The page before the header page has the
3869 * flag bit '1' set if it is pointing to the page we want.
3870 * but if the writer is in the process of moving it
3871 * than it will be '2' or already moved '0'.
3874 ret = rb_head_page_replace(reader, cpu_buffer->reader_page);
3877 * If we did not convert it, then we must try again.
3883 * Yay! We succeeded in replacing the page.
3885 * Now make the new head point back to the reader page.
3887 rb_list_head(reader->list.next)->prev = &cpu_buffer->reader_page->list;
3888 rb_inc_page(cpu_buffer, &cpu_buffer->head_page);
3890 local_inc(&cpu_buffer->pages_read);
3892 /* Finally update the reader page to the new head */
3893 cpu_buffer->reader_page = reader;
3894 cpu_buffer->reader_page->read = 0;
3896 if (overwrite != cpu_buffer->last_overrun) {
3897 cpu_buffer->lost_events = overwrite - cpu_buffer->last_overrun;
3898 cpu_buffer->last_overrun = overwrite;
3904 /* Update the read_stamp on the first event */
3905 if (reader && reader->read == 0)
3906 cpu_buffer->read_stamp = reader->page->time_stamp;
3908 arch_spin_unlock(&cpu_buffer->lock);
3909 local_irq_restore(flags);
3912 * The writer has preempt disable, wait for it. But not forever
3913 * Although, 1 second is pretty much "forever"
3915 #define USECS_WAIT 1000000
3916 for (nr_loops = 0; nr_loops < USECS_WAIT; nr_loops++) {
3917 /* If the write is past the end of page, a writer is still updating it */
3918 if (likely(!reader || rb_page_write(reader) <= BUF_PAGE_SIZE))
3923 /* Get the latest version of the reader write value */
3927 /* The writer is not moving forward? Something is wrong */
3928 if (RB_WARN_ON(cpu_buffer, nr_loops == USECS_WAIT))
3932 * Make sure we see any padding after the write update
3933 * (see rb_reset_tail()).
3935 * In addition, a writer may be writing on the reader page
3936 * if the page has not been fully filled, so the read barrier
3937 * is also needed to make sure we see the content of what is
3938 * committed by the writer (see rb_set_commit_to_write()).
3946 static void rb_advance_reader(struct ring_buffer_per_cpu *cpu_buffer)
3948 struct ring_buffer_event *event;
3949 struct buffer_page *reader;
3952 reader = rb_get_reader_page(cpu_buffer);
3954 /* This function should not be called when buffer is empty */
3955 if (RB_WARN_ON(cpu_buffer, !reader))
3958 event = rb_reader_event(cpu_buffer);
3960 if (event->type_len <= RINGBUF_TYPE_DATA_TYPE_LEN_MAX)
3963 rb_update_read_stamp(cpu_buffer, event);
3965 length = rb_event_length(event);
3966 cpu_buffer->reader_page->read += length;
3969 static void rb_advance_iter(struct ring_buffer_iter *iter)
3971 struct ring_buffer_per_cpu *cpu_buffer;
3972 struct ring_buffer_event *event;
3975 cpu_buffer = iter->cpu_buffer;
3978 * Check if we are at the end of the buffer.
3980 if (iter->head >= rb_page_size(iter->head_page)) {
3981 /* discarded commits can make the page empty */
3982 if (iter->head_page == cpu_buffer->commit_page)
3988 event = rb_iter_head_event(iter);
3990 length = rb_event_length(event);
3993 * This should not be called to advance the header if we are
3994 * at the tail of the buffer.
3996 if (RB_WARN_ON(cpu_buffer,
3997 (iter->head_page == cpu_buffer->commit_page) &&
3998 (iter->head + length > rb_commit_index(cpu_buffer))))
4001 rb_update_iter_read_stamp(iter, event);
4003 iter->head += length;
4005 /* check for end of page padding */
4006 if ((iter->head >= rb_page_size(iter->head_page)) &&
4007 (iter->head_page != cpu_buffer->commit_page))
4011 static int rb_lost_events(struct ring_buffer_per_cpu *cpu_buffer)
4013 return cpu_buffer->lost_events;
4016 static struct ring_buffer_event *
4017 rb_buffer_peek(struct ring_buffer_per_cpu *cpu_buffer, u64 *ts,
4018 unsigned long *lost_events)
4020 struct ring_buffer_event *event;
4021 struct buffer_page *reader;
4028 * We repeat when a time extend is encountered.
4029 * Since the time extend is always attached to a data event,
4030 * we should never loop more than once.
4031 * (We never hit the following condition more than twice).
4033 if (RB_WARN_ON(cpu_buffer, ++nr_loops > 2))
4036 reader = rb_get_reader_page(cpu_buffer);
4040 event = rb_reader_event(cpu_buffer);
4042 switch (event->type_len) {
4043 case RINGBUF_TYPE_PADDING:
4044 if (rb_null_event(event))
4045 RB_WARN_ON(cpu_buffer, 1);
4047 * Because the writer could be discarding every
4048 * event it creates (which would probably be bad)
4049 * if we were to go back to "again" then we may never
4050 * catch up, and will trigger the warn on, or lock
4051 * the box. Return the padding, and we will release
4052 * the current locks, and try again.
4056 case RINGBUF_TYPE_TIME_EXTEND:
4057 /* Internal data, OK to advance */
4058 rb_advance_reader(cpu_buffer);
4061 case RINGBUF_TYPE_TIME_STAMP:
4063 *ts = ring_buffer_event_time_stamp(event);
4064 ring_buffer_normalize_time_stamp(cpu_buffer->buffer,
4065 cpu_buffer->cpu, ts);
4067 /* Internal data, OK to advance */
4068 rb_advance_reader(cpu_buffer);
4071 case RINGBUF_TYPE_DATA:
4073 *ts = cpu_buffer->read_stamp + event->time_delta;
4074 ring_buffer_normalize_time_stamp(cpu_buffer->buffer,
4075 cpu_buffer->cpu, ts);
4078 *lost_events = rb_lost_events(cpu_buffer);
4087 EXPORT_SYMBOL_GPL(ring_buffer_peek);
4089 static struct ring_buffer_event *
4090 rb_iter_peek(struct ring_buffer_iter *iter, u64 *ts)
4092 struct ring_buffer *buffer;
4093 struct ring_buffer_per_cpu *cpu_buffer;
4094 struct ring_buffer_event *event;
4100 cpu_buffer = iter->cpu_buffer;
4101 buffer = cpu_buffer->buffer;
4104 * Check if someone performed a consuming read to the buffer
4105 * or removed some pages from the buffer. In these cases,
4106 * iterator was invalidated and we need to reset it.
4108 if (unlikely(iter->cache_read != cpu_buffer->read ||
4109 iter->cache_reader_page != cpu_buffer->reader_page ||
4110 iter->cache_pages_removed != cpu_buffer->pages_removed))
4111 rb_iter_reset(iter);
4114 if (ring_buffer_iter_empty(iter))
4118 * We repeat when a time extend is encountered or we hit
4119 * the end of the page. Since the time extend is always attached
4120 * to a data event, we should never loop more than three times.
4121 * Once for going to next page, once on time extend, and
4122 * finally once to get the event.
4123 * (We never hit the following condition more than thrice).
4125 if (RB_WARN_ON(cpu_buffer, ++nr_loops > 3))
4128 if (rb_per_cpu_empty(cpu_buffer))
4131 if (iter->head >= rb_page_size(iter->head_page)) {
4136 event = rb_iter_head_event(iter);
4138 switch (event->type_len) {
4139 case RINGBUF_TYPE_PADDING:
4140 if (rb_null_event(event)) {
4144 rb_advance_iter(iter);
4147 case RINGBUF_TYPE_TIME_EXTEND:
4148 /* Internal data, OK to advance */
4149 rb_advance_iter(iter);
4152 case RINGBUF_TYPE_TIME_STAMP:
4154 *ts = ring_buffer_event_time_stamp(event);
4155 ring_buffer_normalize_time_stamp(cpu_buffer->buffer,
4156 cpu_buffer->cpu, ts);
4158 /* Internal data, OK to advance */
4159 rb_advance_iter(iter);
4162 case RINGBUF_TYPE_DATA:
4164 *ts = iter->read_stamp + event->time_delta;
4165 ring_buffer_normalize_time_stamp(buffer,
4166 cpu_buffer->cpu, ts);
4176 EXPORT_SYMBOL_GPL(ring_buffer_iter_peek);
4178 static inline bool rb_reader_lock(struct ring_buffer_per_cpu *cpu_buffer)
4180 if (likely(!in_nmi())) {
4181 raw_spin_lock(&cpu_buffer->reader_lock);
4186 * If an NMI die dumps out the content of the ring buffer
4187 * trylock must be used to prevent a deadlock if the NMI
4188 * preempted a task that holds the ring buffer locks. If
4189 * we get the lock then all is fine, if not, then continue
4190 * to do the read, but this can corrupt the ring buffer,
4191 * so it must be permanently disabled from future writes.
4192 * Reading from NMI is a oneshot deal.
4194 if (raw_spin_trylock(&cpu_buffer->reader_lock))
4197 /* Continue without locking, but disable the ring buffer */
4198 atomic_inc(&cpu_buffer->record_disabled);
4203 rb_reader_unlock(struct ring_buffer_per_cpu *cpu_buffer, bool locked)
4206 raw_spin_unlock(&cpu_buffer->reader_lock);
4211 * ring_buffer_peek - peek at the next event to be read
4212 * @buffer: The ring buffer to read
4213 * @cpu: The cpu to peak at
4214 * @ts: The timestamp counter of this event.
4215 * @lost_events: a variable to store if events were lost (may be NULL)
4217 * This will return the event that will be read next, but does
4218 * not consume the data.
4220 struct ring_buffer_event *
4221 ring_buffer_peek(struct ring_buffer *buffer, int cpu, u64 *ts,
4222 unsigned long *lost_events)
4224 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu];
4225 struct ring_buffer_event *event;
4226 unsigned long flags;
4229 if (!cpumask_test_cpu(cpu, buffer->cpumask))
4233 local_irq_save(flags);
4234 dolock = rb_reader_lock(cpu_buffer);
4235 event = rb_buffer_peek(cpu_buffer, ts, lost_events);
4236 if (event && event->type_len == RINGBUF_TYPE_PADDING)
4237 rb_advance_reader(cpu_buffer);
4238 rb_reader_unlock(cpu_buffer, dolock);
4239 local_irq_restore(flags);
4241 if (event && event->type_len == RINGBUF_TYPE_PADDING)
4248 * ring_buffer_iter_peek - peek at the next event to be read
4249 * @iter: The ring buffer iterator
4250 * @ts: The timestamp counter of this event.
4252 * This will return the event that will be read next, but does
4253 * not increment the iterator.
4255 struct ring_buffer_event *
4256 ring_buffer_iter_peek(struct ring_buffer_iter *iter, u64 *ts)
4258 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer;
4259 struct ring_buffer_event *event;
4260 unsigned long flags;
4263 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
4264 event = rb_iter_peek(iter, ts);
4265 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
4267 if (event && event->type_len == RINGBUF_TYPE_PADDING)
4274 * ring_buffer_consume - return an event and consume it
4275 * @buffer: The ring buffer to get the next event from
4276 * @cpu: the cpu to read the buffer from
4277 * @ts: a variable to store the timestamp (may be NULL)
4278 * @lost_events: a variable to store if events were lost (may be NULL)
4280 * Returns the next event in the ring buffer, and that event is consumed.
4281 * Meaning, that sequential reads will keep returning a different event,
4282 * and eventually empty the ring buffer if the producer is slower.
4284 struct ring_buffer_event *
4285 ring_buffer_consume(struct ring_buffer *buffer, int cpu, u64 *ts,
4286 unsigned long *lost_events)
4288 struct ring_buffer_per_cpu *cpu_buffer;
4289 struct ring_buffer_event *event = NULL;
4290 unsigned long flags;
4294 /* might be called in atomic */
4297 if (!cpumask_test_cpu(cpu, buffer->cpumask))
4300 cpu_buffer = buffer->buffers[cpu];
4301 local_irq_save(flags);
4302 dolock = rb_reader_lock(cpu_buffer);
4304 event = rb_buffer_peek(cpu_buffer, ts, lost_events);
4306 cpu_buffer->lost_events = 0;
4307 rb_advance_reader(cpu_buffer);
4310 rb_reader_unlock(cpu_buffer, dolock);
4311 local_irq_restore(flags);
4316 if (event && event->type_len == RINGBUF_TYPE_PADDING)
4321 EXPORT_SYMBOL_GPL(ring_buffer_consume);
4324 * ring_buffer_read_prepare - Prepare for a non consuming read of the buffer
4325 * @buffer: The ring buffer to read from
4326 * @cpu: The cpu buffer to iterate over
4327 * @flags: gfp flags to use for memory allocation
4329 * This performs the initial preparations necessary to iterate
4330 * through the buffer. Memory is allocated, buffer recording
4331 * is disabled, and the iterator pointer is returned to the caller.
4333 * Disabling buffer recording prevents the reading from being
4334 * corrupted. This is not a consuming read, so a producer is not
4337 * After a sequence of ring_buffer_read_prepare calls, the user is
4338 * expected to make at least one call to ring_buffer_read_prepare_sync.
4339 * Afterwards, ring_buffer_read_start is invoked to get things going
4342 * This overall must be paired with ring_buffer_read_finish.
4344 struct ring_buffer_iter *
4345 ring_buffer_read_prepare(struct ring_buffer *buffer, int cpu, gfp_t flags)
4347 struct ring_buffer_per_cpu *cpu_buffer;
4348 struct ring_buffer_iter *iter;
4350 if (!cpumask_test_cpu(cpu, buffer->cpumask))
4353 iter = kmalloc(sizeof(*iter), flags);
4357 cpu_buffer = buffer->buffers[cpu];
4359 iter->cpu_buffer = cpu_buffer;
4361 atomic_inc(&buffer->resize_disabled);
4362 atomic_inc(&cpu_buffer->record_disabled);
4366 EXPORT_SYMBOL_GPL(ring_buffer_read_prepare);
4369 * ring_buffer_read_prepare_sync - Synchronize a set of prepare calls
4371 * All previously invoked ring_buffer_read_prepare calls to prepare
4372 * iterators will be synchronized. Afterwards, read_buffer_read_start
4373 * calls on those iterators are allowed.
4376 ring_buffer_read_prepare_sync(void)
4380 EXPORT_SYMBOL_GPL(ring_buffer_read_prepare_sync);
4383 * ring_buffer_read_start - start a non consuming read of the buffer
4384 * @iter: The iterator returned by ring_buffer_read_prepare
4386 * This finalizes the startup of an iteration through the buffer.
4387 * The iterator comes from a call to ring_buffer_read_prepare and
4388 * an intervening ring_buffer_read_prepare_sync must have been
4391 * Must be paired with ring_buffer_read_finish.
4394 ring_buffer_read_start(struct ring_buffer_iter *iter)
4396 struct ring_buffer_per_cpu *cpu_buffer;
4397 unsigned long flags;
4402 cpu_buffer = iter->cpu_buffer;
4404 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
4405 arch_spin_lock(&cpu_buffer->lock);
4406 rb_iter_reset(iter);
4407 arch_spin_unlock(&cpu_buffer->lock);
4408 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
4410 EXPORT_SYMBOL_GPL(ring_buffer_read_start);
4413 * ring_buffer_read_finish - finish reading the iterator of the buffer
4414 * @iter: The iterator retrieved by ring_buffer_start
4416 * This re-enables the recording to the buffer, and frees the
4420 ring_buffer_read_finish(struct ring_buffer_iter *iter)
4422 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer;
4423 unsigned long flags;
4426 * Ring buffer is disabled from recording, here's a good place
4427 * to check the integrity of the ring buffer.
4428 * Must prevent readers from trying to read, as the check
4429 * clears the HEAD page and readers require it.
4431 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
4432 rb_check_pages(cpu_buffer);
4433 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
4435 atomic_dec(&cpu_buffer->record_disabled);
4436 atomic_dec(&cpu_buffer->buffer->resize_disabled);
4439 EXPORT_SYMBOL_GPL(ring_buffer_read_finish);
4442 * ring_buffer_read - read the next item in the ring buffer by the iterator
4443 * @iter: The ring buffer iterator
4444 * @ts: The time stamp of the event read.
4446 * This reads the next event in the ring buffer and increments the iterator.
4448 struct ring_buffer_event *
4449 ring_buffer_read(struct ring_buffer_iter *iter, u64 *ts)
4451 struct ring_buffer_event *event;
4452 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer;
4453 unsigned long flags;
4455 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
4457 event = rb_iter_peek(iter, ts);
4461 if (event->type_len == RINGBUF_TYPE_PADDING)
4464 rb_advance_iter(iter);
4466 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
4470 EXPORT_SYMBOL_GPL(ring_buffer_read);
4473 * ring_buffer_size - return the size of the ring buffer (in bytes)
4474 * @buffer: The ring buffer.
4476 unsigned long ring_buffer_size(struct ring_buffer *buffer, int cpu)
4479 * Earlier, this method returned
4480 * BUF_PAGE_SIZE * buffer->nr_pages
4481 * Since the nr_pages field is now removed, we have converted this to
4482 * return the per cpu buffer value.
4484 if (!cpumask_test_cpu(cpu, buffer->cpumask))
4487 return BUF_PAGE_SIZE * buffer->buffers[cpu]->nr_pages;
4489 EXPORT_SYMBOL_GPL(ring_buffer_size);
4491 static void rb_clear_buffer_page(struct buffer_page *page)
4493 local_set(&page->write, 0);
4494 local_set(&page->entries, 0);
4495 rb_init_page(page->page);
4500 rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer)
4502 struct buffer_page *page;
4504 rb_head_page_deactivate(cpu_buffer);
4506 cpu_buffer->head_page
4507 = list_entry(cpu_buffer->pages, struct buffer_page, list);
4508 rb_clear_buffer_page(cpu_buffer->head_page);
4509 list_for_each_entry(page, cpu_buffer->pages, list) {
4510 rb_clear_buffer_page(page);
4513 cpu_buffer->tail_page = cpu_buffer->head_page;
4514 cpu_buffer->commit_page = cpu_buffer->head_page;
4516 INIT_LIST_HEAD(&cpu_buffer->reader_page->list);
4517 INIT_LIST_HEAD(&cpu_buffer->new_pages);
4518 rb_clear_buffer_page(cpu_buffer->reader_page);
4520 local_set(&cpu_buffer->entries_bytes, 0);
4521 local_set(&cpu_buffer->overrun, 0);
4522 local_set(&cpu_buffer->commit_overrun, 0);
4523 local_set(&cpu_buffer->dropped_events, 0);
4524 local_set(&cpu_buffer->entries, 0);
4525 local_set(&cpu_buffer->committing, 0);
4526 local_set(&cpu_buffer->commits, 0);
4527 local_set(&cpu_buffer->pages_touched, 0);
4528 local_set(&cpu_buffer->pages_lost, 0);
4529 local_set(&cpu_buffer->pages_read, 0);
4530 cpu_buffer->last_pages_touch = 0;
4531 cpu_buffer->shortest_full = 0;
4532 cpu_buffer->read = 0;
4533 cpu_buffer->read_bytes = 0;
4535 cpu_buffer->write_stamp = 0;
4536 cpu_buffer->read_stamp = 0;
4538 cpu_buffer->lost_events = 0;
4539 cpu_buffer->last_overrun = 0;
4541 rb_head_page_activate(cpu_buffer);
4542 cpu_buffer->pages_removed = 0;
4546 * ring_buffer_reset_cpu - reset a ring buffer per CPU buffer
4547 * @buffer: The ring buffer to reset a per cpu buffer of
4548 * @cpu: The CPU buffer to be reset
4550 void ring_buffer_reset_cpu(struct ring_buffer *buffer, int cpu)
4552 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu];
4553 unsigned long flags;
4555 if (!cpumask_test_cpu(cpu, buffer->cpumask))
4557 /* prevent another thread from changing buffer sizes */
4558 mutex_lock(&buffer->mutex);
4560 atomic_inc(&buffer->resize_disabled);
4561 atomic_inc(&cpu_buffer->record_disabled);
4563 /* Make sure all commits have finished */
4566 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
4568 if (RB_WARN_ON(cpu_buffer, local_read(&cpu_buffer->committing)))
4571 arch_spin_lock(&cpu_buffer->lock);
4573 rb_reset_cpu(cpu_buffer);
4575 arch_spin_unlock(&cpu_buffer->lock);
4578 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
4580 atomic_dec(&cpu_buffer->record_disabled);
4581 atomic_dec(&buffer->resize_disabled);
4583 mutex_unlock(&buffer->mutex);
4585 EXPORT_SYMBOL_GPL(ring_buffer_reset_cpu);
4588 * ring_buffer_reset - reset a ring buffer
4589 * @buffer: The ring buffer to reset all cpu buffers
4591 void ring_buffer_reset(struct ring_buffer *buffer)
4595 for_each_buffer_cpu(buffer, cpu)
4596 ring_buffer_reset_cpu(buffer, cpu);
4598 EXPORT_SYMBOL_GPL(ring_buffer_reset);
4601 * rind_buffer_empty - is the ring buffer empty?
4602 * @buffer: The ring buffer to test
4604 bool ring_buffer_empty(struct ring_buffer *buffer)
4606 struct ring_buffer_per_cpu *cpu_buffer;
4607 unsigned long flags;
4612 /* yes this is racy, but if you don't like the race, lock the buffer */
4613 for_each_buffer_cpu(buffer, cpu) {
4614 cpu_buffer = buffer->buffers[cpu];
4615 local_irq_save(flags);
4616 dolock = rb_reader_lock(cpu_buffer);
4617 ret = rb_per_cpu_empty(cpu_buffer);
4618 rb_reader_unlock(cpu_buffer, dolock);
4619 local_irq_restore(flags);
4627 EXPORT_SYMBOL_GPL(ring_buffer_empty);
4630 * ring_buffer_empty_cpu - is a cpu buffer of a ring buffer empty?
4631 * @buffer: The ring buffer
4632 * @cpu: The CPU buffer to test
4634 bool ring_buffer_empty_cpu(struct ring_buffer *buffer, int cpu)
4636 struct ring_buffer_per_cpu *cpu_buffer;
4637 unsigned long flags;
4641 if (!cpumask_test_cpu(cpu, buffer->cpumask))
4644 cpu_buffer = buffer->buffers[cpu];
4645 local_irq_save(flags);
4646 dolock = rb_reader_lock(cpu_buffer);
4647 ret = rb_per_cpu_empty(cpu_buffer);
4648 rb_reader_unlock(cpu_buffer, dolock);
4649 local_irq_restore(flags);
4653 EXPORT_SYMBOL_GPL(ring_buffer_empty_cpu);
4655 #ifdef CONFIG_RING_BUFFER_ALLOW_SWAP
4657 * ring_buffer_swap_cpu - swap a CPU buffer between two ring buffers
4658 * @buffer_a: One buffer to swap with
4659 * @buffer_b: The other buffer to swap with
4661 * This function is useful for tracers that want to take a "snapshot"
4662 * of a CPU buffer and has another back up buffer lying around.
4663 * it is expected that the tracer handles the cpu buffer not being
4664 * used at the moment.
4666 int ring_buffer_swap_cpu(struct ring_buffer *buffer_a,
4667 struct ring_buffer *buffer_b, int cpu)
4669 struct ring_buffer_per_cpu *cpu_buffer_a;
4670 struct ring_buffer_per_cpu *cpu_buffer_b;
4673 if (!cpumask_test_cpu(cpu, buffer_a->cpumask) ||
4674 !cpumask_test_cpu(cpu, buffer_b->cpumask))
4677 cpu_buffer_a = buffer_a->buffers[cpu];
4678 cpu_buffer_b = buffer_b->buffers[cpu];
4680 /* At least make sure the two buffers are somewhat the same */
4681 if (cpu_buffer_a->nr_pages != cpu_buffer_b->nr_pages)
4686 if (atomic_read(&buffer_a->record_disabled))
4689 if (atomic_read(&buffer_b->record_disabled))
4692 if (atomic_read(&cpu_buffer_a->record_disabled))
4695 if (atomic_read(&cpu_buffer_b->record_disabled))
4699 * We can't do a synchronize_rcu here because this
4700 * function can be called in atomic context.
4701 * Normally this will be called from the same CPU as cpu.
4702 * If not it's up to the caller to protect this.
4704 atomic_inc(&cpu_buffer_a->record_disabled);
4705 atomic_inc(&cpu_buffer_b->record_disabled);
4708 if (local_read(&cpu_buffer_a->committing))
4710 if (local_read(&cpu_buffer_b->committing))
4713 buffer_a->buffers[cpu] = cpu_buffer_b;
4714 buffer_b->buffers[cpu] = cpu_buffer_a;
4716 cpu_buffer_b->buffer = buffer_a;
4717 cpu_buffer_a->buffer = buffer_b;
4722 atomic_dec(&cpu_buffer_a->record_disabled);
4723 atomic_dec(&cpu_buffer_b->record_disabled);
4727 EXPORT_SYMBOL_GPL(ring_buffer_swap_cpu);
4728 #endif /* CONFIG_RING_BUFFER_ALLOW_SWAP */
4731 * ring_buffer_alloc_read_page - allocate a page to read from buffer
4732 * @buffer: the buffer to allocate for.
4733 * @cpu: the cpu buffer to allocate.
4735 * This function is used in conjunction with ring_buffer_read_page.
4736 * When reading a full page from the ring buffer, these functions
4737 * can be used to speed up the process. The calling function should
4738 * allocate a few pages first with this function. Then when it
4739 * needs to get pages from the ring buffer, it passes the result
4740 * of this function into ring_buffer_read_page, which will swap
4741 * the page that was allocated, with the read page of the buffer.
4744 * The page allocated, or ERR_PTR
4746 void *ring_buffer_alloc_read_page(struct ring_buffer *buffer, int cpu)
4748 struct ring_buffer_per_cpu *cpu_buffer;
4749 struct buffer_data_page *bpage = NULL;
4750 unsigned long flags;
4753 if (!cpumask_test_cpu(cpu, buffer->cpumask))
4754 return ERR_PTR(-ENODEV);
4756 cpu_buffer = buffer->buffers[cpu];
4757 local_irq_save(flags);
4758 arch_spin_lock(&cpu_buffer->lock);
4760 if (cpu_buffer->free_page) {
4761 bpage = cpu_buffer->free_page;
4762 cpu_buffer->free_page = NULL;
4765 arch_spin_unlock(&cpu_buffer->lock);
4766 local_irq_restore(flags);
4771 page = alloc_pages_node(cpu_to_node(cpu),
4772 GFP_KERNEL | __GFP_NORETRY, 0);
4774 return ERR_PTR(-ENOMEM);
4776 bpage = page_address(page);
4779 rb_init_page(bpage);
4783 EXPORT_SYMBOL_GPL(ring_buffer_alloc_read_page);
4786 * ring_buffer_free_read_page - free an allocated read page
4787 * @buffer: the buffer the page was allocate for
4788 * @cpu: the cpu buffer the page came from
4789 * @data: the page to free
4791 * Free a page allocated from ring_buffer_alloc_read_page.
4793 void ring_buffer_free_read_page(struct ring_buffer *buffer, int cpu, void *data)
4795 struct ring_buffer_per_cpu *cpu_buffer;
4796 struct buffer_data_page *bpage = data;
4797 struct page *page = virt_to_page(bpage);
4798 unsigned long flags;
4800 if (!buffer || !buffer->buffers || !buffer->buffers[cpu])
4803 cpu_buffer = buffer->buffers[cpu];
4805 /* If the page is still in use someplace else, we can't reuse it */
4806 if (page_ref_count(page) > 1)
4809 local_irq_save(flags);
4810 arch_spin_lock(&cpu_buffer->lock);
4812 if (!cpu_buffer->free_page) {
4813 cpu_buffer->free_page = bpage;
4817 arch_spin_unlock(&cpu_buffer->lock);
4818 local_irq_restore(flags);
4821 free_page((unsigned long)bpage);
4823 EXPORT_SYMBOL_GPL(ring_buffer_free_read_page);
4826 * ring_buffer_read_page - extract a page from the ring buffer
4827 * @buffer: buffer to extract from
4828 * @data_page: the page to use allocated from ring_buffer_alloc_read_page
4829 * @len: amount to extract
4830 * @cpu: the cpu of the buffer to extract
4831 * @full: should the extraction only happen when the page is full.
4833 * This function will pull out a page from the ring buffer and consume it.
4834 * @data_page must be the address of the variable that was returned
4835 * from ring_buffer_alloc_read_page. This is because the page might be used
4836 * to swap with a page in the ring buffer.
4839 * rpage = ring_buffer_alloc_read_page(buffer, cpu);
4840 * if (IS_ERR(rpage))
4841 * return PTR_ERR(rpage);
4842 * ret = ring_buffer_read_page(buffer, &rpage, len, cpu, 0);
4844 * process_page(rpage, ret);
4846 * When @full is set, the function will not return true unless
4847 * the writer is off the reader page.
4849 * Note: it is up to the calling functions to handle sleeps and wakeups.
4850 * The ring buffer can be used anywhere in the kernel and can not
4851 * blindly call wake_up. The layer that uses the ring buffer must be
4852 * responsible for that.
4855 * >=0 if data has been transferred, returns the offset of consumed data.
4856 * <0 if no data has been transferred.
4858 int ring_buffer_read_page(struct ring_buffer *buffer,
4859 void **data_page, size_t len, int cpu, int full)
4861 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu];
4862 struct ring_buffer_event *event;
4863 struct buffer_data_page *bpage;
4864 struct buffer_page *reader;
4865 unsigned long missed_events;
4866 unsigned long flags;
4867 unsigned int commit;
4872 if (!cpumask_test_cpu(cpu, buffer->cpumask))
4876 * If len is not big enough to hold the page header, then
4877 * we can not copy anything.
4879 if (len <= BUF_PAGE_HDR_SIZE)
4882 len -= BUF_PAGE_HDR_SIZE;
4891 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
4893 reader = rb_get_reader_page(cpu_buffer);
4897 event = rb_reader_event(cpu_buffer);
4899 read = reader->read;
4900 commit = rb_page_commit(reader);
4902 /* Check if any events were dropped */
4903 missed_events = cpu_buffer->lost_events;
4906 * If this page has been partially read or
4907 * if len is not big enough to read the rest of the page or
4908 * a writer is still on the page, then
4909 * we must copy the data from the page to the buffer.
4910 * Otherwise, we can simply swap the page with the one passed in.
4912 if (read || (len < (commit - read)) ||
4913 cpu_buffer->reader_page == cpu_buffer->commit_page) {
4914 struct buffer_data_page *rpage = cpu_buffer->reader_page->page;
4915 unsigned int rpos = read;
4916 unsigned int pos = 0;
4920 * If a full page is expected, this can still be returned
4921 * if there's been a previous partial read and the
4922 * rest of the page can be read and the commit page is off
4926 (!read || (len < (commit - read)) ||
4927 cpu_buffer->reader_page == cpu_buffer->commit_page))
4930 if (len > (commit - read))
4931 len = (commit - read);
4933 /* Always keep the time extend and data together */
4934 size = rb_event_ts_length(event);
4939 /* save the current timestamp, since the user will need it */
4940 save_timestamp = cpu_buffer->read_stamp;
4942 /* Need to copy one event at a time */
4944 /* We need the size of one event, because
4945 * rb_advance_reader only advances by one event,
4946 * whereas rb_event_ts_length may include the size of
4947 * one or two events.
4948 * We have already ensured there's enough space if this
4949 * is a time extend. */
4950 size = rb_event_length(event);
4951 memcpy(bpage->data + pos, rpage->data + rpos, size);
4955 rb_advance_reader(cpu_buffer);
4956 rpos = reader->read;
4962 event = rb_reader_event(cpu_buffer);
4963 /* Always keep the time extend and data together */
4964 size = rb_event_ts_length(event);
4965 } while (len >= size);
4968 local_set(&bpage->commit, pos);
4969 bpage->time_stamp = save_timestamp;
4971 /* we copied everything to the beginning */
4974 /* update the entry counter */
4975 cpu_buffer->read += rb_page_entries(reader);
4976 cpu_buffer->read_bytes += BUF_PAGE_SIZE;
4978 /* swap the pages */
4979 rb_init_page(bpage);
4980 bpage = reader->page;
4981 reader->page = *data_page;
4982 local_set(&reader->write, 0);
4983 local_set(&reader->entries, 0);
4988 * Use the real_end for the data size,
4989 * This gives us a chance to store the lost events
4992 if (reader->real_end)
4993 local_set(&bpage->commit, reader->real_end);
4997 cpu_buffer->lost_events = 0;
4999 commit = local_read(&bpage->commit);
5001 * Set a flag in the commit field if we lost events
5003 if (missed_events) {
5004 /* If there is room at the end of the page to save the
5005 * missed events, then record it there.
5007 if (BUF_PAGE_SIZE - commit >= sizeof(missed_events)) {
5008 memcpy(&bpage->data[commit], &missed_events,
5009 sizeof(missed_events));
5010 local_add(RB_MISSED_STORED, &bpage->commit);
5011 commit += sizeof(missed_events);
5013 local_add(RB_MISSED_EVENTS, &bpage->commit);
5017 * This page may be off to user land. Zero it out here.
5019 if (commit < BUF_PAGE_SIZE)
5020 memset(&bpage->data[commit], 0, BUF_PAGE_SIZE - commit);
5023 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
5028 EXPORT_SYMBOL_GPL(ring_buffer_read_page);
5031 * We only allocate new buffers, never free them if the CPU goes down.
5032 * If we were to free the buffer, then the user would lose any trace that was in
5035 int trace_rb_cpu_prepare(unsigned int cpu, struct hlist_node *node)
5037 struct ring_buffer *buffer;
5040 unsigned long nr_pages;
5042 buffer = container_of(node, struct ring_buffer, node);
5043 if (cpumask_test_cpu(cpu, buffer->cpumask))
5048 /* check if all cpu sizes are same */
5049 for_each_buffer_cpu(buffer, cpu_i) {
5050 /* fill in the size from first enabled cpu */
5052 nr_pages = buffer->buffers[cpu_i]->nr_pages;
5053 if (nr_pages != buffer->buffers[cpu_i]->nr_pages) {
5058 /* allocate minimum pages, user can later expand it */
5061 buffer->buffers[cpu] =
5062 rb_allocate_cpu_buffer(buffer, nr_pages, cpu);
5063 if (!buffer->buffers[cpu]) {
5064 WARN(1, "failed to allocate ring buffer on CPU %u\n",
5069 cpumask_set_cpu(cpu, buffer->cpumask);
5073 #ifdef CONFIG_RING_BUFFER_STARTUP_TEST
5075 * This is a basic integrity check of the ring buffer.
5076 * Late in the boot cycle this test will run when configured in.
5077 * It will kick off a thread per CPU that will go into a loop
5078 * writing to the per cpu ring buffer various sizes of data.
5079 * Some of the data will be large items, some small.
5081 * Another thread is created that goes into a spin, sending out
5082 * IPIs to the other CPUs to also write into the ring buffer.
5083 * this is to test the nesting ability of the buffer.
5085 * Basic stats are recorded and reported. If something in the
5086 * ring buffer should happen that's not expected, a big warning
5087 * is displayed and all ring buffers are disabled.
5089 static struct task_struct *rb_threads[NR_CPUS] __initdata;
5091 struct rb_test_data {
5092 struct ring_buffer *buffer;
5093 unsigned long events;
5094 unsigned long bytes_written;
5095 unsigned long bytes_alloc;
5096 unsigned long bytes_dropped;
5097 unsigned long events_nested;
5098 unsigned long bytes_written_nested;
5099 unsigned long bytes_alloc_nested;
5100 unsigned long bytes_dropped_nested;
5101 int min_size_nested;
5102 int max_size_nested;
5109 static struct rb_test_data rb_data[NR_CPUS] __initdata;
5112 #define RB_TEST_BUFFER_SIZE 1048576
5114 static char rb_string[] __initdata =
5115 "abcdefghijklmnopqrstuvwxyz1234567890!@#$%^&*()?+\\"
5116 "?+|:';\",.<>/?abcdefghijklmnopqrstuvwxyz1234567890"
5117 "!@#$%^&*()?+\\?+|:';\",.<>/?abcdefghijklmnopqrstuv";
5119 static bool rb_test_started __initdata;
5126 static __init int rb_write_something(struct rb_test_data *data, bool nested)
5128 struct ring_buffer_event *event;
5129 struct rb_item *item;
5136 /* Have nested writes different that what is written */
5137 cnt = data->cnt + (nested ? 27 : 0);
5139 /* Multiply cnt by ~e, to make some unique increment */
5140 size = (cnt * 68 / 25) % (sizeof(rb_string) - 1);
5142 len = size + sizeof(struct rb_item);
5144 started = rb_test_started;
5145 /* read rb_test_started before checking buffer enabled */
5148 event = ring_buffer_lock_reserve(data->buffer, len);
5150 /* Ignore dropped events before test starts. */
5153 data->bytes_dropped += len;
5155 data->bytes_dropped_nested += len;
5160 event_len = ring_buffer_event_length(event);
5162 if (RB_WARN_ON(data->buffer, event_len < len))
5165 item = ring_buffer_event_data(event);
5167 memcpy(item->str, rb_string, size);
5170 data->bytes_alloc_nested += event_len;
5171 data->bytes_written_nested += len;
5172 data->events_nested++;
5173 if (!data->min_size_nested || len < data->min_size_nested)
5174 data->min_size_nested = len;
5175 if (len > data->max_size_nested)
5176 data->max_size_nested = len;
5178 data->bytes_alloc += event_len;
5179 data->bytes_written += len;
5181 if (!data->min_size || len < data->min_size)
5182 data->max_size = len;
5183 if (len > data->max_size)
5184 data->max_size = len;
5188 ring_buffer_unlock_commit(data->buffer, event);
5193 static __init int rb_test(void *arg)
5195 struct rb_test_data *data = arg;
5197 while (!kthread_should_stop()) {
5198 rb_write_something(data, false);
5201 set_current_state(TASK_INTERRUPTIBLE);
5202 /* Now sleep between a min of 100-300us and a max of 1ms */
5203 usleep_range(((data->cnt % 3) + 1) * 100, 1000);
5209 static __init void rb_ipi(void *ignore)
5211 struct rb_test_data *data;
5212 int cpu = smp_processor_id();
5214 data = &rb_data[cpu];
5215 rb_write_something(data, true);
5218 static __init int rb_hammer_test(void *arg)
5220 while (!kthread_should_stop()) {
5222 /* Send an IPI to all cpus to write data! */
5223 smp_call_function(rb_ipi, NULL, 1);
5224 /* No sleep, but for non preempt, let others run */
5231 static __init int test_ringbuffer(void)
5233 struct task_struct *rb_hammer;
5234 struct ring_buffer *buffer;
5238 if (security_locked_down(LOCKDOWN_TRACEFS)) {
5239 pr_warning("Lockdown is enabled, skipping ring buffer tests\n");
5243 pr_info("Running ring buffer tests...\n");
5245 buffer = ring_buffer_alloc(RB_TEST_BUFFER_SIZE, RB_FL_OVERWRITE);
5246 if (WARN_ON(!buffer))
5249 /* Disable buffer so that threads can't write to it yet */
5250 ring_buffer_record_off(buffer);
5252 for_each_online_cpu(cpu) {
5253 rb_data[cpu].buffer = buffer;
5254 rb_data[cpu].cpu = cpu;
5255 rb_data[cpu].cnt = cpu;
5256 rb_threads[cpu] = kthread_create(rb_test, &rb_data[cpu],
5257 "rbtester/%d", cpu);
5258 if (WARN_ON(IS_ERR(rb_threads[cpu]))) {
5259 pr_cont("FAILED\n");
5260 ret = PTR_ERR(rb_threads[cpu]);
5264 kthread_bind(rb_threads[cpu], cpu);
5265 wake_up_process(rb_threads[cpu]);
5268 /* Now create the rb hammer! */
5269 rb_hammer = kthread_run(rb_hammer_test, NULL, "rbhammer");
5270 if (WARN_ON(IS_ERR(rb_hammer))) {
5271 pr_cont("FAILED\n");
5272 ret = PTR_ERR(rb_hammer);
5276 ring_buffer_record_on(buffer);
5278 * Show buffer is enabled before setting rb_test_started.
5279 * Yes there's a small race window where events could be
5280 * dropped and the thread wont catch it. But when a ring
5281 * buffer gets enabled, there will always be some kind of
5282 * delay before other CPUs see it. Thus, we don't care about
5283 * those dropped events. We care about events dropped after
5284 * the threads see that the buffer is active.
5287 rb_test_started = true;
5289 set_current_state(TASK_INTERRUPTIBLE);
5290 /* Just run for 10 seconds */;
5291 schedule_timeout(10 * HZ);
5293 kthread_stop(rb_hammer);
5296 for_each_online_cpu(cpu) {
5297 if (!rb_threads[cpu])
5299 kthread_stop(rb_threads[cpu]);
5302 ring_buffer_free(buffer);
5307 pr_info("finished\n");
5308 for_each_online_cpu(cpu) {
5309 struct ring_buffer_event *event;
5310 struct rb_test_data *data = &rb_data[cpu];
5311 struct rb_item *item;
5312 unsigned long total_events;
5313 unsigned long total_dropped;
5314 unsigned long total_written;
5315 unsigned long total_alloc;
5316 unsigned long total_read = 0;
5317 unsigned long total_size = 0;
5318 unsigned long total_len = 0;
5319 unsigned long total_lost = 0;
5322 int small_event_size;
5326 total_events = data->events + data->events_nested;
5327 total_written = data->bytes_written + data->bytes_written_nested;
5328 total_alloc = data->bytes_alloc + data->bytes_alloc_nested;
5329 total_dropped = data->bytes_dropped + data->bytes_dropped_nested;
5331 big_event_size = data->max_size + data->max_size_nested;
5332 small_event_size = data->min_size + data->min_size_nested;
5334 pr_info("CPU %d:\n", cpu);
5335 pr_info(" events: %ld\n", total_events);
5336 pr_info(" dropped bytes: %ld\n", total_dropped);
5337 pr_info(" alloced bytes: %ld\n", total_alloc);
5338 pr_info(" written bytes: %ld\n", total_written);
5339 pr_info(" biggest event: %d\n", big_event_size);
5340 pr_info(" smallest event: %d\n", small_event_size);
5342 if (RB_WARN_ON(buffer, total_dropped))
5347 while ((event = ring_buffer_consume(buffer, cpu, NULL, &lost))) {
5349 item = ring_buffer_event_data(event);
5350 total_len += ring_buffer_event_length(event);
5351 total_size += item->size + sizeof(struct rb_item);
5352 if (memcmp(&item->str[0], rb_string, item->size) != 0) {
5353 pr_info("FAILED!\n");
5354 pr_info("buffer had: %.*s\n", item->size, item->str);
5355 pr_info("expected: %.*s\n", item->size, rb_string);
5356 RB_WARN_ON(buffer, 1);
5367 pr_info(" read events: %ld\n", total_read);
5368 pr_info(" lost events: %ld\n", total_lost);
5369 pr_info(" total events: %ld\n", total_lost + total_read);
5370 pr_info(" recorded len bytes: %ld\n", total_len);
5371 pr_info(" recorded size bytes: %ld\n", total_size);
5373 pr_info(" With dropped events, record len and size may not match\n"
5374 " alloced and written from above\n");
5376 if (RB_WARN_ON(buffer, total_len != total_alloc ||
5377 total_size != total_written))
5380 if (RB_WARN_ON(buffer, total_lost + total_read != total_events))
5386 pr_info("Ring buffer PASSED!\n");
5388 ring_buffer_free(buffer);
5392 late_initcall(test_ringbuffer);
5393 #endif /* CONFIG_RING_BUFFER_STARTUP_TEST */