GNU Linux-libre 6.8.9-gnu
[releases.git] / kernel / trace / ring_buffer.c
1 // SPDX-License-Identifier: GPL-2.0
2 /*
3  * Generic ring buffer
4  *
5  * Copyright (C) 2008 Steven Rostedt <srostedt@redhat.com>
6  */
7 #include <linux/trace_recursion.h>
8 #include <linux/trace_events.h>
9 #include <linux/ring_buffer.h>
10 #include <linux/trace_clock.h>
11 #include <linux/sched/clock.h>
12 #include <linux/trace_seq.h>
13 #include <linux/spinlock.h>
14 #include <linux/irq_work.h>
15 #include <linux/security.h>
16 #include <linux/uaccess.h>
17 #include <linux/hardirq.h>
18 #include <linux/kthread.h>      /* for self test */
19 #include <linux/module.h>
20 #include <linux/percpu.h>
21 #include <linux/mutex.h>
22 #include <linux/delay.h>
23 #include <linux/slab.h>
24 #include <linux/init.h>
25 #include <linux/hash.h>
26 #include <linux/list.h>
27 #include <linux/cpu.h>
28 #include <linux/oom.h>
29
30 #include <asm/local64.h>
31 #include <asm/local.h>
32
33 /*
34  * The "absolute" timestamp in the buffer is only 59 bits.
35  * If a clock has the 5 MSBs set, it needs to be saved and
36  * reinserted.
37  */
38 #define TS_MSB          (0xf8ULL << 56)
39 #define ABS_TS_MASK     (~TS_MSB)
40
41 static void update_pages_handler(struct work_struct *work);
42
43 /*
44  * The ring buffer header is special. We must manually up keep it.
45  */
46 int ring_buffer_print_entry_header(struct trace_seq *s)
47 {
48         trace_seq_puts(s, "# compressed entry header\n");
49         trace_seq_puts(s, "\ttype_len    :    5 bits\n");
50         trace_seq_puts(s, "\ttime_delta  :   27 bits\n");
51         trace_seq_puts(s, "\tarray       :   32 bits\n");
52         trace_seq_putc(s, '\n');
53         trace_seq_printf(s, "\tpadding     : type == %d\n",
54                          RINGBUF_TYPE_PADDING);
55         trace_seq_printf(s, "\ttime_extend : type == %d\n",
56                          RINGBUF_TYPE_TIME_EXTEND);
57         trace_seq_printf(s, "\ttime_stamp : type == %d\n",
58                          RINGBUF_TYPE_TIME_STAMP);
59         trace_seq_printf(s, "\tdata max type_len  == %d\n",
60                          RINGBUF_TYPE_DATA_TYPE_LEN_MAX);
61
62         return !trace_seq_has_overflowed(s);
63 }
64
65 /*
66  * The ring buffer is made up of a list of pages. A separate list of pages is
67  * allocated for each CPU. A writer may only write to a buffer that is
68  * associated with the CPU it is currently executing on.  A reader may read
69  * from any per cpu buffer.
70  *
71  * The reader is special. For each per cpu buffer, the reader has its own
72  * reader page. When a reader has read the entire reader page, this reader
73  * page is swapped with another page in the ring buffer.
74  *
75  * Now, as long as the writer is off the reader page, the reader can do what
76  * ever it wants with that page. The writer will never write to that page
77  * again (as long as it is out of the ring buffer).
78  *
79  * Here's some silly ASCII art.
80  *
81  *   +------+
82  *   |reader|          RING BUFFER
83  *   |page  |
84  *   +------+        +---+   +---+   +---+
85  *                   |   |-->|   |-->|   |
86  *                   +---+   +---+   +---+
87  *                     ^               |
88  *                     |               |
89  *                     +---------------+
90  *
91  *
92  *   +------+
93  *   |reader|          RING BUFFER
94  *   |page  |------------------v
95  *   +------+        +---+   +---+   +---+
96  *                   |   |-->|   |-->|   |
97  *                   +---+   +---+   +---+
98  *                     ^               |
99  *                     |               |
100  *                     +---------------+
101  *
102  *
103  *   +------+
104  *   |reader|          RING BUFFER
105  *   |page  |------------------v
106  *   +------+        +---+   +---+   +---+
107  *      ^            |   |-->|   |-->|   |
108  *      |            +---+   +---+   +---+
109  *      |                              |
110  *      |                              |
111  *      +------------------------------+
112  *
113  *
114  *   +------+
115  *   |buffer|          RING BUFFER
116  *   |page  |------------------v
117  *   +------+        +---+   +---+   +---+
118  *      ^            |   |   |   |-->|   |
119  *      |   New      +---+   +---+   +---+
120  *      |  Reader------^               |
121  *      |   page                       |
122  *      +------------------------------+
123  *
124  *
125  * After we make this swap, the reader can hand this page off to the splice
126  * code and be done with it. It can even allocate a new page if it needs to
127  * and swap that into the ring buffer.
128  *
129  * We will be using cmpxchg soon to make all this lockless.
130  *
131  */
132
133 /* Used for individual buffers (after the counter) */
134 #define RB_BUFFER_OFF           (1 << 20)
135
136 #define BUF_PAGE_HDR_SIZE offsetof(struct buffer_data_page, data)
137
138 #define RB_EVNT_HDR_SIZE (offsetof(struct ring_buffer_event, array))
139 #define RB_ALIGNMENT            4U
140 #define RB_MAX_SMALL_DATA       (RB_ALIGNMENT * RINGBUF_TYPE_DATA_TYPE_LEN_MAX)
141 #define RB_EVNT_MIN_SIZE        8U      /* two 32bit words */
142
143 #ifndef CONFIG_HAVE_64BIT_ALIGNED_ACCESS
144 # define RB_FORCE_8BYTE_ALIGNMENT       0
145 # define RB_ARCH_ALIGNMENT              RB_ALIGNMENT
146 #else
147 # define RB_FORCE_8BYTE_ALIGNMENT       1
148 # define RB_ARCH_ALIGNMENT              8U
149 #endif
150
151 #define RB_ALIGN_DATA           __aligned(RB_ARCH_ALIGNMENT)
152
153 /* define RINGBUF_TYPE_DATA for 'case RINGBUF_TYPE_DATA:' */
154 #define RINGBUF_TYPE_DATA 0 ... RINGBUF_TYPE_DATA_TYPE_LEN_MAX
155
156 enum {
157         RB_LEN_TIME_EXTEND = 8,
158         RB_LEN_TIME_STAMP =  8,
159 };
160
161 #define skip_time_extend(event) \
162         ((struct ring_buffer_event *)((char *)event + RB_LEN_TIME_EXTEND))
163
164 #define extended_time(event) \
165         (event->type_len >= RINGBUF_TYPE_TIME_EXTEND)
166
167 static inline bool rb_null_event(struct ring_buffer_event *event)
168 {
169         return event->type_len == RINGBUF_TYPE_PADDING && !event->time_delta;
170 }
171
172 static void rb_event_set_padding(struct ring_buffer_event *event)
173 {
174         /* padding has a NULL time_delta */
175         event->type_len = RINGBUF_TYPE_PADDING;
176         event->time_delta = 0;
177 }
178
179 static unsigned
180 rb_event_data_length(struct ring_buffer_event *event)
181 {
182         unsigned length;
183
184         if (event->type_len)
185                 length = event->type_len * RB_ALIGNMENT;
186         else
187                 length = event->array[0];
188         return length + RB_EVNT_HDR_SIZE;
189 }
190
191 /*
192  * Return the length of the given event. Will return
193  * the length of the time extend if the event is a
194  * time extend.
195  */
196 static inline unsigned
197 rb_event_length(struct ring_buffer_event *event)
198 {
199         switch (event->type_len) {
200         case RINGBUF_TYPE_PADDING:
201                 if (rb_null_event(event))
202                         /* undefined */
203                         return -1;
204                 return  event->array[0] + RB_EVNT_HDR_SIZE;
205
206         case RINGBUF_TYPE_TIME_EXTEND:
207                 return RB_LEN_TIME_EXTEND;
208
209         case RINGBUF_TYPE_TIME_STAMP:
210                 return RB_LEN_TIME_STAMP;
211
212         case RINGBUF_TYPE_DATA:
213                 return rb_event_data_length(event);
214         default:
215                 WARN_ON_ONCE(1);
216         }
217         /* not hit */
218         return 0;
219 }
220
221 /*
222  * Return total length of time extend and data,
223  *   or just the event length for all other events.
224  */
225 static inline unsigned
226 rb_event_ts_length(struct ring_buffer_event *event)
227 {
228         unsigned len = 0;
229
230         if (extended_time(event)) {
231                 /* time extends include the data event after it */
232                 len = RB_LEN_TIME_EXTEND;
233                 event = skip_time_extend(event);
234         }
235         return len + rb_event_length(event);
236 }
237
238 /**
239  * ring_buffer_event_length - return the length of the event
240  * @event: the event to get the length of
241  *
242  * Returns the size of the data load of a data event.
243  * If the event is something other than a data event, it
244  * returns the size of the event itself. With the exception
245  * of a TIME EXTEND, where it still returns the size of the
246  * data load of the data event after it.
247  */
248 unsigned ring_buffer_event_length(struct ring_buffer_event *event)
249 {
250         unsigned length;
251
252         if (extended_time(event))
253                 event = skip_time_extend(event);
254
255         length = rb_event_length(event);
256         if (event->type_len > RINGBUF_TYPE_DATA_TYPE_LEN_MAX)
257                 return length;
258         length -= RB_EVNT_HDR_SIZE;
259         if (length > RB_MAX_SMALL_DATA + sizeof(event->array[0]))
260                 length -= sizeof(event->array[0]);
261         return length;
262 }
263 EXPORT_SYMBOL_GPL(ring_buffer_event_length);
264
265 /* inline for ring buffer fast paths */
266 static __always_inline void *
267 rb_event_data(struct ring_buffer_event *event)
268 {
269         if (extended_time(event))
270                 event = skip_time_extend(event);
271         WARN_ON_ONCE(event->type_len > RINGBUF_TYPE_DATA_TYPE_LEN_MAX);
272         /* If length is in len field, then array[0] has the data */
273         if (event->type_len)
274                 return (void *)&event->array[0];
275         /* Otherwise length is in array[0] and array[1] has the data */
276         return (void *)&event->array[1];
277 }
278
279 /**
280  * ring_buffer_event_data - return the data of the event
281  * @event: the event to get the data from
282  */
283 void *ring_buffer_event_data(struct ring_buffer_event *event)
284 {
285         return rb_event_data(event);
286 }
287 EXPORT_SYMBOL_GPL(ring_buffer_event_data);
288
289 #define for_each_buffer_cpu(buffer, cpu)                \
290         for_each_cpu(cpu, buffer->cpumask)
291
292 #define for_each_online_buffer_cpu(buffer, cpu)         \
293         for_each_cpu_and(cpu, buffer->cpumask, cpu_online_mask)
294
295 #define TS_SHIFT        27
296 #define TS_MASK         ((1ULL << TS_SHIFT) - 1)
297 #define TS_DELTA_TEST   (~TS_MASK)
298
299 static u64 rb_event_time_stamp(struct ring_buffer_event *event)
300 {
301         u64 ts;
302
303         ts = event->array[0];
304         ts <<= TS_SHIFT;
305         ts += event->time_delta;
306
307         return ts;
308 }
309
310 /* Flag when events were overwritten */
311 #define RB_MISSED_EVENTS        (1 << 31)
312 /* Missed count stored at end */
313 #define RB_MISSED_STORED        (1 << 30)
314
315 struct buffer_data_page {
316         u64              time_stamp;    /* page time stamp */
317         local_t          commit;        /* write committed index */
318         unsigned char    data[] RB_ALIGN_DATA;  /* data of buffer page */
319 };
320
321 struct buffer_data_read_page {
322         unsigned                order;  /* order of the page */
323         struct buffer_data_page *data;  /* actual data, stored in this page */
324 };
325
326 /*
327  * Note, the buffer_page list must be first. The buffer pages
328  * are allocated in cache lines, which means that each buffer
329  * page will be at the beginning of a cache line, and thus
330  * the least significant bits will be zero. We use this to
331  * add flags in the list struct pointers, to make the ring buffer
332  * lockless.
333  */
334 struct buffer_page {
335         struct list_head list;          /* list of buffer pages */
336         local_t          write;         /* index for next write */
337         unsigned         read;          /* index for next read */
338         local_t          entries;       /* entries on this page */
339         unsigned long    real_end;      /* real end of data */
340         unsigned         order;         /* order of the page */
341         struct buffer_data_page *page;  /* Actual data page */
342 };
343
344 /*
345  * The buffer page counters, write and entries, must be reset
346  * atomically when crossing page boundaries. To synchronize this
347  * update, two counters are inserted into the number. One is
348  * the actual counter for the write position or count on the page.
349  *
350  * The other is a counter of updaters. Before an update happens
351  * the update partition of the counter is incremented. This will
352  * allow the updater to update the counter atomically.
353  *
354  * The counter is 20 bits, and the state data is 12.
355  */
356 #define RB_WRITE_MASK           0xfffff
357 #define RB_WRITE_INTCNT         (1 << 20)
358
359 static void rb_init_page(struct buffer_data_page *bpage)
360 {
361         local_set(&bpage->commit, 0);
362 }
363
364 static __always_inline unsigned int rb_page_commit(struct buffer_page *bpage)
365 {
366         return local_read(&bpage->page->commit);
367 }
368
369 static void free_buffer_page(struct buffer_page *bpage)
370 {
371         free_pages((unsigned long)bpage->page, bpage->order);
372         kfree(bpage);
373 }
374
375 /*
376  * We need to fit the time_stamp delta into 27 bits.
377  */
378 static inline bool test_time_stamp(u64 delta)
379 {
380         return !!(delta & TS_DELTA_TEST);
381 }
382
383 struct rb_irq_work {
384         struct irq_work                 work;
385         wait_queue_head_t               waiters;
386         wait_queue_head_t               full_waiters;
387         atomic_t                        seq;
388         bool                            waiters_pending;
389         bool                            full_waiters_pending;
390         bool                            wakeup_full;
391 };
392
393 /*
394  * Structure to hold event state and handle nested events.
395  */
396 struct rb_event_info {
397         u64                     ts;
398         u64                     delta;
399         u64                     before;
400         u64                     after;
401         unsigned long           length;
402         struct buffer_page      *tail_page;
403         int                     add_timestamp;
404 };
405
406 /*
407  * Used for the add_timestamp
408  *  NONE
409  *  EXTEND - wants a time extend
410  *  ABSOLUTE - the buffer requests all events to have absolute time stamps
411  *  FORCE - force a full time stamp.
412  */
413 enum {
414         RB_ADD_STAMP_NONE               = 0,
415         RB_ADD_STAMP_EXTEND             = BIT(1),
416         RB_ADD_STAMP_ABSOLUTE           = BIT(2),
417         RB_ADD_STAMP_FORCE              = BIT(3)
418 };
419 /*
420  * Used for which event context the event is in.
421  *  TRANSITION = 0
422  *  NMI     = 1
423  *  IRQ     = 2
424  *  SOFTIRQ = 3
425  *  NORMAL  = 4
426  *
427  * See trace_recursive_lock() comment below for more details.
428  */
429 enum {
430         RB_CTX_TRANSITION,
431         RB_CTX_NMI,
432         RB_CTX_IRQ,
433         RB_CTX_SOFTIRQ,
434         RB_CTX_NORMAL,
435         RB_CTX_MAX
436 };
437
438 struct rb_time_struct {
439         local64_t       time;
440 };
441 typedef struct rb_time_struct rb_time_t;
442
443 #define MAX_NEST        5
444
445 /*
446  * head_page == tail_page && head == tail then buffer is empty.
447  */
448 struct ring_buffer_per_cpu {
449         int                             cpu;
450         atomic_t                        record_disabled;
451         atomic_t                        resize_disabled;
452         struct trace_buffer     *buffer;
453         raw_spinlock_t                  reader_lock;    /* serialize readers */
454         arch_spinlock_t                 lock;
455         struct lock_class_key           lock_key;
456         struct buffer_data_page         *free_page;
457         unsigned long                   nr_pages;
458         unsigned int                    current_context;
459         struct list_head                *pages;
460         struct buffer_page              *head_page;     /* read from head */
461         struct buffer_page              *tail_page;     /* write to tail */
462         struct buffer_page              *commit_page;   /* committed pages */
463         struct buffer_page              *reader_page;
464         unsigned long                   lost_events;
465         unsigned long                   last_overrun;
466         unsigned long                   nest;
467         local_t                         entries_bytes;
468         local_t                         entries;
469         local_t                         overrun;
470         local_t                         commit_overrun;
471         local_t                         dropped_events;
472         local_t                         committing;
473         local_t                         commits;
474         local_t                         pages_touched;
475         local_t                         pages_lost;
476         local_t                         pages_read;
477         long                            last_pages_touch;
478         size_t                          shortest_full;
479         unsigned long                   read;
480         unsigned long                   read_bytes;
481         rb_time_t                       write_stamp;
482         rb_time_t                       before_stamp;
483         u64                             event_stamp[MAX_NEST];
484         u64                             read_stamp;
485         /* pages removed since last reset */
486         unsigned long                   pages_removed;
487         /* ring buffer pages to update, > 0 to add, < 0 to remove */
488         long                            nr_pages_to_update;
489         struct list_head                new_pages; /* new pages to add */
490         struct work_struct              update_pages_work;
491         struct completion               update_done;
492
493         struct rb_irq_work              irq_work;
494 };
495
496 struct trace_buffer {
497         unsigned                        flags;
498         int                             cpus;
499         atomic_t                        record_disabled;
500         atomic_t                        resizing;
501         cpumask_var_t                   cpumask;
502
503         struct lock_class_key           *reader_lock_key;
504
505         struct mutex                    mutex;
506
507         struct ring_buffer_per_cpu      **buffers;
508
509         struct hlist_node               node;
510         u64                             (*clock)(void);
511
512         struct rb_irq_work              irq_work;
513         bool                            time_stamp_abs;
514
515         unsigned int                    subbuf_size;
516         unsigned int                    subbuf_order;
517         unsigned int                    max_data_size;
518 };
519
520 struct ring_buffer_iter {
521         struct ring_buffer_per_cpu      *cpu_buffer;
522         unsigned long                   head;
523         unsigned long                   next_event;
524         struct buffer_page              *head_page;
525         struct buffer_page              *cache_reader_page;
526         unsigned long                   cache_read;
527         unsigned long                   cache_pages_removed;
528         u64                             read_stamp;
529         u64                             page_stamp;
530         struct ring_buffer_event        *event;
531         size_t                          event_size;
532         int                             missed_events;
533 };
534
535 int ring_buffer_print_page_header(struct trace_buffer *buffer, struct trace_seq *s)
536 {
537         struct buffer_data_page field;
538
539         trace_seq_printf(s, "\tfield: u64 timestamp;\t"
540                          "offset:0;\tsize:%u;\tsigned:%u;\n",
541                          (unsigned int)sizeof(field.time_stamp),
542                          (unsigned int)is_signed_type(u64));
543
544         trace_seq_printf(s, "\tfield: local_t commit;\t"
545                          "offset:%u;\tsize:%u;\tsigned:%u;\n",
546                          (unsigned int)offsetof(typeof(field), commit),
547                          (unsigned int)sizeof(field.commit),
548                          (unsigned int)is_signed_type(long));
549
550         trace_seq_printf(s, "\tfield: int overwrite;\t"
551                          "offset:%u;\tsize:%u;\tsigned:%u;\n",
552                          (unsigned int)offsetof(typeof(field), commit),
553                          1,
554                          (unsigned int)is_signed_type(long));
555
556         trace_seq_printf(s, "\tfield: char data;\t"
557                          "offset:%u;\tsize:%u;\tsigned:%u;\n",
558                          (unsigned int)offsetof(typeof(field), data),
559                          (unsigned int)buffer->subbuf_size,
560                          (unsigned int)is_signed_type(char));
561
562         return !trace_seq_has_overflowed(s);
563 }
564
565 static inline void rb_time_read(rb_time_t *t, u64 *ret)
566 {
567         *ret = local64_read(&t->time);
568 }
569 static void rb_time_set(rb_time_t *t, u64 val)
570 {
571         local64_set(&t->time, val);
572 }
573
574 /*
575  * Enable this to make sure that the event passed to
576  * ring_buffer_event_time_stamp() is not committed and also
577  * is on the buffer that it passed in.
578  */
579 //#define RB_VERIFY_EVENT
580 #ifdef RB_VERIFY_EVENT
581 static struct list_head *rb_list_head(struct list_head *list);
582 static void verify_event(struct ring_buffer_per_cpu *cpu_buffer,
583                          void *event)
584 {
585         struct buffer_page *page = cpu_buffer->commit_page;
586         struct buffer_page *tail_page = READ_ONCE(cpu_buffer->tail_page);
587         struct list_head *next;
588         long commit, write;
589         unsigned long addr = (unsigned long)event;
590         bool done = false;
591         int stop = 0;
592
593         /* Make sure the event exists and is not committed yet */
594         do {
595                 if (page == tail_page || WARN_ON_ONCE(stop++ > 100))
596                         done = true;
597                 commit = local_read(&page->page->commit);
598                 write = local_read(&page->write);
599                 if (addr >= (unsigned long)&page->page->data[commit] &&
600                     addr < (unsigned long)&page->page->data[write])
601                         return;
602
603                 next = rb_list_head(page->list.next);
604                 page = list_entry(next, struct buffer_page, list);
605         } while (!done);
606         WARN_ON_ONCE(1);
607 }
608 #else
609 static inline void verify_event(struct ring_buffer_per_cpu *cpu_buffer,
610                          void *event)
611 {
612 }
613 #endif
614
615 /*
616  * The absolute time stamp drops the 5 MSBs and some clocks may
617  * require them. The rb_fix_abs_ts() will take a previous full
618  * time stamp, and add the 5 MSB of that time stamp on to the
619  * saved absolute time stamp. Then they are compared in case of
620  * the unlikely event that the latest time stamp incremented
621  * the 5 MSB.
622  */
623 static inline u64 rb_fix_abs_ts(u64 abs, u64 save_ts)
624 {
625         if (save_ts & TS_MSB) {
626                 abs |= save_ts & TS_MSB;
627                 /* Check for overflow */
628                 if (unlikely(abs < save_ts))
629                         abs += 1ULL << 59;
630         }
631         return abs;
632 }
633
634 static inline u64 rb_time_stamp(struct trace_buffer *buffer);
635
636 /**
637  * ring_buffer_event_time_stamp - return the event's current time stamp
638  * @buffer: The buffer that the event is on
639  * @event: the event to get the time stamp of
640  *
641  * Note, this must be called after @event is reserved, and before it is
642  * committed to the ring buffer. And must be called from the same
643  * context where the event was reserved (normal, softirq, irq, etc).
644  *
645  * Returns the time stamp associated with the current event.
646  * If the event has an extended time stamp, then that is used as
647  * the time stamp to return.
648  * In the highly unlikely case that the event was nested more than
649  * the max nesting, then the write_stamp of the buffer is returned,
650  * otherwise  current time is returned, but that really neither of
651  * the last two cases should ever happen.
652  */
653 u64 ring_buffer_event_time_stamp(struct trace_buffer *buffer,
654                                  struct ring_buffer_event *event)
655 {
656         struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[smp_processor_id()];
657         unsigned int nest;
658         u64 ts;
659
660         /* If the event includes an absolute time, then just use that */
661         if (event->type_len == RINGBUF_TYPE_TIME_STAMP) {
662                 ts = rb_event_time_stamp(event);
663                 return rb_fix_abs_ts(ts, cpu_buffer->tail_page->page->time_stamp);
664         }
665
666         nest = local_read(&cpu_buffer->committing);
667         verify_event(cpu_buffer, event);
668         if (WARN_ON_ONCE(!nest))
669                 goto fail;
670
671         /* Read the current saved nesting level time stamp */
672         if (likely(--nest < MAX_NEST))
673                 return cpu_buffer->event_stamp[nest];
674
675         /* Shouldn't happen, warn if it does */
676         WARN_ONCE(1, "nest (%d) greater than max", nest);
677
678  fail:
679         rb_time_read(&cpu_buffer->write_stamp, &ts);
680
681         return ts;
682 }
683
684 /**
685  * ring_buffer_nr_pages - get the number of buffer pages in the ring buffer
686  * @buffer: The ring_buffer to get the number of pages from
687  * @cpu: The cpu of the ring_buffer to get the number of pages from
688  *
689  * Returns the number of pages used by a per_cpu buffer of the ring buffer.
690  */
691 size_t ring_buffer_nr_pages(struct trace_buffer *buffer, int cpu)
692 {
693         return buffer->buffers[cpu]->nr_pages;
694 }
695
696 /**
697  * ring_buffer_nr_dirty_pages - get the number of used pages in the ring buffer
698  * @buffer: The ring_buffer to get the number of pages from
699  * @cpu: The cpu of the ring_buffer to get the number of pages from
700  *
701  * Returns the number of pages that have content in the ring buffer.
702  */
703 size_t ring_buffer_nr_dirty_pages(struct trace_buffer *buffer, int cpu)
704 {
705         size_t read;
706         size_t lost;
707         size_t cnt;
708
709         read = local_read(&buffer->buffers[cpu]->pages_read);
710         lost = local_read(&buffer->buffers[cpu]->pages_lost);
711         cnt = local_read(&buffer->buffers[cpu]->pages_touched);
712
713         if (WARN_ON_ONCE(cnt < lost))
714                 return 0;
715
716         cnt -= lost;
717
718         /* The reader can read an empty page, but not more than that */
719         if (cnt < read) {
720                 WARN_ON_ONCE(read > cnt + 1);
721                 return 0;
722         }
723
724         return cnt - read;
725 }
726
727 static __always_inline bool full_hit(struct trace_buffer *buffer, int cpu, int full)
728 {
729         struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu];
730         size_t nr_pages;
731         size_t dirty;
732
733         nr_pages = cpu_buffer->nr_pages;
734         if (!nr_pages || !full)
735                 return true;
736
737         /*
738          * Add one as dirty will never equal nr_pages, as the sub-buffer
739          * that the writer is on is not counted as dirty.
740          * This is needed if "buffer_percent" is set to 100.
741          */
742         dirty = ring_buffer_nr_dirty_pages(buffer, cpu) + 1;
743
744         return (dirty * 100) >= (full * nr_pages);
745 }
746
747 /*
748  * rb_wake_up_waiters - wake up tasks waiting for ring buffer input
749  *
750  * Schedules a delayed work to wake up any task that is blocked on the
751  * ring buffer waiters queue.
752  */
753 static void rb_wake_up_waiters(struct irq_work *work)
754 {
755         struct rb_irq_work *rbwork = container_of(work, struct rb_irq_work, work);
756
757         /* For waiters waiting for the first wake up */
758         (void)atomic_fetch_inc_release(&rbwork->seq);
759
760         wake_up_all(&rbwork->waiters);
761         if (rbwork->full_waiters_pending || rbwork->wakeup_full) {
762                 /* Only cpu_buffer sets the above flags */
763                 struct ring_buffer_per_cpu *cpu_buffer =
764                         container_of(rbwork, struct ring_buffer_per_cpu, irq_work);
765
766                 /* Called from interrupt context */
767                 raw_spin_lock(&cpu_buffer->reader_lock);
768                 rbwork->wakeup_full = false;
769                 rbwork->full_waiters_pending = false;
770
771                 /* Waking up all waiters, they will reset the shortest full */
772                 cpu_buffer->shortest_full = 0;
773                 raw_spin_unlock(&cpu_buffer->reader_lock);
774
775                 wake_up_all(&rbwork->full_waiters);
776         }
777 }
778
779 /**
780  * ring_buffer_wake_waiters - wake up any waiters on this ring buffer
781  * @buffer: The ring buffer to wake waiters on
782  * @cpu: The CPU buffer to wake waiters on
783  *
784  * In the case of a file that represents a ring buffer is closing,
785  * it is prudent to wake up any waiters that are on this.
786  */
787 void ring_buffer_wake_waiters(struct trace_buffer *buffer, int cpu)
788 {
789         struct ring_buffer_per_cpu *cpu_buffer;
790         struct rb_irq_work *rbwork;
791
792         if (!buffer)
793                 return;
794
795         if (cpu == RING_BUFFER_ALL_CPUS) {
796
797                 /* Wake up individual ones too. One level recursion */
798                 for_each_buffer_cpu(buffer, cpu)
799                         ring_buffer_wake_waiters(buffer, cpu);
800
801                 rbwork = &buffer->irq_work;
802         } else {
803                 if (WARN_ON_ONCE(!buffer->buffers))
804                         return;
805                 if (WARN_ON_ONCE(cpu >= nr_cpu_ids))
806                         return;
807
808                 cpu_buffer = buffer->buffers[cpu];
809                 /* The CPU buffer may not have been initialized yet */
810                 if (!cpu_buffer)
811                         return;
812                 rbwork = &cpu_buffer->irq_work;
813         }
814
815         /* This can be called in any context */
816         irq_work_queue(&rbwork->work);
817 }
818
819 static bool rb_watermark_hit(struct trace_buffer *buffer, int cpu, int full)
820 {
821         struct ring_buffer_per_cpu *cpu_buffer;
822         bool ret = false;
823
824         /* Reads of all CPUs always waits for any data */
825         if (cpu == RING_BUFFER_ALL_CPUS)
826                 return !ring_buffer_empty(buffer);
827
828         cpu_buffer = buffer->buffers[cpu];
829
830         if (!ring_buffer_empty_cpu(buffer, cpu)) {
831                 unsigned long flags;
832                 bool pagebusy;
833
834                 if (!full)
835                         return true;
836
837                 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
838                 pagebusy = cpu_buffer->reader_page == cpu_buffer->commit_page;
839                 ret = !pagebusy && full_hit(buffer, cpu, full);
840
841                 if (!ret && (!cpu_buffer->shortest_full ||
842                              cpu_buffer->shortest_full > full)) {
843                     cpu_buffer->shortest_full = full;
844                 }
845                 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
846         }
847         return ret;
848 }
849
850 static inline bool
851 rb_wait_cond(struct rb_irq_work *rbwork, struct trace_buffer *buffer,
852              int cpu, int full, ring_buffer_cond_fn cond, void *data)
853 {
854         if (rb_watermark_hit(buffer, cpu, full))
855                 return true;
856
857         if (cond(data))
858                 return true;
859
860         /*
861          * The events can happen in critical sections where
862          * checking a work queue can cause deadlocks.
863          * After adding a task to the queue, this flag is set
864          * only to notify events to try to wake up the queue
865          * using irq_work.
866          *
867          * We don't clear it even if the buffer is no longer
868          * empty. The flag only causes the next event to run
869          * irq_work to do the work queue wake up. The worse
870          * that can happen if we race with !trace_empty() is that
871          * an event will cause an irq_work to try to wake up
872          * an empty queue.
873          *
874          * There's no reason to protect this flag either, as
875          * the work queue and irq_work logic will do the necessary
876          * synchronization for the wake ups. The only thing
877          * that is necessary is that the wake up happens after
878          * a task has been queued. It's OK for spurious wake ups.
879          */
880         if (full)
881                 rbwork->full_waiters_pending = true;
882         else
883                 rbwork->waiters_pending = true;
884
885         return false;
886 }
887
888 struct rb_wait_data {
889         struct rb_irq_work              *irq_work;
890         int                             seq;
891 };
892
893 /*
894  * The default wait condition for ring_buffer_wait() is to just to exit the
895  * wait loop the first time it is woken up.
896  */
897 static bool rb_wait_once(void *data)
898 {
899         struct rb_wait_data *rdata = data;
900         struct rb_irq_work *rbwork = rdata->irq_work;
901
902         return atomic_read_acquire(&rbwork->seq) != rdata->seq;
903 }
904
905 /**
906  * ring_buffer_wait - wait for input to the ring buffer
907  * @buffer: buffer to wait on
908  * @cpu: the cpu buffer to wait on
909  * @full: wait until the percentage of pages are available, if @cpu != RING_BUFFER_ALL_CPUS
910  * @cond: condition function to break out of wait (NULL to run once)
911  * @data: the data to pass to @cond.
912  *
913  * If @cpu == RING_BUFFER_ALL_CPUS then the task will wake up as soon
914  * as data is added to any of the @buffer's cpu buffers. Otherwise
915  * it will wait for data to be added to a specific cpu buffer.
916  */
917 int ring_buffer_wait(struct trace_buffer *buffer, int cpu, int full,
918                      ring_buffer_cond_fn cond, void *data)
919 {
920         struct ring_buffer_per_cpu *cpu_buffer;
921         struct wait_queue_head *waitq;
922         struct rb_irq_work *rbwork;
923         struct rb_wait_data rdata;
924         int ret = 0;
925
926         /*
927          * Depending on what the caller is waiting for, either any
928          * data in any cpu buffer, or a specific buffer, put the
929          * caller on the appropriate wait queue.
930          */
931         if (cpu == RING_BUFFER_ALL_CPUS) {
932                 rbwork = &buffer->irq_work;
933                 /* Full only makes sense on per cpu reads */
934                 full = 0;
935         } else {
936                 if (!cpumask_test_cpu(cpu, buffer->cpumask))
937                         return -ENODEV;
938                 cpu_buffer = buffer->buffers[cpu];
939                 rbwork = &cpu_buffer->irq_work;
940         }
941
942         if (full)
943                 waitq = &rbwork->full_waiters;
944         else
945                 waitq = &rbwork->waiters;
946
947         /* Set up to exit loop as soon as it is woken */
948         if (!cond) {
949                 cond = rb_wait_once;
950                 rdata.irq_work = rbwork;
951                 rdata.seq = atomic_read_acquire(&rbwork->seq);
952                 data = &rdata;
953         }
954
955         ret = wait_event_interruptible((*waitq),
956                                 rb_wait_cond(rbwork, buffer, cpu, full, cond, data));
957
958         return ret;
959 }
960
961 /**
962  * ring_buffer_poll_wait - poll on buffer input
963  * @buffer: buffer to wait on
964  * @cpu: the cpu buffer to wait on
965  * @filp: the file descriptor
966  * @poll_table: The poll descriptor
967  * @full: wait until the percentage of pages are available, if @cpu != RING_BUFFER_ALL_CPUS
968  *
969  * If @cpu == RING_BUFFER_ALL_CPUS then the task will wake up as soon
970  * as data is added to any of the @buffer's cpu buffers. Otherwise
971  * it will wait for data to be added to a specific cpu buffer.
972  *
973  * Returns EPOLLIN | EPOLLRDNORM if data exists in the buffers,
974  * zero otherwise.
975  */
976 __poll_t ring_buffer_poll_wait(struct trace_buffer *buffer, int cpu,
977                           struct file *filp, poll_table *poll_table, int full)
978 {
979         struct ring_buffer_per_cpu *cpu_buffer;
980         struct rb_irq_work *rbwork;
981
982         if (cpu == RING_BUFFER_ALL_CPUS) {
983                 rbwork = &buffer->irq_work;
984                 full = 0;
985         } else {
986                 if (!cpumask_test_cpu(cpu, buffer->cpumask))
987                         return EPOLLERR;
988
989                 cpu_buffer = buffer->buffers[cpu];
990                 rbwork = &cpu_buffer->irq_work;
991         }
992
993         if (full) {
994                 unsigned long flags;
995
996                 poll_wait(filp, &rbwork->full_waiters, poll_table);
997
998                 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
999                 if (!cpu_buffer->shortest_full ||
1000                     cpu_buffer->shortest_full > full)
1001                         cpu_buffer->shortest_full = full;
1002                 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
1003                 if (full_hit(buffer, cpu, full))
1004                         return EPOLLIN | EPOLLRDNORM;
1005                 /*
1006                  * Only allow full_waiters_pending update to be seen after
1007                  * the shortest_full is set. If the writer sees the
1008                  * full_waiters_pending flag set, it will compare the
1009                  * amount in the ring buffer to shortest_full. If the amount
1010                  * in the ring buffer is greater than the shortest_full
1011                  * percent, it will call the irq_work handler to wake up
1012                  * this list. The irq_handler will reset shortest_full
1013                  * back to zero. That's done under the reader_lock, but
1014                  * the below smp_mb() makes sure that the update to
1015                  * full_waiters_pending doesn't leak up into the above.
1016                  */
1017                 smp_mb();
1018                 rbwork->full_waiters_pending = true;
1019                 return 0;
1020         }
1021
1022         poll_wait(filp, &rbwork->waiters, poll_table);
1023         rbwork->waiters_pending = true;
1024
1025         /*
1026          * There's a tight race between setting the waiters_pending and
1027          * checking if the ring buffer is empty.  Once the waiters_pending bit
1028          * is set, the next event will wake the task up, but we can get stuck
1029          * if there's only a single event in.
1030          *
1031          * FIXME: Ideally, we need a memory barrier on the writer side as well,
1032          * but adding a memory barrier to all events will cause too much of a
1033          * performance hit in the fast path.  We only need a memory barrier when
1034          * the buffer goes from empty to having content.  But as this race is
1035          * extremely small, and it's not a problem if another event comes in, we
1036          * will fix it later.
1037          */
1038         smp_mb();
1039
1040         if ((cpu == RING_BUFFER_ALL_CPUS && !ring_buffer_empty(buffer)) ||
1041             (cpu != RING_BUFFER_ALL_CPUS && !ring_buffer_empty_cpu(buffer, cpu)))
1042                 return EPOLLIN | EPOLLRDNORM;
1043         return 0;
1044 }
1045
1046 /* buffer may be either ring_buffer or ring_buffer_per_cpu */
1047 #define RB_WARN_ON(b, cond)                                             \
1048         ({                                                              \
1049                 int _____ret = unlikely(cond);                          \
1050                 if (_____ret) {                                         \
1051                         if (__same_type(*(b), struct ring_buffer_per_cpu)) { \
1052                                 struct ring_buffer_per_cpu *__b =       \
1053                                         (void *)b;                      \
1054                                 atomic_inc(&__b->buffer->record_disabled); \
1055                         } else                                          \
1056                                 atomic_inc(&b->record_disabled);        \
1057                         WARN_ON(1);                                     \
1058                 }                                                       \
1059                 _____ret;                                               \
1060         })
1061
1062 /* Up this if you want to test the TIME_EXTENTS and normalization */
1063 #define DEBUG_SHIFT 0
1064
1065 static inline u64 rb_time_stamp(struct trace_buffer *buffer)
1066 {
1067         u64 ts;
1068
1069         /* Skip retpolines :-( */
1070         if (IS_ENABLED(CONFIG_RETPOLINE) && likely(buffer->clock == trace_clock_local))
1071                 ts = trace_clock_local();
1072         else
1073                 ts = buffer->clock();
1074
1075         /* shift to debug/test normalization and TIME_EXTENTS */
1076         return ts << DEBUG_SHIFT;
1077 }
1078
1079 u64 ring_buffer_time_stamp(struct trace_buffer *buffer)
1080 {
1081         u64 time;
1082
1083         preempt_disable_notrace();
1084         time = rb_time_stamp(buffer);
1085         preempt_enable_notrace();
1086
1087         return time;
1088 }
1089 EXPORT_SYMBOL_GPL(ring_buffer_time_stamp);
1090
1091 void ring_buffer_normalize_time_stamp(struct trace_buffer *buffer,
1092                                       int cpu, u64 *ts)
1093 {
1094         /* Just stupid testing the normalize function and deltas */
1095         *ts >>= DEBUG_SHIFT;
1096 }
1097 EXPORT_SYMBOL_GPL(ring_buffer_normalize_time_stamp);
1098
1099 /*
1100  * Making the ring buffer lockless makes things tricky.
1101  * Although writes only happen on the CPU that they are on,
1102  * and they only need to worry about interrupts. Reads can
1103  * happen on any CPU.
1104  *
1105  * The reader page is always off the ring buffer, but when the
1106  * reader finishes with a page, it needs to swap its page with
1107  * a new one from the buffer. The reader needs to take from
1108  * the head (writes go to the tail). But if a writer is in overwrite
1109  * mode and wraps, it must push the head page forward.
1110  *
1111  * Here lies the problem.
1112  *
1113  * The reader must be careful to replace only the head page, and
1114  * not another one. As described at the top of the file in the
1115  * ASCII art, the reader sets its old page to point to the next
1116  * page after head. It then sets the page after head to point to
1117  * the old reader page. But if the writer moves the head page
1118  * during this operation, the reader could end up with the tail.
1119  *
1120  * We use cmpxchg to help prevent this race. We also do something
1121  * special with the page before head. We set the LSB to 1.
1122  *
1123  * When the writer must push the page forward, it will clear the
1124  * bit that points to the head page, move the head, and then set
1125  * the bit that points to the new head page.
1126  *
1127  * We also don't want an interrupt coming in and moving the head
1128  * page on another writer. Thus we use the second LSB to catch
1129  * that too. Thus:
1130  *
1131  * head->list->prev->next        bit 1          bit 0
1132  *                              -------        -------
1133  * Normal page                     0              0
1134  * Points to head page             0              1
1135  * New head page                   1              0
1136  *
1137  * Note we can not trust the prev pointer of the head page, because:
1138  *
1139  * +----+       +-----+        +-----+
1140  * |    |------>|  T  |---X--->|  N  |
1141  * |    |<------|     |        |     |
1142  * +----+       +-----+        +-----+
1143  *   ^                           ^ |
1144  *   |          +-----+          | |
1145  *   +----------|  R  |----------+ |
1146  *              |     |<-----------+
1147  *              +-----+
1148  *
1149  * Key:  ---X-->  HEAD flag set in pointer
1150  *         T      Tail page
1151  *         R      Reader page
1152  *         N      Next page
1153  *
1154  * (see __rb_reserve_next() to see where this happens)
1155  *
1156  *  What the above shows is that the reader just swapped out
1157  *  the reader page with a page in the buffer, but before it
1158  *  could make the new header point back to the new page added
1159  *  it was preempted by a writer. The writer moved forward onto
1160  *  the new page added by the reader and is about to move forward
1161  *  again.
1162  *
1163  *  You can see, it is legitimate for the previous pointer of
1164  *  the head (or any page) not to point back to itself. But only
1165  *  temporarily.
1166  */
1167
1168 #define RB_PAGE_NORMAL          0UL
1169 #define RB_PAGE_HEAD            1UL
1170 #define RB_PAGE_UPDATE          2UL
1171
1172
1173 #define RB_FLAG_MASK            3UL
1174
1175 /* PAGE_MOVED is not part of the mask */
1176 #define RB_PAGE_MOVED           4UL
1177
1178 /*
1179  * rb_list_head - remove any bit
1180  */
1181 static struct list_head *rb_list_head(struct list_head *list)
1182 {
1183         unsigned long val = (unsigned long)list;
1184
1185         return (struct list_head *)(val & ~RB_FLAG_MASK);
1186 }
1187
1188 /*
1189  * rb_is_head_page - test if the given page is the head page
1190  *
1191  * Because the reader may move the head_page pointer, we can
1192  * not trust what the head page is (it may be pointing to
1193  * the reader page). But if the next page is a header page,
1194  * its flags will be non zero.
1195  */
1196 static inline int
1197 rb_is_head_page(struct buffer_page *page, struct list_head *list)
1198 {
1199         unsigned long val;
1200
1201         val = (unsigned long)list->next;
1202
1203         if ((val & ~RB_FLAG_MASK) != (unsigned long)&page->list)
1204                 return RB_PAGE_MOVED;
1205
1206         return val & RB_FLAG_MASK;
1207 }
1208
1209 /*
1210  * rb_is_reader_page
1211  *
1212  * The unique thing about the reader page, is that, if the
1213  * writer is ever on it, the previous pointer never points
1214  * back to the reader page.
1215  */
1216 static bool rb_is_reader_page(struct buffer_page *page)
1217 {
1218         struct list_head *list = page->list.prev;
1219
1220         return rb_list_head(list->next) != &page->list;
1221 }
1222
1223 /*
1224  * rb_set_list_to_head - set a list_head to be pointing to head.
1225  */
1226 static void rb_set_list_to_head(struct list_head *list)
1227 {
1228         unsigned long *ptr;
1229
1230         ptr = (unsigned long *)&list->next;
1231         *ptr |= RB_PAGE_HEAD;
1232         *ptr &= ~RB_PAGE_UPDATE;
1233 }
1234
1235 /*
1236  * rb_head_page_activate - sets up head page
1237  */
1238 static void rb_head_page_activate(struct ring_buffer_per_cpu *cpu_buffer)
1239 {
1240         struct buffer_page *head;
1241
1242         head = cpu_buffer->head_page;
1243         if (!head)
1244                 return;
1245
1246         /*
1247          * Set the previous list pointer to have the HEAD flag.
1248          */
1249         rb_set_list_to_head(head->list.prev);
1250 }
1251
1252 static void rb_list_head_clear(struct list_head *list)
1253 {
1254         unsigned long *ptr = (unsigned long *)&list->next;
1255
1256         *ptr &= ~RB_FLAG_MASK;
1257 }
1258
1259 /*
1260  * rb_head_page_deactivate - clears head page ptr (for free list)
1261  */
1262 static void
1263 rb_head_page_deactivate(struct ring_buffer_per_cpu *cpu_buffer)
1264 {
1265         struct list_head *hd;
1266
1267         /* Go through the whole list and clear any pointers found. */
1268         rb_list_head_clear(cpu_buffer->pages);
1269
1270         list_for_each(hd, cpu_buffer->pages)
1271                 rb_list_head_clear(hd);
1272 }
1273
1274 static int rb_head_page_set(struct ring_buffer_per_cpu *cpu_buffer,
1275                             struct buffer_page *head,
1276                             struct buffer_page *prev,
1277                             int old_flag, int new_flag)
1278 {
1279         struct list_head *list;
1280         unsigned long val = (unsigned long)&head->list;
1281         unsigned long ret;
1282
1283         list = &prev->list;
1284
1285         val &= ~RB_FLAG_MASK;
1286
1287         ret = cmpxchg((unsigned long *)&list->next,
1288                       val | old_flag, val | new_flag);
1289
1290         /* check if the reader took the page */
1291         if ((ret & ~RB_FLAG_MASK) != val)
1292                 return RB_PAGE_MOVED;
1293
1294         return ret & RB_FLAG_MASK;
1295 }
1296
1297 static int rb_head_page_set_update(struct ring_buffer_per_cpu *cpu_buffer,
1298                                    struct buffer_page *head,
1299                                    struct buffer_page *prev,
1300                                    int old_flag)
1301 {
1302         return rb_head_page_set(cpu_buffer, head, prev,
1303                                 old_flag, RB_PAGE_UPDATE);
1304 }
1305
1306 static int rb_head_page_set_head(struct ring_buffer_per_cpu *cpu_buffer,
1307                                  struct buffer_page *head,
1308                                  struct buffer_page *prev,
1309                                  int old_flag)
1310 {
1311         return rb_head_page_set(cpu_buffer, head, prev,
1312                                 old_flag, RB_PAGE_HEAD);
1313 }
1314
1315 static int rb_head_page_set_normal(struct ring_buffer_per_cpu *cpu_buffer,
1316                                    struct buffer_page *head,
1317                                    struct buffer_page *prev,
1318                                    int old_flag)
1319 {
1320         return rb_head_page_set(cpu_buffer, head, prev,
1321                                 old_flag, RB_PAGE_NORMAL);
1322 }
1323
1324 static inline void rb_inc_page(struct buffer_page **bpage)
1325 {
1326         struct list_head *p = rb_list_head((*bpage)->list.next);
1327
1328         *bpage = list_entry(p, struct buffer_page, list);
1329 }
1330
1331 static struct buffer_page *
1332 rb_set_head_page(struct ring_buffer_per_cpu *cpu_buffer)
1333 {
1334         struct buffer_page *head;
1335         struct buffer_page *page;
1336         struct list_head *list;
1337         int i;
1338
1339         if (RB_WARN_ON(cpu_buffer, !cpu_buffer->head_page))
1340                 return NULL;
1341
1342         /* sanity check */
1343         list = cpu_buffer->pages;
1344         if (RB_WARN_ON(cpu_buffer, rb_list_head(list->prev->next) != list))
1345                 return NULL;
1346
1347         page = head = cpu_buffer->head_page;
1348         /*
1349          * It is possible that the writer moves the header behind
1350          * where we started, and we miss in one loop.
1351          * A second loop should grab the header, but we'll do
1352          * three loops just because I'm paranoid.
1353          */
1354         for (i = 0; i < 3; i++) {
1355                 do {
1356                         if (rb_is_head_page(page, page->list.prev)) {
1357                                 cpu_buffer->head_page = page;
1358                                 return page;
1359                         }
1360                         rb_inc_page(&page);
1361                 } while (page != head);
1362         }
1363
1364         RB_WARN_ON(cpu_buffer, 1);
1365
1366         return NULL;
1367 }
1368
1369 static bool rb_head_page_replace(struct buffer_page *old,
1370                                 struct buffer_page *new)
1371 {
1372         unsigned long *ptr = (unsigned long *)&old->list.prev->next;
1373         unsigned long val;
1374
1375         val = *ptr & ~RB_FLAG_MASK;
1376         val |= RB_PAGE_HEAD;
1377
1378         return try_cmpxchg(ptr, &val, (unsigned long)&new->list);
1379 }
1380
1381 /*
1382  * rb_tail_page_update - move the tail page forward
1383  */
1384 static void rb_tail_page_update(struct ring_buffer_per_cpu *cpu_buffer,
1385                                struct buffer_page *tail_page,
1386                                struct buffer_page *next_page)
1387 {
1388         unsigned long old_entries;
1389         unsigned long old_write;
1390
1391         /*
1392          * The tail page now needs to be moved forward.
1393          *
1394          * We need to reset the tail page, but without messing
1395          * with possible erasing of data brought in by interrupts
1396          * that have moved the tail page and are currently on it.
1397          *
1398          * We add a counter to the write field to denote this.
1399          */
1400         old_write = local_add_return(RB_WRITE_INTCNT, &next_page->write);
1401         old_entries = local_add_return(RB_WRITE_INTCNT, &next_page->entries);
1402
1403         /*
1404          * Just make sure we have seen our old_write and synchronize
1405          * with any interrupts that come in.
1406          */
1407         barrier();
1408
1409         /*
1410          * If the tail page is still the same as what we think
1411          * it is, then it is up to us to update the tail
1412          * pointer.
1413          */
1414         if (tail_page == READ_ONCE(cpu_buffer->tail_page)) {
1415                 /* Zero the write counter */
1416                 unsigned long val = old_write & ~RB_WRITE_MASK;
1417                 unsigned long eval = old_entries & ~RB_WRITE_MASK;
1418
1419                 /*
1420                  * This will only succeed if an interrupt did
1421                  * not come in and change it. In which case, we
1422                  * do not want to modify it.
1423                  *
1424                  * We add (void) to let the compiler know that we do not care
1425                  * about the return value of these functions. We use the
1426                  * cmpxchg to only update if an interrupt did not already
1427                  * do it for us. If the cmpxchg fails, we don't care.
1428                  */
1429                 (void)local_cmpxchg(&next_page->write, old_write, val);
1430                 (void)local_cmpxchg(&next_page->entries, old_entries, eval);
1431
1432                 /*
1433                  * No need to worry about races with clearing out the commit.
1434                  * it only can increment when a commit takes place. But that
1435                  * only happens in the outer most nested commit.
1436                  */
1437                 local_set(&next_page->page->commit, 0);
1438
1439                 /* Either we update tail_page or an interrupt does */
1440                 if (try_cmpxchg(&cpu_buffer->tail_page, &tail_page, next_page))
1441                         local_inc(&cpu_buffer->pages_touched);
1442         }
1443 }
1444
1445 static void rb_check_bpage(struct ring_buffer_per_cpu *cpu_buffer,
1446                           struct buffer_page *bpage)
1447 {
1448         unsigned long val = (unsigned long)bpage;
1449
1450         RB_WARN_ON(cpu_buffer, val & RB_FLAG_MASK);
1451 }
1452
1453 /**
1454  * rb_check_pages - integrity check of buffer pages
1455  * @cpu_buffer: CPU buffer with pages to test
1456  *
1457  * As a safety measure we check to make sure the data pages have not
1458  * been corrupted.
1459  */
1460 static void rb_check_pages(struct ring_buffer_per_cpu *cpu_buffer)
1461 {
1462         struct list_head *head = rb_list_head(cpu_buffer->pages);
1463         struct list_head *tmp;
1464
1465         if (RB_WARN_ON(cpu_buffer,
1466                         rb_list_head(rb_list_head(head->next)->prev) != head))
1467                 return;
1468
1469         if (RB_WARN_ON(cpu_buffer,
1470                         rb_list_head(rb_list_head(head->prev)->next) != head))
1471                 return;
1472
1473         for (tmp = rb_list_head(head->next); tmp != head; tmp = rb_list_head(tmp->next)) {
1474                 if (RB_WARN_ON(cpu_buffer,
1475                                 rb_list_head(rb_list_head(tmp->next)->prev) != tmp))
1476                         return;
1477
1478                 if (RB_WARN_ON(cpu_buffer,
1479                                 rb_list_head(rb_list_head(tmp->prev)->next) != tmp))
1480                         return;
1481         }
1482 }
1483
1484 static int __rb_allocate_pages(struct ring_buffer_per_cpu *cpu_buffer,
1485                 long nr_pages, struct list_head *pages)
1486 {
1487         struct buffer_page *bpage, *tmp;
1488         bool user_thread = current->mm != NULL;
1489         gfp_t mflags;
1490         long i;
1491
1492         /*
1493          * Check if the available memory is there first.
1494          * Note, si_mem_available() only gives us a rough estimate of available
1495          * memory. It may not be accurate. But we don't care, we just want
1496          * to prevent doing any allocation when it is obvious that it is
1497          * not going to succeed.
1498          */
1499         i = si_mem_available();
1500         if (i < nr_pages)
1501                 return -ENOMEM;
1502
1503         /*
1504          * __GFP_RETRY_MAYFAIL flag makes sure that the allocation fails
1505          * gracefully without invoking oom-killer and the system is not
1506          * destabilized.
1507          */
1508         mflags = GFP_KERNEL | __GFP_RETRY_MAYFAIL;
1509
1510         /*
1511          * If a user thread allocates too much, and si_mem_available()
1512          * reports there's enough memory, even though there is not.
1513          * Make sure the OOM killer kills this thread. This can happen
1514          * even with RETRY_MAYFAIL because another task may be doing
1515          * an allocation after this task has taken all memory.
1516          * This is the task the OOM killer needs to take out during this
1517          * loop, even if it was triggered by an allocation somewhere else.
1518          */
1519         if (user_thread)
1520                 set_current_oom_origin();
1521         for (i = 0; i < nr_pages; i++) {
1522                 struct page *page;
1523
1524                 bpage = kzalloc_node(ALIGN(sizeof(*bpage), cache_line_size()),
1525                                     mflags, cpu_to_node(cpu_buffer->cpu));
1526                 if (!bpage)
1527                         goto free_pages;
1528
1529                 rb_check_bpage(cpu_buffer, bpage);
1530
1531                 list_add(&bpage->list, pages);
1532
1533                 page = alloc_pages_node(cpu_to_node(cpu_buffer->cpu), mflags,
1534                                         cpu_buffer->buffer->subbuf_order);
1535                 if (!page)
1536                         goto free_pages;
1537                 bpage->page = page_address(page);
1538                 bpage->order = cpu_buffer->buffer->subbuf_order;
1539                 rb_init_page(bpage->page);
1540
1541                 if (user_thread && fatal_signal_pending(current))
1542                         goto free_pages;
1543         }
1544         if (user_thread)
1545                 clear_current_oom_origin();
1546
1547         return 0;
1548
1549 free_pages:
1550         list_for_each_entry_safe(bpage, tmp, pages, list) {
1551                 list_del_init(&bpage->list);
1552                 free_buffer_page(bpage);
1553         }
1554         if (user_thread)
1555                 clear_current_oom_origin();
1556
1557         return -ENOMEM;
1558 }
1559
1560 static int rb_allocate_pages(struct ring_buffer_per_cpu *cpu_buffer,
1561                              unsigned long nr_pages)
1562 {
1563         LIST_HEAD(pages);
1564
1565         WARN_ON(!nr_pages);
1566
1567         if (__rb_allocate_pages(cpu_buffer, nr_pages, &pages))
1568                 return -ENOMEM;
1569
1570         /*
1571          * The ring buffer page list is a circular list that does not
1572          * start and end with a list head. All page list items point to
1573          * other pages.
1574          */
1575         cpu_buffer->pages = pages.next;
1576         list_del(&pages);
1577
1578         cpu_buffer->nr_pages = nr_pages;
1579
1580         rb_check_pages(cpu_buffer);
1581
1582         return 0;
1583 }
1584
1585 static struct ring_buffer_per_cpu *
1586 rb_allocate_cpu_buffer(struct trace_buffer *buffer, long nr_pages, int cpu)
1587 {
1588         struct ring_buffer_per_cpu *cpu_buffer;
1589         struct buffer_page *bpage;
1590         struct page *page;
1591         int ret;
1592
1593         cpu_buffer = kzalloc_node(ALIGN(sizeof(*cpu_buffer), cache_line_size()),
1594                                   GFP_KERNEL, cpu_to_node(cpu));
1595         if (!cpu_buffer)
1596                 return NULL;
1597
1598         cpu_buffer->cpu = cpu;
1599         cpu_buffer->buffer = buffer;
1600         raw_spin_lock_init(&cpu_buffer->reader_lock);
1601         lockdep_set_class(&cpu_buffer->reader_lock, buffer->reader_lock_key);
1602         cpu_buffer->lock = (arch_spinlock_t)__ARCH_SPIN_LOCK_UNLOCKED;
1603         INIT_WORK(&cpu_buffer->update_pages_work, update_pages_handler);
1604         init_completion(&cpu_buffer->update_done);
1605         init_irq_work(&cpu_buffer->irq_work.work, rb_wake_up_waiters);
1606         init_waitqueue_head(&cpu_buffer->irq_work.waiters);
1607         init_waitqueue_head(&cpu_buffer->irq_work.full_waiters);
1608
1609         bpage = kzalloc_node(ALIGN(sizeof(*bpage), cache_line_size()),
1610                             GFP_KERNEL, cpu_to_node(cpu));
1611         if (!bpage)
1612                 goto fail_free_buffer;
1613
1614         rb_check_bpage(cpu_buffer, bpage);
1615
1616         cpu_buffer->reader_page = bpage;
1617
1618         page = alloc_pages_node(cpu_to_node(cpu), GFP_KERNEL, cpu_buffer->buffer->subbuf_order);
1619         if (!page)
1620                 goto fail_free_reader;
1621         bpage->page = page_address(page);
1622         rb_init_page(bpage->page);
1623
1624         INIT_LIST_HEAD(&cpu_buffer->reader_page->list);
1625         INIT_LIST_HEAD(&cpu_buffer->new_pages);
1626
1627         ret = rb_allocate_pages(cpu_buffer, nr_pages);
1628         if (ret < 0)
1629                 goto fail_free_reader;
1630
1631         cpu_buffer->head_page
1632                 = list_entry(cpu_buffer->pages, struct buffer_page, list);
1633         cpu_buffer->tail_page = cpu_buffer->commit_page = cpu_buffer->head_page;
1634
1635         rb_head_page_activate(cpu_buffer);
1636
1637         return cpu_buffer;
1638
1639  fail_free_reader:
1640         free_buffer_page(cpu_buffer->reader_page);
1641
1642  fail_free_buffer:
1643         kfree(cpu_buffer);
1644         return NULL;
1645 }
1646
1647 static void rb_free_cpu_buffer(struct ring_buffer_per_cpu *cpu_buffer)
1648 {
1649         struct list_head *head = cpu_buffer->pages;
1650         struct buffer_page *bpage, *tmp;
1651
1652         irq_work_sync(&cpu_buffer->irq_work.work);
1653
1654         free_buffer_page(cpu_buffer->reader_page);
1655
1656         if (head) {
1657                 rb_head_page_deactivate(cpu_buffer);
1658
1659                 list_for_each_entry_safe(bpage, tmp, head, list) {
1660                         list_del_init(&bpage->list);
1661                         free_buffer_page(bpage);
1662                 }
1663                 bpage = list_entry(head, struct buffer_page, list);
1664                 free_buffer_page(bpage);
1665         }
1666
1667         free_page((unsigned long)cpu_buffer->free_page);
1668
1669         kfree(cpu_buffer);
1670 }
1671
1672 /**
1673  * __ring_buffer_alloc - allocate a new ring_buffer
1674  * @size: the size in bytes per cpu that is needed.
1675  * @flags: attributes to set for the ring buffer.
1676  * @key: ring buffer reader_lock_key.
1677  *
1678  * Currently the only flag that is available is the RB_FL_OVERWRITE
1679  * flag. This flag means that the buffer will overwrite old data
1680  * when the buffer wraps. If this flag is not set, the buffer will
1681  * drop data when the tail hits the head.
1682  */
1683 struct trace_buffer *__ring_buffer_alloc(unsigned long size, unsigned flags,
1684                                         struct lock_class_key *key)
1685 {
1686         struct trace_buffer *buffer;
1687         long nr_pages;
1688         int bsize;
1689         int cpu;
1690         int ret;
1691
1692         /* keep it in its own cache line */
1693         buffer = kzalloc(ALIGN(sizeof(*buffer), cache_line_size()),
1694                          GFP_KERNEL);
1695         if (!buffer)
1696                 return NULL;
1697
1698         if (!zalloc_cpumask_var(&buffer->cpumask, GFP_KERNEL))
1699                 goto fail_free_buffer;
1700
1701         /* Default buffer page size - one system page */
1702         buffer->subbuf_order = 0;
1703         buffer->subbuf_size = PAGE_SIZE - BUF_PAGE_HDR_SIZE;
1704
1705         /* Max payload is buffer page size - header (8bytes) */
1706         buffer->max_data_size = buffer->subbuf_size - (sizeof(u32) * 2);
1707
1708         nr_pages = DIV_ROUND_UP(size, buffer->subbuf_size);
1709         buffer->flags = flags;
1710         buffer->clock = trace_clock_local;
1711         buffer->reader_lock_key = key;
1712
1713         init_irq_work(&buffer->irq_work.work, rb_wake_up_waiters);
1714         init_waitqueue_head(&buffer->irq_work.waiters);
1715
1716         /* need at least two pages */
1717         if (nr_pages < 2)
1718                 nr_pages = 2;
1719
1720         buffer->cpus = nr_cpu_ids;
1721
1722         bsize = sizeof(void *) * nr_cpu_ids;
1723         buffer->buffers = kzalloc(ALIGN(bsize, cache_line_size()),
1724                                   GFP_KERNEL);
1725         if (!buffer->buffers)
1726                 goto fail_free_cpumask;
1727
1728         cpu = raw_smp_processor_id();
1729         cpumask_set_cpu(cpu, buffer->cpumask);
1730         buffer->buffers[cpu] = rb_allocate_cpu_buffer(buffer, nr_pages, cpu);
1731         if (!buffer->buffers[cpu])
1732                 goto fail_free_buffers;
1733
1734         ret = cpuhp_state_add_instance(CPUHP_TRACE_RB_PREPARE, &buffer->node);
1735         if (ret < 0)
1736                 goto fail_free_buffers;
1737
1738         mutex_init(&buffer->mutex);
1739
1740         return buffer;
1741
1742  fail_free_buffers:
1743         for_each_buffer_cpu(buffer, cpu) {
1744                 if (buffer->buffers[cpu])
1745                         rb_free_cpu_buffer(buffer->buffers[cpu]);
1746         }
1747         kfree(buffer->buffers);
1748
1749  fail_free_cpumask:
1750         free_cpumask_var(buffer->cpumask);
1751
1752  fail_free_buffer:
1753         kfree(buffer);
1754         return NULL;
1755 }
1756 EXPORT_SYMBOL_GPL(__ring_buffer_alloc);
1757
1758 /**
1759  * ring_buffer_free - free a ring buffer.
1760  * @buffer: the buffer to free.
1761  */
1762 void
1763 ring_buffer_free(struct trace_buffer *buffer)
1764 {
1765         int cpu;
1766
1767         cpuhp_state_remove_instance(CPUHP_TRACE_RB_PREPARE, &buffer->node);
1768
1769         irq_work_sync(&buffer->irq_work.work);
1770
1771         for_each_buffer_cpu(buffer, cpu)
1772                 rb_free_cpu_buffer(buffer->buffers[cpu]);
1773
1774         kfree(buffer->buffers);
1775         free_cpumask_var(buffer->cpumask);
1776
1777         kfree(buffer);
1778 }
1779 EXPORT_SYMBOL_GPL(ring_buffer_free);
1780
1781 void ring_buffer_set_clock(struct trace_buffer *buffer,
1782                            u64 (*clock)(void))
1783 {
1784         buffer->clock = clock;
1785 }
1786
1787 void ring_buffer_set_time_stamp_abs(struct trace_buffer *buffer, bool abs)
1788 {
1789         buffer->time_stamp_abs = abs;
1790 }
1791
1792 bool ring_buffer_time_stamp_abs(struct trace_buffer *buffer)
1793 {
1794         return buffer->time_stamp_abs;
1795 }
1796
1797 static void rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer);
1798
1799 static inline unsigned long rb_page_entries(struct buffer_page *bpage)
1800 {
1801         return local_read(&bpage->entries) & RB_WRITE_MASK;
1802 }
1803
1804 static inline unsigned long rb_page_write(struct buffer_page *bpage)
1805 {
1806         return local_read(&bpage->write) & RB_WRITE_MASK;
1807 }
1808
1809 static bool
1810 rb_remove_pages(struct ring_buffer_per_cpu *cpu_buffer, unsigned long nr_pages)
1811 {
1812         struct list_head *tail_page, *to_remove, *next_page;
1813         struct buffer_page *to_remove_page, *tmp_iter_page;
1814         struct buffer_page *last_page, *first_page;
1815         unsigned long nr_removed;
1816         unsigned long head_bit;
1817         int page_entries;
1818
1819         head_bit = 0;
1820
1821         raw_spin_lock_irq(&cpu_buffer->reader_lock);
1822         atomic_inc(&cpu_buffer->record_disabled);
1823         /*
1824          * We don't race with the readers since we have acquired the reader
1825          * lock. We also don't race with writers after disabling recording.
1826          * This makes it easy to figure out the first and the last page to be
1827          * removed from the list. We unlink all the pages in between including
1828          * the first and last pages. This is done in a busy loop so that we
1829          * lose the least number of traces.
1830          * The pages are freed after we restart recording and unlock readers.
1831          */
1832         tail_page = &cpu_buffer->tail_page->list;
1833
1834         /*
1835          * tail page might be on reader page, we remove the next page
1836          * from the ring buffer
1837          */
1838         if (cpu_buffer->tail_page == cpu_buffer->reader_page)
1839                 tail_page = rb_list_head(tail_page->next);
1840         to_remove = tail_page;
1841
1842         /* start of pages to remove */
1843         first_page = list_entry(rb_list_head(to_remove->next),
1844                                 struct buffer_page, list);
1845
1846         for (nr_removed = 0; nr_removed < nr_pages; nr_removed++) {
1847                 to_remove = rb_list_head(to_remove)->next;
1848                 head_bit |= (unsigned long)to_remove & RB_PAGE_HEAD;
1849         }
1850         /* Read iterators need to reset themselves when some pages removed */
1851         cpu_buffer->pages_removed += nr_removed;
1852
1853         next_page = rb_list_head(to_remove)->next;
1854
1855         /*
1856          * Now we remove all pages between tail_page and next_page.
1857          * Make sure that we have head_bit value preserved for the
1858          * next page
1859          */
1860         tail_page->next = (struct list_head *)((unsigned long)next_page |
1861                                                 head_bit);
1862         next_page = rb_list_head(next_page);
1863         next_page->prev = tail_page;
1864
1865         /* make sure pages points to a valid page in the ring buffer */
1866         cpu_buffer->pages = next_page;
1867
1868         /* update head page */
1869         if (head_bit)
1870                 cpu_buffer->head_page = list_entry(next_page,
1871                                                 struct buffer_page, list);
1872
1873         /* pages are removed, resume tracing and then free the pages */
1874         atomic_dec(&cpu_buffer->record_disabled);
1875         raw_spin_unlock_irq(&cpu_buffer->reader_lock);
1876
1877         RB_WARN_ON(cpu_buffer, list_empty(cpu_buffer->pages));
1878
1879         /* last buffer page to remove */
1880         last_page = list_entry(rb_list_head(to_remove), struct buffer_page,
1881                                 list);
1882         tmp_iter_page = first_page;
1883
1884         do {
1885                 cond_resched();
1886
1887                 to_remove_page = tmp_iter_page;
1888                 rb_inc_page(&tmp_iter_page);
1889
1890                 /* update the counters */
1891                 page_entries = rb_page_entries(to_remove_page);
1892                 if (page_entries) {
1893                         /*
1894                          * If something was added to this page, it was full
1895                          * since it is not the tail page. So we deduct the
1896                          * bytes consumed in ring buffer from here.
1897                          * Increment overrun to account for the lost events.
1898                          */
1899                         local_add(page_entries, &cpu_buffer->overrun);
1900                         local_sub(rb_page_commit(to_remove_page), &cpu_buffer->entries_bytes);
1901                         local_inc(&cpu_buffer->pages_lost);
1902                 }
1903
1904                 /*
1905                  * We have already removed references to this list item, just
1906                  * free up the buffer_page and its page
1907                  */
1908                 free_buffer_page(to_remove_page);
1909                 nr_removed--;
1910
1911         } while (to_remove_page != last_page);
1912
1913         RB_WARN_ON(cpu_buffer, nr_removed);
1914
1915         return nr_removed == 0;
1916 }
1917
1918 static bool
1919 rb_insert_pages(struct ring_buffer_per_cpu *cpu_buffer)
1920 {
1921         struct list_head *pages = &cpu_buffer->new_pages;
1922         unsigned long flags;
1923         bool success;
1924         int retries;
1925
1926         /* Can be called at early boot up, where interrupts must not been enabled */
1927         raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
1928         /*
1929          * We are holding the reader lock, so the reader page won't be swapped
1930          * in the ring buffer. Now we are racing with the writer trying to
1931          * move head page and the tail page.
1932          * We are going to adapt the reader page update process where:
1933          * 1. We first splice the start and end of list of new pages between
1934          *    the head page and its previous page.
1935          * 2. We cmpxchg the prev_page->next to point from head page to the
1936          *    start of new pages list.
1937          * 3. Finally, we update the head->prev to the end of new list.
1938          *
1939          * We will try this process 10 times, to make sure that we don't keep
1940          * spinning.
1941          */
1942         retries = 10;
1943         success = false;
1944         while (retries--) {
1945                 struct list_head *head_page, *prev_page;
1946                 struct list_head *last_page, *first_page;
1947                 struct list_head *head_page_with_bit;
1948                 struct buffer_page *hpage = rb_set_head_page(cpu_buffer);
1949
1950                 if (!hpage)
1951                         break;
1952                 head_page = &hpage->list;
1953                 prev_page = head_page->prev;
1954
1955                 first_page = pages->next;
1956                 last_page  = pages->prev;
1957
1958                 head_page_with_bit = (struct list_head *)
1959                                      ((unsigned long)head_page | RB_PAGE_HEAD);
1960
1961                 last_page->next = head_page_with_bit;
1962                 first_page->prev = prev_page;
1963
1964                 /* caution: head_page_with_bit gets updated on cmpxchg failure */
1965                 if (try_cmpxchg(&prev_page->next,
1966                                 &head_page_with_bit, first_page)) {
1967                         /*
1968                          * yay, we replaced the page pointer to our new list,
1969                          * now, we just have to update to head page's prev
1970                          * pointer to point to end of list
1971                          */
1972                         head_page->prev = last_page;
1973                         success = true;
1974                         break;
1975                 }
1976         }
1977
1978         if (success)
1979                 INIT_LIST_HEAD(pages);
1980         /*
1981          * If we weren't successful in adding in new pages, warn and stop
1982          * tracing
1983          */
1984         RB_WARN_ON(cpu_buffer, !success);
1985         raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
1986
1987         /* free pages if they weren't inserted */
1988         if (!success) {
1989                 struct buffer_page *bpage, *tmp;
1990                 list_for_each_entry_safe(bpage, tmp, &cpu_buffer->new_pages,
1991                                          list) {
1992                         list_del_init(&bpage->list);
1993                         free_buffer_page(bpage);
1994                 }
1995         }
1996         return success;
1997 }
1998
1999 static void rb_update_pages(struct ring_buffer_per_cpu *cpu_buffer)
2000 {
2001         bool success;
2002
2003         if (cpu_buffer->nr_pages_to_update > 0)
2004                 success = rb_insert_pages(cpu_buffer);
2005         else
2006                 success = rb_remove_pages(cpu_buffer,
2007                                         -cpu_buffer->nr_pages_to_update);
2008
2009         if (success)
2010                 cpu_buffer->nr_pages += cpu_buffer->nr_pages_to_update;
2011 }
2012
2013 static void update_pages_handler(struct work_struct *work)
2014 {
2015         struct ring_buffer_per_cpu *cpu_buffer = container_of(work,
2016                         struct ring_buffer_per_cpu, update_pages_work);
2017         rb_update_pages(cpu_buffer);
2018         complete(&cpu_buffer->update_done);
2019 }
2020
2021 /**
2022  * ring_buffer_resize - resize the ring buffer
2023  * @buffer: the buffer to resize.
2024  * @size: the new size.
2025  * @cpu_id: the cpu buffer to resize
2026  *
2027  * Minimum size is 2 * buffer->subbuf_size.
2028  *
2029  * Returns 0 on success and < 0 on failure.
2030  */
2031 int ring_buffer_resize(struct trace_buffer *buffer, unsigned long size,
2032                         int cpu_id)
2033 {
2034         struct ring_buffer_per_cpu *cpu_buffer;
2035         unsigned long nr_pages;
2036         int cpu, err;
2037
2038         /*
2039          * Always succeed at resizing a non-existent buffer:
2040          */
2041         if (!buffer)
2042                 return 0;
2043
2044         /* Make sure the requested buffer exists */
2045         if (cpu_id != RING_BUFFER_ALL_CPUS &&
2046             !cpumask_test_cpu(cpu_id, buffer->cpumask))
2047                 return 0;
2048
2049         nr_pages = DIV_ROUND_UP(size, buffer->subbuf_size);
2050
2051         /* we need a minimum of two pages */
2052         if (nr_pages < 2)
2053                 nr_pages = 2;
2054
2055         /* prevent another thread from changing buffer sizes */
2056         mutex_lock(&buffer->mutex);
2057         atomic_inc(&buffer->resizing);
2058
2059         if (cpu_id == RING_BUFFER_ALL_CPUS) {
2060                 /*
2061                  * Don't succeed if resizing is disabled, as a reader might be
2062                  * manipulating the ring buffer and is expecting a sane state while
2063                  * this is true.
2064                  */
2065                 for_each_buffer_cpu(buffer, cpu) {
2066                         cpu_buffer = buffer->buffers[cpu];
2067                         if (atomic_read(&cpu_buffer->resize_disabled)) {
2068                                 err = -EBUSY;
2069                                 goto out_err_unlock;
2070                         }
2071                 }
2072
2073                 /* calculate the pages to update */
2074                 for_each_buffer_cpu(buffer, cpu) {
2075                         cpu_buffer = buffer->buffers[cpu];
2076
2077                         cpu_buffer->nr_pages_to_update = nr_pages -
2078                                                         cpu_buffer->nr_pages;
2079                         /*
2080                          * nothing more to do for removing pages or no update
2081                          */
2082                         if (cpu_buffer->nr_pages_to_update <= 0)
2083                                 continue;
2084                         /*
2085                          * to add pages, make sure all new pages can be
2086                          * allocated without receiving ENOMEM
2087                          */
2088                         INIT_LIST_HEAD(&cpu_buffer->new_pages);
2089                         if (__rb_allocate_pages(cpu_buffer, cpu_buffer->nr_pages_to_update,
2090                                                 &cpu_buffer->new_pages)) {
2091                                 /* not enough memory for new pages */
2092                                 err = -ENOMEM;
2093                                 goto out_err;
2094                         }
2095
2096                         cond_resched();
2097                 }
2098
2099                 cpus_read_lock();
2100                 /*
2101                  * Fire off all the required work handlers
2102                  * We can't schedule on offline CPUs, but it's not necessary
2103                  * since we can change their buffer sizes without any race.
2104                  */
2105                 for_each_buffer_cpu(buffer, cpu) {
2106                         cpu_buffer = buffer->buffers[cpu];
2107                         if (!cpu_buffer->nr_pages_to_update)
2108                                 continue;
2109
2110                         /* Can't run something on an offline CPU. */
2111                         if (!cpu_online(cpu)) {
2112                                 rb_update_pages(cpu_buffer);
2113                                 cpu_buffer->nr_pages_to_update = 0;
2114                         } else {
2115                                 /* Run directly if possible. */
2116                                 migrate_disable();
2117                                 if (cpu != smp_processor_id()) {
2118                                         migrate_enable();
2119                                         schedule_work_on(cpu,
2120                                                          &cpu_buffer->update_pages_work);
2121                                 } else {
2122                                         update_pages_handler(&cpu_buffer->update_pages_work);
2123                                         migrate_enable();
2124                                 }
2125                         }
2126                 }
2127
2128                 /* wait for all the updates to complete */
2129                 for_each_buffer_cpu(buffer, cpu) {
2130                         cpu_buffer = buffer->buffers[cpu];
2131                         if (!cpu_buffer->nr_pages_to_update)
2132                                 continue;
2133
2134                         if (cpu_online(cpu))
2135                                 wait_for_completion(&cpu_buffer->update_done);
2136                         cpu_buffer->nr_pages_to_update = 0;
2137                 }
2138
2139                 cpus_read_unlock();
2140         } else {
2141                 cpu_buffer = buffer->buffers[cpu_id];
2142
2143                 if (nr_pages == cpu_buffer->nr_pages)
2144                         goto out;
2145
2146                 /*
2147                  * Don't succeed if resizing is disabled, as a reader might be
2148                  * manipulating the ring buffer and is expecting a sane state while
2149                  * this is true.
2150                  */
2151                 if (atomic_read(&cpu_buffer->resize_disabled)) {
2152                         err = -EBUSY;
2153                         goto out_err_unlock;
2154                 }
2155
2156                 cpu_buffer->nr_pages_to_update = nr_pages -
2157                                                 cpu_buffer->nr_pages;
2158
2159                 INIT_LIST_HEAD(&cpu_buffer->new_pages);
2160                 if (cpu_buffer->nr_pages_to_update > 0 &&
2161                         __rb_allocate_pages(cpu_buffer, cpu_buffer->nr_pages_to_update,
2162                                             &cpu_buffer->new_pages)) {
2163                         err = -ENOMEM;
2164                         goto out_err;
2165                 }
2166
2167                 cpus_read_lock();
2168
2169                 /* Can't run something on an offline CPU. */
2170                 if (!cpu_online(cpu_id))
2171                         rb_update_pages(cpu_buffer);
2172                 else {
2173                         /* Run directly if possible. */
2174                         migrate_disable();
2175                         if (cpu_id == smp_processor_id()) {
2176                                 rb_update_pages(cpu_buffer);
2177                                 migrate_enable();
2178                         } else {
2179                                 migrate_enable();
2180                                 schedule_work_on(cpu_id,
2181                                                  &cpu_buffer->update_pages_work);
2182                                 wait_for_completion(&cpu_buffer->update_done);
2183                         }
2184                 }
2185
2186                 cpu_buffer->nr_pages_to_update = 0;
2187                 cpus_read_unlock();
2188         }
2189
2190  out:
2191         /*
2192          * The ring buffer resize can happen with the ring buffer
2193          * enabled, so that the update disturbs the tracing as little
2194          * as possible. But if the buffer is disabled, we do not need
2195          * to worry about that, and we can take the time to verify
2196          * that the buffer is not corrupt.
2197          */
2198         if (atomic_read(&buffer->record_disabled)) {
2199                 atomic_inc(&buffer->record_disabled);
2200                 /*
2201                  * Even though the buffer was disabled, we must make sure
2202                  * that it is truly disabled before calling rb_check_pages.
2203                  * There could have been a race between checking
2204                  * record_disable and incrementing it.
2205                  */
2206                 synchronize_rcu();
2207                 for_each_buffer_cpu(buffer, cpu) {
2208                         cpu_buffer = buffer->buffers[cpu];
2209                         rb_check_pages(cpu_buffer);
2210                 }
2211                 atomic_dec(&buffer->record_disabled);
2212         }
2213
2214         atomic_dec(&buffer->resizing);
2215         mutex_unlock(&buffer->mutex);
2216         return 0;
2217
2218  out_err:
2219         for_each_buffer_cpu(buffer, cpu) {
2220                 struct buffer_page *bpage, *tmp;
2221
2222                 cpu_buffer = buffer->buffers[cpu];
2223                 cpu_buffer->nr_pages_to_update = 0;
2224
2225                 if (list_empty(&cpu_buffer->new_pages))
2226                         continue;
2227
2228                 list_for_each_entry_safe(bpage, tmp, &cpu_buffer->new_pages,
2229                                         list) {
2230                         list_del_init(&bpage->list);
2231                         free_buffer_page(bpage);
2232                 }
2233         }
2234  out_err_unlock:
2235         atomic_dec(&buffer->resizing);
2236         mutex_unlock(&buffer->mutex);
2237         return err;
2238 }
2239 EXPORT_SYMBOL_GPL(ring_buffer_resize);
2240
2241 void ring_buffer_change_overwrite(struct trace_buffer *buffer, int val)
2242 {
2243         mutex_lock(&buffer->mutex);
2244         if (val)
2245                 buffer->flags |= RB_FL_OVERWRITE;
2246         else
2247                 buffer->flags &= ~RB_FL_OVERWRITE;
2248         mutex_unlock(&buffer->mutex);
2249 }
2250 EXPORT_SYMBOL_GPL(ring_buffer_change_overwrite);
2251
2252 static __always_inline void *__rb_page_index(struct buffer_page *bpage, unsigned index)
2253 {
2254         return bpage->page->data + index;
2255 }
2256
2257 static __always_inline struct ring_buffer_event *
2258 rb_reader_event(struct ring_buffer_per_cpu *cpu_buffer)
2259 {
2260         return __rb_page_index(cpu_buffer->reader_page,
2261                                cpu_buffer->reader_page->read);
2262 }
2263
2264 static struct ring_buffer_event *
2265 rb_iter_head_event(struct ring_buffer_iter *iter)
2266 {
2267         struct ring_buffer_event *event;
2268         struct buffer_page *iter_head_page = iter->head_page;
2269         unsigned long commit;
2270         unsigned length;
2271
2272         if (iter->head != iter->next_event)
2273                 return iter->event;
2274
2275         /*
2276          * When the writer goes across pages, it issues a cmpxchg which
2277          * is a mb(), which will synchronize with the rmb here.
2278          * (see rb_tail_page_update() and __rb_reserve_next())
2279          */
2280         commit = rb_page_commit(iter_head_page);
2281         smp_rmb();
2282
2283         /* An event needs to be at least 8 bytes in size */
2284         if (iter->head > commit - 8)
2285                 goto reset;
2286
2287         event = __rb_page_index(iter_head_page, iter->head);
2288         length = rb_event_length(event);
2289
2290         /*
2291          * READ_ONCE() doesn't work on functions and we don't want the
2292          * compiler doing any crazy optimizations with length.
2293          */
2294         barrier();
2295
2296         if ((iter->head + length) > commit || length > iter->event_size)
2297                 /* Writer corrupted the read? */
2298                 goto reset;
2299
2300         memcpy(iter->event, event, length);
2301         /*
2302          * If the page stamp is still the same after this rmb() then the
2303          * event was safely copied without the writer entering the page.
2304          */
2305         smp_rmb();
2306
2307         /* Make sure the page didn't change since we read this */
2308         if (iter->page_stamp != iter_head_page->page->time_stamp ||
2309             commit > rb_page_commit(iter_head_page))
2310                 goto reset;
2311
2312         iter->next_event = iter->head + length;
2313         return iter->event;
2314  reset:
2315         /* Reset to the beginning */
2316         iter->page_stamp = iter->read_stamp = iter->head_page->page->time_stamp;
2317         iter->head = 0;
2318         iter->next_event = 0;
2319         iter->missed_events = 1;
2320         return NULL;
2321 }
2322
2323 /* Size is determined by what has been committed */
2324 static __always_inline unsigned rb_page_size(struct buffer_page *bpage)
2325 {
2326         return rb_page_commit(bpage);
2327 }
2328
2329 static __always_inline unsigned
2330 rb_commit_index(struct ring_buffer_per_cpu *cpu_buffer)
2331 {
2332         return rb_page_commit(cpu_buffer->commit_page);
2333 }
2334
2335 static __always_inline unsigned
2336 rb_event_index(struct ring_buffer_per_cpu *cpu_buffer, struct ring_buffer_event *event)
2337 {
2338         unsigned long addr = (unsigned long)event;
2339
2340         addr &= (PAGE_SIZE << cpu_buffer->buffer->subbuf_order) - 1;
2341
2342         return addr - BUF_PAGE_HDR_SIZE;
2343 }
2344
2345 static void rb_inc_iter(struct ring_buffer_iter *iter)
2346 {
2347         struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer;
2348
2349         /*
2350          * The iterator could be on the reader page (it starts there).
2351          * But the head could have moved, since the reader was
2352          * found. Check for this case and assign the iterator
2353          * to the head page instead of next.
2354          */
2355         if (iter->head_page == cpu_buffer->reader_page)
2356                 iter->head_page = rb_set_head_page(cpu_buffer);
2357         else
2358                 rb_inc_page(&iter->head_page);
2359
2360         iter->page_stamp = iter->read_stamp = iter->head_page->page->time_stamp;
2361         iter->head = 0;
2362         iter->next_event = 0;
2363 }
2364
2365 /*
2366  * rb_handle_head_page - writer hit the head page
2367  *
2368  * Returns: +1 to retry page
2369  *           0 to continue
2370  *          -1 on error
2371  */
2372 static int
2373 rb_handle_head_page(struct ring_buffer_per_cpu *cpu_buffer,
2374                     struct buffer_page *tail_page,
2375                     struct buffer_page *next_page)
2376 {
2377         struct buffer_page *new_head;
2378         int entries;
2379         int type;
2380         int ret;
2381
2382         entries = rb_page_entries(next_page);
2383
2384         /*
2385          * The hard part is here. We need to move the head
2386          * forward, and protect against both readers on
2387          * other CPUs and writers coming in via interrupts.
2388          */
2389         type = rb_head_page_set_update(cpu_buffer, next_page, tail_page,
2390                                        RB_PAGE_HEAD);
2391
2392         /*
2393          * type can be one of four:
2394          *  NORMAL - an interrupt already moved it for us
2395          *  HEAD   - we are the first to get here.
2396          *  UPDATE - we are the interrupt interrupting
2397          *           a current move.
2398          *  MOVED  - a reader on another CPU moved the next
2399          *           pointer to its reader page. Give up
2400          *           and try again.
2401          */
2402
2403         switch (type) {
2404         case RB_PAGE_HEAD:
2405                 /*
2406                  * We changed the head to UPDATE, thus
2407                  * it is our responsibility to update
2408                  * the counters.
2409                  */
2410                 local_add(entries, &cpu_buffer->overrun);
2411                 local_sub(rb_page_commit(next_page), &cpu_buffer->entries_bytes);
2412                 local_inc(&cpu_buffer->pages_lost);
2413
2414                 /*
2415                  * The entries will be zeroed out when we move the
2416                  * tail page.
2417                  */
2418
2419                 /* still more to do */
2420                 break;
2421
2422         case RB_PAGE_UPDATE:
2423                 /*
2424                  * This is an interrupt that interrupt the
2425                  * previous update. Still more to do.
2426                  */
2427                 break;
2428         case RB_PAGE_NORMAL:
2429                 /*
2430                  * An interrupt came in before the update
2431                  * and processed this for us.
2432                  * Nothing left to do.
2433                  */
2434                 return 1;
2435         case RB_PAGE_MOVED:
2436                 /*
2437                  * The reader is on another CPU and just did
2438                  * a swap with our next_page.
2439                  * Try again.
2440                  */
2441                 return 1;
2442         default:
2443                 RB_WARN_ON(cpu_buffer, 1); /* WTF??? */
2444                 return -1;
2445         }
2446
2447         /*
2448          * Now that we are here, the old head pointer is
2449          * set to UPDATE. This will keep the reader from
2450          * swapping the head page with the reader page.
2451          * The reader (on another CPU) will spin till
2452          * we are finished.
2453          *
2454          * We just need to protect against interrupts
2455          * doing the job. We will set the next pointer
2456          * to HEAD. After that, we set the old pointer
2457          * to NORMAL, but only if it was HEAD before.
2458          * otherwise we are an interrupt, and only
2459          * want the outer most commit to reset it.
2460          */
2461         new_head = next_page;
2462         rb_inc_page(&new_head);
2463
2464         ret = rb_head_page_set_head(cpu_buffer, new_head, next_page,
2465                                     RB_PAGE_NORMAL);
2466
2467         /*
2468          * Valid returns are:
2469          *  HEAD   - an interrupt came in and already set it.
2470          *  NORMAL - One of two things:
2471          *            1) We really set it.
2472          *            2) A bunch of interrupts came in and moved
2473          *               the page forward again.
2474          */
2475         switch (ret) {
2476         case RB_PAGE_HEAD:
2477         case RB_PAGE_NORMAL:
2478                 /* OK */
2479                 break;
2480         default:
2481                 RB_WARN_ON(cpu_buffer, 1);
2482                 return -1;
2483         }
2484
2485         /*
2486          * It is possible that an interrupt came in,
2487          * set the head up, then more interrupts came in
2488          * and moved it again. When we get back here,
2489          * the page would have been set to NORMAL but we
2490          * just set it back to HEAD.
2491          *
2492          * How do you detect this? Well, if that happened
2493          * the tail page would have moved.
2494          */
2495         if (ret == RB_PAGE_NORMAL) {
2496                 struct buffer_page *buffer_tail_page;
2497
2498                 buffer_tail_page = READ_ONCE(cpu_buffer->tail_page);
2499                 /*
2500                  * If the tail had moved passed next, then we need
2501                  * to reset the pointer.
2502                  */
2503                 if (buffer_tail_page != tail_page &&
2504                     buffer_tail_page != next_page)
2505                         rb_head_page_set_normal(cpu_buffer, new_head,
2506                                                 next_page,
2507                                                 RB_PAGE_HEAD);
2508         }
2509
2510         /*
2511          * If this was the outer most commit (the one that
2512          * changed the original pointer from HEAD to UPDATE),
2513          * then it is up to us to reset it to NORMAL.
2514          */
2515         if (type == RB_PAGE_HEAD) {
2516                 ret = rb_head_page_set_normal(cpu_buffer, next_page,
2517                                               tail_page,
2518                                               RB_PAGE_UPDATE);
2519                 if (RB_WARN_ON(cpu_buffer,
2520                                ret != RB_PAGE_UPDATE))
2521                         return -1;
2522         }
2523
2524         return 0;
2525 }
2526
2527 static inline void
2528 rb_reset_tail(struct ring_buffer_per_cpu *cpu_buffer,
2529               unsigned long tail, struct rb_event_info *info)
2530 {
2531         unsigned long bsize = READ_ONCE(cpu_buffer->buffer->subbuf_size);
2532         struct buffer_page *tail_page = info->tail_page;
2533         struct ring_buffer_event *event;
2534         unsigned long length = info->length;
2535
2536         /*
2537          * Only the event that crossed the page boundary
2538          * must fill the old tail_page with padding.
2539          */
2540         if (tail >= bsize) {
2541                 /*
2542                  * If the page was filled, then we still need
2543                  * to update the real_end. Reset it to zero
2544                  * and the reader will ignore it.
2545                  */
2546                 if (tail == bsize)
2547                         tail_page->real_end = 0;
2548
2549                 local_sub(length, &tail_page->write);
2550                 return;
2551         }
2552
2553         event = __rb_page_index(tail_page, tail);
2554
2555         /*
2556          * Save the original length to the meta data.
2557          * This will be used by the reader to add lost event
2558          * counter.
2559          */
2560         tail_page->real_end = tail;
2561
2562         /*
2563          * If this event is bigger than the minimum size, then
2564          * we need to be careful that we don't subtract the
2565          * write counter enough to allow another writer to slip
2566          * in on this page.
2567          * We put in a discarded commit instead, to make sure
2568          * that this space is not used again, and this space will
2569          * not be accounted into 'entries_bytes'.
2570          *
2571          * If we are less than the minimum size, we don't need to
2572          * worry about it.
2573          */
2574         if (tail > (bsize - RB_EVNT_MIN_SIZE)) {
2575                 /* No room for any events */
2576
2577                 /* Mark the rest of the page with padding */
2578                 rb_event_set_padding(event);
2579
2580                 /* Make sure the padding is visible before the write update */
2581                 smp_wmb();
2582
2583                 /* Set the write back to the previous setting */
2584                 local_sub(length, &tail_page->write);
2585                 return;
2586         }
2587
2588         /* Put in a discarded event */
2589         event->array[0] = (bsize - tail) - RB_EVNT_HDR_SIZE;
2590         event->type_len = RINGBUF_TYPE_PADDING;
2591         /* time delta must be non zero */
2592         event->time_delta = 1;
2593
2594         /* account for padding bytes */
2595         local_add(bsize - tail, &cpu_buffer->entries_bytes);
2596
2597         /* Make sure the padding is visible before the tail_page->write update */
2598         smp_wmb();
2599
2600         /* Set write to end of buffer */
2601         length = (tail + length) - bsize;
2602         local_sub(length, &tail_page->write);
2603 }
2604
2605 static inline void rb_end_commit(struct ring_buffer_per_cpu *cpu_buffer);
2606
2607 /*
2608  * This is the slow path, force gcc not to inline it.
2609  */
2610 static noinline struct ring_buffer_event *
2611 rb_move_tail(struct ring_buffer_per_cpu *cpu_buffer,
2612              unsigned long tail, struct rb_event_info *info)
2613 {
2614         struct buffer_page *tail_page = info->tail_page;
2615         struct buffer_page *commit_page = cpu_buffer->commit_page;
2616         struct trace_buffer *buffer = cpu_buffer->buffer;
2617         struct buffer_page *next_page;
2618         int ret;
2619
2620         next_page = tail_page;
2621
2622         rb_inc_page(&next_page);
2623
2624         /*
2625          * If for some reason, we had an interrupt storm that made
2626          * it all the way around the buffer, bail, and warn
2627          * about it.
2628          */
2629         if (unlikely(next_page == commit_page)) {
2630                 local_inc(&cpu_buffer->commit_overrun);
2631                 goto out_reset;
2632         }
2633
2634         /*
2635          * This is where the fun begins!
2636          *
2637          * We are fighting against races between a reader that
2638          * could be on another CPU trying to swap its reader
2639          * page with the buffer head.
2640          *
2641          * We are also fighting against interrupts coming in and
2642          * moving the head or tail on us as well.
2643          *
2644          * If the next page is the head page then we have filled
2645          * the buffer, unless the commit page is still on the
2646          * reader page.
2647          */
2648         if (rb_is_head_page(next_page, &tail_page->list)) {
2649
2650                 /*
2651                  * If the commit is not on the reader page, then
2652                  * move the header page.
2653                  */
2654                 if (!rb_is_reader_page(cpu_buffer->commit_page)) {
2655                         /*
2656                          * If we are not in overwrite mode,
2657                          * this is easy, just stop here.
2658                          */
2659                         if (!(buffer->flags & RB_FL_OVERWRITE)) {
2660                                 local_inc(&cpu_buffer->dropped_events);
2661                                 goto out_reset;
2662                         }
2663
2664                         ret = rb_handle_head_page(cpu_buffer,
2665                                                   tail_page,
2666                                                   next_page);
2667                         if (ret < 0)
2668                                 goto out_reset;
2669                         if (ret)
2670                                 goto out_again;
2671                 } else {
2672                         /*
2673                          * We need to be careful here too. The
2674                          * commit page could still be on the reader
2675                          * page. We could have a small buffer, and
2676                          * have filled up the buffer with events
2677                          * from interrupts and such, and wrapped.
2678                          *
2679                          * Note, if the tail page is also on the
2680                          * reader_page, we let it move out.
2681                          */
2682                         if (unlikely((cpu_buffer->commit_page !=
2683                                       cpu_buffer->tail_page) &&
2684                                      (cpu_buffer->commit_page ==
2685                                       cpu_buffer->reader_page))) {
2686                                 local_inc(&cpu_buffer->commit_overrun);
2687                                 goto out_reset;
2688                         }
2689                 }
2690         }
2691
2692         rb_tail_page_update(cpu_buffer, tail_page, next_page);
2693
2694  out_again:
2695
2696         rb_reset_tail(cpu_buffer, tail, info);
2697
2698         /* Commit what we have for now. */
2699         rb_end_commit(cpu_buffer);
2700         /* rb_end_commit() decs committing */
2701         local_inc(&cpu_buffer->committing);
2702
2703         /* fail and let the caller try again */
2704         return ERR_PTR(-EAGAIN);
2705
2706  out_reset:
2707         /* reset write */
2708         rb_reset_tail(cpu_buffer, tail, info);
2709
2710         return NULL;
2711 }
2712
2713 /* Slow path */
2714 static struct ring_buffer_event *
2715 rb_add_time_stamp(struct ring_buffer_per_cpu *cpu_buffer,
2716                   struct ring_buffer_event *event, u64 delta, bool abs)
2717 {
2718         if (abs)
2719                 event->type_len = RINGBUF_TYPE_TIME_STAMP;
2720         else
2721                 event->type_len = RINGBUF_TYPE_TIME_EXTEND;
2722
2723         /* Not the first event on the page, or not delta? */
2724         if (abs || rb_event_index(cpu_buffer, event)) {
2725                 event->time_delta = delta & TS_MASK;
2726                 event->array[0] = delta >> TS_SHIFT;
2727         } else {
2728                 /* nope, just zero it */
2729                 event->time_delta = 0;
2730                 event->array[0] = 0;
2731         }
2732
2733         return skip_time_extend(event);
2734 }
2735
2736 #ifndef CONFIG_HAVE_UNSTABLE_SCHED_CLOCK
2737 static inline bool sched_clock_stable(void)
2738 {
2739         return true;
2740 }
2741 #endif
2742
2743 static void
2744 rb_check_timestamp(struct ring_buffer_per_cpu *cpu_buffer,
2745                    struct rb_event_info *info)
2746 {
2747         u64 write_stamp;
2748
2749         WARN_ONCE(1, "Delta way too big! %llu ts=%llu before=%llu after=%llu write stamp=%llu\n%s",
2750                   (unsigned long long)info->delta,
2751                   (unsigned long long)info->ts,
2752                   (unsigned long long)info->before,
2753                   (unsigned long long)info->after,
2754                   (unsigned long long)({rb_time_read(&cpu_buffer->write_stamp, &write_stamp); write_stamp;}),
2755                   sched_clock_stable() ? "" :
2756                   "If you just came from a suspend/resume,\n"
2757                   "please switch to the trace global clock:\n"
2758                   "  echo global > /sys/kernel/tracing/trace_clock\n"
2759                   "or add trace_clock=global to the kernel command line\n");
2760 }
2761
2762 static void rb_add_timestamp(struct ring_buffer_per_cpu *cpu_buffer,
2763                                       struct ring_buffer_event **event,
2764                                       struct rb_event_info *info,
2765                                       u64 *delta,
2766                                       unsigned int *length)
2767 {
2768         bool abs = info->add_timestamp &
2769                 (RB_ADD_STAMP_FORCE | RB_ADD_STAMP_ABSOLUTE);
2770
2771         if (unlikely(info->delta > (1ULL << 59))) {
2772                 /*
2773                  * Some timers can use more than 59 bits, and when a timestamp
2774                  * is added to the buffer, it will lose those bits.
2775                  */
2776                 if (abs && (info->ts & TS_MSB)) {
2777                         info->delta &= ABS_TS_MASK;
2778
2779                 /* did the clock go backwards */
2780                 } else if (info->before == info->after && info->before > info->ts) {
2781                         /* not interrupted */
2782                         static int once;
2783
2784                         /*
2785                          * This is possible with a recalibrating of the TSC.
2786                          * Do not produce a call stack, but just report it.
2787                          */
2788                         if (!once) {
2789                                 once++;
2790                                 pr_warn("Ring buffer clock went backwards: %llu -> %llu\n",
2791                                         info->before, info->ts);
2792                         }
2793                 } else
2794                         rb_check_timestamp(cpu_buffer, info);
2795                 if (!abs)
2796                         info->delta = 0;
2797         }
2798         *event = rb_add_time_stamp(cpu_buffer, *event, info->delta, abs);
2799         *length -= RB_LEN_TIME_EXTEND;
2800         *delta = 0;
2801 }
2802
2803 /**
2804  * rb_update_event - update event type and data
2805  * @cpu_buffer: The per cpu buffer of the @event
2806  * @event: the event to update
2807  * @info: The info to update the @event with (contains length and delta)
2808  *
2809  * Update the type and data fields of the @event. The length
2810  * is the actual size that is written to the ring buffer,
2811  * and with this, we can determine what to place into the
2812  * data field.
2813  */
2814 static void
2815 rb_update_event(struct ring_buffer_per_cpu *cpu_buffer,
2816                 struct ring_buffer_event *event,
2817                 struct rb_event_info *info)
2818 {
2819         unsigned length = info->length;
2820         u64 delta = info->delta;
2821         unsigned int nest = local_read(&cpu_buffer->committing) - 1;
2822
2823         if (!WARN_ON_ONCE(nest >= MAX_NEST))
2824                 cpu_buffer->event_stamp[nest] = info->ts;
2825
2826         /*
2827          * If we need to add a timestamp, then we
2828          * add it to the start of the reserved space.
2829          */
2830         if (unlikely(info->add_timestamp))
2831                 rb_add_timestamp(cpu_buffer, &event, info, &delta, &length);
2832
2833         event->time_delta = delta;
2834         length -= RB_EVNT_HDR_SIZE;
2835         if (length > RB_MAX_SMALL_DATA || RB_FORCE_8BYTE_ALIGNMENT) {
2836                 event->type_len = 0;
2837                 event->array[0] = length;
2838         } else
2839                 event->type_len = DIV_ROUND_UP(length, RB_ALIGNMENT);
2840 }
2841
2842 static unsigned rb_calculate_event_length(unsigned length)
2843 {
2844         struct ring_buffer_event event; /* Used only for sizeof array */
2845
2846         /* zero length can cause confusions */
2847         if (!length)
2848                 length++;
2849
2850         if (length > RB_MAX_SMALL_DATA || RB_FORCE_8BYTE_ALIGNMENT)
2851                 length += sizeof(event.array[0]);
2852
2853         length += RB_EVNT_HDR_SIZE;
2854         length = ALIGN(length, RB_ARCH_ALIGNMENT);
2855
2856         /*
2857          * In case the time delta is larger than the 27 bits for it
2858          * in the header, we need to add a timestamp. If another
2859          * event comes in when trying to discard this one to increase
2860          * the length, then the timestamp will be added in the allocated
2861          * space of this event. If length is bigger than the size needed
2862          * for the TIME_EXTEND, then padding has to be used. The events
2863          * length must be either RB_LEN_TIME_EXTEND, or greater than or equal
2864          * to RB_LEN_TIME_EXTEND + 8, as 8 is the minimum size for padding.
2865          * As length is a multiple of 4, we only need to worry if it
2866          * is 12 (RB_LEN_TIME_EXTEND + 4).
2867          */
2868         if (length == RB_LEN_TIME_EXTEND + RB_ALIGNMENT)
2869                 length += RB_ALIGNMENT;
2870
2871         return length;
2872 }
2873
2874 static inline bool
2875 rb_try_to_discard(struct ring_buffer_per_cpu *cpu_buffer,
2876                   struct ring_buffer_event *event)
2877 {
2878         unsigned long new_index, old_index;
2879         struct buffer_page *bpage;
2880         unsigned long addr;
2881
2882         new_index = rb_event_index(cpu_buffer, event);
2883         old_index = new_index + rb_event_ts_length(event);
2884         addr = (unsigned long)event;
2885         addr &= ~((PAGE_SIZE << cpu_buffer->buffer->subbuf_order) - 1);
2886
2887         bpage = READ_ONCE(cpu_buffer->tail_page);
2888
2889         /*
2890          * Make sure the tail_page is still the same and
2891          * the next write location is the end of this event
2892          */
2893         if (bpage->page == (void *)addr && rb_page_write(bpage) == old_index) {
2894                 unsigned long write_mask =
2895                         local_read(&bpage->write) & ~RB_WRITE_MASK;
2896                 unsigned long event_length = rb_event_length(event);
2897
2898                 /*
2899                  * For the before_stamp to be different than the write_stamp
2900                  * to make sure that the next event adds an absolute
2901                  * value and does not rely on the saved write stamp, which
2902                  * is now going to be bogus.
2903                  *
2904                  * By setting the before_stamp to zero, the next event
2905                  * is not going to use the write_stamp and will instead
2906                  * create an absolute timestamp. This means there's no
2907                  * reason to update the wirte_stamp!
2908                  */
2909                 rb_time_set(&cpu_buffer->before_stamp, 0);
2910
2911                 /*
2912                  * If an event were to come in now, it would see that the
2913                  * write_stamp and the before_stamp are different, and assume
2914                  * that this event just added itself before updating
2915                  * the write stamp. The interrupting event will fix the
2916                  * write stamp for us, and use an absolute timestamp.
2917                  */
2918
2919                 /*
2920                  * This is on the tail page. It is possible that
2921                  * a write could come in and move the tail page
2922                  * and write to the next page. That is fine
2923                  * because we just shorten what is on this page.
2924                  */
2925                 old_index += write_mask;
2926                 new_index += write_mask;
2927
2928                 /* caution: old_index gets updated on cmpxchg failure */
2929                 if (local_try_cmpxchg(&bpage->write, &old_index, new_index)) {
2930                         /* update counters */
2931                         local_sub(event_length, &cpu_buffer->entries_bytes);
2932                         return true;
2933                 }
2934         }
2935
2936         /* could not discard */
2937         return false;
2938 }
2939
2940 static void rb_start_commit(struct ring_buffer_per_cpu *cpu_buffer)
2941 {
2942         local_inc(&cpu_buffer->committing);
2943         local_inc(&cpu_buffer->commits);
2944 }
2945
2946 static __always_inline void
2947 rb_set_commit_to_write(struct ring_buffer_per_cpu *cpu_buffer)
2948 {
2949         unsigned long max_count;
2950
2951         /*
2952          * We only race with interrupts and NMIs on this CPU.
2953          * If we own the commit event, then we can commit
2954          * all others that interrupted us, since the interruptions
2955          * are in stack format (they finish before they come
2956          * back to us). This allows us to do a simple loop to
2957          * assign the commit to the tail.
2958          */
2959  again:
2960         max_count = cpu_buffer->nr_pages * 100;
2961
2962         while (cpu_buffer->commit_page != READ_ONCE(cpu_buffer->tail_page)) {
2963                 if (RB_WARN_ON(cpu_buffer, !(--max_count)))
2964                         return;
2965                 if (RB_WARN_ON(cpu_buffer,
2966                                rb_is_reader_page(cpu_buffer->tail_page)))
2967                         return;
2968                 /*
2969                  * No need for a memory barrier here, as the update
2970                  * of the tail_page did it for this page.
2971                  */
2972                 local_set(&cpu_buffer->commit_page->page->commit,
2973                           rb_page_write(cpu_buffer->commit_page));
2974                 rb_inc_page(&cpu_buffer->commit_page);
2975                 /* add barrier to keep gcc from optimizing too much */
2976                 barrier();
2977         }
2978         while (rb_commit_index(cpu_buffer) !=
2979                rb_page_write(cpu_buffer->commit_page)) {
2980
2981                 /* Make sure the readers see the content of what is committed. */
2982                 smp_wmb();
2983                 local_set(&cpu_buffer->commit_page->page->commit,
2984                           rb_page_write(cpu_buffer->commit_page));
2985                 RB_WARN_ON(cpu_buffer,
2986                            local_read(&cpu_buffer->commit_page->page->commit) &
2987                            ~RB_WRITE_MASK);
2988                 barrier();
2989         }
2990
2991         /* again, keep gcc from optimizing */
2992         barrier();
2993
2994         /*
2995          * If an interrupt came in just after the first while loop
2996          * and pushed the tail page forward, we will be left with
2997          * a dangling commit that will never go forward.
2998          */
2999         if (unlikely(cpu_buffer->commit_page != READ_ONCE(cpu_buffer->tail_page)))
3000                 goto again;
3001 }
3002
3003 static __always_inline void rb_end_commit(struct ring_buffer_per_cpu *cpu_buffer)
3004 {
3005         unsigned long commits;
3006
3007         if (RB_WARN_ON(cpu_buffer,
3008                        !local_read(&cpu_buffer->committing)))
3009                 return;
3010
3011  again:
3012         commits = local_read(&cpu_buffer->commits);
3013         /* synchronize with interrupts */
3014         barrier();
3015         if (local_read(&cpu_buffer->committing) == 1)
3016                 rb_set_commit_to_write(cpu_buffer);
3017
3018         local_dec(&cpu_buffer->committing);
3019
3020         /* synchronize with interrupts */
3021         barrier();
3022
3023         /*
3024          * Need to account for interrupts coming in between the
3025          * updating of the commit page and the clearing of the
3026          * committing counter.
3027          */
3028         if (unlikely(local_read(&cpu_buffer->commits) != commits) &&
3029             !local_read(&cpu_buffer->committing)) {
3030                 local_inc(&cpu_buffer->committing);
3031                 goto again;
3032         }
3033 }
3034
3035 static inline void rb_event_discard(struct ring_buffer_event *event)
3036 {
3037         if (extended_time(event))
3038                 event = skip_time_extend(event);
3039
3040         /* array[0] holds the actual length for the discarded event */
3041         event->array[0] = rb_event_data_length(event) - RB_EVNT_HDR_SIZE;
3042         event->type_len = RINGBUF_TYPE_PADDING;
3043         /* time delta must be non zero */
3044         if (!event->time_delta)
3045                 event->time_delta = 1;
3046 }
3047
3048 static void rb_commit(struct ring_buffer_per_cpu *cpu_buffer)
3049 {
3050         local_inc(&cpu_buffer->entries);
3051         rb_end_commit(cpu_buffer);
3052 }
3053
3054 static __always_inline void
3055 rb_wakeups(struct trace_buffer *buffer, struct ring_buffer_per_cpu *cpu_buffer)
3056 {
3057         if (buffer->irq_work.waiters_pending) {
3058                 buffer->irq_work.waiters_pending = false;
3059                 /* irq_work_queue() supplies it's own memory barriers */
3060                 irq_work_queue(&buffer->irq_work.work);
3061         }
3062
3063         if (cpu_buffer->irq_work.waiters_pending) {
3064                 cpu_buffer->irq_work.waiters_pending = false;
3065                 /* irq_work_queue() supplies it's own memory barriers */
3066                 irq_work_queue(&cpu_buffer->irq_work.work);
3067         }
3068
3069         if (cpu_buffer->last_pages_touch == local_read(&cpu_buffer->pages_touched))
3070                 return;
3071
3072         if (cpu_buffer->reader_page == cpu_buffer->commit_page)
3073                 return;
3074
3075         if (!cpu_buffer->irq_work.full_waiters_pending)
3076                 return;
3077
3078         cpu_buffer->last_pages_touch = local_read(&cpu_buffer->pages_touched);
3079
3080         if (!full_hit(buffer, cpu_buffer->cpu, cpu_buffer->shortest_full))
3081                 return;
3082
3083         cpu_buffer->irq_work.wakeup_full = true;
3084         cpu_buffer->irq_work.full_waiters_pending = false;
3085         /* irq_work_queue() supplies it's own memory barriers */
3086         irq_work_queue(&cpu_buffer->irq_work.work);
3087 }
3088
3089 #ifdef CONFIG_RING_BUFFER_RECORD_RECURSION
3090 # define do_ring_buffer_record_recursion()      \
3091         do_ftrace_record_recursion(_THIS_IP_, _RET_IP_)
3092 #else
3093 # define do_ring_buffer_record_recursion() do { } while (0)
3094 #endif
3095
3096 /*
3097  * The lock and unlock are done within a preempt disable section.
3098  * The current_context per_cpu variable can only be modified
3099  * by the current task between lock and unlock. But it can
3100  * be modified more than once via an interrupt. To pass this
3101  * information from the lock to the unlock without having to
3102  * access the 'in_interrupt()' functions again (which do show
3103  * a bit of overhead in something as critical as function tracing,
3104  * we use a bitmask trick.
3105  *
3106  *  bit 1 =  NMI context
3107  *  bit 2 =  IRQ context
3108  *  bit 3 =  SoftIRQ context
3109  *  bit 4 =  normal context.
3110  *
3111  * This works because this is the order of contexts that can
3112  * preempt other contexts. A SoftIRQ never preempts an IRQ
3113  * context.
3114  *
3115  * When the context is determined, the corresponding bit is
3116  * checked and set (if it was set, then a recursion of that context
3117  * happened).
3118  *
3119  * On unlock, we need to clear this bit. To do so, just subtract
3120  * 1 from the current_context and AND it to itself.
3121  *
3122  * (binary)
3123  *  101 - 1 = 100
3124  *  101 & 100 = 100 (clearing bit zero)
3125  *
3126  *  1010 - 1 = 1001
3127  *  1010 & 1001 = 1000 (clearing bit 1)
3128  *
3129  * The least significant bit can be cleared this way, and it
3130  * just so happens that it is the same bit corresponding to
3131  * the current context.
3132  *
3133  * Now the TRANSITION bit breaks the above slightly. The TRANSITION bit
3134  * is set when a recursion is detected at the current context, and if
3135  * the TRANSITION bit is already set, it will fail the recursion.
3136  * This is needed because there's a lag between the changing of
3137  * interrupt context and updating the preempt count. In this case,
3138  * a false positive will be found. To handle this, one extra recursion
3139  * is allowed, and this is done by the TRANSITION bit. If the TRANSITION
3140  * bit is already set, then it is considered a recursion and the function
3141  * ends. Otherwise, the TRANSITION bit is set, and that bit is returned.
3142  *
3143  * On the trace_recursive_unlock(), the TRANSITION bit will be the first
3144  * to be cleared. Even if it wasn't the context that set it. That is,
3145  * if an interrupt comes in while NORMAL bit is set and the ring buffer
3146  * is called before preempt_count() is updated, since the check will
3147  * be on the NORMAL bit, the TRANSITION bit will then be set. If an
3148  * NMI then comes in, it will set the NMI bit, but when the NMI code
3149  * does the trace_recursive_unlock() it will clear the TRANSITION bit
3150  * and leave the NMI bit set. But this is fine, because the interrupt
3151  * code that set the TRANSITION bit will then clear the NMI bit when it
3152  * calls trace_recursive_unlock(). If another NMI comes in, it will
3153  * set the TRANSITION bit and continue.
3154  *
3155  * Note: The TRANSITION bit only handles a single transition between context.
3156  */
3157
3158 static __always_inline bool
3159 trace_recursive_lock(struct ring_buffer_per_cpu *cpu_buffer)
3160 {
3161         unsigned int val = cpu_buffer->current_context;
3162         int bit = interrupt_context_level();
3163
3164         bit = RB_CTX_NORMAL - bit;
3165
3166         if (unlikely(val & (1 << (bit + cpu_buffer->nest)))) {
3167                 /*
3168                  * It is possible that this was called by transitioning
3169                  * between interrupt context, and preempt_count() has not
3170                  * been updated yet. In this case, use the TRANSITION bit.
3171                  */
3172                 bit = RB_CTX_TRANSITION;
3173                 if (val & (1 << (bit + cpu_buffer->nest))) {
3174                         do_ring_buffer_record_recursion();
3175                         return true;
3176                 }
3177         }
3178
3179         val |= (1 << (bit + cpu_buffer->nest));
3180         cpu_buffer->current_context = val;
3181
3182         return false;
3183 }
3184
3185 static __always_inline void
3186 trace_recursive_unlock(struct ring_buffer_per_cpu *cpu_buffer)
3187 {
3188         cpu_buffer->current_context &=
3189                 cpu_buffer->current_context - (1 << cpu_buffer->nest);
3190 }
3191
3192 /* The recursive locking above uses 5 bits */
3193 #define NESTED_BITS 5
3194
3195 /**
3196  * ring_buffer_nest_start - Allow to trace while nested
3197  * @buffer: The ring buffer to modify
3198  *
3199  * The ring buffer has a safety mechanism to prevent recursion.
3200  * But there may be a case where a trace needs to be done while
3201  * tracing something else. In this case, calling this function
3202  * will allow this function to nest within a currently active
3203  * ring_buffer_lock_reserve().
3204  *
3205  * Call this function before calling another ring_buffer_lock_reserve() and
3206  * call ring_buffer_nest_end() after the nested ring_buffer_unlock_commit().
3207  */
3208 void ring_buffer_nest_start(struct trace_buffer *buffer)
3209 {
3210         struct ring_buffer_per_cpu *cpu_buffer;
3211         int cpu;
3212
3213         /* Enabled by ring_buffer_nest_end() */
3214         preempt_disable_notrace();
3215         cpu = raw_smp_processor_id();
3216         cpu_buffer = buffer->buffers[cpu];
3217         /* This is the shift value for the above recursive locking */
3218         cpu_buffer->nest += NESTED_BITS;
3219 }
3220
3221 /**
3222  * ring_buffer_nest_end - Allow to trace while nested
3223  * @buffer: The ring buffer to modify
3224  *
3225  * Must be called after ring_buffer_nest_start() and after the
3226  * ring_buffer_unlock_commit().
3227  */
3228 void ring_buffer_nest_end(struct trace_buffer *buffer)
3229 {
3230         struct ring_buffer_per_cpu *cpu_buffer;
3231         int cpu;
3232
3233         /* disabled by ring_buffer_nest_start() */
3234         cpu = raw_smp_processor_id();
3235         cpu_buffer = buffer->buffers[cpu];
3236         /* This is the shift value for the above recursive locking */
3237         cpu_buffer->nest -= NESTED_BITS;
3238         preempt_enable_notrace();
3239 }
3240
3241 /**
3242  * ring_buffer_unlock_commit - commit a reserved
3243  * @buffer: The buffer to commit to
3244  *
3245  * This commits the data to the ring buffer, and releases any locks held.
3246  *
3247  * Must be paired with ring_buffer_lock_reserve.
3248  */
3249 int ring_buffer_unlock_commit(struct trace_buffer *buffer)
3250 {
3251         struct ring_buffer_per_cpu *cpu_buffer;
3252         int cpu = raw_smp_processor_id();
3253
3254         cpu_buffer = buffer->buffers[cpu];
3255
3256         rb_commit(cpu_buffer);
3257
3258         rb_wakeups(buffer, cpu_buffer);
3259
3260         trace_recursive_unlock(cpu_buffer);
3261
3262         preempt_enable_notrace();
3263
3264         return 0;
3265 }
3266 EXPORT_SYMBOL_GPL(ring_buffer_unlock_commit);
3267
3268 /* Special value to validate all deltas on a page. */
3269 #define CHECK_FULL_PAGE         1L
3270
3271 #ifdef CONFIG_RING_BUFFER_VALIDATE_TIME_DELTAS
3272
3273 static const char *show_irq_str(int bits)
3274 {
3275         const char *type[] = {
3276                 ".",    // 0
3277                 "s",    // 1
3278                 "h",    // 2
3279                 "Hs",   // 3
3280                 "n",    // 4
3281                 "Ns",   // 5
3282                 "Nh",   // 6
3283                 "NHs",  // 7
3284         };
3285
3286         return type[bits];
3287 }
3288
3289 /* Assume this is an trace event */
3290 static const char *show_flags(struct ring_buffer_event *event)
3291 {
3292         struct trace_entry *entry;
3293         int bits = 0;
3294
3295         if (rb_event_data_length(event) - RB_EVNT_HDR_SIZE < sizeof(*entry))
3296                 return "X";
3297
3298         entry = ring_buffer_event_data(event);
3299
3300         if (entry->flags & TRACE_FLAG_SOFTIRQ)
3301                 bits |= 1;
3302
3303         if (entry->flags & TRACE_FLAG_HARDIRQ)
3304                 bits |= 2;
3305
3306         if (entry->flags & TRACE_FLAG_NMI)
3307                 bits |= 4;
3308
3309         return show_irq_str(bits);
3310 }
3311
3312 static const char *show_irq(struct ring_buffer_event *event)
3313 {
3314         struct trace_entry *entry;
3315
3316         if (rb_event_data_length(event) - RB_EVNT_HDR_SIZE < sizeof(*entry))
3317                 return "";
3318
3319         entry = ring_buffer_event_data(event);
3320         if (entry->flags & TRACE_FLAG_IRQS_OFF)
3321                 return "d";
3322         return "";
3323 }
3324
3325 static const char *show_interrupt_level(void)
3326 {
3327         unsigned long pc = preempt_count();
3328         unsigned char level = 0;
3329
3330         if (pc & SOFTIRQ_OFFSET)
3331                 level |= 1;
3332
3333         if (pc & HARDIRQ_MASK)
3334                 level |= 2;
3335
3336         if (pc & NMI_MASK)
3337                 level |= 4;
3338
3339         return show_irq_str(level);
3340 }
3341
3342 static void dump_buffer_page(struct buffer_data_page *bpage,
3343                              struct rb_event_info *info,
3344                              unsigned long tail)
3345 {
3346         struct ring_buffer_event *event;
3347         u64 ts, delta;
3348         int e;
3349
3350         ts = bpage->time_stamp;
3351         pr_warn("  [%lld] PAGE TIME STAMP\n", ts);
3352
3353         for (e = 0; e < tail; e += rb_event_length(event)) {
3354
3355                 event = (struct ring_buffer_event *)(bpage->data + e);
3356
3357                 switch (event->type_len) {
3358
3359                 case RINGBUF_TYPE_TIME_EXTEND:
3360                         delta = rb_event_time_stamp(event);
3361                         ts += delta;
3362                         pr_warn(" 0x%x: [%lld] delta:%lld TIME EXTEND\n",
3363                                 e, ts, delta);
3364                         break;
3365
3366                 case RINGBUF_TYPE_TIME_STAMP:
3367                         delta = rb_event_time_stamp(event);
3368                         ts = rb_fix_abs_ts(delta, ts);
3369                         pr_warn(" 0x%x:  [%lld] absolute:%lld TIME STAMP\n",
3370                                 e, ts, delta);
3371                         break;
3372
3373                 case RINGBUF_TYPE_PADDING:
3374                         ts += event->time_delta;
3375                         pr_warn(" 0x%x:  [%lld] delta:%d PADDING\n",
3376                                 e, ts, event->time_delta);
3377                         break;
3378
3379                 case RINGBUF_TYPE_DATA:
3380                         ts += event->time_delta;
3381                         pr_warn(" 0x%x:  [%lld] delta:%d %s%s\n",
3382                                 e, ts, event->time_delta,
3383                                 show_flags(event), show_irq(event));
3384                         break;
3385
3386                 default:
3387                         break;
3388                 }
3389         }
3390         pr_warn("expected end:0x%lx last event actually ended at:0x%x\n", tail, e);
3391 }
3392
3393 static DEFINE_PER_CPU(atomic_t, checking);
3394 static atomic_t ts_dump;
3395
3396 #define buffer_warn_return(fmt, ...)                                    \
3397         do {                                                            \
3398                 /* If another report is happening, ignore this one */   \
3399                 if (atomic_inc_return(&ts_dump) != 1) {                 \
3400                         atomic_dec(&ts_dump);                           \
3401                         goto out;                                       \
3402                 }                                                       \
3403                 atomic_inc(&cpu_buffer->record_disabled);               \
3404                 pr_warn(fmt, ##__VA_ARGS__);                            \
3405                 dump_buffer_page(bpage, info, tail);                    \
3406                 atomic_dec(&ts_dump);                                   \
3407                 /* There's some cases in boot up that this can happen */ \
3408                 if (WARN_ON_ONCE(system_state != SYSTEM_BOOTING))       \
3409                         /* Do not re-enable checking */                 \
3410                         return;                                         \
3411         } while (0)
3412
3413 /*
3414  * Check if the current event time stamp matches the deltas on
3415  * the buffer page.
3416  */
3417 static void check_buffer(struct ring_buffer_per_cpu *cpu_buffer,
3418                          struct rb_event_info *info,
3419                          unsigned long tail)
3420 {
3421         struct ring_buffer_event *event;
3422         struct buffer_data_page *bpage;
3423         u64 ts, delta;
3424         bool full = false;
3425         int e;
3426
3427         bpage = info->tail_page->page;
3428
3429         if (tail == CHECK_FULL_PAGE) {
3430                 full = true;
3431                 tail = local_read(&bpage->commit);
3432         } else if (info->add_timestamp &
3433                    (RB_ADD_STAMP_FORCE | RB_ADD_STAMP_ABSOLUTE)) {
3434                 /* Ignore events with absolute time stamps */
3435                 return;
3436         }
3437
3438         /*
3439          * Do not check the first event (skip possible extends too).
3440          * Also do not check if previous events have not been committed.
3441          */
3442         if (tail <= 8 || tail > local_read(&bpage->commit))
3443                 return;
3444
3445         /*
3446          * If this interrupted another event,
3447          */
3448         if (atomic_inc_return(this_cpu_ptr(&checking)) != 1)
3449                 goto out;
3450
3451         ts = bpage->time_stamp;
3452
3453         for (e = 0; e < tail; e += rb_event_length(event)) {
3454
3455                 event = (struct ring_buffer_event *)(bpage->data + e);
3456
3457                 switch (event->type_len) {
3458
3459                 case RINGBUF_TYPE_TIME_EXTEND:
3460                         delta = rb_event_time_stamp(event);
3461                         ts += delta;
3462                         break;
3463
3464                 case RINGBUF_TYPE_TIME_STAMP:
3465                         delta = rb_event_time_stamp(event);
3466                         delta = rb_fix_abs_ts(delta, ts);
3467                         if (delta < ts) {
3468                                 buffer_warn_return("[CPU: %d]ABSOLUTE TIME WENT BACKWARDS: last ts: %lld absolute ts: %lld\n",
3469                                                    cpu_buffer->cpu, ts, delta);
3470                         }
3471                         ts = delta;
3472                         break;
3473
3474                 case RINGBUF_TYPE_PADDING:
3475                         if (event->time_delta == 1)
3476                                 break;
3477                         fallthrough;
3478                 case RINGBUF_TYPE_DATA:
3479                         ts += event->time_delta;
3480                         break;
3481
3482                 default:
3483                         RB_WARN_ON(cpu_buffer, 1);
3484                 }
3485         }
3486         if ((full && ts > info->ts) ||
3487             (!full && ts + info->delta != info->ts)) {
3488                 buffer_warn_return("[CPU: %d]TIME DOES NOT MATCH expected:%lld actual:%lld delta:%lld before:%lld after:%lld%s context:%s\n",
3489                                    cpu_buffer->cpu,
3490                                    ts + info->delta, info->ts, info->delta,
3491                                    info->before, info->after,
3492                                    full ? " (full)" : "", show_interrupt_level());
3493         }
3494 out:
3495         atomic_dec(this_cpu_ptr(&checking));
3496 }
3497 #else
3498 static inline void check_buffer(struct ring_buffer_per_cpu *cpu_buffer,
3499                          struct rb_event_info *info,
3500                          unsigned long tail)
3501 {
3502 }
3503 #endif /* CONFIG_RING_BUFFER_VALIDATE_TIME_DELTAS */
3504
3505 static struct ring_buffer_event *
3506 __rb_reserve_next(struct ring_buffer_per_cpu *cpu_buffer,
3507                   struct rb_event_info *info)
3508 {
3509         struct ring_buffer_event *event;
3510         struct buffer_page *tail_page;
3511         unsigned long tail, write, w;
3512
3513         /* Don't let the compiler play games with cpu_buffer->tail_page */
3514         tail_page = info->tail_page = READ_ONCE(cpu_buffer->tail_page);
3515
3516  /*A*/  w = local_read(&tail_page->write) & RB_WRITE_MASK;
3517         barrier();
3518         rb_time_read(&cpu_buffer->before_stamp, &info->before);
3519         rb_time_read(&cpu_buffer->write_stamp, &info->after);
3520         barrier();
3521         info->ts = rb_time_stamp(cpu_buffer->buffer);
3522
3523         if ((info->add_timestamp & RB_ADD_STAMP_ABSOLUTE)) {
3524                 info->delta = info->ts;
3525         } else {
3526                 /*
3527                  * If interrupting an event time update, we may need an
3528                  * absolute timestamp.
3529                  * Don't bother if this is the start of a new page (w == 0).
3530                  */
3531                 if (!w) {
3532                         /* Use the sub-buffer timestamp */
3533                         info->delta = 0;
3534                 } else if (unlikely(info->before != info->after)) {
3535                         info->add_timestamp |= RB_ADD_STAMP_FORCE | RB_ADD_STAMP_EXTEND;
3536                         info->length += RB_LEN_TIME_EXTEND;
3537                 } else {
3538                         info->delta = info->ts - info->after;
3539                         if (unlikely(test_time_stamp(info->delta))) {
3540                                 info->add_timestamp |= RB_ADD_STAMP_EXTEND;
3541                                 info->length += RB_LEN_TIME_EXTEND;
3542                         }
3543                 }
3544         }
3545
3546  /*B*/  rb_time_set(&cpu_buffer->before_stamp, info->ts);
3547
3548  /*C*/  write = local_add_return(info->length, &tail_page->write);
3549
3550         /* set write to only the index of the write */
3551         write &= RB_WRITE_MASK;
3552
3553         tail = write - info->length;
3554
3555         /* See if we shot pass the end of this buffer page */
3556         if (unlikely(write > cpu_buffer->buffer->subbuf_size)) {
3557                 check_buffer(cpu_buffer, info, CHECK_FULL_PAGE);
3558                 return rb_move_tail(cpu_buffer, tail, info);
3559         }
3560
3561         if (likely(tail == w)) {
3562                 /* Nothing interrupted us between A and C */
3563  /*D*/          rb_time_set(&cpu_buffer->write_stamp, info->ts);
3564                 /*
3565                  * If something came in between C and D, the write stamp
3566                  * may now not be in sync. But that's fine as the before_stamp
3567                  * will be different and then next event will just be forced
3568                  * to use an absolute timestamp.
3569                  */
3570                 if (likely(!(info->add_timestamp &
3571                              (RB_ADD_STAMP_FORCE | RB_ADD_STAMP_ABSOLUTE))))
3572                         /* This did not interrupt any time update */
3573                         info->delta = info->ts - info->after;
3574                 else
3575                         /* Just use full timestamp for interrupting event */
3576                         info->delta = info->ts;
3577                 check_buffer(cpu_buffer, info, tail);
3578         } else {
3579                 u64 ts;
3580                 /* SLOW PATH - Interrupted between A and C */
3581
3582                 /* Save the old before_stamp */
3583                 rb_time_read(&cpu_buffer->before_stamp, &info->before);
3584
3585                 /*
3586                  * Read a new timestamp and update the before_stamp to make
3587                  * the next event after this one force using an absolute
3588                  * timestamp. This is in case an interrupt were to come in
3589                  * between E and F.
3590                  */
3591                 ts = rb_time_stamp(cpu_buffer->buffer);
3592                 rb_time_set(&cpu_buffer->before_stamp, ts);
3593
3594                 barrier();
3595  /*E*/          rb_time_read(&cpu_buffer->write_stamp, &info->after);
3596                 barrier();
3597  /*F*/          if (write == (local_read(&tail_page->write) & RB_WRITE_MASK) &&
3598                     info->after == info->before && info->after < ts) {
3599                         /*
3600                          * Nothing came after this event between C and F, it is
3601                          * safe to use info->after for the delta as it
3602                          * matched info->before and is still valid.
3603                          */
3604                         info->delta = ts - info->after;
3605                 } else {
3606                         /*
3607                          * Interrupted between C and F:
3608                          * Lost the previous events time stamp. Just set the
3609                          * delta to zero, and this will be the same time as
3610                          * the event this event interrupted. And the events that
3611                          * came after this will still be correct (as they would
3612                          * have built their delta on the previous event.
3613                          */
3614                         info->delta = 0;
3615                 }
3616                 info->ts = ts;
3617                 info->add_timestamp &= ~RB_ADD_STAMP_FORCE;
3618         }
3619
3620         /*
3621          * If this is the first commit on the page, then it has the same
3622          * timestamp as the page itself.
3623          */
3624         if (unlikely(!tail && !(info->add_timestamp &
3625                                 (RB_ADD_STAMP_FORCE | RB_ADD_STAMP_ABSOLUTE))))
3626                 info->delta = 0;
3627
3628         /* We reserved something on the buffer */
3629
3630         event = __rb_page_index(tail_page, tail);
3631         rb_update_event(cpu_buffer, event, info);
3632
3633         local_inc(&tail_page->entries);
3634
3635         /*
3636          * If this is the first commit on the page, then update
3637          * its timestamp.
3638          */
3639         if (unlikely(!tail))
3640                 tail_page->page->time_stamp = info->ts;
3641
3642         /* account for these added bytes */
3643         local_add(info->length, &cpu_buffer->entries_bytes);
3644
3645         return event;
3646 }
3647
3648 static __always_inline struct ring_buffer_event *
3649 rb_reserve_next_event(struct trace_buffer *buffer,
3650                       struct ring_buffer_per_cpu *cpu_buffer,
3651                       unsigned long length)
3652 {
3653         struct ring_buffer_event *event;
3654         struct rb_event_info info;
3655         int nr_loops = 0;
3656         int add_ts_default;
3657
3658         /* ring buffer does cmpxchg, make sure it is safe in NMI context */
3659         if (!IS_ENABLED(CONFIG_ARCH_HAVE_NMI_SAFE_CMPXCHG) &&
3660             (unlikely(in_nmi()))) {
3661                 return NULL;
3662         }
3663
3664         rb_start_commit(cpu_buffer);
3665         /* The commit page can not change after this */
3666
3667 #ifdef CONFIG_RING_BUFFER_ALLOW_SWAP
3668         /*
3669          * Due to the ability to swap a cpu buffer from a buffer
3670          * it is possible it was swapped before we committed.
3671          * (committing stops a swap). We check for it here and
3672          * if it happened, we have to fail the write.
3673          */
3674         barrier();
3675         if (unlikely(READ_ONCE(cpu_buffer->buffer) != buffer)) {
3676                 local_dec(&cpu_buffer->committing);
3677                 local_dec(&cpu_buffer->commits);
3678                 return NULL;
3679         }
3680 #endif
3681
3682         info.length = rb_calculate_event_length(length);
3683
3684         if (ring_buffer_time_stamp_abs(cpu_buffer->buffer)) {
3685                 add_ts_default = RB_ADD_STAMP_ABSOLUTE;
3686                 info.length += RB_LEN_TIME_EXTEND;
3687                 if (info.length > cpu_buffer->buffer->max_data_size)
3688                         goto out_fail;
3689         } else {
3690                 add_ts_default = RB_ADD_STAMP_NONE;
3691         }
3692
3693  again:
3694         info.add_timestamp = add_ts_default;
3695         info.delta = 0;
3696
3697         /*
3698          * We allow for interrupts to reenter here and do a trace.
3699          * If one does, it will cause this original code to loop
3700          * back here. Even with heavy interrupts happening, this
3701          * should only happen a few times in a row. If this happens
3702          * 1000 times in a row, there must be either an interrupt
3703          * storm or we have something buggy.
3704          * Bail!
3705          */
3706         if (RB_WARN_ON(cpu_buffer, ++nr_loops > 1000))
3707                 goto out_fail;
3708
3709         event = __rb_reserve_next(cpu_buffer, &info);
3710
3711         if (unlikely(PTR_ERR(event) == -EAGAIN)) {
3712                 if (info.add_timestamp & (RB_ADD_STAMP_FORCE | RB_ADD_STAMP_EXTEND))
3713                         info.length -= RB_LEN_TIME_EXTEND;
3714                 goto again;
3715         }
3716
3717         if (likely(event))
3718                 return event;
3719  out_fail:
3720         rb_end_commit(cpu_buffer);
3721         return NULL;
3722 }
3723
3724 /**
3725  * ring_buffer_lock_reserve - reserve a part of the buffer
3726  * @buffer: the ring buffer to reserve from
3727  * @length: the length of the data to reserve (excluding event header)
3728  *
3729  * Returns a reserved event on the ring buffer to copy directly to.
3730  * The user of this interface will need to get the body to write into
3731  * and can use the ring_buffer_event_data() interface.
3732  *
3733  * The length is the length of the data needed, not the event length
3734  * which also includes the event header.
3735  *
3736  * Must be paired with ring_buffer_unlock_commit, unless NULL is returned.
3737  * If NULL is returned, then nothing has been allocated or locked.
3738  */
3739 struct ring_buffer_event *
3740 ring_buffer_lock_reserve(struct trace_buffer *buffer, unsigned long length)
3741 {
3742         struct ring_buffer_per_cpu *cpu_buffer;
3743         struct ring_buffer_event *event;
3744         int cpu;
3745
3746         /* If we are tracing schedule, we don't want to recurse */
3747         preempt_disable_notrace();
3748
3749         if (unlikely(atomic_read(&buffer->record_disabled)))
3750                 goto out;
3751
3752         cpu = raw_smp_processor_id();
3753
3754         if (unlikely(!cpumask_test_cpu(cpu, buffer->cpumask)))
3755                 goto out;
3756
3757         cpu_buffer = buffer->buffers[cpu];
3758
3759         if (unlikely(atomic_read(&cpu_buffer->record_disabled)))
3760                 goto out;
3761
3762         if (unlikely(length > buffer->max_data_size))
3763                 goto out;
3764
3765         if (unlikely(trace_recursive_lock(cpu_buffer)))
3766                 goto out;
3767
3768         event = rb_reserve_next_event(buffer, cpu_buffer, length);
3769         if (!event)
3770                 goto out_unlock;
3771
3772         return event;
3773
3774  out_unlock:
3775         trace_recursive_unlock(cpu_buffer);
3776  out:
3777         preempt_enable_notrace();
3778         return NULL;
3779 }
3780 EXPORT_SYMBOL_GPL(ring_buffer_lock_reserve);
3781
3782 /*
3783  * Decrement the entries to the page that an event is on.
3784  * The event does not even need to exist, only the pointer
3785  * to the page it is on. This may only be called before the commit
3786  * takes place.
3787  */
3788 static inline void
3789 rb_decrement_entry(struct ring_buffer_per_cpu *cpu_buffer,
3790                    struct ring_buffer_event *event)
3791 {
3792         unsigned long addr = (unsigned long)event;
3793         struct buffer_page *bpage = cpu_buffer->commit_page;
3794         struct buffer_page *start;
3795
3796         addr &= ~((PAGE_SIZE << cpu_buffer->buffer->subbuf_order) - 1);
3797
3798         /* Do the likely case first */
3799         if (likely(bpage->page == (void *)addr)) {
3800                 local_dec(&bpage->entries);
3801                 return;
3802         }
3803
3804         /*
3805          * Because the commit page may be on the reader page we
3806          * start with the next page and check the end loop there.
3807          */
3808         rb_inc_page(&bpage);
3809         start = bpage;
3810         do {
3811                 if (bpage->page == (void *)addr) {
3812                         local_dec(&bpage->entries);
3813                         return;
3814                 }
3815                 rb_inc_page(&bpage);
3816         } while (bpage != start);
3817
3818         /* commit not part of this buffer?? */
3819         RB_WARN_ON(cpu_buffer, 1);
3820 }
3821
3822 /**
3823  * ring_buffer_discard_commit - discard an event that has not been committed
3824  * @buffer: the ring buffer
3825  * @event: non committed event to discard
3826  *
3827  * Sometimes an event that is in the ring buffer needs to be ignored.
3828  * This function lets the user discard an event in the ring buffer
3829  * and then that event will not be read later.
3830  *
3831  * This function only works if it is called before the item has been
3832  * committed. It will try to free the event from the ring buffer
3833  * if another event has not been added behind it.
3834  *
3835  * If another event has been added behind it, it will set the event
3836  * up as discarded, and perform the commit.
3837  *
3838  * If this function is called, do not call ring_buffer_unlock_commit on
3839  * the event.
3840  */
3841 void ring_buffer_discard_commit(struct trace_buffer *buffer,
3842                                 struct ring_buffer_event *event)
3843 {
3844         struct ring_buffer_per_cpu *cpu_buffer;
3845         int cpu;
3846
3847         /* The event is discarded regardless */
3848         rb_event_discard(event);
3849
3850         cpu = smp_processor_id();
3851         cpu_buffer = buffer->buffers[cpu];
3852
3853         /*
3854          * This must only be called if the event has not been
3855          * committed yet. Thus we can assume that preemption
3856          * is still disabled.
3857          */
3858         RB_WARN_ON(buffer, !local_read(&cpu_buffer->committing));
3859
3860         rb_decrement_entry(cpu_buffer, event);
3861         if (rb_try_to_discard(cpu_buffer, event))
3862                 goto out;
3863
3864  out:
3865         rb_end_commit(cpu_buffer);
3866
3867         trace_recursive_unlock(cpu_buffer);
3868
3869         preempt_enable_notrace();
3870
3871 }
3872 EXPORT_SYMBOL_GPL(ring_buffer_discard_commit);
3873
3874 /**
3875  * ring_buffer_write - write data to the buffer without reserving
3876  * @buffer: The ring buffer to write to.
3877  * @length: The length of the data being written (excluding the event header)
3878  * @data: The data to write to the buffer.
3879  *
3880  * This is like ring_buffer_lock_reserve and ring_buffer_unlock_commit as
3881  * one function. If you already have the data to write to the buffer, it
3882  * may be easier to simply call this function.
3883  *
3884  * Note, like ring_buffer_lock_reserve, the length is the length of the data
3885  * and not the length of the event which would hold the header.
3886  */
3887 int ring_buffer_write(struct trace_buffer *buffer,
3888                       unsigned long length,
3889                       void *data)
3890 {
3891         struct ring_buffer_per_cpu *cpu_buffer;
3892         struct ring_buffer_event *event;
3893         void *body;
3894         int ret = -EBUSY;
3895         int cpu;
3896
3897         preempt_disable_notrace();
3898
3899         if (atomic_read(&buffer->record_disabled))
3900                 goto out;
3901
3902         cpu = raw_smp_processor_id();
3903
3904         if (!cpumask_test_cpu(cpu, buffer->cpumask))
3905                 goto out;
3906
3907         cpu_buffer = buffer->buffers[cpu];
3908
3909         if (atomic_read(&cpu_buffer->record_disabled))
3910                 goto out;
3911
3912         if (length > buffer->max_data_size)
3913                 goto out;
3914
3915         if (unlikely(trace_recursive_lock(cpu_buffer)))
3916                 goto out;
3917
3918         event = rb_reserve_next_event(buffer, cpu_buffer, length);
3919         if (!event)
3920                 goto out_unlock;
3921
3922         body = rb_event_data(event);
3923
3924         memcpy(body, data, length);
3925
3926         rb_commit(cpu_buffer);
3927
3928         rb_wakeups(buffer, cpu_buffer);
3929
3930         ret = 0;
3931
3932  out_unlock:
3933         trace_recursive_unlock(cpu_buffer);
3934
3935  out:
3936         preempt_enable_notrace();
3937
3938         return ret;
3939 }
3940 EXPORT_SYMBOL_GPL(ring_buffer_write);
3941
3942 static bool rb_per_cpu_empty(struct ring_buffer_per_cpu *cpu_buffer)
3943 {
3944         struct buffer_page *reader = cpu_buffer->reader_page;
3945         struct buffer_page *head = rb_set_head_page(cpu_buffer);
3946         struct buffer_page *commit = cpu_buffer->commit_page;
3947
3948         /* In case of error, head will be NULL */
3949         if (unlikely(!head))
3950                 return true;
3951
3952         /* Reader should exhaust content in reader page */
3953         if (reader->read != rb_page_commit(reader))
3954                 return false;
3955
3956         /*
3957          * If writers are committing on the reader page, knowing all
3958          * committed content has been read, the ring buffer is empty.
3959          */
3960         if (commit == reader)
3961                 return true;
3962
3963         /*
3964          * If writers are committing on a page other than reader page
3965          * and head page, there should always be content to read.
3966          */
3967         if (commit != head)
3968                 return false;
3969
3970         /*
3971          * Writers are committing on the head page, we just need
3972          * to care about there're committed data, and the reader will
3973          * swap reader page with head page when it is to read data.
3974          */
3975         return rb_page_commit(commit) == 0;
3976 }
3977
3978 /**
3979  * ring_buffer_record_disable - stop all writes into the buffer
3980  * @buffer: The ring buffer to stop writes to.
3981  *
3982  * This prevents all writes to the buffer. Any attempt to write
3983  * to the buffer after this will fail and return NULL.
3984  *
3985  * The caller should call synchronize_rcu() after this.
3986  */
3987 void ring_buffer_record_disable(struct trace_buffer *buffer)
3988 {
3989         atomic_inc(&buffer->record_disabled);
3990 }
3991 EXPORT_SYMBOL_GPL(ring_buffer_record_disable);
3992
3993 /**
3994  * ring_buffer_record_enable - enable writes to the buffer
3995  * @buffer: The ring buffer to enable writes
3996  *
3997  * Note, multiple disables will need the same number of enables
3998  * to truly enable the writing (much like preempt_disable).
3999  */
4000 void ring_buffer_record_enable(struct trace_buffer *buffer)
4001 {
4002         atomic_dec(&buffer->record_disabled);
4003 }
4004 EXPORT_SYMBOL_GPL(ring_buffer_record_enable);
4005
4006 /**
4007  * ring_buffer_record_off - stop all writes into the buffer
4008  * @buffer: The ring buffer to stop writes to.
4009  *
4010  * This prevents all writes to the buffer. Any attempt to write
4011  * to the buffer after this will fail and return NULL.
4012  *
4013  * This is different than ring_buffer_record_disable() as
4014  * it works like an on/off switch, where as the disable() version
4015  * must be paired with a enable().
4016  */
4017 void ring_buffer_record_off(struct trace_buffer *buffer)
4018 {
4019         unsigned int rd;
4020         unsigned int new_rd;
4021
4022         rd = atomic_read(&buffer->record_disabled);
4023         do {
4024                 new_rd = rd | RB_BUFFER_OFF;
4025         } while (!atomic_try_cmpxchg(&buffer->record_disabled, &rd, new_rd));
4026 }
4027 EXPORT_SYMBOL_GPL(ring_buffer_record_off);
4028
4029 /**
4030  * ring_buffer_record_on - restart writes into the buffer
4031  * @buffer: The ring buffer to start writes to.
4032  *
4033  * This enables all writes to the buffer that was disabled by
4034  * ring_buffer_record_off().
4035  *
4036  * This is different than ring_buffer_record_enable() as
4037  * it works like an on/off switch, where as the enable() version
4038  * must be paired with a disable().
4039  */
4040 void ring_buffer_record_on(struct trace_buffer *buffer)
4041 {
4042         unsigned int rd;
4043         unsigned int new_rd;
4044
4045         rd = atomic_read(&buffer->record_disabled);
4046         do {
4047                 new_rd = rd & ~RB_BUFFER_OFF;
4048         } while (!atomic_try_cmpxchg(&buffer->record_disabled, &rd, new_rd));
4049 }
4050 EXPORT_SYMBOL_GPL(ring_buffer_record_on);
4051
4052 /**
4053  * ring_buffer_record_is_on - return true if the ring buffer can write
4054  * @buffer: The ring buffer to see if write is enabled
4055  *
4056  * Returns true if the ring buffer is in a state that it accepts writes.
4057  */
4058 bool ring_buffer_record_is_on(struct trace_buffer *buffer)
4059 {
4060         return !atomic_read(&buffer->record_disabled);
4061 }
4062
4063 /**
4064  * ring_buffer_record_is_set_on - return true if the ring buffer is set writable
4065  * @buffer: The ring buffer to see if write is set enabled
4066  *
4067  * Returns true if the ring buffer is set writable by ring_buffer_record_on().
4068  * Note that this does NOT mean it is in a writable state.
4069  *
4070  * It may return true when the ring buffer has been disabled by
4071  * ring_buffer_record_disable(), as that is a temporary disabling of
4072  * the ring buffer.
4073  */
4074 bool ring_buffer_record_is_set_on(struct trace_buffer *buffer)
4075 {
4076         return !(atomic_read(&buffer->record_disabled) & RB_BUFFER_OFF);
4077 }
4078
4079 /**
4080  * ring_buffer_record_disable_cpu - stop all writes into the cpu_buffer
4081  * @buffer: The ring buffer to stop writes to.
4082  * @cpu: The CPU buffer to stop
4083  *
4084  * This prevents all writes to the buffer. Any attempt to write
4085  * to the buffer after this will fail and return NULL.
4086  *
4087  * The caller should call synchronize_rcu() after this.
4088  */
4089 void ring_buffer_record_disable_cpu(struct trace_buffer *buffer, int cpu)
4090 {
4091         struct ring_buffer_per_cpu *cpu_buffer;
4092
4093         if (!cpumask_test_cpu(cpu, buffer->cpumask))
4094                 return;
4095
4096         cpu_buffer = buffer->buffers[cpu];
4097         atomic_inc(&cpu_buffer->record_disabled);
4098 }
4099 EXPORT_SYMBOL_GPL(ring_buffer_record_disable_cpu);
4100
4101 /**
4102  * ring_buffer_record_enable_cpu - enable writes to the buffer
4103  * @buffer: The ring buffer to enable writes
4104  * @cpu: The CPU to enable.
4105  *
4106  * Note, multiple disables will need the same number of enables
4107  * to truly enable the writing (much like preempt_disable).
4108  */
4109 void ring_buffer_record_enable_cpu(struct trace_buffer *buffer, int cpu)
4110 {
4111         struct ring_buffer_per_cpu *cpu_buffer;
4112
4113         if (!cpumask_test_cpu(cpu, buffer->cpumask))
4114                 return;
4115
4116         cpu_buffer = buffer->buffers[cpu];
4117         atomic_dec(&cpu_buffer->record_disabled);
4118 }
4119 EXPORT_SYMBOL_GPL(ring_buffer_record_enable_cpu);
4120
4121 /*
4122  * The total entries in the ring buffer is the running counter
4123  * of entries entered into the ring buffer, minus the sum of
4124  * the entries read from the ring buffer and the number of
4125  * entries that were overwritten.
4126  */
4127 static inline unsigned long
4128 rb_num_of_entries(struct ring_buffer_per_cpu *cpu_buffer)
4129 {
4130         return local_read(&cpu_buffer->entries) -
4131                 (local_read(&cpu_buffer->overrun) + cpu_buffer->read);
4132 }
4133
4134 /**
4135  * ring_buffer_oldest_event_ts - get the oldest event timestamp from the buffer
4136  * @buffer: The ring buffer
4137  * @cpu: The per CPU buffer to read from.
4138  */
4139 u64 ring_buffer_oldest_event_ts(struct trace_buffer *buffer, int cpu)
4140 {
4141         unsigned long flags;
4142         struct ring_buffer_per_cpu *cpu_buffer;
4143         struct buffer_page *bpage;
4144         u64 ret = 0;
4145
4146         if (!cpumask_test_cpu(cpu, buffer->cpumask))
4147                 return 0;
4148
4149         cpu_buffer = buffer->buffers[cpu];
4150         raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
4151         /*
4152          * if the tail is on reader_page, oldest time stamp is on the reader
4153          * page
4154          */
4155         if (cpu_buffer->tail_page == cpu_buffer->reader_page)
4156                 bpage = cpu_buffer->reader_page;
4157         else
4158                 bpage = rb_set_head_page(cpu_buffer);
4159         if (bpage)
4160                 ret = bpage->page->time_stamp;
4161         raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
4162
4163         return ret;
4164 }
4165 EXPORT_SYMBOL_GPL(ring_buffer_oldest_event_ts);
4166
4167 /**
4168  * ring_buffer_bytes_cpu - get the number of bytes unconsumed in a cpu buffer
4169  * @buffer: The ring buffer
4170  * @cpu: The per CPU buffer to read from.
4171  */
4172 unsigned long ring_buffer_bytes_cpu(struct trace_buffer *buffer, int cpu)
4173 {
4174         struct ring_buffer_per_cpu *cpu_buffer;
4175         unsigned long ret;
4176
4177         if (!cpumask_test_cpu(cpu, buffer->cpumask))
4178                 return 0;
4179
4180         cpu_buffer = buffer->buffers[cpu];
4181         ret = local_read(&cpu_buffer->entries_bytes) - cpu_buffer->read_bytes;
4182
4183         return ret;
4184 }
4185 EXPORT_SYMBOL_GPL(ring_buffer_bytes_cpu);
4186
4187 /**
4188  * ring_buffer_entries_cpu - get the number of entries in a cpu buffer
4189  * @buffer: The ring buffer
4190  * @cpu: The per CPU buffer to get the entries from.
4191  */
4192 unsigned long ring_buffer_entries_cpu(struct trace_buffer *buffer, int cpu)
4193 {
4194         struct ring_buffer_per_cpu *cpu_buffer;
4195
4196         if (!cpumask_test_cpu(cpu, buffer->cpumask))
4197                 return 0;
4198
4199         cpu_buffer = buffer->buffers[cpu];
4200
4201         return rb_num_of_entries(cpu_buffer);
4202 }
4203 EXPORT_SYMBOL_GPL(ring_buffer_entries_cpu);
4204
4205 /**
4206  * ring_buffer_overrun_cpu - get the number of overruns caused by the ring
4207  * buffer wrapping around (only if RB_FL_OVERWRITE is on).
4208  * @buffer: The ring buffer
4209  * @cpu: The per CPU buffer to get the number of overruns from
4210  */
4211 unsigned long ring_buffer_overrun_cpu(struct trace_buffer *buffer, int cpu)
4212 {
4213         struct ring_buffer_per_cpu *cpu_buffer;
4214         unsigned long ret;
4215
4216         if (!cpumask_test_cpu(cpu, buffer->cpumask))
4217                 return 0;
4218
4219         cpu_buffer = buffer->buffers[cpu];
4220         ret = local_read(&cpu_buffer->overrun);
4221
4222         return ret;
4223 }
4224 EXPORT_SYMBOL_GPL(ring_buffer_overrun_cpu);
4225
4226 /**
4227  * ring_buffer_commit_overrun_cpu - get the number of overruns caused by
4228  * commits failing due to the buffer wrapping around while there are uncommitted
4229  * events, such as during an interrupt storm.
4230  * @buffer: The ring buffer
4231  * @cpu: The per CPU buffer to get the number of overruns from
4232  */
4233 unsigned long
4234 ring_buffer_commit_overrun_cpu(struct trace_buffer *buffer, int cpu)
4235 {
4236         struct ring_buffer_per_cpu *cpu_buffer;
4237         unsigned long ret;
4238
4239         if (!cpumask_test_cpu(cpu, buffer->cpumask))
4240                 return 0;
4241
4242         cpu_buffer = buffer->buffers[cpu];
4243         ret = local_read(&cpu_buffer->commit_overrun);
4244
4245         return ret;
4246 }
4247 EXPORT_SYMBOL_GPL(ring_buffer_commit_overrun_cpu);
4248
4249 /**
4250  * ring_buffer_dropped_events_cpu - get the number of dropped events caused by
4251  * the ring buffer filling up (only if RB_FL_OVERWRITE is off).
4252  * @buffer: The ring buffer
4253  * @cpu: The per CPU buffer to get the number of overruns from
4254  */
4255 unsigned long
4256 ring_buffer_dropped_events_cpu(struct trace_buffer *buffer, int cpu)
4257 {
4258         struct ring_buffer_per_cpu *cpu_buffer;
4259         unsigned long ret;
4260
4261         if (!cpumask_test_cpu(cpu, buffer->cpumask))
4262                 return 0;
4263
4264         cpu_buffer = buffer->buffers[cpu];
4265         ret = local_read(&cpu_buffer->dropped_events);
4266
4267         return ret;
4268 }
4269 EXPORT_SYMBOL_GPL(ring_buffer_dropped_events_cpu);
4270
4271 /**
4272  * ring_buffer_read_events_cpu - get the number of events successfully read
4273  * @buffer: The ring buffer
4274  * @cpu: The per CPU buffer to get the number of events read
4275  */
4276 unsigned long
4277 ring_buffer_read_events_cpu(struct trace_buffer *buffer, int cpu)
4278 {
4279         struct ring_buffer_per_cpu *cpu_buffer;
4280
4281         if (!cpumask_test_cpu(cpu, buffer->cpumask))
4282                 return 0;
4283
4284         cpu_buffer = buffer->buffers[cpu];
4285         return cpu_buffer->read;
4286 }
4287 EXPORT_SYMBOL_GPL(ring_buffer_read_events_cpu);
4288
4289 /**
4290  * ring_buffer_entries - get the number of entries in a buffer
4291  * @buffer: The ring buffer
4292  *
4293  * Returns the total number of entries in the ring buffer
4294  * (all CPU entries)
4295  */
4296 unsigned long ring_buffer_entries(struct trace_buffer *buffer)
4297 {
4298         struct ring_buffer_per_cpu *cpu_buffer;
4299         unsigned long entries = 0;
4300         int cpu;
4301
4302         /* if you care about this being correct, lock the buffer */
4303         for_each_buffer_cpu(buffer, cpu) {
4304                 cpu_buffer = buffer->buffers[cpu];
4305                 entries += rb_num_of_entries(cpu_buffer);
4306         }
4307
4308         return entries;
4309 }
4310 EXPORT_SYMBOL_GPL(ring_buffer_entries);
4311
4312 /**
4313  * ring_buffer_overruns - get the number of overruns in buffer
4314  * @buffer: The ring buffer
4315  *
4316  * Returns the total number of overruns in the ring buffer
4317  * (all CPU entries)
4318  */
4319 unsigned long ring_buffer_overruns(struct trace_buffer *buffer)
4320 {
4321         struct ring_buffer_per_cpu *cpu_buffer;
4322         unsigned long overruns = 0;
4323         int cpu;
4324
4325         /* if you care about this being correct, lock the buffer */
4326         for_each_buffer_cpu(buffer, cpu) {
4327                 cpu_buffer = buffer->buffers[cpu];
4328                 overruns += local_read(&cpu_buffer->overrun);
4329         }
4330
4331         return overruns;
4332 }
4333 EXPORT_SYMBOL_GPL(ring_buffer_overruns);
4334
4335 static void rb_iter_reset(struct ring_buffer_iter *iter)
4336 {
4337         struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer;
4338
4339         /* Iterator usage is expected to have record disabled */
4340         iter->head_page = cpu_buffer->reader_page;
4341         iter->head = cpu_buffer->reader_page->read;
4342         iter->next_event = iter->head;
4343
4344         iter->cache_reader_page = iter->head_page;
4345         iter->cache_read = cpu_buffer->read;
4346         iter->cache_pages_removed = cpu_buffer->pages_removed;
4347
4348         if (iter->head) {
4349                 iter->read_stamp = cpu_buffer->read_stamp;
4350                 iter->page_stamp = cpu_buffer->reader_page->page->time_stamp;
4351         } else {
4352                 iter->read_stamp = iter->head_page->page->time_stamp;
4353                 iter->page_stamp = iter->read_stamp;
4354         }
4355 }
4356
4357 /**
4358  * ring_buffer_iter_reset - reset an iterator
4359  * @iter: The iterator to reset
4360  *
4361  * Resets the iterator, so that it will start from the beginning
4362  * again.
4363  */
4364 void ring_buffer_iter_reset(struct ring_buffer_iter *iter)
4365 {
4366         struct ring_buffer_per_cpu *cpu_buffer;
4367         unsigned long flags;
4368
4369         if (!iter)
4370                 return;
4371
4372         cpu_buffer = iter->cpu_buffer;
4373
4374         raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
4375         rb_iter_reset(iter);
4376         raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
4377 }
4378 EXPORT_SYMBOL_GPL(ring_buffer_iter_reset);
4379
4380 /**
4381  * ring_buffer_iter_empty - check if an iterator has no more to read
4382  * @iter: The iterator to check
4383  */
4384 int ring_buffer_iter_empty(struct ring_buffer_iter *iter)
4385 {
4386         struct ring_buffer_per_cpu *cpu_buffer;
4387         struct buffer_page *reader;
4388         struct buffer_page *head_page;
4389         struct buffer_page *commit_page;
4390         struct buffer_page *curr_commit_page;
4391         unsigned commit;
4392         u64 curr_commit_ts;
4393         u64 commit_ts;
4394
4395         cpu_buffer = iter->cpu_buffer;
4396         reader = cpu_buffer->reader_page;
4397         head_page = cpu_buffer->head_page;
4398         commit_page = READ_ONCE(cpu_buffer->commit_page);
4399         commit_ts = commit_page->page->time_stamp;
4400
4401         /*
4402          * When the writer goes across pages, it issues a cmpxchg which
4403          * is a mb(), which will synchronize with the rmb here.
4404          * (see rb_tail_page_update())
4405          */
4406         smp_rmb();
4407         commit = rb_page_commit(commit_page);
4408         /* We want to make sure that the commit page doesn't change */
4409         smp_rmb();
4410
4411         /* Make sure commit page didn't change */
4412         curr_commit_page = READ_ONCE(cpu_buffer->commit_page);
4413         curr_commit_ts = READ_ONCE(curr_commit_page->page->time_stamp);
4414
4415         /* If the commit page changed, then there's more data */
4416         if (curr_commit_page != commit_page ||
4417             curr_commit_ts != commit_ts)
4418                 return 0;
4419
4420         /* Still racy, as it may return a false positive, but that's OK */
4421         return ((iter->head_page == commit_page && iter->head >= commit) ||
4422                 (iter->head_page == reader && commit_page == head_page &&
4423                  head_page->read == commit &&
4424                  iter->head == rb_page_commit(cpu_buffer->reader_page)));
4425 }
4426 EXPORT_SYMBOL_GPL(ring_buffer_iter_empty);
4427
4428 static void
4429 rb_update_read_stamp(struct ring_buffer_per_cpu *cpu_buffer,
4430                      struct ring_buffer_event *event)
4431 {
4432         u64 delta;
4433
4434         switch (event->type_len) {
4435         case RINGBUF_TYPE_PADDING:
4436                 return;
4437
4438         case RINGBUF_TYPE_TIME_EXTEND:
4439                 delta = rb_event_time_stamp(event);
4440                 cpu_buffer->read_stamp += delta;
4441                 return;
4442
4443         case RINGBUF_TYPE_TIME_STAMP:
4444                 delta = rb_event_time_stamp(event);
4445                 delta = rb_fix_abs_ts(delta, cpu_buffer->read_stamp);
4446                 cpu_buffer->read_stamp = delta;
4447                 return;
4448
4449         case RINGBUF_TYPE_DATA:
4450                 cpu_buffer->read_stamp += event->time_delta;
4451                 return;
4452
4453         default:
4454                 RB_WARN_ON(cpu_buffer, 1);
4455         }
4456 }
4457
4458 static void
4459 rb_update_iter_read_stamp(struct ring_buffer_iter *iter,
4460                           struct ring_buffer_event *event)
4461 {
4462         u64 delta;
4463
4464         switch (event->type_len) {
4465         case RINGBUF_TYPE_PADDING:
4466                 return;
4467
4468         case RINGBUF_TYPE_TIME_EXTEND:
4469                 delta = rb_event_time_stamp(event);
4470                 iter->read_stamp += delta;
4471                 return;
4472
4473         case RINGBUF_TYPE_TIME_STAMP:
4474                 delta = rb_event_time_stamp(event);
4475                 delta = rb_fix_abs_ts(delta, iter->read_stamp);
4476                 iter->read_stamp = delta;
4477                 return;
4478
4479         case RINGBUF_TYPE_DATA:
4480                 iter->read_stamp += event->time_delta;
4481                 return;
4482
4483         default:
4484                 RB_WARN_ON(iter->cpu_buffer, 1);
4485         }
4486 }
4487
4488 static struct buffer_page *
4489 rb_get_reader_page(struct ring_buffer_per_cpu *cpu_buffer)
4490 {
4491         struct buffer_page *reader = NULL;
4492         unsigned long bsize = READ_ONCE(cpu_buffer->buffer->subbuf_size);
4493         unsigned long overwrite;
4494         unsigned long flags;
4495         int nr_loops = 0;
4496         bool ret;
4497
4498         local_irq_save(flags);
4499         arch_spin_lock(&cpu_buffer->lock);
4500
4501  again:
4502         /*
4503          * This should normally only loop twice. But because the
4504          * start of the reader inserts an empty page, it causes
4505          * a case where we will loop three times. There should be no
4506          * reason to loop four times (that I know of).
4507          */
4508         if (RB_WARN_ON(cpu_buffer, ++nr_loops > 3)) {
4509                 reader = NULL;
4510                 goto out;
4511         }
4512
4513         reader = cpu_buffer->reader_page;
4514
4515         /* If there's more to read, return this page */
4516         if (cpu_buffer->reader_page->read < rb_page_size(reader))
4517                 goto out;
4518
4519         /* Never should we have an index greater than the size */
4520         if (RB_WARN_ON(cpu_buffer,
4521                        cpu_buffer->reader_page->read > rb_page_size(reader)))
4522                 goto out;
4523
4524         /* check if we caught up to the tail */
4525         reader = NULL;
4526         if (cpu_buffer->commit_page == cpu_buffer->reader_page)
4527                 goto out;
4528
4529         /* Don't bother swapping if the ring buffer is empty */
4530         if (rb_num_of_entries(cpu_buffer) == 0)
4531                 goto out;
4532
4533         /*
4534          * Reset the reader page to size zero.
4535          */
4536         local_set(&cpu_buffer->reader_page->write, 0);
4537         local_set(&cpu_buffer->reader_page->entries, 0);
4538         local_set(&cpu_buffer->reader_page->page->commit, 0);
4539         cpu_buffer->reader_page->real_end = 0;
4540
4541  spin:
4542         /*
4543          * Splice the empty reader page into the list around the head.
4544          */
4545         reader = rb_set_head_page(cpu_buffer);
4546         if (!reader)
4547                 goto out;
4548         cpu_buffer->reader_page->list.next = rb_list_head(reader->list.next);
4549         cpu_buffer->reader_page->list.prev = reader->list.prev;
4550
4551         /*
4552          * cpu_buffer->pages just needs to point to the buffer, it
4553          *  has no specific buffer page to point to. Lets move it out
4554          *  of our way so we don't accidentally swap it.
4555          */
4556         cpu_buffer->pages = reader->list.prev;
4557
4558         /* The reader page will be pointing to the new head */
4559         rb_set_list_to_head(&cpu_buffer->reader_page->list);
4560
4561         /*
4562          * We want to make sure we read the overruns after we set up our
4563          * pointers to the next object. The writer side does a
4564          * cmpxchg to cross pages which acts as the mb on the writer
4565          * side. Note, the reader will constantly fail the swap
4566          * while the writer is updating the pointers, so this
4567          * guarantees that the overwrite recorded here is the one we
4568          * want to compare with the last_overrun.
4569          */
4570         smp_mb();
4571         overwrite = local_read(&(cpu_buffer->overrun));
4572
4573         /*
4574          * Here's the tricky part.
4575          *
4576          * We need to move the pointer past the header page.
4577          * But we can only do that if a writer is not currently
4578          * moving it. The page before the header page has the
4579          * flag bit '1' set if it is pointing to the page we want.
4580          * but if the writer is in the process of moving it
4581          * than it will be '2' or already moved '0'.
4582          */
4583
4584         ret = rb_head_page_replace(reader, cpu_buffer->reader_page);
4585
4586         /*
4587          * If we did not convert it, then we must try again.
4588          */
4589         if (!ret)
4590                 goto spin;
4591
4592         /*
4593          * Yay! We succeeded in replacing the page.
4594          *
4595          * Now make the new head point back to the reader page.
4596          */
4597         rb_list_head(reader->list.next)->prev = &cpu_buffer->reader_page->list;
4598         rb_inc_page(&cpu_buffer->head_page);
4599
4600         local_inc(&cpu_buffer->pages_read);
4601
4602         /* Finally update the reader page to the new head */
4603         cpu_buffer->reader_page = reader;
4604         cpu_buffer->reader_page->read = 0;
4605
4606         if (overwrite != cpu_buffer->last_overrun) {
4607                 cpu_buffer->lost_events = overwrite - cpu_buffer->last_overrun;
4608                 cpu_buffer->last_overrun = overwrite;
4609         }
4610
4611         goto again;
4612
4613  out:
4614         /* Update the read_stamp on the first event */
4615         if (reader && reader->read == 0)
4616                 cpu_buffer->read_stamp = reader->page->time_stamp;
4617
4618         arch_spin_unlock(&cpu_buffer->lock);
4619         local_irq_restore(flags);
4620
4621         /*
4622          * The writer has preempt disable, wait for it. But not forever
4623          * Although, 1 second is pretty much "forever"
4624          */
4625 #define USECS_WAIT      1000000
4626         for (nr_loops = 0; nr_loops < USECS_WAIT; nr_loops++) {
4627                 /* If the write is past the end of page, a writer is still updating it */
4628                 if (likely(!reader || rb_page_write(reader) <= bsize))
4629                         break;
4630
4631                 udelay(1);
4632
4633                 /* Get the latest version of the reader write value */
4634                 smp_rmb();
4635         }
4636
4637         /* The writer is not moving forward? Something is wrong */
4638         if (RB_WARN_ON(cpu_buffer, nr_loops == USECS_WAIT))
4639                 reader = NULL;
4640
4641         /*
4642          * Make sure we see any padding after the write update
4643          * (see rb_reset_tail()).
4644          *
4645          * In addition, a writer may be writing on the reader page
4646          * if the page has not been fully filled, so the read barrier
4647          * is also needed to make sure we see the content of what is
4648          * committed by the writer (see rb_set_commit_to_write()).
4649          */
4650         smp_rmb();
4651
4652
4653         return reader;
4654 }
4655
4656 static void rb_advance_reader(struct ring_buffer_per_cpu *cpu_buffer)
4657 {
4658         struct ring_buffer_event *event;
4659         struct buffer_page *reader;
4660         unsigned length;
4661
4662         reader = rb_get_reader_page(cpu_buffer);
4663
4664         /* This function should not be called when buffer is empty */
4665         if (RB_WARN_ON(cpu_buffer, !reader))
4666                 return;
4667
4668         event = rb_reader_event(cpu_buffer);
4669
4670         if (event->type_len <= RINGBUF_TYPE_DATA_TYPE_LEN_MAX)
4671                 cpu_buffer->read++;
4672
4673         rb_update_read_stamp(cpu_buffer, event);
4674
4675         length = rb_event_length(event);
4676         cpu_buffer->reader_page->read += length;
4677         cpu_buffer->read_bytes += length;
4678 }
4679
4680 static void rb_advance_iter(struct ring_buffer_iter *iter)
4681 {
4682         struct ring_buffer_per_cpu *cpu_buffer;
4683
4684         cpu_buffer = iter->cpu_buffer;
4685
4686         /* If head == next_event then we need to jump to the next event */
4687         if (iter->head == iter->next_event) {
4688                 /* If the event gets overwritten again, there's nothing to do */
4689                 if (rb_iter_head_event(iter) == NULL)
4690                         return;
4691         }
4692
4693         iter->head = iter->next_event;
4694
4695         /*
4696          * Check if we are at the end of the buffer.
4697          */
4698         if (iter->next_event >= rb_page_size(iter->head_page)) {
4699                 /* discarded commits can make the page empty */
4700                 if (iter->head_page == cpu_buffer->commit_page)
4701                         return;
4702                 rb_inc_iter(iter);
4703                 return;
4704         }
4705
4706         rb_update_iter_read_stamp(iter, iter->event);
4707 }
4708
4709 static int rb_lost_events(struct ring_buffer_per_cpu *cpu_buffer)
4710 {
4711         return cpu_buffer->lost_events;
4712 }
4713
4714 static struct ring_buffer_event *
4715 rb_buffer_peek(struct ring_buffer_per_cpu *cpu_buffer, u64 *ts,
4716                unsigned long *lost_events)
4717 {
4718         struct ring_buffer_event *event;
4719         struct buffer_page *reader;
4720         int nr_loops = 0;
4721
4722         if (ts)
4723                 *ts = 0;
4724  again:
4725         /*
4726          * We repeat when a time extend is encountered.
4727          * Since the time extend is always attached to a data event,
4728          * we should never loop more than once.
4729          * (We never hit the following condition more than twice).
4730          */
4731         if (RB_WARN_ON(cpu_buffer, ++nr_loops > 2))
4732                 return NULL;
4733
4734         reader = rb_get_reader_page(cpu_buffer);
4735         if (!reader)
4736                 return NULL;
4737
4738         event = rb_reader_event(cpu_buffer);
4739
4740         switch (event->type_len) {
4741         case RINGBUF_TYPE_PADDING:
4742                 if (rb_null_event(event))
4743                         RB_WARN_ON(cpu_buffer, 1);
4744                 /*
4745                  * Because the writer could be discarding every
4746                  * event it creates (which would probably be bad)
4747                  * if we were to go back to "again" then we may never
4748                  * catch up, and will trigger the warn on, or lock
4749                  * the box. Return the padding, and we will release
4750                  * the current locks, and try again.
4751                  */
4752                 return event;
4753
4754         case RINGBUF_TYPE_TIME_EXTEND:
4755                 /* Internal data, OK to advance */
4756                 rb_advance_reader(cpu_buffer);
4757                 goto again;
4758
4759         case RINGBUF_TYPE_TIME_STAMP:
4760                 if (ts) {
4761                         *ts = rb_event_time_stamp(event);
4762                         *ts = rb_fix_abs_ts(*ts, reader->page->time_stamp);
4763                         ring_buffer_normalize_time_stamp(cpu_buffer->buffer,
4764                                                          cpu_buffer->cpu, ts);
4765                 }
4766                 /* Internal data, OK to advance */
4767                 rb_advance_reader(cpu_buffer);
4768                 goto again;
4769
4770         case RINGBUF_TYPE_DATA:
4771                 if (ts && !(*ts)) {
4772                         *ts = cpu_buffer->read_stamp + event->time_delta;
4773                         ring_buffer_normalize_time_stamp(cpu_buffer->buffer,
4774                                                          cpu_buffer->cpu, ts);
4775                 }
4776                 if (lost_events)
4777                         *lost_events = rb_lost_events(cpu_buffer);
4778                 return event;
4779
4780         default:
4781                 RB_WARN_ON(cpu_buffer, 1);
4782         }
4783
4784         return NULL;
4785 }
4786 EXPORT_SYMBOL_GPL(ring_buffer_peek);
4787
4788 static struct ring_buffer_event *
4789 rb_iter_peek(struct ring_buffer_iter *iter, u64 *ts)
4790 {
4791         struct trace_buffer *buffer;
4792         struct ring_buffer_per_cpu *cpu_buffer;
4793         struct ring_buffer_event *event;
4794         int nr_loops = 0;
4795
4796         if (ts)
4797                 *ts = 0;
4798
4799         cpu_buffer = iter->cpu_buffer;
4800         buffer = cpu_buffer->buffer;
4801
4802         /*
4803          * Check if someone performed a consuming read to the buffer
4804          * or removed some pages from the buffer. In these cases,
4805          * iterator was invalidated and we need to reset it.
4806          */
4807         if (unlikely(iter->cache_read != cpu_buffer->read ||
4808                      iter->cache_reader_page != cpu_buffer->reader_page ||
4809                      iter->cache_pages_removed != cpu_buffer->pages_removed))
4810                 rb_iter_reset(iter);
4811
4812  again:
4813         if (ring_buffer_iter_empty(iter))
4814                 return NULL;
4815
4816         /*
4817          * As the writer can mess with what the iterator is trying
4818          * to read, just give up if we fail to get an event after
4819          * three tries. The iterator is not as reliable when reading
4820          * the ring buffer with an active write as the consumer is.
4821          * Do not warn if the three failures is reached.
4822          */
4823         if (++nr_loops > 3)
4824                 return NULL;
4825
4826         if (rb_per_cpu_empty(cpu_buffer))
4827                 return NULL;
4828
4829         if (iter->head >= rb_page_size(iter->head_page)) {
4830                 rb_inc_iter(iter);
4831                 goto again;
4832         }
4833
4834         event = rb_iter_head_event(iter);
4835         if (!event)
4836                 goto again;
4837
4838         switch (event->type_len) {
4839         case RINGBUF_TYPE_PADDING:
4840                 if (rb_null_event(event)) {
4841                         rb_inc_iter(iter);
4842                         goto again;
4843                 }
4844                 rb_advance_iter(iter);
4845                 return event;
4846
4847         case RINGBUF_TYPE_TIME_EXTEND:
4848                 /* Internal data, OK to advance */
4849                 rb_advance_iter(iter);
4850                 goto again;
4851
4852         case RINGBUF_TYPE_TIME_STAMP:
4853                 if (ts) {
4854                         *ts = rb_event_time_stamp(event);
4855                         *ts = rb_fix_abs_ts(*ts, iter->head_page->page->time_stamp);
4856                         ring_buffer_normalize_time_stamp(cpu_buffer->buffer,
4857                                                          cpu_buffer->cpu, ts);
4858                 }
4859                 /* Internal data, OK to advance */
4860                 rb_advance_iter(iter);
4861                 goto again;
4862
4863         case RINGBUF_TYPE_DATA:
4864                 if (ts && !(*ts)) {
4865                         *ts = iter->read_stamp + event->time_delta;
4866                         ring_buffer_normalize_time_stamp(buffer,
4867                                                          cpu_buffer->cpu, ts);
4868                 }
4869                 return event;
4870
4871         default:
4872                 RB_WARN_ON(cpu_buffer, 1);
4873         }
4874
4875         return NULL;
4876 }
4877 EXPORT_SYMBOL_GPL(ring_buffer_iter_peek);
4878
4879 static inline bool rb_reader_lock(struct ring_buffer_per_cpu *cpu_buffer)
4880 {
4881         if (likely(!in_nmi())) {
4882                 raw_spin_lock(&cpu_buffer->reader_lock);
4883                 return true;
4884         }
4885
4886         /*
4887          * If an NMI die dumps out the content of the ring buffer
4888          * trylock must be used to prevent a deadlock if the NMI
4889          * preempted a task that holds the ring buffer locks. If
4890          * we get the lock then all is fine, if not, then continue
4891          * to do the read, but this can corrupt the ring buffer,
4892          * so it must be permanently disabled from future writes.
4893          * Reading from NMI is a oneshot deal.
4894          */
4895         if (raw_spin_trylock(&cpu_buffer->reader_lock))
4896                 return true;
4897
4898         /* Continue without locking, but disable the ring buffer */
4899         atomic_inc(&cpu_buffer->record_disabled);
4900         return false;
4901 }
4902
4903 static inline void
4904 rb_reader_unlock(struct ring_buffer_per_cpu *cpu_buffer, bool locked)
4905 {
4906         if (likely(locked))
4907                 raw_spin_unlock(&cpu_buffer->reader_lock);
4908 }
4909
4910 /**
4911  * ring_buffer_peek - peek at the next event to be read
4912  * @buffer: The ring buffer to read
4913  * @cpu: The cpu to peak at
4914  * @ts: The timestamp counter of this event.
4915  * @lost_events: a variable to store if events were lost (may be NULL)
4916  *
4917  * This will return the event that will be read next, but does
4918  * not consume the data.
4919  */
4920 struct ring_buffer_event *
4921 ring_buffer_peek(struct trace_buffer *buffer, int cpu, u64 *ts,
4922                  unsigned long *lost_events)
4923 {
4924         struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu];
4925         struct ring_buffer_event *event;
4926         unsigned long flags;
4927         bool dolock;
4928
4929         if (!cpumask_test_cpu(cpu, buffer->cpumask))
4930                 return NULL;
4931
4932  again:
4933         local_irq_save(flags);
4934         dolock = rb_reader_lock(cpu_buffer);
4935         event = rb_buffer_peek(cpu_buffer, ts, lost_events);
4936         if (event && event->type_len == RINGBUF_TYPE_PADDING)
4937                 rb_advance_reader(cpu_buffer);
4938         rb_reader_unlock(cpu_buffer, dolock);
4939         local_irq_restore(flags);
4940
4941         if (event && event->type_len == RINGBUF_TYPE_PADDING)
4942                 goto again;
4943
4944         return event;
4945 }
4946
4947 /** ring_buffer_iter_dropped - report if there are dropped events
4948  * @iter: The ring buffer iterator
4949  *
4950  * Returns true if there was dropped events since the last peek.
4951  */
4952 bool ring_buffer_iter_dropped(struct ring_buffer_iter *iter)
4953 {
4954         bool ret = iter->missed_events != 0;
4955
4956         iter->missed_events = 0;
4957         return ret;
4958 }
4959 EXPORT_SYMBOL_GPL(ring_buffer_iter_dropped);
4960
4961 /**
4962  * ring_buffer_iter_peek - peek at the next event to be read
4963  * @iter: The ring buffer iterator
4964  * @ts: The timestamp counter of this event.
4965  *
4966  * This will return the event that will be read next, but does
4967  * not increment the iterator.
4968  */
4969 struct ring_buffer_event *
4970 ring_buffer_iter_peek(struct ring_buffer_iter *iter, u64 *ts)
4971 {
4972         struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer;
4973         struct ring_buffer_event *event;
4974         unsigned long flags;
4975
4976  again:
4977         raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
4978         event = rb_iter_peek(iter, ts);
4979         raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
4980
4981         if (event && event->type_len == RINGBUF_TYPE_PADDING)
4982                 goto again;
4983
4984         return event;
4985 }
4986
4987 /**
4988  * ring_buffer_consume - return an event and consume it
4989  * @buffer: The ring buffer to get the next event from
4990  * @cpu: the cpu to read the buffer from
4991  * @ts: a variable to store the timestamp (may be NULL)
4992  * @lost_events: a variable to store if events were lost (may be NULL)
4993  *
4994  * Returns the next event in the ring buffer, and that event is consumed.
4995  * Meaning, that sequential reads will keep returning a different event,
4996  * and eventually empty the ring buffer if the producer is slower.
4997  */
4998 struct ring_buffer_event *
4999 ring_buffer_consume(struct trace_buffer *buffer, int cpu, u64 *ts,
5000                     unsigned long *lost_events)
5001 {
5002         struct ring_buffer_per_cpu *cpu_buffer;
5003         struct ring_buffer_event *event = NULL;
5004         unsigned long flags;
5005         bool dolock;
5006
5007  again:
5008         /* might be called in atomic */
5009         preempt_disable();
5010
5011         if (!cpumask_test_cpu(cpu, buffer->cpumask))
5012                 goto out;
5013
5014         cpu_buffer = buffer->buffers[cpu];
5015         local_irq_save(flags);
5016         dolock = rb_reader_lock(cpu_buffer);
5017
5018         event = rb_buffer_peek(cpu_buffer, ts, lost_events);
5019         if (event) {
5020                 cpu_buffer->lost_events = 0;
5021                 rb_advance_reader(cpu_buffer);
5022         }
5023
5024         rb_reader_unlock(cpu_buffer, dolock);
5025         local_irq_restore(flags);
5026
5027  out:
5028         preempt_enable();
5029
5030         if (event && event->type_len == RINGBUF_TYPE_PADDING)
5031                 goto again;
5032
5033         return event;
5034 }
5035 EXPORT_SYMBOL_GPL(ring_buffer_consume);
5036
5037 /**
5038  * ring_buffer_read_prepare - Prepare for a non consuming read of the buffer
5039  * @buffer: The ring buffer to read from
5040  * @cpu: The cpu buffer to iterate over
5041  * @flags: gfp flags to use for memory allocation
5042  *
5043  * This performs the initial preparations necessary to iterate
5044  * through the buffer.  Memory is allocated, buffer recording
5045  * is disabled, and the iterator pointer is returned to the caller.
5046  *
5047  * Disabling buffer recording prevents the reading from being
5048  * corrupted. This is not a consuming read, so a producer is not
5049  * expected.
5050  *
5051  * After a sequence of ring_buffer_read_prepare calls, the user is
5052  * expected to make at least one call to ring_buffer_read_prepare_sync.
5053  * Afterwards, ring_buffer_read_start is invoked to get things going
5054  * for real.
5055  *
5056  * This overall must be paired with ring_buffer_read_finish.
5057  */
5058 struct ring_buffer_iter *
5059 ring_buffer_read_prepare(struct trace_buffer *buffer, int cpu, gfp_t flags)
5060 {
5061         struct ring_buffer_per_cpu *cpu_buffer;
5062         struct ring_buffer_iter *iter;
5063
5064         if (!cpumask_test_cpu(cpu, buffer->cpumask))
5065                 return NULL;
5066
5067         iter = kzalloc(sizeof(*iter), flags);
5068         if (!iter)
5069                 return NULL;
5070
5071         /* Holds the entire event: data and meta data */
5072         iter->event_size = buffer->subbuf_size;
5073         iter->event = kmalloc(iter->event_size, flags);
5074         if (!iter->event) {
5075                 kfree(iter);
5076                 return NULL;
5077         }
5078
5079         cpu_buffer = buffer->buffers[cpu];
5080
5081         iter->cpu_buffer = cpu_buffer;
5082
5083         atomic_inc(&cpu_buffer->resize_disabled);
5084
5085         return iter;
5086 }
5087 EXPORT_SYMBOL_GPL(ring_buffer_read_prepare);
5088
5089 /**
5090  * ring_buffer_read_prepare_sync - Synchronize a set of prepare calls
5091  *
5092  * All previously invoked ring_buffer_read_prepare calls to prepare
5093  * iterators will be synchronized.  Afterwards, read_buffer_read_start
5094  * calls on those iterators are allowed.
5095  */
5096 void
5097 ring_buffer_read_prepare_sync(void)
5098 {
5099         synchronize_rcu();
5100 }
5101 EXPORT_SYMBOL_GPL(ring_buffer_read_prepare_sync);
5102
5103 /**
5104  * ring_buffer_read_start - start a non consuming read of the buffer
5105  * @iter: The iterator returned by ring_buffer_read_prepare
5106  *
5107  * This finalizes the startup of an iteration through the buffer.
5108  * The iterator comes from a call to ring_buffer_read_prepare and
5109  * an intervening ring_buffer_read_prepare_sync must have been
5110  * performed.
5111  *
5112  * Must be paired with ring_buffer_read_finish.
5113  */
5114 void
5115 ring_buffer_read_start(struct ring_buffer_iter *iter)
5116 {
5117         struct ring_buffer_per_cpu *cpu_buffer;
5118         unsigned long flags;
5119
5120         if (!iter)
5121                 return;
5122
5123         cpu_buffer = iter->cpu_buffer;
5124
5125         raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
5126         arch_spin_lock(&cpu_buffer->lock);
5127         rb_iter_reset(iter);
5128         arch_spin_unlock(&cpu_buffer->lock);
5129         raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
5130 }
5131 EXPORT_SYMBOL_GPL(ring_buffer_read_start);
5132
5133 /**
5134  * ring_buffer_read_finish - finish reading the iterator of the buffer
5135  * @iter: The iterator retrieved by ring_buffer_start
5136  *
5137  * This re-enables the recording to the buffer, and frees the
5138  * iterator.
5139  */
5140 void
5141 ring_buffer_read_finish(struct ring_buffer_iter *iter)
5142 {
5143         struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer;
5144         unsigned long flags;
5145
5146         /*
5147          * Ring buffer is disabled from recording, here's a good place
5148          * to check the integrity of the ring buffer.
5149          * Must prevent readers from trying to read, as the check
5150          * clears the HEAD page and readers require it.
5151          */
5152         raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
5153         rb_check_pages(cpu_buffer);
5154         raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
5155
5156         atomic_dec(&cpu_buffer->resize_disabled);
5157         kfree(iter->event);
5158         kfree(iter);
5159 }
5160 EXPORT_SYMBOL_GPL(ring_buffer_read_finish);
5161
5162 /**
5163  * ring_buffer_iter_advance - advance the iterator to the next location
5164  * @iter: The ring buffer iterator
5165  *
5166  * Move the location of the iterator such that the next read will
5167  * be the next location of the iterator.
5168  */
5169 void ring_buffer_iter_advance(struct ring_buffer_iter *iter)
5170 {
5171         struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer;
5172         unsigned long flags;
5173
5174         raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
5175
5176         rb_advance_iter(iter);
5177
5178         raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
5179 }
5180 EXPORT_SYMBOL_GPL(ring_buffer_iter_advance);
5181
5182 /**
5183  * ring_buffer_size - return the size of the ring buffer (in bytes)
5184  * @buffer: The ring buffer.
5185  * @cpu: The CPU to get ring buffer size from.
5186  */
5187 unsigned long ring_buffer_size(struct trace_buffer *buffer, int cpu)
5188 {
5189         if (!cpumask_test_cpu(cpu, buffer->cpumask))
5190                 return 0;
5191
5192         return buffer->subbuf_size * buffer->buffers[cpu]->nr_pages;
5193 }
5194 EXPORT_SYMBOL_GPL(ring_buffer_size);
5195
5196 /**
5197  * ring_buffer_max_event_size - return the max data size of an event
5198  * @buffer: The ring buffer.
5199  *
5200  * Returns the maximum size an event can be.
5201  */
5202 unsigned long ring_buffer_max_event_size(struct trace_buffer *buffer)
5203 {
5204         /* If abs timestamp is requested, events have a timestamp too */
5205         if (ring_buffer_time_stamp_abs(buffer))
5206                 return buffer->max_data_size - RB_LEN_TIME_EXTEND;
5207         return buffer->max_data_size;
5208 }
5209 EXPORT_SYMBOL_GPL(ring_buffer_max_event_size);
5210
5211 static void rb_clear_buffer_page(struct buffer_page *page)
5212 {
5213         local_set(&page->write, 0);
5214         local_set(&page->entries, 0);
5215         rb_init_page(page->page);
5216         page->read = 0;
5217 }
5218
5219 static void
5220 rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer)
5221 {
5222         struct buffer_page *page;
5223
5224         rb_head_page_deactivate(cpu_buffer);
5225
5226         cpu_buffer->head_page
5227                 = list_entry(cpu_buffer->pages, struct buffer_page, list);
5228         rb_clear_buffer_page(cpu_buffer->head_page);
5229         list_for_each_entry(page, cpu_buffer->pages, list) {
5230                 rb_clear_buffer_page(page);
5231         }
5232
5233         cpu_buffer->tail_page = cpu_buffer->head_page;
5234         cpu_buffer->commit_page = cpu_buffer->head_page;
5235
5236         INIT_LIST_HEAD(&cpu_buffer->reader_page->list);
5237         INIT_LIST_HEAD(&cpu_buffer->new_pages);
5238         rb_clear_buffer_page(cpu_buffer->reader_page);
5239
5240         local_set(&cpu_buffer->entries_bytes, 0);
5241         local_set(&cpu_buffer->overrun, 0);
5242         local_set(&cpu_buffer->commit_overrun, 0);
5243         local_set(&cpu_buffer->dropped_events, 0);
5244         local_set(&cpu_buffer->entries, 0);
5245         local_set(&cpu_buffer->committing, 0);
5246         local_set(&cpu_buffer->commits, 0);
5247         local_set(&cpu_buffer->pages_touched, 0);
5248         local_set(&cpu_buffer->pages_lost, 0);
5249         local_set(&cpu_buffer->pages_read, 0);
5250         cpu_buffer->last_pages_touch = 0;
5251         cpu_buffer->shortest_full = 0;
5252         cpu_buffer->read = 0;
5253         cpu_buffer->read_bytes = 0;
5254
5255         rb_time_set(&cpu_buffer->write_stamp, 0);
5256         rb_time_set(&cpu_buffer->before_stamp, 0);
5257
5258         memset(cpu_buffer->event_stamp, 0, sizeof(cpu_buffer->event_stamp));
5259
5260         cpu_buffer->lost_events = 0;
5261         cpu_buffer->last_overrun = 0;
5262
5263         rb_head_page_activate(cpu_buffer);
5264         cpu_buffer->pages_removed = 0;
5265 }
5266
5267 /* Must have disabled the cpu buffer then done a synchronize_rcu */
5268 static void reset_disabled_cpu_buffer(struct ring_buffer_per_cpu *cpu_buffer)
5269 {
5270         unsigned long flags;
5271
5272         raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
5273
5274         if (RB_WARN_ON(cpu_buffer, local_read(&cpu_buffer->committing)))
5275                 goto out;
5276
5277         arch_spin_lock(&cpu_buffer->lock);
5278
5279         rb_reset_cpu(cpu_buffer);
5280
5281         arch_spin_unlock(&cpu_buffer->lock);
5282
5283  out:
5284         raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
5285 }
5286
5287 /**
5288  * ring_buffer_reset_cpu - reset a ring buffer per CPU buffer
5289  * @buffer: The ring buffer to reset a per cpu buffer of
5290  * @cpu: The CPU buffer to be reset
5291  */
5292 void ring_buffer_reset_cpu(struct trace_buffer *buffer, int cpu)
5293 {
5294         struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu];
5295
5296         if (!cpumask_test_cpu(cpu, buffer->cpumask))
5297                 return;
5298
5299         /* prevent another thread from changing buffer sizes */
5300         mutex_lock(&buffer->mutex);
5301
5302         atomic_inc(&cpu_buffer->resize_disabled);
5303         atomic_inc(&cpu_buffer->record_disabled);
5304
5305         /* Make sure all commits have finished */
5306         synchronize_rcu();
5307
5308         reset_disabled_cpu_buffer(cpu_buffer);
5309
5310         atomic_dec(&cpu_buffer->record_disabled);
5311         atomic_dec(&cpu_buffer->resize_disabled);
5312
5313         mutex_unlock(&buffer->mutex);
5314 }
5315 EXPORT_SYMBOL_GPL(ring_buffer_reset_cpu);
5316
5317 /* Flag to ensure proper resetting of atomic variables */
5318 #define RESET_BIT       (1 << 30)
5319
5320 /**
5321  * ring_buffer_reset_online_cpus - reset a ring buffer per CPU buffer
5322  * @buffer: The ring buffer to reset a per cpu buffer of
5323  */
5324 void ring_buffer_reset_online_cpus(struct trace_buffer *buffer)
5325 {
5326         struct ring_buffer_per_cpu *cpu_buffer;
5327         int cpu;
5328
5329         /* prevent another thread from changing buffer sizes */
5330         mutex_lock(&buffer->mutex);
5331
5332         for_each_online_buffer_cpu(buffer, cpu) {
5333                 cpu_buffer = buffer->buffers[cpu];
5334
5335                 atomic_add(RESET_BIT, &cpu_buffer->resize_disabled);
5336                 atomic_inc(&cpu_buffer->record_disabled);
5337         }
5338
5339         /* Make sure all commits have finished */
5340         synchronize_rcu();
5341
5342         for_each_buffer_cpu(buffer, cpu) {
5343                 cpu_buffer = buffer->buffers[cpu];
5344
5345                 /*
5346                  * If a CPU came online during the synchronize_rcu(), then
5347                  * ignore it.
5348                  */
5349                 if (!(atomic_read(&cpu_buffer->resize_disabled) & RESET_BIT))
5350                         continue;
5351
5352                 reset_disabled_cpu_buffer(cpu_buffer);
5353
5354                 atomic_dec(&cpu_buffer->record_disabled);
5355                 atomic_sub(RESET_BIT, &cpu_buffer->resize_disabled);
5356         }
5357
5358         mutex_unlock(&buffer->mutex);
5359 }
5360
5361 /**
5362  * ring_buffer_reset - reset a ring buffer
5363  * @buffer: The ring buffer to reset all cpu buffers
5364  */
5365 void ring_buffer_reset(struct trace_buffer *buffer)
5366 {
5367         struct ring_buffer_per_cpu *cpu_buffer;
5368         int cpu;
5369
5370         /* prevent another thread from changing buffer sizes */
5371         mutex_lock(&buffer->mutex);
5372
5373         for_each_buffer_cpu(buffer, cpu) {
5374                 cpu_buffer = buffer->buffers[cpu];
5375
5376                 atomic_inc(&cpu_buffer->resize_disabled);
5377                 atomic_inc(&cpu_buffer->record_disabled);
5378         }
5379
5380         /* Make sure all commits have finished */
5381         synchronize_rcu();
5382
5383         for_each_buffer_cpu(buffer, cpu) {
5384                 cpu_buffer = buffer->buffers[cpu];
5385
5386                 reset_disabled_cpu_buffer(cpu_buffer);
5387
5388                 atomic_dec(&cpu_buffer->record_disabled);
5389                 atomic_dec(&cpu_buffer->resize_disabled);
5390         }
5391
5392         mutex_unlock(&buffer->mutex);
5393 }
5394 EXPORT_SYMBOL_GPL(ring_buffer_reset);
5395
5396 /**
5397  * ring_buffer_empty - is the ring buffer empty?
5398  * @buffer: The ring buffer to test
5399  */
5400 bool ring_buffer_empty(struct trace_buffer *buffer)
5401 {
5402         struct ring_buffer_per_cpu *cpu_buffer;
5403         unsigned long flags;
5404         bool dolock;
5405         bool ret;
5406         int cpu;
5407
5408         /* yes this is racy, but if you don't like the race, lock the buffer */
5409         for_each_buffer_cpu(buffer, cpu) {
5410                 cpu_buffer = buffer->buffers[cpu];
5411                 local_irq_save(flags);
5412                 dolock = rb_reader_lock(cpu_buffer);
5413                 ret = rb_per_cpu_empty(cpu_buffer);
5414                 rb_reader_unlock(cpu_buffer, dolock);
5415                 local_irq_restore(flags);
5416
5417                 if (!ret)
5418                         return false;
5419         }
5420
5421         return true;
5422 }
5423 EXPORT_SYMBOL_GPL(ring_buffer_empty);
5424
5425 /**
5426  * ring_buffer_empty_cpu - is a cpu buffer of a ring buffer empty?
5427  * @buffer: The ring buffer
5428  * @cpu: The CPU buffer to test
5429  */
5430 bool ring_buffer_empty_cpu(struct trace_buffer *buffer, int cpu)
5431 {
5432         struct ring_buffer_per_cpu *cpu_buffer;
5433         unsigned long flags;
5434         bool dolock;
5435         bool ret;
5436
5437         if (!cpumask_test_cpu(cpu, buffer->cpumask))
5438                 return true;
5439
5440         cpu_buffer = buffer->buffers[cpu];
5441         local_irq_save(flags);
5442         dolock = rb_reader_lock(cpu_buffer);
5443         ret = rb_per_cpu_empty(cpu_buffer);
5444         rb_reader_unlock(cpu_buffer, dolock);
5445         local_irq_restore(flags);
5446
5447         return ret;
5448 }
5449 EXPORT_SYMBOL_GPL(ring_buffer_empty_cpu);
5450
5451 #ifdef CONFIG_RING_BUFFER_ALLOW_SWAP
5452 /**
5453  * ring_buffer_swap_cpu - swap a CPU buffer between two ring buffers
5454  * @buffer_a: One buffer to swap with
5455  * @buffer_b: The other buffer to swap with
5456  * @cpu: the CPU of the buffers to swap
5457  *
5458  * This function is useful for tracers that want to take a "snapshot"
5459  * of a CPU buffer and has another back up buffer lying around.
5460  * it is expected that the tracer handles the cpu buffer not being
5461  * used at the moment.
5462  */
5463 int ring_buffer_swap_cpu(struct trace_buffer *buffer_a,
5464                          struct trace_buffer *buffer_b, int cpu)
5465 {
5466         struct ring_buffer_per_cpu *cpu_buffer_a;
5467         struct ring_buffer_per_cpu *cpu_buffer_b;
5468         int ret = -EINVAL;
5469
5470         if (!cpumask_test_cpu(cpu, buffer_a->cpumask) ||
5471             !cpumask_test_cpu(cpu, buffer_b->cpumask))
5472                 goto out;
5473
5474         cpu_buffer_a = buffer_a->buffers[cpu];
5475         cpu_buffer_b = buffer_b->buffers[cpu];
5476
5477         /* At least make sure the two buffers are somewhat the same */
5478         if (cpu_buffer_a->nr_pages != cpu_buffer_b->nr_pages)
5479                 goto out;
5480
5481         if (buffer_a->subbuf_order != buffer_b->subbuf_order)
5482                 goto out;
5483
5484         ret = -EAGAIN;
5485
5486         if (atomic_read(&buffer_a->record_disabled))
5487                 goto out;
5488
5489         if (atomic_read(&buffer_b->record_disabled))
5490                 goto out;
5491
5492         if (atomic_read(&cpu_buffer_a->record_disabled))
5493                 goto out;
5494
5495         if (atomic_read(&cpu_buffer_b->record_disabled))
5496                 goto out;
5497
5498         /*
5499          * We can't do a synchronize_rcu here because this
5500          * function can be called in atomic context.
5501          * Normally this will be called from the same CPU as cpu.
5502          * If not it's up to the caller to protect this.
5503          */
5504         atomic_inc(&cpu_buffer_a->record_disabled);
5505         atomic_inc(&cpu_buffer_b->record_disabled);
5506
5507         ret = -EBUSY;
5508         if (local_read(&cpu_buffer_a->committing))
5509                 goto out_dec;
5510         if (local_read(&cpu_buffer_b->committing))
5511                 goto out_dec;
5512
5513         /*
5514          * When resize is in progress, we cannot swap it because
5515          * it will mess the state of the cpu buffer.
5516          */
5517         if (atomic_read(&buffer_a->resizing))
5518                 goto out_dec;
5519         if (atomic_read(&buffer_b->resizing))
5520                 goto out_dec;
5521
5522         buffer_a->buffers[cpu] = cpu_buffer_b;
5523         buffer_b->buffers[cpu] = cpu_buffer_a;
5524
5525         cpu_buffer_b->buffer = buffer_a;
5526         cpu_buffer_a->buffer = buffer_b;
5527
5528         ret = 0;
5529
5530 out_dec:
5531         atomic_dec(&cpu_buffer_a->record_disabled);
5532         atomic_dec(&cpu_buffer_b->record_disabled);
5533 out:
5534         return ret;
5535 }
5536 EXPORT_SYMBOL_GPL(ring_buffer_swap_cpu);
5537 #endif /* CONFIG_RING_BUFFER_ALLOW_SWAP */
5538
5539 /**
5540  * ring_buffer_alloc_read_page - allocate a page to read from buffer
5541  * @buffer: the buffer to allocate for.
5542  * @cpu: the cpu buffer to allocate.
5543  *
5544  * This function is used in conjunction with ring_buffer_read_page.
5545  * When reading a full page from the ring buffer, these functions
5546  * can be used to speed up the process. The calling function should
5547  * allocate a few pages first with this function. Then when it
5548  * needs to get pages from the ring buffer, it passes the result
5549  * of this function into ring_buffer_read_page, which will swap
5550  * the page that was allocated, with the read page of the buffer.
5551  *
5552  * Returns:
5553  *  The page allocated, or ERR_PTR
5554  */
5555 struct buffer_data_read_page *
5556 ring_buffer_alloc_read_page(struct trace_buffer *buffer, int cpu)
5557 {
5558         struct ring_buffer_per_cpu *cpu_buffer;
5559         struct buffer_data_read_page *bpage = NULL;
5560         unsigned long flags;
5561         struct page *page;
5562
5563         if (!cpumask_test_cpu(cpu, buffer->cpumask))
5564                 return ERR_PTR(-ENODEV);
5565
5566         bpage = kzalloc(sizeof(*bpage), GFP_KERNEL);
5567         if (!bpage)
5568                 return ERR_PTR(-ENOMEM);
5569
5570         bpage->order = buffer->subbuf_order;
5571         cpu_buffer = buffer->buffers[cpu];
5572         local_irq_save(flags);
5573         arch_spin_lock(&cpu_buffer->lock);
5574
5575         if (cpu_buffer->free_page) {
5576                 bpage->data = cpu_buffer->free_page;
5577                 cpu_buffer->free_page = NULL;
5578         }
5579
5580         arch_spin_unlock(&cpu_buffer->lock);
5581         local_irq_restore(flags);
5582
5583         if (bpage->data)
5584                 goto out;
5585
5586         page = alloc_pages_node(cpu_to_node(cpu), GFP_KERNEL | __GFP_NORETRY,
5587                                 cpu_buffer->buffer->subbuf_order);
5588         if (!page) {
5589                 kfree(bpage);
5590                 return ERR_PTR(-ENOMEM);
5591         }
5592
5593         bpage->data = page_address(page);
5594
5595  out:
5596         rb_init_page(bpage->data);
5597
5598         return bpage;
5599 }
5600 EXPORT_SYMBOL_GPL(ring_buffer_alloc_read_page);
5601
5602 /**
5603  * ring_buffer_free_read_page - free an allocated read page
5604  * @buffer: the buffer the page was allocate for
5605  * @cpu: the cpu buffer the page came from
5606  * @data_page: the page to free
5607  *
5608  * Free a page allocated from ring_buffer_alloc_read_page.
5609  */
5610 void ring_buffer_free_read_page(struct trace_buffer *buffer, int cpu,
5611                                 struct buffer_data_read_page *data_page)
5612 {
5613         struct ring_buffer_per_cpu *cpu_buffer;
5614         struct buffer_data_page *bpage = data_page->data;
5615         struct page *page = virt_to_page(bpage);
5616         unsigned long flags;
5617
5618         if (!buffer || !buffer->buffers || !buffer->buffers[cpu])
5619                 return;
5620
5621         cpu_buffer = buffer->buffers[cpu];
5622
5623         /*
5624          * If the page is still in use someplace else, or order of the page
5625          * is different from the subbuffer order of the buffer -
5626          * we can't reuse it
5627          */
5628         if (page_ref_count(page) > 1 || data_page->order != buffer->subbuf_order)
5629                 goto out;
5630
5631         local_irq_save(flags);
5632         arch_spin_lock(&cpu_buffer->lock);
5633
5634         if (!cpu_buffer->free_page) {
5635                 cpu_buffer->free_page = bpage;
5636                 bpage = NULL;
5637         }
5638
5639         arch_spin_unlock(&cpu_buffer->lock);
5640         local_irq_restore(flags);
5641
5642  out:
5643         free_pages((unsigned long)bpage, data_page->order);
5644         kfree(data_page);
5645 }
5646 EXPORT_SYMBOL_GPL(ring_buffer_free_read_page);
5647
5648 /**
5649  * ring_buffer_read_page - extract a page from the ring buffer
5650  * @buffer: buffer to extract from
5651  * @data_page: the page to use allocated from ring_buffer_alloc_read_page
5652  * @len: amount to extract
5653  * @cpu: the cpu of the buffer to extract
5654  * @full: should the extraction only happen when the page is full.
5655  *
5656  * This function will pull out a page from the ring buffer and consume it.
5657  * @data_page must be the address of the variable that was returned
5658  * from ring_buffer_alloc_read_page. This is because the page might be used
5659  * to swap with a page in the ring buffer.
5660  *
5661  * for example:
5662  *      rpage = ring_buffer_alloc_read_page(buffer, cpu);
5663  *      if (IS_ERR(rpage))
5664  *              return PTR_ERR(rpage);
5665  *      ret = ring_buffer_read_page(buffer, rpage, len, cpu, 0);
5666  *      if (ret >= 0)
5667  *              process_page(ring_buffer_read_page_data(rpage), ret);
5668  *      ring_buffer_free_read_page(buffer, cpu, rpage);
5669  *
5670  * When @full is set, the function will not return true unless
5671  * the writer is off the reader page.
5672  *
5673  * Note: it is up to the calling functions to handle sleeps and wakeups.
5674  *  The ring buffer can be used anywhere in the kernel and can not
5675  *  blindly call wake_up. The layer that uses the ring buffer must be
5676  *  responsible for that.
5677  *
5678  * Returns:
5679  *  >=0 if data has been transferred, returns the offset of consumed data.
5680  *  <0 if no data has been transferred.
5681  */
5682 int ring_buffer_read_page(struct trace_buffer *buffer,
5683                           struct buffer_data_read_page *data_page,
5684                           size_t len, int cpu, int full)
5685 {
5686         struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu];
5687         struct ring_buffer_event *event;
5688         struct buffer_data_page *bpage;
5689         struct buffer_page *reader;
5690         unsigned long missed_events;
5691         unsigned long flags;
5692         unsigned int commit;
5693         unsigned int read;
5694         u64 save_timestamp;
5695         int ret = -1;
5696
5697         if (!cpumask_test_cpu(cpu, buffer->cpumask))
5698                 goto out;
5699
5700         /*
5701          * If len is not big enough to hold the page header, then
5702          * we can not copy anything.
5703          */
5704         if (len <= BUF_PAGE_HDR_SIZE)
5705                 goto out;
5706
5707         len -= BUF_PAGE_HDR_SIZE;
5708
5709         if (!data_page || !data_page->data)
5710                 goto out;
5711         if (data_page->order != buffer->subbuf_order)
5712                 goto out;
5713
5714         bpage = data_page->data;
5715         if (!bpage)
5716                 goto out;
5717
5718         raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
5719
5720         reader = rb_get_reader_page(cpu_buffer);
5721         if (!reader)
5722                 goto out_unlock;
5723
5724         event = rb_reader_event(cpu_buffer);
5725
5726         read = reader->read;
5727         commit = rb_page_commit(reader);
5728
5729         /* Check if any events were dropped */
5730         missed_events = cpu_buffer->lost_events;
5731
5732         /*
5733          * If this page has been partially read or
5734          * if len is not big enough to read the rest of the page or
5735          * a writer is still on the page, then
5736          * we must copy the data from the page to the buffer.
5737          * Otherwise, we can simply swap the page with the one passed in.
5738          */
5739         if (read || (len < (commit - read)) ||
5740             cpu_buffer->reader_page == cpu_buffer->commit_page) {
5741                 struct buffer_data_page *rpage = cpu_buffer->reader_page->page;
5742                 unsigned int rpos = read;
5743                 unsigned int pos = 0;
5744                 unsigned int size;
5745
5746                 /*
5747                  * If a full page is expected, this can still be returned
5748                  * if there's been a previous partial read and the
5749                  * rest of the page can be read and the commit page is off
5750                  * the reader page.
5751                  */
5752                 if (full &&
5753                     (!read || (len < (commit - read)) ||
5754                      cpu_buffer->reader_page == cpu_buffer->commit_page))
5755                         goto out_unlock;
5756
5757                 if (len > (commit - read))
5758                         len = (commit - read);
5759
5760                 /* Always keep the time extend and data together */
5761                 size = rb_event_ts_length(event);
5762
5763                 if (len < size)
5764                         goto out_unlock;
5765
5766                 /* save the current timestamp, since the user will need it */
5767                 save_timestamp = cpu_buffer->read_stamp;
5768
5769                 /* Need to copy one event at a time */
5770                 do {
5771                         /* We need the size of one event, because
5772                          * rb_advance_reader only advances by one event,
5773                          * whereas rb_event_ts_length may include the size of
5774                          * one or two events.
5775                          * We have already ensured there's enough space if this
5776                          * is a time extend. */
5777                         size = rb_event_length(event);
5778                         memcpy(bpage->data + pos, rpage->data + rpos, size);
5779
5780                         len -= size;
5781
5782                         rb_advance_reader(cpu_buffer);
5783                         rpos = reader->read;
5784                         pos += size;
5785
5786                         if (rpos >= commit)
5787                                 break;
5788
5789                         event = rb_reader_event(cpu_buffer);
5790                         /* Always keep the time extend and data together */
5791                         size = rb_event_ts_length(event);
5792                 } while (len >= size);
5793
5794                 /* update bpage */
5795                 local_set(&bpage->commit, pos);
5796                 bpage->time_stamp = save_timestamp;
5797
5798                 /* we copied everything to the beginning */
5799                 read = 0;
5800         } else {
5801                 /* update the entry counter */
5802                 cpu_buffer->read += rb_page_entries(reader);
5803                 cpu_buffer->read_bytes += rb_page_commit(reader);
5804
5805                 /* swap the pages */
5806                 rb_init_page(bpage);
5807                 bpage = reader->page;
5808                 reader->page = data_page->data;
5809                 local_set(&reader->write, 0);
5810                 local_set(&reader->entries, 0);
5811                 reader->read = 0;
5812                 data_page->data = bpage;
5813
5814                 /*
5815                  * Use the real_end for the data size,
5816                  * This gives us a chance to store the lost events
5817                  * on the page.
5818                  */
5819                 if (reader->real_end)
5820                         local_set(&bpage->commit, reader->real_end);
5821         }
5822         ret = read;
5823
5824         cpu_buffer->lost_events = 0;
5825
5826         commit = local_read(&bpage->commit);
5827         /*
5828          * Set a flag in the commit field if we lost events
5829          */
5830         if (missed_events) {
5831                 /* If there is room at the end of the page to save the
5832                  * missed events, then record it there.
5833                  */
5834                 if (buffer->subbuf_size - commit >= sizeof(missed_events)) {
5835                         memcpy(&bpage->data[commit], &missed_events,
5836                                sizeof(missed_events));
5837                         local_add(RB_MISSED_STORED, &bpage->commit);
5838                         commit += sizeof(missed_events);
5839                 }
5840                 local_add(RB_MISSED_EVENTS, &bpage->commit);
5841         }
5842
5843         /*
5844          * This page may be off to user land. Zero it out here.
5845          */
5846         if (commit < buffer->subbuf_size)
5847                 memset(&bpage->data[commit], 0, buffer->subbuf_size - commit);
5848
5849  out_unlock:
5850         raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
5851
5852  out:
5853         return ret;
5854 }
5855 EXPORT_SYMBOL_GPL(ring_buffer_read_page);
5856
5857 /**
5858  * ring_buffer_read_page_data - get pointer to the data in the page.
5859  * @page:  the page to get the data from
5860  *
5861  * Returns pointer to the actual data in this page.
5862  */
5863 void *ring_buffer_read_page_data(struct buffer_data_read_page *page)
5864 {
5865         return page->data;
5866 }
5867 EXPORT_SYMBOL_GPL(ring_buffer_read_page_data);
5868
5869 /**
5870  * ring_buffer_subbuf_size_get - get size of the sub buffer.
5871  * @buffer: the buffer to get the sub buffer size from
5872  *
5873  * Returns size of the sub buffer, in bytes.
5874  */
5875 int ring_buffer_subbuf_size_get(struct trace_buffer *buffer)
5876 {
5877         return buffer->subbuf_size + BUF_PAGE_HDR_SIZE;
5878 }
5879 EXPORT_SYMBOL_GPL(ring_buffer_subbuf_size_get);
5880
5881 /**
5882  * ring_buffer_subbuf_order_get - get order of system sub pages in one buffer page.
5883  * @buffer: The ring_buffer to get the system sub page order from
5884  *
5885  * By default, one ring buffer sub page equals to one system page. This parameter
5886  * is configurable, per ring buffer. The size of the ring buffer sub page can be
5887  * extended, but must be an order of system page size.
5888  *
5889  * Returns the order of buffer sub page size, in system pages:
5890  * 0 means the sub buffer size is 1 system page and so forth.
5891  * In case of an error < 0 is returned.
5892  */
5893 int ring_buffer_subbuf_order_get(struct trace_buffer *buffer)
5894 {
5895         if (!buffer)
5896                 return -EINVAL;
5897
5898         return buffer->subbuf_order;
5899 }
5900 EXPORT_SYMBOL_GPL(ring_buffer_subbuf_order_get);
5901
5902 /**
5903  * ring_buffer_subbuf_order_set - set the size of ring buffer sub page.
5904  * @buffer: The ring_buffer to set the new page size.
5905  * @order: Order of the system pages in one sub buffer page
5906  *
5907  * By default, one ring buffer pages equals to one system page. This API can be
5908  * used to set new size of the ring buffer page. The size must be order of
5909  * system page size, that's why the input parameter @order is the order of
5910  * system pages that are allocated for one ring buffer page:
5911  *  0 - 1 system page
5912  *  1 - 2 system pages
5913  *  3 - 4 system pages
5914  *  ...
5915  *
5916  * Returns 0 on success or < 0 in case of an error.
5917  */
5918 int ring_buffer_subbuf_order_set(struct trace_buffer *buffer, int order)
5919 {
5920         struct ring_buffer_per_cpu *cpu_buffer;
5921         struct buffer_page *bpage, *tmp;
5922         int old_order, old_size;
5923         int nr_pages;
5924         int psize;
5925         int err;
5926         int cpu;
5927
5928         if (!buffer || order < 0)
5929                 return -EINVAL;
5930
5931         if (buffer->subbuf_order == order)
5932                 return 0;
5933
5934         psize = (1 << order) * PAGE_SIZE;
5935         if (psize <= BUF_PAGE_HDR_SIZE)
5936                 return -EINVAL;
5937
5938         /* Size of a subbuf cannot be greater than the write counter */
5939         if (psize > RB_WRITE_MASK + 1)
5940                 return -EINVAL;
5941
5942         old_order = buffer->subbuf_order;
5943         old_size = buffer->subbuf_size;
5944
5945         /* prevent another thread from changing buffer sizes */
5946         mutex_lock(&buffer->mutex);
5947         atomic_inc(&buffer->record_disabled);
5948
5949         /* Make sure all commits have finished */
5950         synchronize_rcu();
5951
5952         buffer->subbuf_order = order;
5953         buffer->subbuf_size = psize - BUF_PAGE_HDR_SIZE;
5954
5955         /* Make sure all new buffers are allocated, before deleting the old ones */
5956         for_each_buffer_cpu(buffer, cpu) {
5957
5958                 if (!cpumask_test_cpu(cpu, buffer->cpumask))
5959                         continue;
5960
5961                 cpu_buffer = buffer->buffers[cpu];
5962
5963                 /* Update the number of pages to match the new size */
5964                 nr_pages = old_size * buffer->buffers[cpu]->nr_pages;
5965                 nr_pages = DIV_ROUND_UP(nr_pages, buffer->subbuf_size);
5966
5967                 /* we need a minimum of two pages */
5968                 if (nr_pages < 2)
5969                         nr_pages = 2;
5970
5971                 cpu_buffer->nr_pages_to_update = nr_pages;
5972
5973                 /* Include the reader page */
5974                 nr_pages++;
5975
5976                 /* Allocate the new size buffer */
5977                 INIT_LIST_HEAD(&cpu_buffer->new_pages);
5978                 if (__rb_allocate_pages(cpu_buffer, nr_pages,
5979                                         &cpu_buffer->new_pages)) {
5980                         /* not enough memory for new pages */
5981                         err = -ENOMEM;
5982                         goto error;
5983                 }
5984         }
5985
5986         for_each_buffer_cpu(buffer, cpu) {
5987
5988                 if (!cpumask_test_cpu(cpu, buffer->cpumask))
5989                         continue;
5990
5991                 cpu_buffer = buffer->buffers[cpu];
5992
5993                 /* Clear the head bit to make the link list normal to read */
5994                 rb_head_page_deactivate(cpu_buffer);
5995
5996                 /* Now walk the list and free all the old sub buffers */
5997                 list_for_each_entry_safe(bpage, tmp, cpu_buffer->pages, list) {
5998                         list_del_init(&bpage->list);
5999                         free_buffer_page(bpage);
6000                 }
6001                 /* The above loop stopped an the last page needing to be freed */
6002                 bpage = list_entry(cpu_buffer->pages, struct buffer_page, list);
6003                 free_buffer_page(bpage);
6004
6005                 /* Free the current reader page */
6006                 free_buffer_page(cpu_buffer->reader_page);
6007
6008                 /* One page was allocated for the reader page */
6009                 cpu_buffer->reader_page = list_entry(cpu_buffer->new_pages.next,
6010                                                      struct buffer_page, list);
6011                 list_del_init(&cpu_buffer->reader_page->list);
6012
6013                 /* The cpu_buffer pages are a link list with no head */
6014                 cpu_buffer->pages = cpu_buffer->new_pages.next;
6015                 cpu_buffer->new_pages.next->prev = cpu_buffer->new_pages.prev;
6016                 cpu_buffer->new_pages.prev->next = cpu_buffer->new_pages.next;
6017
6018                 /* Clear the new_pages list */
6019                 INIT_LIST_HEAD(&cpu_buffer->new_pages);
6020
6021                 cpu_buffer->head_page
6022                         = list_entry(cpu_buffer->pages, struct buffer_page, list);
6023                 cpu_buffer->tail_page = cpu_buffer->commit_page = cpu_buffer->head_page;
6024
6025                 cpu_buffer->nr_pages = cpu_buffer->nr_pages_to_update;
6026                 cpu_buffer->nr_pages_to_update = 0;
6027
6028                 free_pages((unsigned long)cpu_buffer->free_page, old_order);
6029                 cpu_buffer->free_page = NULL;
6030
6031                 rb_head_page_activate(cpu_buffer);
6032
6033                 rb_check_pages(cpu_buffer);
6034         }
6035
6036         atomic_dec(&buffer->record_disabled);
6037         mutex_unlock(&buffer->mutex);
6038
6039         return 0;
6040
6041 error:
6042         buffer->subbuf_order = old_order;
6043         buffer->subbuf_size = old_size;
6044
6045         atomic_dec(&buffer->record_disabled);
6046         mutex_unlock(&buffer->mutex);
6047
6048         for_each_buffer_cpu(buffer, cpu) {
6049                 cpu_buffer = buffer->buffers[cpu];
6050
6051                 if (!cpu_buffer->nr_pages_to_update)
6052                         continue;
6053
6054                 list_for_each_entry_safe(bpage, tmp, &cpu_buffer->new_pages, list) {
6055                         list_del_init(&bpage->list);
6056                         free_buffer_page(bpage);
6057                 }
6058         }
6059
6060         return err;
6061 }
6062 EXPORT_SYMBOL_GPL(ring_buffer_subbuf_order_set);
6063
6064 /*
6065  * We only allocate new buffers, never free them if the CPU goes down.
6066  * If we were to free the buffer, then the user would lose any trace that was in
6067  * the buffer.
6068  */
6069 int trace_rb_cpu_prepare(unsigned int cpu, struct hlist_node *node)
6070 {
6071         struct trace_buffer *buffer;
6072         long nr_pages_same;
6073         int cpu_i;
6074         unsigned long nr_pages;
6075
6076         buffer = container_of(node, struct trace_buffer, node);
6077         if (cpumask_test_cpu(cpu, buffer->cpumask))
6078                 return 0;
6079
6080         nr_pages = 0;
6081         nr_pages_same = 1;
6082         /* check if all cpu sizes are same */
6083         for_each_buffer_cpu(buffer, cpu_i) {
6084                 /* fill in the size from first enabled cpu */
6085                 if (nr_pages == 0)
6086                         nr_pages = buffer->buffers[cpu_i]->nr_pages;
6087                 if (nr_pages != buffer->buffers[cpu_i]->nr_pages) {
6088                         nr_pages_same = 0;
6089                         break;
6090                 }
6091         }
6092         /* allocate minimum pages, user can later expand it */
6093         if (!nr_pages_same)
6094                 nr_pages = 2;
6095         buffer->buffers[cpu] =
6096                 rb_allocate_cpu_buffer(buffer, nr_pages, cpu);
6097         if (!buffer->buffers[cpu]) {
6098                 WARN(1, "failed to allocate ring buffer on CPU %u\n",
6099                      cpu);
6100                 return -ENOMEM;
6101         }
6102         smp_wmb();
6103         cpumask_set_cpu(cpu, buffer->cpumask);
6104         return 0;
6105 }
6106
6107 #ifdef CONFIG_RING_BUFFER_STARTUP_TEST
6108 /*
6109  * This is a basic integrity check of the ring buffer.
6110  * Late in the boot cycle this test will run when configured in.
6111  * It will kick off a thread per CPU that will go into a loop
6112  * writing to the per cpu ring buffer various sizes of data.
6113  * Some of the data will be large items, some small.
6114  *
6115  * Another thread is created that goes into a spin, sending out
6116  * IPIs to the other CPUs to also write into the ring buffer.
6117  * this is to test the nesting ability of the buffer.
6118  *
6119  * Basic stats are recorded and reported. If something in the
6120  * ring buffer should happen that's not expected, a big warning
6121  * is displayed and all ring buffers are disabled.
6122  */
6123 static struct task_struct *rb_threads[NR_CPUS] __initdata;
6124
6125 struct rb_test_data {
6126         struct trace_buffer *buffer;
6127         unsigned long           events;
6128         unsigned long           bytes_written;
6129         unsigned long           bytes_alloc;
6130         unsigned long           bytes_dropped;
6131         unsigned long           events_nested;
6132         unsigned long           bytes_written_nested;
6133         unsigned long           bytes_alloc_nested;
6134         unsigned long           bytes_dropped_nested;
6135         int                     min_size_nested;
6136         int                     max_size_nested;
6137         int                     max_size;
6138         int                     min_size;
6139         int                     cpu;
6140         int                     cnt;
6141 };
6142
6143 static struct rb_test_data rb_data[NR_CPUS] __initdata;
6144
6145 /* 1 meg per cpu */
6146 #define RB_TEST_BUFFER_SIZE     1048576
6147
6148 static char rb_string[] __initdata =
6149         "abcdefghijklmnopqrstuvwxyz1234567890!@#$%^&*()?+\\"
6150         "?+|:';\",.<>/?abcdefghijklmnopqrstuvwxyz1234567890"
6151         "!@#$%^&*()?+\\?+|:';\",.<>/?abcdefghijklmnopqrstuv";
6152
6153 static bool rb_test_started __initdata;
6154
6155 struct rb_item {
6156         int size;
6157         char str[];
6158 };
6159
6160 static __init int rb_write_something(struct rb_test_data *data, bool nested)
6161 {
6162         struct ring_buffer_event *event;
6163         struct rb_item *item;
6164         bool started;
6165         int event_len;
6166         int size;
6167         int len;
6168         int cnt;
6169
6170         /* Have nested writes different that what is written */
6171         cnt = data->cnt + (nested ? 27 : 0);
6172
6173         /* Multiply cnt by ~e, to make some unique increment */
6174         size = (cnt * 68 / 25) % (sizeof(rb_string) - 1);
6175
6176         len = size + sizeof(struct rb_item);
6177
6178         started = rb_test_started;
6179         /* read rb_test_started before checking buffer enabled */
6180         smp_rmb();
6181
6182         event = ring_buffer_lock_reserve(data->buffer, len);
6183         if (!event) {
6184                 /* Ignore dropped events before test starts. */
6185                 if (started) {
6186                         if (nested)
6187                                 data->bytes_dropped += len;
6188                         else
6189                                 data->bytes_dropped_nested += len;
6190                 }
6191                 return len;
6192         }
6193
6194         event_len = ring_buffer_event_length(event);
6195
6196         if (RB_WARN_ON(data->buffer, event_len < len))
6197                 goto out;
6198
6199         item = ring_buffer_event_data(event);
6200         item->size = size;
6201         memcpy(item->str, rb_string, size);
6202
6203         if (nested) {
6204                 data->bytes_alloc_nested += event_len;
6205                 data->bytes_written_nested += len;
6206                 data->events_nested++;
6207                 if (!data->min_size_nested || len < data->min_size_nested)
6208                         data->min_size_nested = len;
6209                 if (len > data->max_size_nested)
6210                         data->max_size_nested = len;
6211         } else {
6212                 data->bytes_alloc += event_len;
6213                 data->bytes_written += len;
6214                 data->events++;
6215                 if (!data->min_size || len < data->min_size)
6216                         data->max_size = len;
6217                 if (len > data->max_size)
6218                         data->max_size = len;
6219         }
6220
6221  out:
6222         ring_buffer_unlock_commit(data->buffer);
6223
6224         return 0;
6225 }
6226
6227 static __init int rb_test(void *arg)
6228 {
6229         struct rb_test_data *data = arg;
6230
6231         while (!kthread_should_stop()) {
6232                 rb_write_something(data, false);
6233                 data->cnt++;
6234
6235                 set_current_state(TASK_INTERRUPTIBLE);
6236                 /* Now sleep between a min of 100-300us and a max of 1ms */
6237                 usleep_range(((data->cnt % 3) + 1) * 100, 1000);
6238         }
6239
6240         return 0;
6241 }
6242
6243 static __init void rb_ipi(void *ignore)
6244 {
6245         struct rb_test_data *data;
6246         int cpu = smp_processor_id();
6247
6248         data = &rb_data[cpu];
6249         rb_write_something(data, true);
6250 }
6251
6252 static __init int rb_hammer_test(void *arg)
6253 {
6254         while (!kthread_should_stop()) {
6255
6256                 /* Send an IPI to all cpus to write data! */
6257                 smp_call_function(rb_ipi, NULL, 1);
6258                 /* No sleep, but for non preempt, let others run */
6259                 schedule();
6260         }
6261
6262         return 0;
6263 }
6264
6265 static __init int test_ringbuffer(void)
6266 {
6267         struct task_struct *rb_hammer;
6268         struct trace_buffer *buffer;
6269         int cpu;
6270         int ret = 0;
6271
6272         if (security_locked_down(LOCKDOWN_TRACEFS)) {
6273                 pr_warn("Lockdown is enabled, skipping ring buffer tests\n");
6274                 return 0;
6275         }
6276
6277         pr_info("Running ring buffer tests...\n");
6278
6279         buffer = ring_buffer_alloc(RB_TEST_BUFFER_SIZE, RB_FL_OVERWRITE);
6280         if (WARN_ON(!buffer))
6281                 return 0;
6282
6283         /* Disable buffer so that threads can't write to it yet */
6284         ring_buffer_record_off(buffer);
6285
6286         for_each_online_cpu(cpu) {
6287                 rb_data[cpu].buffer = buffer;
6288                 rb_data[cpu].cpu = cpu;
6289                 rb_data[cpu].cnt = cpu;
6290                 rb_threads[cpu] = kthread_run_on_cpu(rb_test, &rb_data[cpu],
6291                                                      cpu, "rbtester/%u");
6292                 if (WARN_ON(IS_ERR(rb_threads[cpu]))) {
6293                         pr_cont("FAILED\n");
6294                         ret = PTR_ERR(rb_threads[cpu]);
6295                         goto out_free;
6296                 }
6297         }
6298
6299         /* Now create the rb hammer! */
6300         rb_hammer = kthread_run(rb_hammer_test, NULL, "rbhammer");
6301         if (WARN_ON(IS_ERR(rb_hammer))) {
6302                 pr_cont("FAILED\n");
6303                 ret = PTR_ERR(rb_hammer);
6304                 goto out_free;
6305         }
6306
6307         ring_buffer_record_on(buffer);
6308         /*
6309          * Show buffer is enabled before setting rb_test_started.
6310          * Yes there's a small race window where events could be
6311          * dropped and the thread wont catch it. But when a ring
6312          * buffer gets enabled, there will always be some kind of
6313          * delay before other CPUs see it. Thus, we don't care about
6314          * those dropped events. We care about events dropped after
6315          * the threads see that the buffer is active.
6316          */
6317         smp_wmb();
6318         rb_test_started = true;
6319
6320         set_current_state(TASK_INTERRUPTIBLE);
6321         /* Just run for 10 seconds */;
6322         schedule_timeout(10 * HZ);
6323
6324         kthread_stop(rb_hammer);
6325
6326  out_free:
6327         for_each_online_cpu(cpu) {
6328                 if (!rb_threads[cpu])
6329                         break;
6330                 kthread_stop(rb_threads[cpu]);
6331         }
6332         if (ret) {
6333                 ring_buffer_free(buffer);
6334                 return ret;
6335         }
6336
6337         /* Report! */
6338         pr_info("finished\n");
6339         for_each_online_cpu(cpu) {
6340                 struct ring_buffer_event *event;
6341                 struct rb_test_data *data = &rb_data[cpu];
6342                 struct rb_item *item;
6343                 unsigned long total_events;
6344                 unsigned long total_dropped;
6345                 unsigned long total_written;
6346                 unsigned long total_alloc;
6347                 unsigned long total_read = 0;
6348                 unsigned long total_size = 0;
6349                 unsigned long total_len = 0;
6350                 unsigned long total_lost = 0;
6351                 unsigned long lost;
6352                 int big_event_size;
6353                 int small_event_size;
6354
6355                 ret = -1;
6356
6357                 total_events = data->events + data->events_nested;
6358                 total_written = data->bytes_written + data->bytes_written_nested;
6359                 total_alloc = data->bytes_alloc + data->bytes_alloc_nested;
6360                 total_dropped = data->bytes_dropped + data->bytes_dropped_nested;
6361
6362                 big_event_size = data->max_size + data->max_size_nested;
6363                 small_event_size = data->min_size + data->min_size_nested;
6364
6365                 pr_info("CPU %d:\n", cpu);
6366                 pr_info("              events:    %ld\n", total_events);
6367                 pr_info("       dropped bytes:    %ld\n", total_dropped);
6368                 pr_info("       alloced bytes:    %ld\n", total_alloc);
6369                 pr_info("       written bytes:    %ld\n", total_written);
6370                 pr_info("       biggest event:    %d\n", big_event_size);
6371                 pr_info("      smallest event:    %d\n", small_event_size);
6372
6373                 if (RB_WARN_ON(buffer, total_dropped))
6374                         break;
6375
6376                 ret = 0;
6377
6378                 while ((event = ring_buffer_consume(buffer, cpu, NULL, &lost))) {
6379                         total_lost += lost;
6380                         item = ring_buffer_event_data(event);
6381                         total_len += ring_buffer_event_length(event);
6382                         total_size += item->size + sizeof(struct rb_item);
6383                         if (memcmp(&item->str[0], rb_string, item->size) != 0) {
6384                                 pr_info("FAILED!\n");
6385                                 pr_info("buffer had: %.*s\n", item->size, item->str);
6386                                 pr_info("expected:   %.*s\n", item->size, rb_string);
6387                                 RB_WARN_ON(buffer, 1);
6388                                 ret = -1;
6389                                 break;
6390                         }
6391                         total_read++;
6392                 }
6393                 if (ret)
6394                         break;
6395
6396                 ret = -1;
6397
6398                 pr_info("         read events:   %ld\n", total_read);
6399                 pr_info("         lost events:   %ld\n", total_lost);
6400                 pr_info("        total events:   %ld\n", total_lost + total_read);
6401                 pr_info("  recorded len bytes:   %ld\n", total_len);
6402                 pr_info(" recorded size bytes:   %ld\n", total_size);
6403                 if (total_lost) {
6404                         pr_info(" With dropped events, record len and size may not match\n"
6405                                 " alloced and written from above\n");
6406                 } else {
6407                         if (RB_WARN_ON(buffer, total_len != total_alloc ||
6408                                        total_size != total_written))
6409                                 break;
6410                 }
6411                 if (RB_WARN_ON(buffer, total_lost + total_read != total_events))
6412                         break;
6413
6414                 ret = 0;
6415         }
6416         if (!ret)
6417                 pr_info("Ring buffer PASSED!\n");
6418
6419         ring_buffer_free(buffer);
6420         return 0;
6421 }
6422
6423 late_initcall(test_ringbuffer);
6424 #endif /* CONFIG_RING_BUFFER_STARTUP_TEST */