GNU Linux-libre 5.4.257-gnu1
[releases.git] / kernel / trace / ring_buffer.c
1 // SPDX-License-Identifier: GPL-2.0
2 /*
3  * Generic ring buffer
4  *
5  * Copyright (C) 2008 Steven Rostedt <srostedt@redhat.com>
6  */
7 #include <linux/trace_events.h>
8 #include <linux/ring_buffer.h>
9 #include <linux/trace_clock.h>
10 #include <linux/sched/clock.h>
11 #include <linux/trace_seq.h>
12 #include <linux/spinlock.h>
13 #include <linux/irq_work.h>
14 #include <linux/security.h>
15 #include <linux/uaccess.h>
16 #include <linux/hardirq.h>
17 #include <linux/kthread.h>      /* for self test */
18 #include <linux/module.h>
19 #include <linux/percpu.h>
20 #include <linux/mutex.h>
21 #include <linux/delay.h>
22 #include <linux/slab.h>
23 #include <linux/init.h>
24 #include <linux/hash.h>
25 #include <linux/list.h>
26 #include <linux/cpu.h>
27 #include <linux/oom.h>
28
29 #include <asm/local.h>
30
31 static void update_pages_handler(struct work_struct *work);
32
33 /*
34  * The ring buffer header is special. We must manually up keep it.
35  */
36 int ring_buffer_print_entry_header(struct trace_seq *s)
37 {
38         trace_seq_puts(s, "# compressed entry header\n");
39         trace_seq_puts(s, "\ttype_len    :    5 bits\n");
40         trace_seq_puts(s, "\ttime_delta  :   27 bits\n");
41         trace_seq_puts(s, "\tarray       :   32 bits\n");
42         trace_seq_putc(s, '\n');
43         trace_seq_printf(s, "\tpadding     : type == %d\n",
44                          RINGBUF_TYPE_PADDING);
45         trace_seq_printf(s, "\ttime_extend : type == %d\n",
46                          RINGBUF_TYPE_TIME_EXTEND);
47         trace_seq_printf(s, "\ttime_stamp : type == %d\n",
48                          RINGBUF_TYPE_TIME_STAMP);
49         trace_seq_printf(s, "\tdata max type_len  == %d\n",
50                          RINGBUF_TYPE_DATA_TYPE_LEN_MAX);
51
52         return !trace_seq_has_overflowed(s);
53 }
54
55 /*
56  * The ring buffer is made up of a list of pages. A separate list of pages is
57  * allocated for each CPU. A writer may only write to a buffer that is
58  * associated with the CPU it is currently executing on.  A reader may read
59  * from any per cpu buffer.
60  *
61  * The reader is special. For each per cpu buffer, the reader has its own
62  * reader page. When a reader has read the entire reader page, this reader
63  * page is swapped with another page in the ring buffer.
64  *
65  * Now, as long as the writer is off the reader page, the reader can do what
66  * ever it wants with that page. The writer will never write to that page
67  * again (as long as it is out of the ring buffer).
68  *
69  * Here's some silly ASCII art.
70  *
71  *   +------+
72  *   |reader|          RING BUFFER
73  *   |page  |
74  *   +------+        +---+   +---+   +---+
75  *                   |   |-->|   |-->|   |
76  *                   +---+   +---+   +---+
77  *                     ^               |
78  *                     |               |
79  *                     +---------------+
80  *
81  *
82  *   +------+
83  *   |reader|          RING BUFFER
84  *   |page  |------------------v
85  *   +------+        +---+   +---+   +---+
86  *                   |   |-->|   |-->|   |
87  *                   +---+   +---+   +---+
88  *                     ^               |
89  *                     |               |
90  *                     +---------------+
91  *
92  *
93  *   +------+
94  *   |reader|          RING BUFFER
95  *   |page  |------------------v
96  *   +------+        +---+   +---+   +---+
97  *      ^            |   |-->|   |-->|   |
98  *      |            +---+   +---+   +---+
99  *      |                              |
100  *      |                              |
101  *      +------------------------------+
102  *
103  *
104  *   +------+
105  *   |buffer|          RING BUFFER
106  *   |page  |------------------v
107  *   +------+        +---+   +---+   +---+
108  *      ^            |   |   |   |-->|   |
109  *      |   New      +---+   +---+   +---+
110  *      |  Reader------^               |
111  *      |   page                       |
112  *      +------------------------------+
113  *
114  *
115  * After we make this swap, the reader can hand this page off to the splice
116  * code and be done with it. It can even allocate a new page if it needs to
117  * and swap that into the ring buffer.
118  *
119  * We will be using cmpxchg soon to make all this lockless.
120  *
121  */
122
123 /* Used for individual buffers (after the counter) */
124 #define RB_BUFFER_OFF           (1 << 20)
125
126 #define BUF_PAGE_HDR_SIZE offsetof(struct buffer_data_page, data)
127
128 #define RB_EVNT_HDR_SIZE (offsetof(struct ring_buffer_event, array))
129 #define RB_ALIGNMENT            4U
130 #define RB_MAX_SMALL_DATA       (RB_ALIGNMENT * RINGBUF_TYPE_DATA_TYPE_LEN_MAX)
131 #define RB_EVNT_MIN_SIZE        8U      /* two 32bit words */
132
133 #ifndef CONFIG_HAVE_64BIT_ALIGNED_ACCESS
134 # define RB_FORCE_8BYTE_ALIGNMENT       0
135 # define RB_ARCH_ALIGNMENT              RB_ALIGNMENT
136 #else
137 # define RB_FORCE_8BYTE_ALIGNMENT       1
138 # define RB_ARCH_ALIGNMENT              8U
139 #endif
140
141 #define RB_ALIGN_DATA           __aligned(RB_ARCH_ALIGNMENT)
142
143 /* define RINGBUF_TYPE_DATA for 'case RINGBUF_TYPE_DATA:' */
144 #define RINGBUF_TYPE_DATA 0 ... RINGBUF_TYPE_DATA_TYPE_LEN_MAX
145
146 enum {
147         RB_LEN_TIME_EXTEND = 8,
148         RB_LEN_TIME_STAMP =  8,
149 };
150
151 #define skip_time_extend(event) \
152         ((struct ring_buffer_event *)((char *)event + RB_LEN_TIME_EXTEND))
153
154 #define extended_time(event) \
155         (event->type_len >= RINGBUF_TYPE_TIME_EXTEND)
156
157 static inline int rb_null_event(struct ring_buffer_event *event)
158 {
159         return event->type_len == RINGBUF_TYPE_PADDING && !event->time_delta;
160 }
161
162 static void rb_event_set_padding(struct ring_buffer_event *event)
163 {
164         /* padding has a NULL time_delta */
165         event->type_len = RINGBUF_TYPE_PADDING;
166         event->time_delta = 0;
167 }
168
169 static unsigned
170 rb_event_data_length(struct ring_buffer_event *event)
171 {
172         unsigned length;
173
174         if (event->type_len)
175                 length = event->type_len * RB_ALIGNMENT;
176         else
177                 length = event->array[0];
178         return length + RB_EVNT_HDR_SIZE;
179 }
180
181 /*
182  * Return the length of the given event. Will return
183  * the length of the time extend if the event is a
184  * time extend.
185  */
186 static inline unsigned
187 rb_event_length(struct ring_buffer_event *event)
188 {
189         switch (event->type_len) {
190         case RINGBUF_TYPE_PADDING:
191                 if (rb_null_event(event))
192                         /* undefined */
193                         return -1;
194                 return  event->array[0] + RB_EVNT_HDR_SIZE;
195
196         case RINGBUF_TYPE_TIME_EXTEND:
197                 return RB_LEN_TIME_EXTEND;
198
199         case RINGBUF_TYPE_TIME_STAMP:
200                 return RB_LEN_TIME_STAMP;
201
202         case RINGBUF_TYPE_DATA:
203                 return rb_event_data_length(event);
204         default:
205                 BUG();
206         }
207         /* not hit */
208         return 0;
209 }
210
211 /*
212  * Return total length of time extend and data,
213  *   or just the event length for all other events.
214  */
215 static inline unsigned
216 rb_event_ts_length(struct ring_buffer_event *event)
217 {
218         unsigned len = 0;
219
220         if (extended_time(event)) {
221                 /* time extends include the data event after it */
222                 len = RB_LEN_TIME_EXTEND;
223                 event = skip_time_extend(event);
224         }
225         return len + rb_event_length(event);
226 }
227
228 /**
229  * ring_buffer_event_length - return the length of the event
230  * @event: the event to get the length of
231  *
232  * Returns the size of the data load of a data event.
233  * If the event is something other than a data event, it
234  * returns the size of the event itself. With the exception
235  * of a TIME EXTEND, where it still returns the size of the
236  * data load of the data event after it.
237  */
238 unsigned ring_buffer_event_length(struct ring_buffer_event *event)
239 {
240         unsigned length;
241
242         if (extended_time(event))
243                 event = skip_time_extend(event);
244
245         length = rb_event_length(event);
246         if (event->type_len > RINGBUF_TYPE_DATA_TYPE_LEN_MAX)
247                 return length;
248         length -= RB_EVNT_HDR_SIZE;
249         if (length > RB_MAX_SMALL_DATA + sizeof(event->array[0]))
250                 length -= sizeof(event->array[0]);
251         return length;
252 }
253 EXPORT_SYMBOL_GPL(ring_buffer_event_length);
254
255 /* inline for ring buffer fast paths */
256 static __always_inline void *
257 rb_event_data(struct ring_buffer_event *event)
258 {
259         if (extended_time(event))
260                 event = skip_time_extend(event);
261         BUG_ON(event->type_len > RINGBUF_TYPE_DATA_TYPE_LEN_MAX);
262         /* If length is in len field, then array[0] has the data */
263         if (event->type_len)
264                 return (void *)&event->array[0];
265         /* Otherwise length is in array[0] and array[1] has the data */
266         return (void *)&event->array[1];
267 }
268
269 /**
270  * ring_buffer_event_data - return the data of the event
271  * @event: the event to get the data from
272  */
273 void *ring_buffer_event_data(struct ring_buffer_event *event)
274 {
275         return rb_event_data(event);
276 }
277 EXPORT_SYMBOL_GPL(ring_buffer_event_data);
278
279 #define for_each_buffer_cpu(buffer, cpu)                \
280         for_each_cpu(cpu, buffer->cpumask)
281
282 #define TS_SHIFT        27
283 #define TS_MASK         ((1ULL << TS_SHIFT) - 1)
284 #define TS_DELTA_TEST   (~TS_MASK)
285
286 /**
287  * ring_buffer_event_time_stamp - return the event's extended timestamp
288  * @event: the event to get the timestamp of
289  *
290  * Returns the extended timestamp associated with a data event.
291  * An extended time_stamp is a 64-bit timestamp represented
292  * internally in a special way that makes the best use of space
293  * contained within a ring buffer event.  This function decodes
294  * it and maps it to a straight u64 value.
295  */
296 u64 ring_buffer_event_time_stamp(struct ring_buffer_event *event)
297 {
298         u64 ts;
299
300         ts = event->array[0];
301         ts <<= TS_SHIFT;
302         ts += event->time_delta;
303
304         return ts;
305 }
306
307 /* Flag when events were overwritten */
308 #define RB_MISSED_EVENTS        (1 << 31)
309 /* Missed count stored at end */
310 #define RB_MISSED_STORED        (1 << 30)
311
312 #define RB_MISSED_FLAGS         (RB_MISSED_EVENTS|RB_MISSED_STORED)
313
314 struct buffer_data_page {
315         u64              time_stamp;    /* page time stamp */
316         local_t          commit;        /* write committed index */
317         unsigned char    data[] RB_ALIGN_DATA;  /* data of buffer page */
318 };
319
320 /*
321  * Note, the buffer_page list must be first. The buffer pages
322  * are allocated in cache lines, which means that each buffer
323  * page will be at the beginning of a cache line, and thus
324  * the least significant bits will be zero. We use this to
325  * add flags in the list struct pointers, to make the ring buffer
326  * lockless.
327  */
328 struct buffer_page {
329         struct list_head list;          /* list of buffer pages */
330         local_t          write;         /* index for next write */
331         unsigned         read;          /* index for next read */
332         local_t          entries;       /* entries on this page */
333         unsigned long    real_end;      /* real end of data */
334         struct buffer_data_page *page;  /* Actual data page */
335 };
336
337 /*
338  * The buffer page counters, write and entries, must be reset
339  * atomically when crossing page boundaries. To synchronize this
340  * update, two counters are inserted into the number. One is
341  * the actual counter for the write position or count on the page.
342  *
343  * The other is a counter of updaters. Before an update happens
344  * the update partition of the counter is incremented. This will
345  * allow the updater to update the counter atomically.
346  *
347  * The counter is 20 bits, and the state data is 12.
348  */
349 #define RB_WRITE_MASK           0xfffff
350 #define RB_WRITE_INTCNT         (1 << 20)
351
352 static void rb_init_page(struct buffer_data_page *bpage)
353 {
354         local_set(&bpage->commit, 0);
355 }
356
357 /*
358  * Also stolen from mm/slob.c. Thanks to Mathieu Desnoyers for pointing
359  * this issue out.
360  */
361 static void free_buffer_page(struct buffer_page *bpage)
362 {
363         free_page((unsigned long)bpage->page);
364         kfree(bpage);
365 }
366
367 /*
368  * We need to fit the time_stamp delta into 27 bits.
369  */
370 static inline int test_time_stamp(u64 delta)
371 {
372         if (delta & TS_DELTA_TEST)
373                 return 1;
374         return 0;
375 }
376
377 #define BUF_PAGE_SIZE (PAGE_SIZE - BUF_PAGE_HDR_SIZE)
378
379 /* Max payload is BUF_PAGE_SIZE - header (8bytes) */
380 #define BUF_MAX_DATA_SIZE (BUF_PAGE_SIZE - (sizeof(u32) * 2))
381
382 int ring_buffer_print_page_header(struct trace_seq *s)
383 {
384         struct buffer_data_page field;
385
386         trace_seq_printf(s, "\tfield: u64 timestamp;\t"
387                          "offset:0;\tsize:%u;\tsigned:%u;\n",
388                          (unsigned int)sizeof(field.time_stamp),
389                          (unsigned int)is_signed_type(u64));
390
391         trace_seq_printf(s, "\tfield: local_t commit;\t"
392                          "offset:%u;\tsize:%u;\tsigned:%u;\n",
393                          (unsigned int)offsetof(typeof(field), commit),
394                          (unsigned int)sizeof(field.commit),
395                          (unsigned int)is_signed_type(long));
396
397         trace_seq_printf(s, "\tfield: int overwrite;\t"
398                          "offset:%u;\tsize:%u;\tsigned:%u;\n",
399                          (unsigned int)offsetof(typeof(field), commit),
400                          1,
401                          (unsigned int)is_signed_type(long));
402
403         trace_seq_printf(s, "\tfield: char data;\t"
404                          "offset:%u;\tsize:%u;\tsigned:%u;\n",
405                          (unsigned int)offsetof(typeof(field), data),
406                          (unsigned int)BUF_PAGE_SIZE,
407                          (unsigned int)is_signed_type(char));
408
409         return !trace_seq_has_overflowed(s);
410 }
411
412 struct rb_irq_work {
413         struct irq_work                 work;
414         wait_queue_head_t               waiters;
415         wait_queue_head_t               full_waiters;
416         bool                            waiters_pending;
417         bool                            full_waiters_pending;
418         bool                            wakeup_full;
419 };
420
421 /*
422  * Structure to hold event state and handle nested events.
423  */
424 struct rb_event_info {
425         u64                     ts;
426         u64                     delta;
427         unsigned long           length;
428         struct buffer_page      *tail_page;
429         int                     add_timestamp;
430 };
431
432 /*
433  * Used for which event context the event is in.
434  *  TRANSITION = 0
435  *  NMI     = 1
436  *  IRQ     = 2
437  *  SOFTIRQ = 3
438  *  NORMAL  = 4
439  *
440  * See trace_recursive_lock() comment below for more details.
441  */
442 enum {
443         RB_CTX_TRANSITION,
444         RB_CTX_NMI,
445         RB_CTX_IRQ,
446         RB_CTX_SOFTIRQ,
447         RB_CTX_NORMAL,
448         RB_CTX_MAX
449 };
450
451 /*
452  * head_page == tail_page && head == tail then buffer is empty.
453  */
454 struct ring_buffer_per_cpu {
455         int                             cpu;
456         atomic_t                        record_disabled;
457         struct ring_buffer              *buffer;
458         raw_spinlock_t                  reader_lock;    /* serialize readers */
459         arch_spinlock_t                 lock;
460         struct lock_class_key           lock_key;
461         struct buffer_data_page         *free_page;
462         unsigned long                   nr_pages;
463         unsigned int                    current_context;
464         struct list_head                *pages;
465         struct buffer_page              *head_page;     /* read from head */
466         struct buffer_page              *tail_page;     /* write to tail */
467         struct buffer_page              *commit_page;   /* committed pages */
468         struct buffer_page              *reader_page;
469         unsigned long                   lost_events;
470         unsigned long                   last_overrun;
471         unsigned long                   nest;
472         local_t                         entries_bytes;
473         local_t                         entries;
474         local_t                         overrun;
475         local_t                         commit_overrun;
476         local_t                         dropped_events;
477         local_t                         committing;
478         local_t                         commits;
479         local_t                         pages_touched;
480         local_t                         pages_lost;
481         local_t                         pages_read;
482         long                            last_pages_touch;
483         size_t                          shortest_full;
484         unsigned long                   read;
485         unsigned long                   read_bytes;
486         u64                             write_stamp;
487         u64                             read_stamp;
488         /* pages removed since last reset */
489         unsigned long                   pages_removed;
490         /* ring buffer pages to update, > 0 to add, < 0 to remove */
491         long                            nr_pages_to_update;
492         struct list_head                new_pages; /* new pages to add */
493         struct work_struct              update_pages_work;
494         struct completion               update_done;
495
496         struct rb_irq_work              irq_work;
497 };
498
499 struct ring_buffer {
500         unsigned                        flags;
501         int                             cpus;
502         atomic_t                        record_disabled;
503         atomic_t                        resize_disabled;
504         cpumask_var_t                   cpumask;
505
506         struct lock_class_key           *reader_lock_key;
507
508         struct mutex                    mutex;
509
510         struct ring_buffer_per_cpu      **buffers;
511
512         struct hlist_node               node;
513         u64                             (*clock)(void);
514
515         struct rb_irq_work              irq_work;
516         bool                            time_stamp_abs;
517 };
518
519 struct ring_buffer_iter {
520         struct ring_buffer_per_cpu      *cpu_buffer;
521         unsigned long                   head;
522         struct buffer_page              *head_page;
523         struct buffer_page              *cache_reader_page;
524         unsigned long                   cache_read;
525         unsigned long                   cache_pages_removed;
526         u64                             read_stamp;
527 };
528
529 /**
530  * ring_buffer_nr_pages - get the number of buffer pages in the ring buffer
531  * @buffer: The ring_buffer to get the number of pages from
532  * @cpu: The cpu of the ring_buffer to get the number of pages from
533  *
534  * Returns the number of pages used by a per_cpu buffer of the ring buffer.
535  */
536 size_t ring_buffer_nr_pages(struct ring_buffer *buffer, int cpu)
537 {
538         return buffer->buffers[cpu]->nr_pages;
539 }
540
541 /**
542  * ring_buffer_nr_pages_dirty - get the number of used pages in the ring buffer
543  * @buffer: The ring_buffer to get the number of pages from
544  * @cpu: The cpu of the ring_buffer to get the number of pages from
545  *
546  * Returns the number of pages that have content in the ring buffer.
547  */
548 size_t ring_buffer_nr_dirty_pages(struct ring_buffer *buffer, int cpu)
549 {
550         size_t read;
551         size_t lost;
552         size_t cnt;
553
554         read = local_read(&buffer->buffers[cpu]->pages_read);
555         lost = local_read(&buffer->buffers[cpu]->pages_lost);
556         cnt = local_read(&buffer->buffers[cpu]->pages_touched);
557
558         if (WARN_ON_ONCE(cnt < lost))
559                 return 0;
560
561         cnt -= lost;
562
563         /* The reader can read an empty page, but not more than that */
564         if (cnt < read) {
565                 WARN_ON_ONCE(read > cnt + 1);
566                 return 0;
567         }
568
569         return cnt - read;
570 }
571
572 static __always_inline bool full_hit(struct ring_buffer *buffer, int cpu, int full)
573 {
574         struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu];
575         size_t nr_pages;
576         size_t dirty;
577
578         nr_pages = cpu_buffer->nr_pages;
579         if (!nr_pages || !full)
580                 return true;
581
582         dirty = ring_buffer_nr_dirty_pages(buffer, cpu);
583
584         return (dirty * 100) > (full * nr_pages);
585 }
586
587 /*
588  * rb_wake_up_waiters - wake up tasks waiting for ring buffer input
589  *
590  * Schedules a delayed work to wake up any task that is blocked on the
591  * ring buffer waiters queue.
592  */
593 static void rb_wake_up_waiters(struct irq_work *work)
594 {
595         struct rb_irq_work *rbwork = container_of(work, struct rb_irq_work, work);
596
597         wake_up_all(&rbwork->waiters);
598         if (rbwork->full_waiters_pending || rbwork->wakeup_full) {
599                 rbwork->wakeup_full = false;
600                 rbwork->full_waiters_pending = false;
601                 wake_up_all(&rbwork->full_waiters);
602         }
603 }
604
605 /**
606  * ring_buffer_wait - wait for input to the ring buffer
607  * @buffer: buffer to wait on
608  * @cpu: the cpu buffer to wait on
609  * @full: wait until a full page is available, if @cpu != RING_BUFFER_ALL_CPUS
610  *
611  * If @cpu == RING_BUFFER_ALL_CPUS then the task will wake up as soon
612  * as data is added to any of the @buffer's cpu buffers. Otherwise
613  * it will wait for data to be added to a specific cpu buffer.
614  */
615 int ring_buffer_wait(struct ring_buffer *buffer, int cpu, int full)
616 {
617         struct ring_buffer_per_cpu *cpu_buffer;
618         DEFINE_WAIT(wait);
619         struct rb_irq_work *work;
620         int ret = 0;
621
622         /*
623          * Depending on what the caller is waiting for, either any
624          * data in any cpu buffer, or a specific buffer, put the
625          * caller on the appropriate wait queue.
626          */
627         if (cpu == RING_BUFFER_ALL_CPUS) {
628                 work = &buffer->irq_work;
629                 /* Full only makes sense on per cpu reads */
630                 full = 0;
631         } else {
632                 if (!cpumask_test_cpu(cpu, buffer->cpumask))
633                         return -ENODEV;
634                 cpu_buffer = buffer->buffers[cpu];
635                 work = &cpu_buffer->irq_work;
636         }
637
638
639         while (true) {
640                 if (full)
641                         prepare_to_wait(&work->full_waiters, &wait, TASK_INTERRUPTIBLE);
642                 else
643                         prepare_to_wait(&work->waiters, &wait, TASK_INTERRUPTIBLE);
644
645                 /*
646                  * The events can happen in critical sections where
647                  * checking a work queue can cause deadlocks.
648                  * After adding a task to the queue, this flag is set
649                  * only to notify events to try to wake up the queue
650                  * using irq_work.
651                  *
652                  * We don't clear it even if the buffer is no longer
653                  * empty. The flag only causes the next event to run
654                  * irq_work to do the work queue wake up. The worse
655                  * that can happen if we race with !trace_empty() is that
656                  * an event will cause an irq_work to try to wake up
657                  * an empty queue.
658                  *
659                  * There's no reason to protect this flag either, as
660                  * the work queue and irq_work logic will do the necessary
661                  * synchronization for the wake ups. The only thing
662                  * that is necessary is that the wake up happens after
663                  * a task has been queued. It's OK for spurious wake ups.
664                  */
665                 if (full)
666                         work->full_waiters_pending = true;
667                 else
668                         work->waiters_pending = true;
669
670                 if (signal_pending(current)) {
671                         ret = -EINTR;
672                         break;
673                 }
674
675                 if (cpu == RING_BUFFER_ALL_CPUS && !ring_buffer_empty(buffer))
676                         break;
677
678                 if (cpu != RING_BUFFER_ALL_CPUS &&
679                     !ring_buffer_empty_cpu(buffer, cpu)) {
680                         unsigned long flags;
681                         bool pagebusy;
682                         bool done;
683
684                         if (!full)
685                                 break;
686
687                         raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
688                         pagebusy = cpu_buffer->reader_page == cpu_buffer->commit_page;
689                         done = !pagebusy && full_hit(buffer, cpu, full);
690
691                         if (!cpu_buffer->shortest_full ||
692                             cpu_buffer->shortest_full > full)
693                                 cpu_buffer->shortest_full = full;
694                         raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
695                         if (done)
696                                 break;
697                 }
698
699                 schedule();
700         }
701
702         if (full)
703                 finish_wait(&work->full_waiters, &wait);
704         else
705                 finish_wait(&work->waiters, &wait);
706
707         return ret;
708 }
709
710 /**
711  * ring_buffer_poll_wait - poll on buffer input
712  * @buffer: buffer to wait on
713  * @cpu: the cpu buffer to wait on
714  * @filp: the file descriptor
715  * @poll_table: The poll descriptor
716  * @full: wait until the percentage of pages are available, if @cpu != RING_BUFFER_ALL_CPUS
717  *
718  * If @cpu == RING_BUFFER_ALL_CPUS then the task will wake up as soon
719  * as data is added to any of the @buffer's cpu buffers. Otherwise
720  * it will wait for data to be added to a specific cpu buffer.
721  *
722  * Returns EPOLLIN | EPOLLRDNORM if data exists in the buffers,
723  * zero otherwise.
724  */
725 __poll_t ring_buffer_poll_wait(struct ring_buffer *buffer, int cpu,
726                           struct file *filp, poll_table *poll_table, int full)
727 {
728         struct ring_buffer_per_cpu *cpu_buffer;
729         struct rb_irq_work *work;
730
731         if (cpu == RING_BUFFER_ALL_CPUS) {
732                 work = &buffer->irq_work;
733                 full = 0;
734         } else {
735                 if (!cpumask_test_cpu(cpu, buffer->cpumask))
736                         return -EINVAL;
737
738                 cpu_buffer = buffer->buffers[cpu];
739                 work = &cpu_buffer->irq_work;
740         }
741
742         if (full) {
743                 poll_wait(filp, &work->full_waiters, poll_table);
744                 work->full_waiters_pending = true;
745         } else {
746                 poll_wait(filp, &work->waiters, poll_table);
747                 work->waiters_pending = true;
748         }
749
750         /*
751          * There's a tight race between setting the waiters_pending and
752          * checking if the ring buffer is empty.  Once the waiters_pending bit
753          * is set, the next event will wake the task up, but we can get stuck
754          * if there's only a single event in.
755          *
756          * FIXME: Ideally, we need a memory barrier on the writer side as well,
757          * but adding a memory barrier to all events will cause too much of a
758          * performance hit in the fast path.  We only need a memory barrier when
759          * the buffer goes from empty to having content.  But as this race is
760          * extremely small, and it's not a problem if another event comes in, we
761          * will fix it later.
762          */
763         smp_mb();
764
765         if (full)
766                 return full_hit(buffer, cpu, full) ? EPOLLIN | EPOLLRDNORM : 0;
767
768         if ((cpu == RING_BUFFER_ALL_CPUS && !ring_buffer_empty(buffer)) ||
769             (cpu != RING_BUFFER_ALL_CPUS && !ring_buffer_empty_cpu(buffer, cpu)))
770                 return EPOLLIN | EPOLLRDNORM;
771         return 0;
772 }
773
774 /* buffer may be either ring_buffer or ring_buffer_per_cpu */
775 #define RB_WARN_ON(b, cond)                                             \
776         ({                                                              \
777                 int _____ret = unlikely(cond);                          \
778                 if (_____ret) {                                         \
779                         if (__same_type(*(b), struct ring_buffer_per_cpu)) { \
780                                 struct ring_buffer_per_cpu *__b =       \
781                                         (void *)b;                      \
782                                 atomic_inc(&__b->buffer->record_disabled); \
783                         } else                                          \
784                                 atomic_inc(&b->record_disabled);        \
785                         WARN_ON(1);                                     \
786                 }                                                       \
787                 _____ret;                                               \
788         })
789
790 /* Up this if you want to test the TIME_EXTENTS and normalization */
791 #define DEBUG_SHIFT 0
792
793 static inline u64 rb_time_stamp(struct ring_buffer *buffer)
794 {
795         /* shift to debug/test normalization and TIME_EXTENTS */
796         return buffer->clock() << DEBUG_SHIFT;
797 }
798
799 u64 ring_buffer_time_stamp(struct ring_buffer *buffer, int cpu)
800 {
801         u64 time;
802
803         preempt_disable_notrace();
804         time = rb_time_stamp(buffer);
805         preempt_enable_notrace();
806
807         return time;
808 }
809 EXPORT_SYMBOL_GPL(ring_buffer_time_stamp);
810
811 void ring_buffer_normalize_time_stamp(struct ring_buffer *buffer,
812                                       int cpu, u64 *ts)
813 {
814         /* Just stupid testing the normalize function and deltas */
815         *ts >>= DEBUG_SHIFT;
816 }
817 EXPORT_SYMBOL_GPL(ring_buffer_normalize_time_stamp);
818
819 /*
820  * Making the ring buffer lockless makes things tricky.
821  * Although writes only happen on the CPU that they are on,
822  * and they only need to worry about interrupts. Reads can
823  * happen on any CPU.
824  *
825  * The reader page is always off the ring buffer, but when the
826  * reader finishes with a page, it needs to swap its page with
827  * a new one from the buffer. The reader needs to take from
828  * the head (writes go to the tail). But if a writer is in overwrite
829  * mode and wraps, it must push the head page forward.
830  *
831  * Here lies the problem.
832  *
833  * The reader must be careful to replace only the head page, and
834  * not another one. As described at the top of the file in the
835  * ASCII art, the reader sets its old page to point to the next
836  * page after head. It then sets the page after head to point to
837  * the old reader page. But if the writer moves the head page
838  * during this operation, the reader could end up with the tail.
839  *
840  * We use cmpxchg to help prevent this race. We also do something
841  * special with the page before head. We set the LSB to 1.
842  *
843  * When the writer must push the page forward, it will clear the
844  * bit that points to the head page, move the head, and then set
845  * the bit that points to the new head page.
846  *
847  * We also don't want an interrupt coming in and moving the head
848  * page on another writer. Thus we use the second LSB to catch
849  * that too. Thus:
850  *
851  * head->list->prev->next        bit 1          bit 0
852  *                              -------        -------
853  * Normal page                     0              0
854  * Points to head page             0              1
855  * New head page                   1              0
856  *
857  * Note we can not trust the prev pointer of the head page, because:
858  *
859  * +----+       +-----+        +-----+
860  * |    |------>|  T  |---X--->|  N  |
861  * |    |<------|     |        |     |
862  * +----+       +-----+        +-----+
863  *   ^                           ^ |
864  *   |          +-----+          | |
865  *   +----------|  R  |----------+ |
866  *              |     |<-----------+
867  *              +-----+
868  *
869  * Key:  ---X-->  HEAD flag set in pointer
870  *         T      Tail page
871  *         R      Reader page
872  *         N      Next page
873  *
874  * (see __rb_reserve_next() to see where this happens)
875  *
876  *  What the above shows is that the reader just swapped out
877  *  the reader page with a page in the buffer, but before it
878  *  could make the new header point back to the new page added
879  *  it was preempted by a writer. The writer moved forward onto
880  *  the new page added by the reader and is about to move forward
881  *  again.
882  *
883  *  You can see, it is legitimate for the previous pointer of
884  *  the head (or any page) not to point back to itself. But only
885  *  temporarily.
886  */
887
888 #define RB_PAGE_NORMAL          0UL
889 #define RB_PAGE_HEAD            1UL
890 #define RB_PAGE_UPDATE          2UL
891
892
893 #define RB_FLAG_MASK            3UL
894
895 /* PAGE_MOVED is not part of the mask */
896 #define RB_PAGE_MOVED           4UL
897
898 /*
899  * rb_list_head - remove any bit
900  */
901 static struct list_head *rb_list_head(struct list_head *list)
902 {
903         unsigned long val = (unsigned long)list;
904
905         return (struct list_head *)(val & ~RB_FLAG_MASK);
906 }
907
908 /*
909  * rb_is_head_page - test if the given page is the head page
910  *
911  * Because the reader may move the head_page pointer, we can
912  * not trust what the head page is (it may be pointing to
913  * the reader page). But if the next page is a header page,
914  * its flags will be non zero.
915  */
916 static inline int
917 rb_is_head_page(struct ring_buffer_per_cpu *cpu_buffer,
918                 struct buffer_page *page, struct list_head *list)
919 {
920         unsigned long val;
921
922         val = (unsigned long)list->next;
923
924         if ((val & ~RB_FLAG_MASK) != (unsigned long)&page->list)
925                 return RB_PAGE_MOVED;
926
927         return val & RB_FLAG_MASK;
928 }
929
930 /*
931  * rb_is_reader_page
932  *
933  * The unique thing about the reader page, is that, if the
934  * writer is ever on it, the previous pointer never points
935  * back to the reader page.
936  */
937 static bool rb_is_reader_page(struct buffer_page *page)
938 {
939         struct list_head *list = page->list.prev;
940
941         return rb_list_head(list->next) != &page->list;
942 }
943
944 /*
945  * rb_set_list_to_head - set a list_head to be pointing to head.
946  */
947 static void rb_set_list_to_head(struct ring_buffer_per_cpu *cpu_buffer,
948                                 struct list_head *list)
949 {
950         unsigned long *ptr;
951
952         ptr = (unsigned long *)&list->next;
953         *ptr |= RB_PAGE_HEAD;
954         *ptr &= ~RB_PAGE_UPDATE;
955 }
956
957 /*
958  * rb_head_page_activate - sets up head page
959  */
960 static void rb_head_page_activate(struct ring_buffer_per_cpu *cpu_buffer)
961 {
962         struct buffer_page *head;
963
964         head = cpu_buffer->head_page;
965         if (!head)
966                 return;
967
968         /*
969          * Set the previous list pointer to have the HEAD flag.
970          */
971         rb_set_list_to_head(cpu_buffer, head->list.prev);
972 }
973
974 static void rb_list_head_clear(struct list_head *list)
975 {
976         unsigned long *ptr = (unsigned long *)&list->next;
977
978         *ptr &= ~RB_FLAG_MASK;
979 }
980
981 /*
982  * rb_head_page_deactivate - clears head page ptr (for free list)
983  */
984 static void
985 rb_head_page_deactivate(struct ring_buffer_per_cpu *cpu_buffer)
986 {
987         struct list_head *hd;
988
989         /* Go through the whole list and clear any pointers found. */
990         rb_list_head_clear(cpu_buffer->pages);
991
992         list_for_each(hd, cpu_buffer->pages)
993                 rb_list_head_clear(hd);
994 }
995
996 static int rb_head_page_set(struct ring_buffer_per_cpu *cpu_buffer,
997                             struct buffer_page *head,
998                             struct buffer_page *prev,
999                             int old_flag, int new_flag)
1000 {
1001         struct list_head *list;
1002         unsigned long val = (unsigned long)&head->list;
1003         unsigned long ret;
1004
1005         list = &prev->list;
1006
1007         val &= ~RB_FLAG_MASK;
1008
1009         ret = cmpxchg((unsigned long *)&list->next,
1010                       val | old_flag, val | new_flag);
1011
1012         /* check if the reader took the page */
1013         if ((ret & ~RB_FLAG_MASK) != val)
1014                 return RB_PAGE_MOVED;
1015
1016         return ret & RB_FLAG_MASK;
1017 }
1018
1019 static int rb_head_page_set_update(struct ring_buffer_per_cpu *cpu_buffer,
1020                                    struct buffer_page *head,
1021                                    struct buffer_page *prev,
1022                                    int old_flag)
1023 {
1024         return rb_head_page_set(cpu_buffer, head, prev,
1025                                 old_flag, RB_PAGE_UPDATE);
1026 }
1027
1028 static int rb_head_page_set_head(struct ring_buffer_per_cpu *cpu_buffer,
1029                                  struct buffer_page *head,
1030                                  struct buffer_page *prev,
1031                                  int old_flag)
1032 {
1033         return rb_head_page_set(cpu_buffer, head, prev,
1034                                 old_flag, RB_PAGE_HEAD);
1035 }
1036
1037 static int rb_head_page_set_normal(struct ring_buffer_per_cpu *cpu_buffer,
1038                                    struct buffer_page *head,
1039                                    struct buffer_page *prev,
1040                                    int old_flag)
1041 {
1042         return rb_head_page_set(cpu_buffer, head, prev,
1043                                 old_flag, RB_PAGE_NORMAL);
1044 }
1045
1046 static inline void rb_inc_page(struct ring_buffer_per_cpu *cpu_buffer,
1047                                struct buffer_page **bpage)
1048 {
1049         struct list_head *p = rb_list_head((*bpage)->list.next);
1050
1051         *bpage = list_entry(p, struct buffer_page, list);
1052 }
1053
1054 static struct buffer_page *
1055 rb_set_head_page(struct ring_buffer_per_cpu *cpu_buffer)
1056 {
1057         struct buffer_page *head;
1058         struct buffer_page *page;
1059         struct list_head *list;
1060         int i;
1061
1062         if (RB_WARN_ON(cpu_buffer, !cpu_buffer->head_page))
1063                 return NULL;
1064
1065         /* sanity check */
1066         list = cpu_buffer->pages;
1067         if (RB_WARN_ON(cpu_buffer, rb_list_head(list->prev->next) != list))
1068                 return NULL;
1069
1070         page = head = cpu_buffer->head_page;
1071         /*
1072          * It is possible that the writer moves the header behind
1073          * where we started, and we miss in one loop.
1074          * A second loop should grab the header, but we'll do
1075          * three loops just because I'm paranoid.
1076          */
1077         for (i = 0; i < 3; i++) {
1078                 do {
1079                         if (rb_is_head_page(cpu_buffer, page, page->list.prev)) {
1080                                 cpu_buffer->head_page = page;
1081                                 return page;
1082                         }
1083                         rb_inc_page(cpu_buffer, &page);
1084                 } while (page != head);
1085         }
1086
1087         RB_WARN_ON(cpu_buffer, 1);
1088
1089         return NULL;
1090 }
1091
1092 static int rb_head_page_replace(struct buffer_page *old,
1093                                 struct buffer_page *new)
1094 {
1095         unsigned long *ptr = (unsigned long *)&old->list.prev->next;
1096         unsigned long val;
1097         unsigned long ret;
1098
1099         val = *ptr & ~RB_FLAG_MASK;
1100         val |= RB_PAGE_HEAD;
1101
1102         ret = cmpxchg(ptr, val, (unsigned long)&new->list);
1103
1104         return ret == val;
1105 }
1106
1107 /*
1108  * rb_tail_page_update - move the tail page forward
1109  */
1110 static void rb_tail_page_update(struct ring_buffer_per_cpu *cpu_buffer,
1111                                struct buffer_page *tail_page,
1112                                struct buffer_page *next_page)
1113 {
1114         unsigned long old_entries;
1115         unsigned long old_write;
1116
1117         /*
1118          * The tail page now needs to be moved forward.
1119          *
1120          * We need to reset the tail page, but without messing
1121          * with possible erasing of data brought in by interrupts
1122          * that have moved the tail page and are currently on it.
1123          *
1124          * We add a counter to the write field to denote this.
1125          */
1126         old_write = local_add_return(RB_WRITE_INTCNT, &next_page->write);
1127         old_entries = local_add_return(RB_WRITE_INTCNT, &next_page->entries);
1128
1129         local_inc(&cpu_buffer->pages_touched);
1130         /*
1131          * Just make sure we have seen our old_write and synchronize
1132          * with any interrupts that come in.
1133          */
1134         barrier();
1135
1136         /*
1137          * If the tail page is still the same as what we think
1138          * it is, then it is up to us to update the tail
1139          * pointer.
1140          */
1141         if (tail_page == READ_ONCE(cpu_buffer->tail_page)) {
1142                 /* Zero the write counter */
1143                 unsigned long val = old_write & ~RB_WRITE_MASK;
1144                 unsigned long eval = old_entries & ~RB_WRITE_MASK;
1145
1146                 /*
1147                  * This will only succeed if an interrupt did
1148                  * not come in and change it. In which case, we
1149                  * do not want to modify it.
1150                  *
1151                  * We add (void) to let the compiler know that we do not care
1152                  * about the return value of these functions. We use the
1153                  * cmpxchg to only update if an interrupt did not already
1154                  * do it for us. If the cmpxchg fails, we don't care.
1155                  */
1156                 (void)local_cmpxchg(&next_page->write, old_write, val);
1157                 (void)local_cmpxchg(&next_page->entries, old_entries, eval);
1158
1159                 /*
1160                  * No need to worry about races with clearing out the commit.
1161                  * it only can increment when a commit takes place. But that
1162                  * only happens in the outer most nested commit.
1163                  */
1164                 local_set(&next_page->page->commit, 0);
1165
1166                 /* Again, either we update tail_page or an interrupt does */
1167                 (void)cmpxchg(&cpu_buffer->tail_page, tail_page, next_page);
1168         }
1169 }
1170
1171 static int rb_check_bpage(struct ring_buffer_per_cpu *cpu_buffer,
1172                           struct buffer_page *bpage)
1173 {
1174         unsigned long val = (unsigned long)bpage;
1175
1176         if (RB_WARN_ON(cpu_buffer, val & RB_FLAG_MASK))
1177                 return 1;
1178
1179         return 0;
1180 }
1181
1182 /**
1183  * rb_check_list - make sure a pointer to a list has the last bits zero
1184  */
1185 static int rb_check_list(struct ring_buffer_per_cpu *cpu_buffer,
1186                          struct list_head *list)
1187 {
1188         if (RB_WARN_ON(cpu_buffer, rb_list_head(list->prev) != list->prev))
1189                 return 1;
1190         if (RB_WARN_ON(cpu_buffer, rb_list_head(list->next) != list->next))
1191                 return 1;
1192         return 0;
1193 }
1194
1195 /**
1196  * rb_check_pages - integrity check of buffer pages
1197  * @cpu_buffer: CPU buffer with pages to test
1198  *
1199  * As a safety measure we check to make sure the data pages have not
1200  * been corrupted.
1201  */
1202 static int rb_check_pages(struct ring_buffer_per_cpu *cpu_buffer)
1203 {
1204         struct list_head *head = cpu_buffer->pages;
1205         struct buffer_page *bpage, *tmp;
1206
1207         /* Reset the head page if it exists */
1208         if (cpu_buffer->head_page)
1209                 rb_set_head_page(cpu_buffer);
1210
1211         rb_head_page_deactivate(cpu_buffer);
1212
1213         if (RB_WARN_ON(cpu_buffer, head->next->prev != head))
1214                 return -1;
1215         if (RB_WARN_ON(cpu_buffer, head->prev->next != head))
1216                 return -1;
1217
1218         if (rb_check_list(cpu_buffer, head))
1219                 return -1;
1220
1221         list_for_each_entry_safe(bpage, tmp, head, list) {
1222                 if (RB_WARN_ON(cpu_buffer,
1223                                bpage->list.next->prev != &bpage->list))
1224                         return -1;
1225                 if (RB_WARN_ON(cpu_buffer,
1226                                bpage->list.prev->next != &bpage->list))
1227                         return -1;
1228                 if (rb_check_list(cpu_buffer, &bpage->list))
1229                         return -1;
1230         }
1231
1232         rb_head_page_activate(cpu_buffer);
1233
1234         return 0;
1235 }
1236
1237 static int __rb_allocate_pages(long nr_pages, struct list_head *pages, int cpu)
1238 {
1239         struct buffer_page *bpage, *tmp;
1240         bool user_thread = current->mm != NULL;
1241         gfp_t mflags;
1242         long i;
1243
1244         /*
1245          * Check if the available memory is there first.
1246          * Note, si_mem_available() only gives us a rough estimate of available
1247          * memory. It may not be accurate. But we don't care, we just want
1248          * to prevent doing any allocation when it is obvious that it is
1249          * not going to succeed.
1250          */
1251         i = si_mem_available();
1252         if (i < nr_pages)
1253                 return -ENOMEM;
1254
1255         /*
1256          * __GFP_RETRY_MAYFAIL flag makes sure that the allocation fails
1257          * gracefully without invoking oom-killer and the system is not
1258          * destabilized.
1259          */
1260         mflags = GFP_KERNEL | __GFP_RETRY_MAYFAIL;
1261
1262         /*
1263          * If a user thread allocates too much, and si_mem_available()
1264          * reports there's enough memory, even though there is not.
1265          * Make sure the OOM killer kills this thread. This can happen
1266          * even with RETRY_MAYFAIL because another task may be doing
1267          * an allocation after this task has taken all memory.
1268          * This is the task the OOM killer needs to take out during this
1269          * loop, even if it was triggered by an allocation somewhere else.
1270          */
1271         if (user_thread)
1272                 set_current_oom_origin();
1273         for (i = 0; i < nr_pages; i++) {
1274                 struct page *page;
1275
1276                 bpage = kzalloc_node(ALIGN(sizeof(*bpage), cache_line_size()),
1277                                     mflags, cpu_to_node(cpu));
1278                 if (!bpage)
1279                         goto free_pages;
1280
1281                 list_add(&bpage->list, pages);
1282
1283                 page = alloc_pages_node(cpu_to_node(cpu), mflags, 0);
1284                 if (!page)
1285                         goto free_pages;
1286                 bpage->page = page_address(page);
1287                 rb_init_page(bpage->page);
1288
1289                 if (user_thread && fatal_signal_pending(current))
1290                         goto free_pages;
1291         }
1292         if (user_thread)
1293                 clear_current_oom_origin();
1294
1295         return 0;
1296
1297 free_pages:
1298         list_for_each_entry_safe(bpage, tmp, pages, list) {
1299                 list_del_init(&bpage->list);
1300                 free_buffer_page(bpage);
1301         }
1302         if (user_thread)
1303                 clear_current_oom_origin();
1304
1305         return -ENOMEM;
1306 }
1307
1308 static int rb_allocate_pages(struct ring_buffer_per_cpu *cpu_buffer,
1309                              unsigned long nr_pages)
1310 {
1311         LIST_HEAD(pages);
1312
1313         WARN_ON(!nr_pages);
1314
1315         if (__rb_allocate_pages(nr_pages, &pages, cpu_buffer->cpu))
1316                 return -ENOMEM;
1317
1318         /*
1319          * The ring buffer page list is a circular list that does not
1320          * start and end with a list head. All page list items point to
1321          * other pages.
1322          */
1323         cpu_buffer->pages = pages.next;
1324         list_del(&pages);
1325
1326         cpu_buffer->nr_pages = nr_pages;
1327
1328         rb_check_pages(cpu_buffer);
1329
1330         return 0;
1331 }
1332
1333 static struct ring_buffer_per_cpu *
1334 rb_allocate_cpu_buffer(struct ring_buffer *buffer, long nr_pages, int cpu)
1335 {
1336         struct ring_buffer_per_cpu *cpu_buffer;
1337         struct buffer_page *bpage;
1338         struct page *page;
1339         int ret;
1340
1341         cpu_buffer = kzalloc_node(ALIGN(sizeof(*cpu_buffer), cache_line_size()),
1342                                   GFP_KERNEL, cpu_to_node(cpu));
1343         if (!cpu_buffer)
1344                 return NULL;
1345
1346         cpu_buffer->cpu = cpu;
1347         cpu_buffer->buffer = buffer;
1348         raw_spin_lock_init(&cpu_buffer->reader_lock);
1349         lockdep_set_class(&cpu_buffer->reader_lock, buffer->reader_lock_key);
1350         cpu_buffer->lock = (arch_spinlock_t)__ARCH_SPIN_LOCK_UNLOCKED;
1351         INIT_WORK(&cpu_buffer->update_pages_work, update_pages_handler);
1352         init_completion(&cpu_buffer->update_done);
1353         init_irq_work(&cpu_buffer->irq_work.work, rb_wake_up_waiters);
1354         init_waitqueue_head(&cpu_buffer->irq_work.waiters);
1355         init_waitqueue_head(&cpu_buffer->irq_work.full_waiters);
1356
1357         bpage = kzalloc_node(ALIGN(sizeof(*bpage), cache_line_size()),
1358                             GFP_KERNEL, cpu_to_node(cpu));
1359         if (!bpage)
1360                 goto fail_free_buffer;
1361
1362         rb_check_bpage(cpu_buffer, bpage);
1363
1364         cpu_buffer->reader_page = bpage;
1365         page = alloc_pages_node(cpu_to_node(cpu), GFP_KERNEL, 0);
1366         if (!page)
1367                 goto fail_free_reader;
1368         bpage->page = page_address(page);
1369         rb_init_page(bpage->page);
1370
1371         INIT_LIST_HEAD(&cpu_buffer->reader_page->list);
1372         INIT_LIST_HEAD(&cpu_buffer->new_pages);
1373
1374         ret = rb_allocate_pages(cpu_buffer, nr_pages);
1375         if (ret < 0)
1376                 goto fail_free_reader;
1377
1378         cpu_buffer->head_page
1379                 = list_entry(cpu_buffer->pages, struct buffer_page, list);
1380         cpu_buffer->tail_page = cpu_buffer->commit_page = cpu_buffer->head_page;
1381
1382         rb_head_page_activate(cpu_buffer);
1383
1384         return cpu_buffer;
1385
1386  fail_free_reader:
1387         free_buffer_page(cpu_buffer->reader_page);
1388
1389  fail_free_buffer:
1390         kfree(cpu_buffer);
1391         return NULL;
1392 }
1393
1394 static void rb_free_cpu_buffer(struct ring_buffer_per_cpu *cpu_buffer)
1395 {
1396         struct list_head *head = cpu_buffer->pages;
1397         struct buffer_page *bpage, *tmp;
1398
1399         irq_work_sync(&cpu_buffer->irq_work.work);
1400
1401         free_buffer_page(cpu_buffer->reader_page);
1402
1403         if (head) {
1404                 rb_head_page_deactivate(cpu_buffer);
1405
1406                 list_for_each_entry_safe(bpage, tmp, head, list) {
1407                         list_del_init(&bpage->list);
1408                         free_buffer_page(bpage);
1409                 }
1410                 bpage = list_entry(head, struct buffer_page, list);
1411                 free_buffer_page(bpage);
1412         }
1413
1414         kfree(cpu_buffer);
1415 }
1416
1417 /**
1418  * __ring_buffer_alloc - allocate a new ring_buffer
1419  * @size: the size in bytes per cpu that is needed.
1420  * @flags: attributes to set for the ring buffer.
1421  *
1422  * Currently the only flag that is available is the RB_FL_OVERWRITE
1423  * flag. This flag means that the buffer will overwrite old data
1424  * when the buffer wraps. If this flag is not set, the buffer will
1425  * drop data when the tail hits the head.
1426  */
1427 struct ring_buffer *__ring_buffer_alloc(unsigned long size, unsigned flags,
1428                                         struct lock_class_key *key)
1429 {
1430         struct ring_buffer *buffer;
1431         long nr_pages;
1432         int bsize;
1433         int cpu;
1434         int ret;
1435
1436         /* keep it in its own cache line */
1437         buffer = kzalloc(ALIGN(sizeof(*buffer), cache_line_size()),
1438                          GFP_KERNEL);
1439         if (!buffer)
1440                 return NULL;
1441
1442         if (!zalloc_cpumask_var(&buffer->cpumask, GFP_KERNEL))
1443                 goto fail_free_buffer;
1444
1445         nr_pages = DIV_ROUND_UP(size, BUF_PAGE_SIZE);
1446         buffer->flags = flags;
1447         buffer->clock = trace_clock_local;
1448         buffer->reader_lock_key = key;
1449
1450         init_irq_work(&buffer->irq_work.work, rb_wake_up_waiters);
1451         init_waitqueue_head(&buffer->irq_work.waiters);
1452
1453         /* need at least two pages */
1454         if (nr_pages < 2)
1455                 nr_pages = 2;
1456
1457         buffer->cpus = nr_cpu_ids;
1458
1459         bsize = sizeof(void *) * nr_cpu_ids;
1460         buffer->buffers = kzalloc(ALIGN(bsize, cache_line_size()),
1461                                   GFP_KERNEL);
1462         if (!buffer->buffers)
1463                 goto fail_free_cpumask;
1464
1465         cpu = raw_smp_processor_id();
1466         cpumask_set_cpu(cpu, buffer->cpumask);
1467         buffer->buffers[cpu] = rb_allocate_cpu_buffer(buffer, nr_pages, cpu);
1468         if (!buffer->buffers[cpu])
1469                 goto fail_free_buffers;
1470
1471         ret = cpuhp_state_add_instance(CPUHP_TRACE_RB_PREPARE, &buffer->node);
1472         if (ret < 0)
1473                 goto fail_free_buffers;
1474
1475         mutex_init(&buffer->mutex);
1476
1477         return buffer;
1478
1479  fail_free_buffers:
1480         for_each_buffer_cpu(buffer, cpu) {
1481                 if (buffer->buffers[cpu])
1482                         rb_free_cpu_buffer(buffer->buffers[cpu]);
1483         }
1484         kfree(buffer->buffers);
1485
1486  fail_free_cpumask:
1487         free_cpumask_var(buffer->cpumask);
1488
1489  fail_free_buffer:
1490         kfree(buffer);
1491         return NULL;
1492 }
1493 EXPORT_SYMBOL_GPL(__ring_buffer_alloc);
1494
1495 /**
1496  * ring_buffer_free - free a ring buffer.
1497  * @buffer: the buffer to free.
1498  */
1499 void
1500 ring_buffer_free(struct ring_buffer *buffer)
1501 {
1502         int cpu;
1503
1504         cpuhp_state_remove_instance(CPUHP_TRACE_RB_PREPARE, &buffer->node);
1505
1506         irq_work_sync(&buffer->irq_work.work);
1507
1508         for_each_buffer_cpu(buffer, cpu)
1509                 rb_free_cpu_buffer(buffer->buffers[cpu]);
1510
1511         kfree(buffer->buffers);
1512         free_cpumask_var(buffer->cpumask);
1513
1514         kfree(buffer);
1515 }
1516 EXPORT_SYMBOL_GPL(ring_buffer_free);
1517
1518 void ring_buffer_set_clock(struct ring_buffer *buffer,
1519                            u64 (*clock)(void))
1520 {
1521         buffer->clock = clock;
1522 }
1523
1524 void ring_buffer_set_time_stamp_abs(struct ring_buffer *buffer, bool abs)
1525 {
1526         buffer->time_stamp_abs = abs;
1527 }
1528
1529 bool ring_buffer_time_stamp_abs(struct ring_buffer *buffer)
1530 {
1531         return buffer->time_stamp_abs;
1532 }
1533
1534 static void rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer);
1535
1536 static inline unsigned long rb_page_entries(struct buffer_page *bpage)
1537 {
1538         return local_read(&bpage->entries) & RB_WRITE_MASK;
1539 }
1540
1541 static inline unsigned long rb_page_write(struct buffer_page *bpage)
1542 {
1543         return local_read(&bpage->write) & RB_WRITE_MASK;
1544 }
1545
1546 static int
1547 rb_remove_pages(struct ring_buffer_per_cpu *cpu_buffer, unsigned long nr_pages)
1548 {
1549         struct list_head *tail_page, *to_remove, *next_page;
1550         struct buffer_page *to_remove_page, *tmp_iter_page;
1551         struct buffer_page *last_page, *first_page;
1552         unsigned long nr_removed;
1553         unsigned long head_bit;
1554         int page_entries;
1555
1556         head_bit = 0;
1557
1558         raw_spin_lock_irq(&cpu_buffer->reader_lock);
1559         atomic_inc(&cpu_buffer->record_disabled);
1560         /*
1561          * We don't race with the readers since we have acquired the reader
1562          * lock. We also don't race with writers after disabling recording.
1563          * This makes it easy to figure out the first and the last page to be
1564          * removed from the list. We unlink all the pages in between including
1565          * the first and last pages. This is done in a busy loop so that we
1566          * lose the least number of traces.
1567          * The pages are freed after we restart recording and unlock readers.
1568          */
1569         tail_page = &cpu_buffer->tail_page->list;
1570
1571         /*
1572          * tail page might be on reader page, we remove the next page
1573          * from the ring buffer
1574          */
1575         if (cpu_buffer->tail_page == cpu_buffer->reader_page)
1576                 tail_page = rb_list_head(tail_page->next);
1577         to_remove = tail_page;
1578
1579         /* start of pages to remove */
1580         first_page = list_entry(rb_list_head(to_remove->next),
1581                                 struct buffer_page, list);
1582
1583         for (nr_removed = 0; nr_removed < nr_pages; nr_removed++) {
1584                 to_remove = rb_list_head(to_remove)->next;
1585                 head_bit |= (unsigned long)to_remove & RB_PAGE_HEAD;
1586         }
1587         /* Read iterators need to reset themselves when some pages removed */
1588         cpu_buffer->pages_removed += nr_removed;
1589
1590         next_page = rb_list_head(to_remove)->next;
1591
1592         /*
1593          * Now we remove all pages between tail_page and next_page.
1594          * Make sure that we have head_bit value preserved for the
1595          * next page
1596          */
1597         tail_page->next = (struct list_head *)((unsigned long)next_page |
1598                                                 head_bit);
1599         next_page = rb_list_head(next_page);
1600         next_page->prev = tail_page;
1601
1602         /* make sure pages points to a valid page in the ring buffer */
1603         cpu_buffer->pages = next_page;
1604
1605         /* update head page */
1606         if (head_bit)
1607                 cpu_buffer->head_page = list_entry(next_page,
1608                                                 struct buffer_page, list);
1609
1610         /* pages are removed, resume tracing and then free the pages */
1611         atomic_dec(&cpu_buffer->record_disabled);
1612         raw_spin_unlock_irq(&cpu_buffer->reader_lock);
1613
1614         RB_WARN_ON(cpu_buffer, list_empty(cpu_buffer->pages));
1615
1616         /* last buffer page to remove */
1617         last_page = list_entry(rb_list_head(to_remove), struct buffer_page,
1618                                 list);
1619         tmp_iter_page = first_page;
1620
1621         do {
1622                 cond_resched();
1623
1624                 to_remove_page = tmp_iter_page;
1625                 rb_inc_page(cpu_buffer, &tmp_iter_page);
1626
1627                 /* update the counters */
1628                 page_entries = rb_page_entries(to_remove_page);
1629                 if (page_entries) {
1630                         /*
1631                          * If something was added to this page, it was full
1632                          * since it is not the tail page. So we deduct the
1633                          * bytes consumed in ring buffer from here.
1634                          * Increment overrun to account for the lost events.
1635                          */
1636                         local_add(page_entries, &cpu_buffer->overrun);
1637                         local_sub(BUF_PAGE_SIZE, &cpu_buffer->entries_bytes);
1638                         local_inc(&cpu_buffer->pages_lost);
1639                 }
1640
1641                 /*
1642                  * We have already removed references to this list item, just
1643                  * free up the buffer_page and its page
1644                  */
1645                 free_buffer_page(to_remove_page);
1646                 nr_removed--;
1647
1648         } while (to_remove_page != last_page);
1649
1650         RB_WARN_ON(cpu_buffer, nr_removed);
1651
1652         return nr_removed == 0;
1653 }
1654
1655 static int
1656 rb_insert_pages(struct ring_buffer_per_cpu *cpu_buffer)
1657 {
1658         struct list_head *pages = &cpu_buffer->new_pages;
1659         int retries, success;
1660
1661         raw_spin_lock_irq(&cpu_buffer->reader_lock);
1662         /*
1663          * We are holding the reader lock, so the reader page won't be swapped
1664          * in the ring buffer. Now we are racing with the writer trying to
1665          * move head page and the tail page.
1666          * We are going to adapt the reader page update process where:
1667          * 1. We first splice the start and end of list of new pages between
1668          *    the head page and its previous page.
1669          * 2. We cmpxchg the prev_page->next to point from head page to the
1670          *    start of new pages list.
1671          * 3. Finally, we update the head->prev to the end of new list.
1672          *
1673          * We will try this process 10 times, to make sure that we don't keep
1674          * spinning.
1675          */
1676         retries = 10;
1677         success = 0;
1678         while (retries--) {
1679                 struct list_head *head_page, *prev_page, *r;
1680                 struct list_head *last_page, *first_page;
1681                 struct list_head *head_page_with_bit;
1682
1683                 head_page = &rb_set_head_page(cpu_buffer)->list;
1684                 if (!head_page)
1685                         break;
1686                 prev_page = head_page->prev;
1687
1688                 first_page = pages->next;
1689                 last_page  = pages->prev;
1690
1691                 head_page_with_bit = (struct list_head *)
1692                                      ((unsigned long)head_page | RB_PAGE_HEAD);
1693
1694                 last_page->next = head_page_with_bit;
1695                 first_page->prev = prev_page;
1696
1697                 r = cmpxchg(&prev_page->next, head_page_with_bit, first_page);
1698
1699                 if (r == head_page_with_bit) {
1700                         /*
1701                          * yay, we replaced the page pointer to our new list,
1702                          * now, we just have to update to head page's prev
1703                          * pointer to point to end of list
1704                          */
1705                         head_page->prev = last_page;
1706                         success = 1;
1707                         break;
1708                 }
1709         }
1710
1711         if (success)
1712                 INIT_LIST_HEAD(pages);
1713         /*
1714          * If we weren't successful in adding in new pages, warn and stop
1715          * tracing
1716          */
1717         RB_WARN_ON(cpu_buffer, !success);
1718         raw_spin_unlock_irq(&cpu_buffer->reader_lock);
1719
1720         /* free pages if they weren't inserted */
1721         if (!success) {
1722                 struct buffer_page *bpage, *tmp;
1723                 list_for_each_entry_safe(bpage, tmp, &cpu_buffer->new_pages,
1724                                          list) {
1725                         list_del_init(&bpage->list);
1726                         free_buffer_page(bpage);
1727                 }
1728         }
1729         return success;
1730 }
1731
1732 static void rb_update_pages(struct ring_buffer_per_cpu *cpu_buffer)
1733 {
1734         int success;
1735
1736         if (cpu_buffer->nr_pages_to_update > 0)
1737                 success = rb_insert_pages(cpu_buffer);
1738         else
1739                 success = rb_remove_pages(cpu_buffer,
1740                                         -cpu_buffer->nr_pages_to_update);
1741
1742         if (success)
1743                 cpu_buffer->nr_pages += cpu_buffer->nr_pages_to_update;
1744 }
1745
1746 static void update_pages_handler(struct work_struct *work)
1747 {
1748         struct ring_buffer_per_cpu *cpu_buffer = container_of(work,
1749                         struct ring_buffer_per_cpu, update_pages_work);
1750         rb_update_pages(cpu_buffer);
1751         complete(&cpu_buffer->update_done);
1752 }
1753
1754 /**
1755  * ring_buffer_resize - resize the ring buffer
1756  * @buffer: the buffer to resize.
1757  * @size: the new size.
1758  * @cpu_id: the cpu buffer to resize
1759  *
1760  * Minimum size is 2 * BUF_PAGE_SIZE.
1761  *
1762  * Returns 0 on success and < 0 on failure.
1763  */
1764 int ring_buffer_resize(struct ring_buffer *buffer, unsigned long size,
1765                         int cpu_id)
1766 {
1767         struct ring_buffer_per_cpu *cpu_buffer;
1768         unsigned long nr_pages;
1769         int cpu, err;
1770
1771         /*
1772          * Always succeed at resizing a non-existent buffer:
1773          */
1774         if (!buffer)
1775                 return 0;
1776
1777         /* Make sure the requested buffer exists */
1778         if (cpu_id != RING_BUFFER_ALL_CPUS &&
1779             !cpumask_test_cpu(cpu_id, buffer->cpumask))
1780                 return 0;
1781
1782         nr_pages = DIV_ROUND_UP(size, BUF_PAGE_SIZE);
1783
1784         /* we need a minimum of two pages */
1785         if (nr_pages < 2)
1786                 nr_pages = 2;
1787
1788         size = nr_pages * BUF_PAGE_SIZE;
1789
1790         /*
1791          * Don't succeed if resizing is disabled, as a reader might be
1792          * manipulating the ring buffer and is expecting a sane state while
1793          * this is true.
1794          */
1795         if (atomic_read(&buffer->resize_disabled))
1796                 return -EBUSY;
1797
1798         /* prevent another thread from changing buffer sizes */
1799         mutex_lock(&buffer->mutex);
1800
1801         if (cpu_id == RING_BUFFER_ALL_CPUS) {
1802                 /* calculate the pages to update */
1803                 for_each_buffer_cpu(buffer, cpu) {
1804                         cpu_buffer = buffer->buffers[cpu];
1805
1806                         cpu_buffer->nr_pages_to_update = nr_pages -
1807                                                         cpu_buffer->nr_pages;
1808                         /*
1809                          * nothing more to do for removing pages or no update
1810                          */
1811                         if (cpu_buffer->nr_pages_to_update <= 0)
1812                                 continue;
1813                         /*
1814                          * to add pages, make sure all new pages can be
1815                          * allocated without receiving ENOMEM
1816                          */
1817                         INIT_LIST_HEAD(&cpu_buffer->new_pages);
1818                         if (__rb_allocate_pages(cpu_buffer->nr_pages_to_update,
1819                                                 &cpu_buffer->new_pages, cpu)) {
1820                                 /* not enough memory for new pages */
1821                                 err = -ENOMEM;
1822                                 goto out_err;
1823                         }
1824                 }
1825
1826                 get_online_cpus();
1827                 /*
1828                  * Fire off all the required work handlers
1829                  * We can't schedule on offline CPUs, but it's not necessary
1830                  * since we can change their buffer sizes without any race.
1831                  */
1832                 for_each_buffer_cpu(buffer, cpu) {
1833                         cpu_buffer = buffer->buffers[cpu];
1834                         if (!cpu_buffer->nr_pages_to_update)
1835                                 continue;
1836
1837                         /* Can't run something on an offline CPU. */
1838                         if (!cpu_online(cpu)) {
1839                                 rb_update_pages(cpu_buffer);
1840                                 cpu_buffer->nr_pages_to_update = 0;
1841                         } else {
1842                                 schedule_work_on(cpu,
1843                                                 &cpu_buffer->update_pages_work);
1844                         }
1845                 }
1846
1847                 /* wait for all the updates to complete */
1848                 for_each_buffer_cpu(buffer, cpu) {
1849                         cpu_buffer = buffer->buffers[cpu];
1850                         if (!cpu_buffer->nr_pages_to_update)
1851                                 continue;
1852
1853                         if (cpu_online(cpu))
1854                                 wait_for_completion(&cpu_buffer->update_done);
1855                         cpu_buffer->nr_pages_to_update = 0;
1856                 }
1857
1858                 put_online_cpus();
1859         } else {
1860                 /* Make sure this CPU has been initialized */
1861                 if (!cpumask_test_cpu(cpu_id, buffer->cpumask))
1862                         goto out;
1863
1864                 cpu_buffer = buffer->buffers[cpu_id];
1865
1866                 if (nr_pages == cpu_buffer->nr_pages)
1867                         goto out;
1868
1869                 cpu_buffer->nr_pages_to_update = nr_pages -
1870                                                 cpu_buffer->nr_pages;
1871
1872                 INIT_LIST_HEAD(&cpu_buffer->new_pages);
1873                 if (cpu_buffer->nr_pages_to_update > 0 &&
1874                         __rb_allocate_pages(cpu_buffer->nr_pages_to_update,
1875                                             &cpu_buffer->new_pages, cpu_id)) {
1876                         err = -ENOMEM;
1877                         goto out_err;
1878                 }
1879
1880                 get_online_cpus();
1881
1882                 /* Can't run something on an offline CPU. */
1883                 if (!cpu_online(cpu_id))
1884                         rb_update_pages(cpu_buffer);
1885                 else {
1886                         schedule_work_on(cpu_id,
1887                                          &cpu_buffer->update_pages_work);
1888                         wait_for_completion(&cpu_buffer->update_done);
1889                 }
1890
1891                 cpu_buffer->nr_pages_to_update = 0;
1892                 put_online_cpus();
1893         }
1894
1895  out:
1896         /*
1897          * The ring buffer resize can happen with the ring buffer
1898          * enabled, so that the update disturbs the tracing as little
1899          * as possible. But if the buffer is disabled, we do not need
1900          * to worry about that, and we can take the time to verify
1901          * that the buffer is not corrupt.
1902          */
1903         if (atomic_read(&buffer->record_disabled)) {
1904                 atomic_inc(&buffer->record_disabled);
1905                 /*
1906                  * Even though the buffer was disabled, we must make sure
1907                  * that it is truly disabled before calling rb_check_pages.
1908                  * There could have been a race between checking
1909                  * record_disable and incrementing it.
1910                  */
1911                 synchronize_rcu();
1912                 for_each_buffer_cpu(buffer, cpu) {
1913                         cpu_buffer = buffer->buffers[cpu];
1914                         rb_check_pages(cpu_buffer);
1915                 }
1916                 atomic_dec(&buffer->record_disabled);
1917         }
1918
1919         mutex_unlock(&buffer->mutex);
1920         return 0;
1921
1922  out_err:
1923         for_each_buffer_cpu(buffer, cpu) {
1924                 struct buffer_page *bpage, *tmp;
1925
1926                 cpu_buffer = buffer->buffers[cpu];
1927                 cpu_buffer->nr_pages_to_update = 0;
1928
1929                 if (list_empty(&cpu_buffer->new_pages))
1930                         continue;
1931
1932                 list_for_each_entry_safe(bpage, tmp, &cpu_buffer->new_pages,
1933                                         list) {
1934                         list_del_init(&bpage->list);
1935                         free_buffer_page(bpage);
1936                 }
1937         }
1938         mutex_unlock(&buffer->mutex);
1939         return err;
1940 }
1941 EXPORT_SYMBOL_GPL(ring_buffer_resize);
1942
1943 void ring_buffer_change_overwrite(struct ring_buffer *buffer, int val)
1944 {
1945         mutex_lock(&buffer->mutex);
1946         if (val)
1947                 buffer->flags |= RB_FL_OVERWRITE;
1948         else
1949                 buffer->flags &= ~RB_FL_OVERWRITE;
1950         mutex_unlock(&buffer->mutex);
1951 }
1952 EXPORT_SYMBOL_GPL(ring_buffer_change_overwrite);
1953
1954 static __always_inline void *__rb_page_index(struct buffer_page *bpage, unsigned index)
1955 {
1956         return bpage->page->data + index;
1957 }
1958
1959 static __always_inline struct ring_buffer_event *
1960 rb_reader_event(struct ring_buffer_per_cpu *cpu_buffer)
1961 {
1962         return __rb_page_index(cpu_buffer->reader_page,
1963                                cpu_buffer->reader_page->read);
1964 }
1965
1966 static __always_inline struct ring_buffer_event *
1967 rb_iter_head_event(struct ring_buffer_iter *iter)
1968 {
1969         return __rb_page_index(iter->head_page, iter->head);
1970 }
1971
1972 static __always_inline unsigned rb_page_commit(struct buffer_page *bpage)
1973 {
1974         return local_read(&bpage->page->commit);
1975 }
1976
1977 /* Size is determined by what has been committed */
1978 static __always_inline unsigned rb_page_size(struct buffer_page *bpage)
1979 {
1980         return rb_page_commit(bpage);
1981 }
1982
1983 static __always_inline unsigned
1984 rb_commit_index(struct ring_buffer_per_cpu *cpu_buffer)
1985 {
1986         return rb_page_commit(cpu_buffer->commit_page);
1987 }
1988
1989 static __always_inline unsigned
1990 rb_event_index(struct ring_buffer_event *event)
1991 {
1992         unsigned long addr = (unsigned long)event;
1993
1994         return (addr & ~PAGE_MASK) - BUF_PAGE_HDR_SIZE;
1995 }
1996
1997 static void rb_inc_iter(struct ring_buffer_iter *iter)
1998 {
1999         struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer;
2000
2001         /*
2002          * The iterator could be on the reader page (it starts there).
2003          * But the head could have moved, since the reader was
2004          * found. Check for this case and assign the iterator
2005          * to the head page instead of next.
2006          */
2007         if (iter->head_page == cpu_buffer->reader_page)
2008                 iter->head_page = rb_set_head_page(cpu_buffer);
2009         else
2010                 rb_inc_page(cpu_buffer, &iter->head_page);
2011
2012         iter->read_stamp = iter->head_page->page->time_stamp;
2013         iter->head = 0;
2014 }
2015
2016 /*
2017  * rb_handle_head_page - writer hit the head page
2018  *
2019  * Returns: +1 to retry page
2020  *           0 to continue
2021  *          -1 on error
2022  */
2023 static int
2024 rb_handle_head_page(struct ring_buffer_per_cpu *cpu_buffer,
2025                     struct buffer_page *tail_page,
2026                     struct buffer_page *next_page)
2027 {
2028         struct buffer_page *new_head;
2029         int entries;
2030         int type;
2031         int ret;
2032
2033         entries = rb_page_entries(next_page);
2034
2035         /*
2036          * The hard part is here. We need to move the head
2037          * forward, and protect against both readers on
2038          * other CPUs and writers coming in via interrupts.
2039          */
2040         type = rb_head_page_set_update(cpu_buffer, next_page, tail_page,
2041                                        RB_PAGE_HEAD);
2042
2043         /*
2044          * type can be one of four:
2045          *  NORMAL - an interrupt already moved it for us
2046          *  HEAD   - we are the first to get here.
2047          *  UPDATE - we are the interrupt interrupting
2048          *           a current move.
2049          *  MOVED  - a reader on another CPU moved the next
2050          *           pointer to its reader page. Give up
2051          *           and try again.
2052          */
2053
2054         switch (type) {
2055         case RB_PAGE_HEAD:
2056                 /*
2057                  * We changed the head to UPDATE, thus
2058                  * it is our responsibility to update
2059                  * the counters.
2060                  */
2061                 local_add(entries, &cpu_buffer->overrun);
2062                 local_sub(BUF_PAGE_SIZE, &cpu_buffer->entries_bytes);
2063                 local_inc(&cpu_buffer->pages_lost);
2064
2065                 /*
2066                  * The entries will be zeroed out when we move the
2067                  * tail page.
2068                  */
2069
2070                 /* still more to do */
2071                 break;
2072
2073         case RB_PAGE_UPDATE:
2074                 /*
2075                  * This is an interrupt that interrupt the
2076                  * previous update. Still more to do.
2077                  */
2078                 break;
2079         case RB_PAGE_NORMAL:
2080                 /*
2081                  * An interrupt came in before the update
2082                  * and processed this for us.
2083                  * Nothing left to do.
2084                  */
2085                 return 1;
2086         case RB_PAGE_MOVED:
2087                 /*
2088                  * The reader is on another CPU and just did
2089                  * a swap with our next_page.
2090                  * Try again.
2091                  */
2092                 return 1;
2093         default:
2094                 RB_WARN_ON(cpu_buffer, 1); /* WTF??? */
2095                 return -1;
2096         }
2097
2098         /*
2099          * Now that we are here, the old head pointer is
2100          * set to UPDATE. This will keep the reader from
2101          * swapping the head page with the reader page.
2102          * The reader (on another CPU) will spin till
2103          * we are finished.
2104          *
2105          * We just need to protect against interrupts
2106          * doing the job. We will set the next pointer
2107          * to HEAD. After that, we set the old pointer
2108          * to NORMAL, but only if it was HEAD before.
2109          * otherwise we are an interrupt, and only
2110          * want the outer most commit to reset it.
2111          */
2112         new_head = next_page;
2113         rb_inc_page(cpu_buffer, &new_head);
2114
2115         ret = rb_head_page_set_head(cpu_buffer, new_head, next_page,
2116                                     RB_PAGE_NORMAL);
2117
2118         /*
2119          * Valid returns are:
2120          *  HEAD   - an interrupt came in and already set it.
2121          *  NORMAL - One of two things:
2122          *            1) We really set it.
2123          *            2) A bunch of interrupts came in and moved
2124          *               the page forward again.
2125          */
2126         switch (ret) {
2127         case RB_PAGE_HEAD:
2128         case RB_PAGE_NORMAL:
2129                 /* OK */
2130                 break;
2131         default:
2132                 RB_WARN_ON(cpu_buffer, 1);
2133                 return -1;
2134         }
2135
2136         /*
2137          * It is possible that an interrupt came in,
2138          * set the head up, then more interrupts came in
2139          * and moved it again. When we get back here,
2140          * the page would have been set to NORMAL but we
2141          * just set it back to HEAD.
2142          *
2143          * How do you detect this? Well, if that happened
2144          * the tail page would have moved.
2145          */
2146         if (ret == RB_PAGE_NORMAL) {
2147                 struct buffer_page *buffer_tail_page;
2148
2149                 buffer_tail_page = READ_ONCE(cpu_buffer->tail_page);
2150                 /*
2151                  * If the tail had moved passed next, then we need
2152                  * to reset the pointer.
2153                  */
2154                 if (buffer_tail_page != tail_page &&
2155                     buffer_tail_page != next_page)
2156                         rb_head_page_set_normal(cpu_buffer, new_head,
2157                                                 next_page,
2158                                                 RB_PAGE_HEAD);
2159         }
2160
2161         /*
2162          * If this was the outer most commit (the one that
2163          * changed the original pointer from HEAD to UPDATE),
2164          * then it is up to us to reset it to NORMAL.
2165          */
2166         if (type == RB_PAGE_HEAD) {
2167                 ret = rb_head_page_set_normal(cpu_buffer, next_page,
2168                                               tail_page,
2169                                               RB_PAGE_UPDATE);
2170                 if (RB_WARN_ON(cpu_buffer,
2171                                ret != RB_PAGE_UPDATE))
2172                         return -1;
2173         }
2174
2175         return 0;
2176 }
2177
2178 static inline void
2179 rb_reset_tail(struct ring_buffer_per_cpu *cpu_buffer,
2180               unsigned long tail, struct rb_event_info *info)
2181 {
2182         struct buffer_page *tail_page = info->tail_page;
2183         struct ring_buffer_event *event;
2184         unsigned long length = info->length;
2185
2186         /*
2187          * Only the event that crossed the page boundary
2188          * must fill the old tail_page with padding.
2189          */
2190         if (tail >= BUF_PAGE_SIZE) {
2191                 /*
2192                  * If the page was filled, then we still need
2193                  * to update the real_end. Reset it to zero
2194                  * and the reader will ignore it.
2195                  */
2196                 if (tail == BUF_PAGE_SIZE)
2197                         tail_page->real_end = 0;
2198
2199                 local_sub(length, &tail_page->write);
2200                 return;
2201         }
2202
2203         event = __rb_page_index(tail_page, tail);
2204
2205         /* account for padding bytes */
2206         local_add(BUF_PAGE_SIZE - tail, &cpu_buffer->entries_bytes);
2207
2208         /*
2209          * Save the original length to the meta data.
2210          * This will be used by the reader to add lost event
2211          * counter.
2212          */
2213         tail_page->real_end = tail;
2214
2215         /*
2216          * If this event is bigger than the minimum size, then
2217          * we need to be careful that we don't subtract the
2218          * write counter enough to allow another writer to slip
2219          * in on this page.
2220          * We put in a discarded commit instead, to make sure
2221          * that this space is not used again.
2222          *
2223          * If we are less than the minimum size, we don't need to
2224          * worry about it.
2225          */
2226         if (tail > (BUF_PAGE_SIZE - RB_EVNT_MIN_SIZE)) {
2227                 /* No room for any events */
2228
2229                 /* Mark the rest of the page with padding */
2230                 rb_event_set_padding(event);
2231
2232                 /* Make sure the padding is visible before the write update */
2233                 smp_wmb();
2234
2235                 /* Set the write back to the previous setting */
2236                 local_sub(length, &tail_page->write);
2237                 return;
2238         }
2239
2240         /* Put in a discarded event */
2241         event->array[0] = (BUF_PAGE_SIZE - tail) - RB_EVNT_HDR_SIZE;
2242         event->type_len = RINGBUF_TYPE_PADDING;
2243         /* time delta must be non zero */
2244         event->time_delta = 1;
2245
2246         /* Make sure the padding is visible before the tail_page->write update */
2247         smp_wmb();
2248
2249         /* Set write to end of buffer */
2250         length = (tail + length) - BUF_PAGE_SIZE;
2251         local_sub(length, &tail_page->write);
2252 }
2253
2254 static inline void rb_end_commit(struct ring_buffer_per_cpu *cpu_buffer);
2255
2256 /*
2257  * This is the slow path, force gcc not to inline it.
2258  */
2259 static noinline struct ring_buffer_event *
2260 rb_move_tail(struct ring_buffer_per_cpu *cpu_buffer,
2261              unsigned long tail, struct rb_event_info *info)
2262 {
2263         struct buffer_page *tail_page = info->tail_page;
2264         struct buffer_page *commit_page = cpu_buffer->commit_page;
2265         struct ring_buffer *buffer = cpu_buffer->buffer;
2266         struct buffer_page *next_page;
2267         int ret;
2268
2269         next_page = tail_page;
2270
2271         rb_inc_page(cpu_buffer, &next_page);
2272
2273         /*
2274          * If for some reason, we had an interrupt storm that made
2275          * it all the way around the buffer, bail, and warn
2276          * about it.
2277          */
2278         if (unlikely(next_page == commit_page)) {
2279                 local_inc(&cpu_buffer->commit_overrun);
2280                 goto out_reset;
2281         }
2282
2283         /*
2284          * This is where the fun begins!
2285          *
2286          * We are fighting against races between a reader that
2287          * could be on another CPU trying to swap its reader
2288          * page with the buffer head.
2289          *
2290          * We are also fighting against interrupts coming in and
2291          * moving the head or tail on us as well.
2292          *
2293          * If the next page is the head page then we have filled
2294          * the buffer, unless the commit page is still on the
2295          * reader page.
2296          */
2297         if (rb_is_head_page(cpu_buffer, next_page, &tail_page->list)) {
2298
2299                 /*
2300                  * If the commit is not on the reader page, then
2301                  * move the header page.
2302                  */
2303                 if (!rb_is_reader_page(cpu_buffer->commit_page)) {
2304                         /*
2305                          * If we are not in overwrite mode,
2306                          * this is easy, just stop here.
2307                          */
2308                         if (!(buffer->flags & RB_FL_OVERWRITE)) {
2309                                 local_inc(&cpu_buffer->dropped_events);
2310                                 goto out_reset;
2311                         }
2312
2313                         ret = rb_handle_head_page(cpu_buffer,
2314                                                   tail_page,
2315                                                   next_page);
2316                         if (ret < 0)
2317                                 goto out_reset;
2318                         if (ret)
2319                                 goto out_again;
2320                 } else {
2321                         /*
2322                          * We need to be careful here too. The
2323                          * commit page could still be on the reader
2324                          * page. We could have a small buffer, and
2325                          * have filled up the buffer with events
2326                          * from interrupts and such, and wrapped.
2327                          *
2328                          * Note, if the tail page is also the on the
2329                          * reader_page, we let it move out.
2330                          */
2331                         if (unlikely((cpu_buffer->commit_page !=
2332                                       cpu_buffer->tail_page) &&
2333                                      (cpu_buffer->commit_page ==
2334                                       cpu_buffer->reader_page))) {
2335                                 local_inc(&cpu_buffer->commit_overrun);
2336                                 goto out_reset;
2337                         }
2338                 }
2339         }
2340
2341         rb_tail_page_update(cpu_buffer, tail_page, next_page);
2342
2343  out_again:
2344
2345         rb_reset_tail(cpu_buffer, tail, info);
2346
2347         /* Commit what we have for now. */
2348         rb_end_commit(cpu_buffer);
2349         /* rb_end_commit() decs committing */
2350         local_inc(&cpu_buffer->committing);
2351
2352         /* fail and let the caller try again */
2353         return ERR_PTR(-EAGAIN);
2354
2355  out_reset:
2356         /* reset write */
2357         rb_reset_tail(cpu_buffer, tail, info);
2358
2359         return NULL;
2360 }
2361
2362 /* Slow path, do not inline */
2363 static noinline struct ring_buffer_event *
2364 rb_add_time_stamp(struct ring_buffer_event *event, u64 delta, bool abs)
2365 {
2366         if (abs)
2367                 event->type_len = RINGBUF_TYPE_TIME_STAMP;
2368         else
2369                 event->type_len = RINGBUF_TYPE_TIME_EXTEND;
2370
2371         /* Not the first event on the page, or not delta? */
2372         if (abs || rb_event_index(event)) {
2373                 event->time_delta = delta & TS_MASK;
2374                 event->array[0] = delta >> TS_SHIFT;
2375         } else {
2376                 /* nope, just zero it */
2377                 event->time_delta = 0;
2378                 event->array[0] = 0;
2379         }
2380
2381         return skip_time_extend(event);
2382 }
2383
2384 static inline bool rb_event_is_commit(struct ring_buffer_per_cpu *cpu_buffer,
2385                                      struct ring_buffer_event *event);
2386
2387 /**
2388  * rb_update_event - update event type and data
2389  * @event: the event to update
2390  * @type: the type of event
2391  * @length: the size of the event field in the ring buffer
2392  *
2393  * Update the type and data fields of the event. The length
2394  * is the actual size that is written to the ring buffer,
2395  * and with this, we can determine what to place into the
2396  * data field.
2397  */
2398 static void
2399 rb_update_event(struct ring_buffer_per_cpu *cpu_buffer,
2400                 struct ring_buffer_event *event,
2401                 struct rb_event_info *info)
2402 {
2403         unsigned length = info->length;
2404         u64 delta = info->delta;
2405
2406         /* Only a commit updates the timestamp */
2407         if (unlikely(!rb_event_is_commit(cpu_buffer, event)))
2408                 delta = 0;
2409
2410         /*
2411          * If we need to add a timestamp, then we
2412          * add it to the start of the reserved space.
2413          */
2414         if (unlikely(info->add_timestamp)) {
2415                 bool abs = ring_buffer_time_stamp_abs(cpu_buffer->buffer);
2416
2417                 event = rb_add_time_stamp(event, abs ? info->delta : delta, abs);
2418                 length -= RB_LEN_TIME_EXTEND;
2419                 delta = 0;
2420         }
2421
2422         event->time_delta = delta;
2423         length -= RB_EVNT_HDR_SIZE;
2424         if (length > RB_MAX_SMALL_DATA || RB_FORCE_8BYTE_ALIGNMENT) {
2425                 event->type_len = 0;
2426                 event->array[0] = length;
2427         } else
2428                 event->type_len = DIV_ROUND_UP(length, RB_ALIGNMENT);
2429 }
2430
2431 static unsigned rb_calculate_event_length(unsigned length)
2432 {
2433         struct ring_buffer_event event; /* Used only for sizeof array */
2434
2435         /* zero length can cause confusions */
2436         if (!length)
2437                 length++;
2438
2439         if (length > RB_MAX_SMALL_DATA || RB_FORCE_8BYTE_ALIGNMENT)
2440                 length += sizeof(event.array[0]);
2441
2442         length += RB_EVNT_HDR_SIZE;
2443         length = ALIGN(length, RB_ARCH_ALIGNMENT);
2444
2445         /*
2446          * In case the time delta is larger than the 27 bits for it
2447          * in the header, we need to add a timestamp. If another
2448          * event comes in when trying to discard this one to increase
2449          * the length, then the timestamp will be added in the allocated
2450          * space of this event. If length is bigger than the size needed
2451          * for the TIME_EXTEND, then padding has to be used. The events
2452          * length must be either RB_LEN_TIME_EXTEND, or greater than or equal
2453          * to RB_LEN_TIME_EXTEND + 8, as 8 is the minimum size for padding.
2454          * As length is a multiple of 4, we only need to worry if it
2455          * is 12 (RB_LEN_TIME_EXTEND + 4).
2456          */
2457         if (length == RB_LEN_TIME_EXTEND + RB_ALIGNMENT)
2458                 length += RB_ALIGNMENT;
2459
2460         return length;
2461 }
2462
2463 #ifndef CONFIG_HAVE_UNSTABLE_SCHED_CLOCK
2464 static inline bool sched_clock_stable(void)
2465 {
2466         return true;
2467 }
2468 #endif
2469
2470 static inline int
2471 rb_try_to_discard(struct ring_buffer_per_cpu *cpu_buffer,
2472                   struct ring_buffer_event *event)
2473 {
2474         unsigned long new_index, old_index;
2475         struct buffer_page *bpage;
2476         unsigned long index;
2477         unsigned long addr;
2478
2479         new_index = rb_event_index(event);
2480         old_index = new_index + rb_event_ts_length(event);
2481         addr = (unsigned long)event;
2482         addr &= PAGE_MASK;
2483
2484         bpage = READ_ONCE(cpu_buffer->tail_page);
2485
2486         if (bpage->page == (void *)addr && rb_page_write(bpage) == old_index) {
2487                 unsigned long write_mask =
2488                         local_read(&bpage->write) & ~RB_WRITE_MASK;
2489                 unsigned long event_length = rb_event_length(event);
2490                 /*
2491                  * This is on the tail page. It is possible that
2492                  * a write could come in and move the tail page
2493                  * and write to the next page. That is fine
2494                  * because we just shorten what is on this page.
2495                  */
2496                 old_index += write_mask;
2497                 new_index += write_mask;
2498                 index = local_cmpxchg(&bpage->write, old_index, new_index);
2499                 if (index == old_index) {
2500                         /* update counters */
2501                         local_sub(event_length, &cpu_buffer->entries_bytes);
2502                         return 1;
2503                 }
2504         }
2505
2506         /* could not discard */
2507         return 0;
2508 }
2509
2510 static void rb_start_commit(struct ring_buffer_per_cpu *cpu_buffer)
2511 {
2512         local_inc(&cpu_buffer->committing);
2513         local_inc(&cpu_buffer->commits);
2514 }
2515
2516 static __always_inline void
2517 rb_set_commit_to_write(struct ring_buffer_per_cpu *cpu_buffer)
2518 {
2519         unsigned long max_count;
2520
2521         /*
2522          * We only race with interrupts and NMIs on this CPU.
2523          * If we own the commit event, then we can commit
2524          * all others that interrupted us, since the interruptions
2525          * are in stack format (they finish before they come
2526          * back to us). This allows us to do a simple loop to
2527          * assign the commit to the tail.
2528          */
2529  again:
2530         max_count = cpu_buffer->nr_pages * 100;
2531
2532         while (cpu_buffer->commit_page != READ_ONCE(cpu_buffer->tail_page)) {
2533                 if (RB_WARN_ON(cpu_buffer, !(--max_count)))
2534                         return;
2535                 if (RB_WARN_ON(cpu_buffer,
2536                                rb_is_reader_page(cpu_buffer->tail_page)))
2537                         return;
2538                 /*
2539                  * No need for a memory barrier here, as the update
2540                  * of the tail_page did it for this page.
2541                  */
2542                 local_set(&cpu_buffer->commit_page->page->commit,
2543                           rb_page_write(cpu_buffer->commit_page));
2544                 rb_inc_page(cpu_buffer, &cpu_buffer->commit_page);
2545                 /* Only update the write stamp if the page has an event */
2546                 if (rb_page_write(cpu_buffer->commit_page))
2547                         cpu_buffer->write_stamp =
2548                                 cpu_buffer->commit_page->page->time_stamp;
2549                 /* add barrier to keep gcc from optimizing too much */
2550                 barrier();
2551         }
2552         while (rb_commit_index(cpu_buffer) !=
2553                rb_page_write(cpu_buffer->commit_page)) {
2554
2555                 /* Make sure the readers see the content of what is committed. */
2556                 smp_wmb();
2557                 local_set(&cpu_buffer->commit_page->page->commit,
2558                           rb_page_write(cpu_buffer->commit_page));
2559                 RB_WARN_ON(cpu_buffer,
2560                            local_read(&cpu_buffer->commit_page->page->commit) &
2561                            ~RB_WRITE_MASK);
2562                 barrier();
2563         }
2564
2565         /* again, keep gcc from optimizing */
2566         barrier();
2567
2568         /*
2569          * If an interrupt came in just after the first while loop
2570          * and pushed the tail page forward, we will be left with
2571          * a dangling commit that will never go forward.
2572          */
2573         if (unlikely(cpu_buffer->commit_page != READ_ONCE(cpu_buffer->tail_page)))
2574                 goto again;
2575 }
2576
2577 static __always_inline void rb_end_commit(struct ring_buffer_per_cpu *cpu_buffer)
2578 {
2579         unsigned long commits;
2580
2581         if (RB_WARN_ON(cpu_buffer,
2582                        !local_read(&cpu_buffer->committing)))
2583                 return;
2584
2585  again:
2586         commits = local_read(&cpu_buffer->commits);
2587         /* synchronize with interrupts */
2588         barrier();
2589         if (local_read(&cpu_buffer->committing) == 1)
2590                 rb_set_commit_to_write(cpu_buffer);
2591
2592         local_dec(&cpu_buffer->committing);
2593
2594         /* synchronize with interrupts */
2595         barrier();
2596
2597         /*
2598          * Need to account for interrupts coming in between the
2599          * updating of the commit page and the clearing of the
2600          * committing counter.
2601          */
2602         if (unlikely(local_read(&cpu_buffer->commits) != commits) &&
2603             !local_read(&cpu_buffer->committing)) {
2604                 local_inc(&cpu_buffer->committing);
2605                 goto again;
2606         }
2607 }
2608
2609 static inline void rb_event_discard(struct ring_buffer_event *event)
2610 {
2611         if (extended_time(event))
2612                 event = skip_time_extend(event);
2613
2614         /* array[0] holds the actual length for the discarded event */
2615         event->array[0] = rb_event_data_length(event) - RB_EVNT_HDR_SIZE;
2616         event->type_len = RINGBUF_TYPE_PADDING;
2617         /* time delta must be non zero */
2618         if (!event->time_delta)
2619                 event->time_delta = 1;
2620 }
2621
2622 static __always_inline bool
2623 rb_event_is_commit(struct ring_buffer_per_cpu *cpu_buffer,
2624                    struct ring_buffer_event *event)
2625 {
2626         unsigned long addr = (unsigned long)event;
2627         unsigned long index;
2628
2629         index = rb_event_index(event);
2630         addr &= PAGE_MASK;
2631
2632         return cpu_buffer->commit_page->page == (void *)addr &&
2633                 rb_commit_index(cpu_buffer) == index;
2634 }
2635
2636 static __always_inline void
2637 rb_update_write_stamp(struct ring_buffer_per_cpu *cpu_buffer,
2638                       struct ring_buffer_event *event)
2639 {
2640         u64 delta;
2641
2642         /*
2643          * The event first in the commit queue updates the
2644          * time stamp.
2645          */
2646         if (rb_event_is_commit(cpu_buffer, event)) {
2647                 /*
2648                  * A commit event that is first on a page
2649                  * updates the write timestamp with the page stamp
2650                  */
2651                 if (!rb_event_index(event))
2652                         cpu_buffer->write_stamp =
2653                                 cpu_buffer->commit_page->page->time_stamp;
2654                 else if (event->type_len == RINGBUF_TYPE_TIME_EXTEND) {
2655                         delta = ring_buffer_event_time_stamp(event);
2656                         cpu_buffer->write_stamp += delta;
2657                 } else if (event->type_len == RINGBUF_TYPE_TIME_STAMP) {
2658                         delta = ring_buffer_event_time_stamp(event);
2659                         cpu_buffer->write_stamp = delta;
2660                 } else
2661                         cpu_buffer->write_stamp += event->time_delta;
2662         }
2663 }
2664
2665 static void rb_commit(struct ring_buffer_per_cpu *cpu_buffer,
2666                       struct ring_buffer_event *event)
2667 {
2668         local_inc(&cpu_buffer->entries);
2669         rb_update_write_stamp(cpu_buffer, event);
2670         rb_end_commit(cpu_buffer);
2671 }
2672
2673 static __always_inline void
2674 rb_wakeups(struct ring_buffer *buffer, struct ring_buffer_per_cpu *cpu_buffer)
2675 {
2676         if (buffer->irq_work.waiters_pending) {
2677                 buffer->irq_work.waiters_pending = false;
2678                 /* irq_work_queue() supplies it's own memory barriers */
2679                 irq_work_queue(&buffer->irq_work.work);
2680         }
2681
2682         if (cpu_buffer->irq_work.waiters_pending) {
2683                 cpu_buffer->irq_work.waiters_pending = false;
2684                 /* irq_work_queue() supplies it's own memory barriers */
2685                 irq_work_queue(&cpu_buffer->irq_work.work);
2686         }
2687
2688         if (cpu_buffer->last_pages_touch == local_read(&cpu_buffer->pages_touched))
2689                 return;
2690
2691         if (cpu_buffer->reader_page == cpu_buffer->commit_page)
2692                 return;
2693
2694         if (!cpu_buffer->irq_work.full_waiters_pending)
2695                 return;
2696
2697         cpu_buffer->last_pages_touch = local_read(&cpu_buffer->pages_touched);
2698
2699         if (!full_hit(buffer, cpu_buffer->cpu, cpu_buffer->shortest_full))
2700                 return;
2701
2702         cpu_buffer->irq_work.wakeup_full = true;
2703         cpu_buffer->irq_work.full_waiters_pending = false;
2704         /* irq_work_queue() supplies it's own memory barriers */
2705         irq_work_queue(&cpu_buffer->irq_work.work);
2706 }
2707
2708 /*
2709  * The lock and unlock are done within a preempt disable section.
2710  * The current_context per_cpu variable can only be modified
2711  * by the current task between lock and unlock. But it can
2712  * be modified more than once via an interrupt. To pass this
2713  * information from the lock to the unlock without having to
2714  * access the 'in_interrupt()' functions again (which do show
2715  * a bit of overhead in something as critical as function tracing,
2716  * we use a bitmask trick.
2717  *
2718  *  bit 1 =  NMI context
2719  *  bit 2 =  IRQ context
2720  *  bit 3 =  SoftIRQ context
2721  *  bit 4 =  normal context.
2722  *
2723  * This works because this is the order of contexts that can
2724  * preempt other contexts. A SoftIRQ never preempts an IRQ
2725  * context.
2726  *
2727  * When the context is determined, the corresponding bit is
2728  * checked and set (if it was set, then a recursion of that context
2729  * happened).
2730  *
2731  * On unlock, we need to clear this bit. To do so, just subtract
2732  * 1 from the current_context and AND it to itself.
2733  *
2734  * (binary)
2735  *  101 - 1 = 100
2736  *  101 & 100 = 100 (clearing bit zero)
2737  *
2738  *  1010 - 1 = 1001
2739  *  1010 & 1001 = 1000 (clearing bit 1)
2740  *
2741  * The least significant bit can be cleared this way, and it
2742  * just so happens that it is the same bit corresponding to
2743  * the current context.
2744  *
2745  * Now the TRANSITION bit breaks the above slightly. The TRANSITION bit
2746  * is set when a recursion is detected at the current context, and if
2747  * the TRANSITION bit is already set, it will fail the recursion.
2748  * This is needed because there's a lag between the changing of
2749  * interrupt context and updating the preempt count. In this case,
2750  * a false positive will be found. To handle this, one extra recursion
2751  * is allowed, and this is done by the TRANSITION bit. If the TRANSITION
2752  * bit is already set, then it is considered a recursion and the function
2753  * ends. Otherwise, the TRANSITION bit is set, and that bit is returned.
2754  *
2755  * On the trace_recursive_unlock(), the TRANSITION bit will be the first
2756  * to be cleared. Even if it wasn't the context that set it. That is,
2757  * if an interrupt comes in while NORMAL bit is set and the ring buffer
2758  * is called before preempt_count() is updated, since the check will
2759  * be on the NORMAL bit, the TRANSITION bit will then be set. If an
2760  * NMI then comes in, it will set the NMI bit, but when the NMI code
2761  * does the trace_recursive_unlock() it will clear the TRANSTION bit
2762  * and leave the NMI bit set. But this is fine, because the interrupt
2763  * code that set the TRANSITION bit will then clear the NMI bit when it
2764  * calls trace_recursive_unlock(). If another NMI comes in, it will
2765  * set the TRANSITION bit and continue.
2766  *
2767  * Note: The TRANSITION bit only handles a single transition between context.
2768  */
2769
2770 static __always_inline int
2771 trace_recursive_lock(struct ring_buffer_per_cpu *cpu_buffer)
2772 {
2773         unsigned int val = cpu_buffer->current_context;
2774         unsigned long pc = preempt_count();
2775         int bit;
2776
2777         if (!(pc & (NMI_MASK | HARDIRQ_MASK | SOFTIRQ_OFFSET)))
2778                 bit = RB_CTX_NORMAL;
2779         else
2780                 bit = pc & NMI_MASK ? RB_CTX_NMI :
2781                         pc & HARDIRQ_MASK ? RB_CTX_IRQ : RB_CTX_SOFTIRQ;
2782
2783         if (unlikely(val & (1 << (bit + cpu_buffer->nest)))) {
2784                 /*
2785                  * It is possible that this was called by transitioning
2786                  * between interrupt context, and preempt_count() has not
2787                  * been updated yet. In this case, use the TRANSITION bit.
2788                  */
2789                 bit = RB_CTX_TRANSITION;
2790                 if (val & (1 << (bit + cpu_buffer->nest)))
2791                         return 1;
2792         }
2793
2794         val |= (1 << (bit + cpu_buffer->nest));
2795         cpu_buffer->current_context = val;
2796
2797         return 0;
2798 }
2799
2800 static __always_inline void
2801 trace_recursive_unlock(struct ring_buffer_per_cpu *cpu_buffer)
2802 {
2803         cpu_buffer->current_context &=
2804                 cpu_buffer->current_context - (1 << cpu_buffer->nest);
2805 }
2806
2807 /* The recursive locking above uses 5 bits */
2808 #define NESTED_BITS 5
2809
2810 /**
2811  * ring_buffer_nest_start - Allow to trace while nested
2812  * @buffer: The ring buffer to modify
2813  *
2814  * The ring buffer has a safety mechanism to prevent recursion.
2815  * But there may be a case where a trace needs to be done while
2816  * tracing something else. In this case, calling this function
2817  * will allow this function to nest within a currently active
2818  * ring_buffer_lock_reserve().
2819  *
2820  * Call this function before calling another ring_buffer_lock_reserve() and
2821  * call ring_buffer_nest_end() after the nested ring_buffer_unlock_commit().
2822  */
2823 void ring_buffer_nest_start(struct ring_buffer *buffer)
2824 {
2825         struct ring_buffer_per_cpu *cpu_buffer;
2826         int cpu;
2827
2828         /* Enabled by ring_buffer_nest_end() */
2829         preempt_disable_notrace();
2830         cpu = raw_smp_processor_id();
2831         cpu_buffer = buffer->buffers[cpu];
2832         /* This is the shift value for the above recursive locking */
2833         cpu_buffer->nest += NESTED_BITS;
2834 }
2835
2836 /**
2837  * ring_buffer_nest_end - Allow to trace while nested
2838  * @buffer: The ring buffer to modify
2839  *
2840  * Must be called after ring_buffer_nest_start() and after the
2841  * ring_buffer_unlock_commit().
2842  */
2843 void ring_buffer_nest_end(struct ring_buffer *buffer)
2844 {
2845         struct ring_buffer_per_cpu *cpu_buffer;
2846         int cpu;
2847
2848         /* disabled by ring_buffer_nest_start() */
2849         cpu = raw_smp_processor_id();
2850         cpu_buffer = buffer->buffers[cpu];
2851         /* This is the shift value for the above recursive locking */
2852         cpu_buffer->nest -= NESTED_BITS;
2853         preempt_enable_notrace();
2854 }
2855
2856 /**
2857  * ring_buffer_unlock_commit - commit a reserved
2858  * @buffer: The buffer to commit to
2859  * @event: The event pointer to commit.
2860  *
2861  * This commits the data to the ring buffer, and releases any locks held.
2862  *
2863  * Must be paired with ring_buffer_lock_reserve.
2864  */
2865 int ring_buffer_unlock_commit(struct ring_buffer *buffer,
2866                               struct ring_buffer_event *event)
2867 {
2868         struct ring_buffer_per_cpu *cpu_buffer;
2869         int cpu = raw_smp_processor_id();
2870
2871         cpu_buffer = buffer->buffers[cpu];
2872
2873         rb_commit(cpu_buffer, event);
2874
2875         rb_wakeups(buffer, cpu_buffer);
2876
2877         trace_recursive_unlock(cpu_buffer);
2878
2879         preempt_enable_notrace();
2880
2881         return 0;
2882 }
2883 EXPORT_SYMBOL_GPL(ring_buffer_unlock_commit);
2884
2885 static noinline void
2886 rb_handle_timestamp(struct ring_buffer_per_cpu *cpu_buffer,
2887                     struct rb_event_info *info)
2888 {
2889         WARN_ONCE(info->delta > (1ULL << 59),
2890                   KERN_WARNING "Delta way too big! %llu ts=%llu write stamp = %llu\n%s",
2891                   (unsigned long long)info->delta,
2892                   (unsigned long long)info->ts,
2893                   (unsigned long long)cpu_buffer->write_stamp,
2894                   sched_clock_stable() ? "" :
2895                   "If you just came from a suspend/resume,\n"
2896                   "please switch to the trace global clock:\n"
2897                   "  echo global > /sys/kernel/debug/tracing/trace_clock\n"
2898                   "or add trace_clock=global to the kernel command line\n");
2899         info->add_timestamp = 1;
2900 }
2901
2902 static struct ring_buffer_event *
2903 __rb_reserve_next(struct ring_buffer_per_cpu *cpu_buffer,
2904                   struct rb_event_info *info)
2905 {
2906         struct ring_buffer_event *event;
2907         struct buffer_page *tail_page;
2908         unsigned long tail, write;
2909
2910         /*
2911          * If the time delta since the last event is too big to
2912          * hold in the time field of the event, then we append a
2913          * TIME EXTEND event ahead of the data event.
2914          */
2915         if (unlikely(info->add_timestamp))
2916                 info->length += RB_LEN_TIME_EXTEND;
2917
2918         /* Don't let the compiler play games with cpu_buffer->tail_page */
2919         tail_page = info->tail_page = READ_ONCE(cpu_buffer->tail_page);
2920         write = local_add_return(info->length, &tail_page->write);
2921
2922         /* set write to only the index of the write */
2923         write &= RB_WRITE_MASK;
2924         tail = write - info->length;
2925
2926         /*
2927          * If this is the first commit on the page, then it has the same
2928          * timestamp as the page itself.
2929          */
2930         if (!tail && !ring_buffer_time_stamp_abs(cpu_buffer->buffer))
2931                 info->delta = 0;
2932
2933         /* See if we shot pass the end of this buffer page */
2934         if (unlikely(write > BUF_PAGE_SIZE))
2935                 return rb_move_tail(cpu_buffer, tail, info);
2936
2937         /* We reserved something on the buffer */
2938
2939         event = __rb_page_index(tail_page, tail);
2940         rb_update_event(cpu_buffer, event, info);
2941
2942         local_inc(&tail_page->entries);
2943
2944         /*
2945          * If this is the first commit on the page, then update
2946          * its timestamp.
2947          */
2948         if (!tail)
2949                 tail_page->page->time_stamp = info->ts;
2950
2951         /* account for these added bytes */
2952         local_add(info->length, &cpu_buffer->entries_bytes);
2953
2954         return event;
2955 }
2956
2957 static __always_inline struct ring_buffer_event *
2958 rb_reserve_next_event(struct ring_buffer *buffer,
2959                       struct ring_buffer_per_cpu *cpu_buffer,
2960                       unsigned long length)
2961 {
2962         struct ring_buffer_event *event;
2963         struct rb_event_info info;
2964         int nr_loops = 0;
2965         u64 diff;
2966
2967         rb_start_commit(cpu_buffer);
2968
2969 #ifdef CONFIG_RING_BUFFER_ALLOW_SWAP
2970         /*
2971          * Due to the ability to swap a cpu buffer from a buffer
2972          * it is possible it was swapped before we committed.
2973          * (committing stops a swap). We check for it here and
2974          * if it happened, we have to fail the write.
2975          */
2976         barrier();
2977         if (unlikely(READ_ONCE(cpu_buffer->buffer) != buffer)) {
2978                 local_dec(&cpu_buffer->committing);
2979                 local_dec(&cpu_buffer->commits);
2980                 return NULL;
2981         }
2982 #endif
2983
2984         info.length = rb_calculate_event_length(length);
2985  again:
2986         info.add_timestamp = 0;
2987         info.delta = 0;
2988
2989         /*
2990          * We allow for interrupts to reenter here and do a trace.
2991          * If one does, it will cause this original code to loop
2992          * back here. Even with heavy interrupts happening, this
2993          * should only happen a few times in a row. If this happens
2994          * 1000 times in a row, there must be either an interrupt
2995          * storm or we have something buggy.
2996          * Bail!
2997          */
2998         if (RB_WARN_ON(cpu_buffer, ++nr_loops > 1000))
2999                 goto out_fail;
3000
3001         info.ts = rb_time_stamp(cpu_buffer->buffer);
3002         diff = info.ts - cpu_buffer->write_stamp;
3003
3004         /* make sure this diff is calculated here */
3005         barrier();
3006
3007         if (ring_buffer_time_stamp_abs(buffer)) {
3008                 info.delta = info.ts;
3009                 rb_handle_timestamp(cpu_buffer, &info);
3010         } else /* Did the write stamp get updated already? */
3011                 if (likely(info.ts >= cpu_buffer->write_stamp)) {
3012                 info.delta = diff;
3013                 if (unlikely(test_time_stamp(info.delta)))
3014                         rb_handle_timestamp(cpu_buffer, &info);
3015         }
3016
3017         event = __rb_reserve_next(cpu_buffer, &info);
3018
3019         if (unlikely(PTR_ERR(event) == -EAGAIN)) {
3020                 if (info.add_timestamp)
3021                         info.length -= RB_LEN_TIME_EXTEND;
3022                 goto again;
3023         }
3024
3025         if (!event)
3026                 goto out_fail;
3027
3028         return event;
3029
3030  out_fail:
3031         rb_end_commit(cpu_buffer);
3032         return NULL;
3033 }
3034
3035 /**
3036  * ring_buffer_lock_reserve - reserve a part of the buffer
3037  * @buffer: the ring buffer to reserve from
3038  * @length: the length of the data to reserve (excluding event header)
3039  *
3040  * Returns a reserved event on the ring buffer to copy directly to.
3041  * The user of this interface will need to get the body to write into
3042  * and can use the ring_buffer_event_data() interface.
3043  *
3044  * The length is the length of the data needed, not the event length
3045  * which also includes the event header.
3046  *
3047  * Must be paired with ring_buffer_unlock_commit, unless NULL is returned.
3048  * If NULL is returned, then nothing has been allocated or locked.
3049  */
3050 struct ring_buffer_event *
3051 ring_buffer_lock_reserve(struct ring_buffer *buffer, unsigned long length)
3052 {
3053         struct ring_buffer_per_cpu *cpu_buffer;
3054         struct ring_buffer_event *event;
3055         int cpu;
3056
3057         /* If we are tracing schedule, we don't want to recurse */
3058         preempt_disable_notrace();
3059
3060         if (unlikely(atomic_read(&buffer->record_disabled)))
3061                 goto out;
3062
3063         cpu = raw_smp_processor_id();
3064
3065         if (unlikely(!cpumask_test_cpu(cpu, buffer->cpumask)))
3066                 goto out;
3067
3068         cpu_buffer = buffer->buffers[cpu];
3069
3070         if (unlikely(atomic_read(&cpu_buffer->record_disabled)))
3071                 goto out;
3072
3073         if (unlikely(length > BUF_MAX_DATA_SIZE))
3074                 goto out;
3075
3076         if (unlikely(trace_recursive_lock(cpu_buffer)))
3077                 goto out;
3078
3079         event = rb_reserve_next_event(buffer, cpu_buffer, length);
3080         if (!event)
3081                 goto out_unlock;
3082
3083         return event;
3084
3085  out_unlock:
3086         trace_recursive_unlock(cpu_buffer);
3087  out:
3088         preempt_enable_notrace();
3089         return NULL;
3090 }
3091 EXPORT_SYMBOL_GPL(ring_buffer_lock_reserve);
3092
3093 /*
3094  * Decrement the entries to the page that an event is on.
3095  * The event does not even need to exist, only the pointer
3096  * to the page it is on. This may only be called before the commit
3097  * takes place.
3098  */
3099 static inline void
3100 rb_decrement_entry(struct ring_buffer_per_cpu *cpu_buffer,
3101                    struct ring_buffer_event *event)
3102 {
3103         unsigned long addr = (unsigned long)event;
3104         struct buffer_page *bpage = cpu_buffer->commit_page;
3105         struct buffer_page *start;
3106
3107         addr &= PAGE_MASK;
3108
3109         /* Do the likely case first */
3110         if (likely(bpage->page == (void *)addr)) {
3111                 local_dec(&bpage->entries);
3112                 return;
3113         }
3114
3115         /*
3116          * Because the commit page may be on the reader page we
3117          * start with the next page and check the end loop there.
3118          */
3119         rb_inc_page(cpu_buffer, &bpage);
3120         start = bpage;
3121         do {
3122                 if (bpage->page == (void *)addr) {
3123                         local_dec(&bpage->entries);
3124                         return;
3125                 }
3126                 rb_inc_page(cpu_buffer, &bpage);
3127         } while (bpage != start);
3128
3129         /* commit not part of this buffer?? */
3130         RB_WARN_ON(cpu_buffer, 1);
3131 }
3132
3133 /**
3134  * ring_buffer_commit_discard - discard an event that has not been committed
3135  * @buffer: the ring buffer
3136  * @event: non committed event to discard
3137  *
3138  * Sometimes an event that is in the ring buffer needs to be ignored.
3139  * This function lets the user discard an event in the ring buffer
3140  * and then that event will not be read later.
3141  *
3142  * This function only works if it is called before the item has been
3143  * committed. It will try to free the event from the ring buffer
3144  * if another event has not been added behind it.
3145  *
3146  * If another event has been added behind it, it will set the event
3147  * up as discarded, and perform the commit.
3148  *
3149  * If this function is called, do not call ring_buffer_unlock_commit on
3150  * the event.
3151  */
3152 void ring_buffer_discard_commit(struct ring_buffer *buffer,
3153                                 struct ring_buffer_event *event)
3154 {
3155         struct ring_buffer_per_cpu *cpu_buffer;
3156         int cpu;
3157
3158         /* The event is discarded regardless */
3159         rb_event_discard(event);
3160
3161         cpu = smp_processor_id();
3162         cpu_buffer = buffer->buffers[cpu];
3163
3164         /*
3165          * This must only be called if the event has not been
3166          * committed yet. Thus we can assume that preemption
3167          * is still disabled.
3168          */
3169         RB_WARN_ON(buffer, !local_read(&cpu_buffer->committing));
3170
3171         rb_decrement_entry(cpu_buffer, event);
3172         if (rb_try_to_discard(cpu_buffer, event))
3173                 goto out;
3174
3175         /*
3176          * The commit is still visible by the reader, so we
3177          * must still update the timestamp.
3178          */
3179         rb_update_write_stamp(cpu_buffer, event);
3180  out:
3181         rb_end_commit(cpu_buffer);
3182
3183         trace_recursive_unlock(cpu_buffer);
3184
3185         preempt_enable_notrace();
3186
3187 }
3188 EXPORT_SYMBOL_GPL(ring_buffer_discard_commit);
3189
3190 /**
3191  * ring_buffer_write - write data to the buffer without reserving
3192  * @buffer: The ring buffer to write to.
3193  * @length: The length of the data being written (excluding the event header)
3194  * @data: The data to write to the buffer.
3195  *
3196  * This is like ring_buffer_lock_reserve and ring_buffer_unlock_commit as
3197  * one function. If you already have the data to write to the buffer, it
3198  * may be easier to simply call this function.
3199  *
3200  * Note, like ring_buffer_lock_reserve, the length is the length of the data
3201  * and not the length of the event which would hold the header.
3202  */
3203 int ring_buffer_write(struct ring_buffer *buffer,
3204                       unsigned long length,
3205                       void *data)
3206 {
3207         struct ring_buffer_per_cpu *cpu_buffer;
3208         struct ring_buffer_event *event;
3209         void *body;
3210         int ret = -EBUSY;
3211         int cpu;
3212
3213         preempt_disable_notrace();
3214
3215         if (atomic_read(&buffer->record_disabled))
3216                 goto out;
3217
3218         cpu = raw_smp_processor_id();
3219
3220         if (!cpumask_test_cpu(cpu, buffer->cpumask))
3221                 goto out;
3222
3223         cpu_buffer = buffer->buffers[cpu];
3224
3225         if (atomic_read(&cpu_buffer->record_disabled))
3226                 goto out;
3227
3228         if (length > BUF_MAX_DATA_SIZE)
3229                 goto out;
3230
3231         if (unlikely(trace_recursive_lock(cpu_buffer)))
3232                 goto out;
3233
3234         event = rb_reserve_next_event(buffer, cpu_buffer, length);
3235         if (!event)
3236                 goto out_unlock;
3237
3238         body = rb_event_data(event);
3239
3240         memcpy(body, data, length);
3241
3242         rb_commit(cpu_buffer, event);
3243
3244         rb_wakeups(buffer, cpu_buffer);
3245
3246         ret = 0;
3247
3248  out_unlock:
3249         trace_recursive_unlock(cpu_buffer);
3250
3251  out:
3252         preempt_enable_notrace();
3253
3254         return ret;
3255 }
3256 EXPORT_SYMBOL_GPL(ring_buffer_write);
3257
3258 static bool rb_per_cpu_empty(struct ring_buffer_per_cpu *cpu_buffer)
3259 {
3260         struct buffer_page *reader = cpu_buffer->reader_page;
3261         struct buffer_page *head = rb_set_head_page(cpu_buffer);
3262         struct buffer_page *commit = cpu_buffer->commit_page;
3263
3264         /* In case of error, head will be NULL */
3265         if (unlikely(!head))
3266                 return true;
3267
3268         /* Reader should exhaust content in reader page */
3269         if (reader->read != rb_page_commit(reader))
3270                 return false;
3271
3272         /*
3273          * If writers are committing on the reader page, knowing all
3274          * committed content has been read, the ring buffer is empty.
3275          */
3276         if (commit == reader)
3277                 return true;
3278
3279         /*
3280          * If writers are committing on a page other than reader page
3281          * and head page, there should always be content to read.
3282          */
3283         if (commit != head)
3284                 return false;
3285
3286         /*
3287          * Writers are committing on the head page, we just need
3288          * to care about there're committed data, and the reader will
3289          * swap reader page with head page when it is to read data.
3290          */
3291         return rb_page_commit(commit) == 0;
3292 }
3293
3294 /**
3295  * ring_buffer_record_disable - stop all writes into the buffer
3296  * @buffer: The ring buffer to stop writes to.
3297  *
3298  * This prevents all writes to the buffer. Any attempt to write
3299  * to the buffer after this will fail and return NULL.
3300  *
3301  * The caller should call synchronize_rcu() after this.
3302  */
3303 void ring_buffer_record_disable(struct ring_buffer *buffer)
3304 {
3305         atomic_inc(&buffer->record_disabled);
3306 }
3307 EXPORT_SYMBOL_GPL(ring_buffer_record_disable);
3308
3309 /**
3310  * ring_buffer_record_enable - enable writes to the buffer
3311  * @buffer: The ring buffer to enable writes
3312  *
3313  * Note, multiple disables will need the same number of enables
3314  * to truly enable the writing (much like preempt_disable).
3315  */
3316 void ring_buffer_record_enable(struct ring_buffer *buffer)
3317 {
3318         atomic_dec(&buffer->record_disabled);
3319 }
3320 EXPORT_SYMBOL_GPL(ring_buffer_record_enable);
3321
3322 /**
3323  * ring_buffer_record_off - stop all writes into the buffer
3324  * @buffer: The ring buffer to stop writes to.
3325  *
3326  * This prevents all writes to the buffer. Any attempt to write
3327  * to the buffer after this will fail and return NULL.
3328  *
3329  * This is different than ring_buffer_record_disable() as
3330  * it works like an on/off switch, where as the disable() version
3331  * must be paired with a enable().
3332  */
3333 void ring_buffer_record_off(struct ring_buffer *buffer)
3334 {
3335         unsigned int rd;
3336         unsigned int new_rd;
3337
3338         do {
3339                 rd = atomic_read(&buffer->record_disabled);
3340                 new_rd = rd | RB_BUFFER_OFF;
3341         } while (atomic_cmpxchg(&buffer->record_disabled, rd, new_rd) != rd);
3342 }
3343 EXPORT_SYMBOL_GPL(ring_buffer_record_off);
3344
3345 /**
3346  * ring_buffer_record_on - restart writes into the buffer
3347  * @buffer: The ring buffer to start writes to.
3348  *
3349  * This enables all writes to the buffer that was disabled by
3350  * ring_buffer_record_off().
3351  *
3352  * This is different than ring_buffer_record_enable() as
3353  * it works like an on/off switch, where as the enable() version
3354  * must be paired with a disable().
3355  */
3356 void ring_buffer_record_on(struct ring_buffer *buffer)
3357 {
3358         unsigned int rd;
3359         unsigned int new_rd;
3360
3361         do {
3362                 rd = atomic_read(&buffer->record_disabled);
3363                 new_rd = rd & ~RB_BUFFER_OFF;
3364         } while (atomic_cmpxchg(&buffer->record_disabled, rd, new_rd) != rd);
3365 }
3366 EXPORT_SYMBOL_GPL(ring_buffer_record_on);
3367
3368 /**
3369  * ring_buffer_record_is_on - return true if the ring buffer can write
3370  * @buffer: The ring buffer to see if write is enabled
3371  *
3372  * Returns true if the ring buffer is in a state that it accepts writes.
3373  */
3374 bool ring_buffer_record_is_on(struct ring_buffer *buffer)
3375 {
3376         return !atomic_read(&buffer->record_disabled);
3377 }
3378
3379 /**
3380  * ring_buffer_record_is_set_on - return true if the ring buffer is set writable
3381  * @buffer: The ring buffer to see if write is set enabled
3382  *
3383  * Returns true if the ring buffer is set writable by ring_buffer_record_on().
3384  * Note that this does NOT mean it is in a writable state.
3385  *
3386  * It may return true when the ring buffer has been disabled by
3387  * ring_buffer_record_disable(), as that is a temporary disabling of
3388  * the ring buffer.
3389  */
3390 bool ring_buffer_record_is_set_on(struct ring_buffer *buffer)
3391 {
3392         return !(atomic_read(&buffer->record_disabled) & RB_BUFFER_OFF);
3393 }
3394
3395 /**
3396  * ring_buffer_record_disable_cpu - stop all writes into the cpu_buffer
3397  * @buffer: The ring buffer to stop writes to.
3398  * @cpu: The CPU buffer to stop
3399  *
3400  * This prevents all writes to the buffer. Any attempt to write
3401  * to the buffer after this will fail and return NULL.
3402  *
3403  * The caller should call synchronize_rcu() after this.
3404  */
3405 void ring_buffer_record_disable_cpu(struct ring_buffer *buffer, int cpu)
3406 {
3407         struct ring_buffer_per_cpu *cpu_buffer;
3408
3409         if (!cpumask_test_cpu(cpu, buffer->cpumask))
3410                 return;
3411
3412         cpu_buffer = buffer->buffers[cpu];
3413         atomic_inc(&cpu_buffer->record_disabled);
3414 }
3415 EXPORT_SYMBOL_GPL(ring_buffer_record_disable_cpu);
3416
3417 /**
3418  * ring_buffer_record_enable_cpu - enable writes to the buffer
3419  * @buffer: The ring buffer to enable writes
3420  * @cpu: The CPU to enable.
3421  *
3422  * Note, multiple disables will need the same number of enables
3423  * to truly enable the writing (much like preempt_disable).
3424  */
3425 void ring_buffer_record_enable_cpu(struct ring_buffer *buffer, int cpu)
3426 {
3427         struct ring_buffer_per_cpu *cpu_buffer;
3428
3429         if (!cpumask_test_cpu(cpu, buffer->cpumask))
3430                 return;
3431
3432         cpu_buffer = buffer->buffers[cpu];
3433         atomic_dec(&cpu_buffer->record_disabled);
3434 }
3435 EXPORT_SYMBOL_GPL(ring_buffer_record_enable_cpu);
3436
3437 /*
3438  * The total entries in the ring buffer is the running counter
3439  * of entries entered into the ring buffer, minus the sum of
3440  * the entries read from the ring buffer and the number of
3441  * entries that were overwritten.
3442  */
3443 static inline unsigned long
3444 rb_num_of_entries(struct ring_buffer_per_cpu *cpu_buffer)
3445 {
3446         return local_read(&cpu_buffer->entries) -
3447                 (local_read(&cpu_buffer->overrun) + cpu_buffer->read);
3448 }
3449
3450 /**
3451  * ring_buffer_oldest_event_ts - get the oldest event timestamp from the buffer
3452  * @buffer: The ring buffer
3453  * @cpu: The per CPU buffer to read from.
3454  */
3455 u64 ring_buffer_oldest_event_ts(struct ring_buffer *buffer, int cpu)
3456 {
3457         unsigned long flags;
3458         struct ring_buffer_per_cpu *cpu_buffer;
3459         struct buffer_page *bpage;
3460         u64 ret = 0;
3461
3462         if (!cpumask_test_cpu(cpu, buffer->cpumask))
3463                 return 0;
3464
3465         cpu_buffer = buffer->buffers[cpu];
3466         raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
3467         /*
3468          * if the tail is on reader_page, oldest time stamp is on the reader
3469          * page
3470          */
3471         if (cpu_buffer->tail_page == cpu_buffer->reader_page)
3472                 bpage = cpu_buffer->reader_page;
3473         else
3474                 bpage = rb_set_head_page(cpu_buffer);
3475         if (bpage)
3476                 ret = bpage->page->time_stamp;
3477         raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
3478
3479         return ret;
3480 }
3481 EXPORT_SYMBOL_GPL(ring_buffer_oldest_event_ts);
3482
3483 /**
3484  * ring_buffer_bytes_cpu - get the number of bytes consumed in a cpu buffer
3485  * @buffer: The ring buffer
3486  * @cpu: The per CPU buffer to read from.
3487  */
3488 unsigned long ring_buffer_bytes_cpu(struct ring_buffer *buffer, int cpu)
3489 {
3490         struct ring_buffer_per_cpu *cpu_buffer;
3491         unsigned long ret;
3492
3493         if (!cpumask_test_cpu(cpu, buffer->cpumask))
3494                 return 0;
3495
3496         cpu_buffer = buffer->buffers[cpu];
3497         ret = local_read(&cpu_buffer->entries_bytes) - cpu_buffer->read_bytes;
3498
3499         return ret;
3500 }
3501 EXPORT_SYMBOL_GPL(ring_buffer_bytes_cpu);
3502
3503 /**
3504  * ring_buffer_entries_cpu - get the number of entries in a cpu buffer
3505  * @buffer: The ring buffer
3506  * @cpu: The per CPU buffer to get the entries from.
3507  */
3508 unsigned long ring_buffer_entries_cpu(struct ring_buffer *buffer, int cpu)
3509 {
3510         struct ring_buffer_per_cpu *cpu_buffer;
3511
3512         if (!cpumask_test_cpu(cpu, buffer->cpumask))
3513                 return 0;
3514
3515         cpu_buffer = buffer->buffers[cpu];
3516
3517         return rb_num_of_entries(cpu_buffer);
3518 }
3519 EXPORT_SYMBOL_GPL(ring_buffer_entries_cpu);
3520
3521 /**
3522  * ring_buffer_overrun_cpu - get the number of overruns caused by the ring
3523  * buffer wrapping around (only if RB_FL_OVERWRITE is on).
3524  * @buffer: The ring buffer
3525  * @cpu: The per CPU buffer to get the number of overruns from
3526  */
3527 unsigned long ring_buffer_overrun_cpu(struct ring_buffer *buffer, int cpu)
3528 {
3529         struct ring_buffer_per_cpu *cpu_buffer;
3530         unsigned long ret;
3531
3532         if (!cpumask_test_cpu(cpu, buffer->cpumask))
3533                 return 0;
3534
3535         cpu_buffer = buffer->buffers[cpu];
3536         ret = local_read(&cpu_buffer->overrun);
3537
3538         return ret;
3539 }
3540 EXPORT_SYMBOL_GPL(ring_buffer_overrun_cpu);
3541
3542 /**
3543  * ring_buffer_commit_overrun_cpu - get the number of overruns caused by
3544  * commits failing due to the buffer wrapping around while there are uncommitted
3545  * events, such as during an interrupt storm.
3546  * @buffer: The ring buffer
3547  * @cpu: The per CPU buffer to get the number of overruns from
3548  */
3549 unsigned long
3550 ring_buffer_commit_overrun_cpu(struct ring_buffer *buffer, int cpu)
3551 {
3552         struct ring_buffer_per_cpu *cpu_buffer;
3553         unsigned long ret;
3554
3555         if (!cpumask_test_cpu(cpu, buffer->cpumask))
3556                 return 0;
3557
3558         cpu_buffer = buffer->buffers[cpu];
3559         ret = local_read(&cpu_buffer->commit_overrun);
3560
3561         return ret;
3562 }
3563 EXPORT_SYMBOL_GPL(ring_buffer_commit_overrun_cpu);
3564
3565 /**
3566  * ring_buffer_dropped_events_cpu - get the number of dropped events caused by
3567  * the ring buffer filling up (only if RB_FL_OVERWRITE is off).
3568  * @buffer: The ring buffer
3569  * @cpu: The per CPU buffer to get the number of overruns from
3570  */
3571 unsigned long
3572 ring_buffer_dropped_events_cpu(struct ring_buffer *buffer, int cpu)
3573 {
3574         struct ring_buffer_per_cpu *cpu_buffer;
3575         unsigned long ret;
3576
3577         if (!cpumask_test_cpu(cpu, buffer->cpumask))
3578                 return 0;
3579
3580         cpu_buffer = buffer->buffers[cpu];
3581         ret = local_read(&cpu_buffer->dropped_events);
3582
3583         return ret;
3584 }
3585 EXPORT_SYMBOL_GPL(ring_buffer_dropped_events_cpu);
3586
3587 /**
3588  * ring_buffer_read_events_cpu - get the number of events successfully read
3589  * @buffer: The ring buffer
3590  * @cpu: The per CPU buffer to get the number of events read
3591  */
3592 unsigned long
3593 ring_buffer_read_events_cpu(struct ring_buffer *buffer, int cpu)
3594 {
3595         struct ring_buffer_per_cpu *cpu_buffer;
3596
3597         if (!cpumask_test_cpu(cpu, buffer->cpumask))
3598                 return 0;
3599
3600         cpu_buffer = buffer->buffers[cpu];
3601         return cpu_buffer->read;
3602 }
3603 EXPORT_SYMBOL_GPL(ring_buffer_read_events_cpu);
3604
3605 /**
3606  * ring_buffer_entries - get the number of entries in a buffer
3607  * @buffer: The ring buffer
3608  *
3609  * Returns the total number of entries in the ring buffer
3610  * (all CPU entries)
3611  */
3612 unsigned long ring_buffer_entries(struct ring_buffer *buffer)
3613 {
3614         struct ring_buffer_per_cpu *cpu_buffer;
3615         unsigned long entries = 0;
3616         int cpu;
3617
3618         /* if you care about this being correct, lock the buffer */
3619         for_each_buffer_cpu(buffer, cpu) {
3620                 cpu_buffer = buffer->buffers[cpu];
3621                 entries += rb_num_of_entries(cpu_buffer);
3622         }
3623
3624         return entries;
3625 }
3626 EXPORT_SYMBOL_GPL(ring_buffer_entries);
3627
3628 /**
3629  * ring_buffer_overruns - get the number of overruns in buffer
3630  * @buffer: The ring buffer
3631  *
3632  * Returns the total number of overruns in the ring buffer
3633  * (all CPU entries)
3634  */
3635 unsigned long ring_buffer_overruns(struct ring_buffer *buffer)
3636 {
3637         struct ring_buffer_per_cpu *cpu_buffer;
3638         unsigned long overruns = 0;
3639         int cpu;
3640
3641         /* if you care about this being correct, lock the buffer */
3642         for_each_buffer_cpu(buffer, cpu) {
3643                 cpu_buffer = buffer->buffers[cpu];
3644                 overruns += local_read(&cpu_buffer->overrun);
3645         }
3646
3647         return overruns;
3648 }
3649 EXPORT_SYMBOL_GPL(ring_buffer_overruns);
3650
3651 static void rb_iter_reset(struct ring_buffer_iter *iter)
3652 {
3653         struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer;
3654
3655         /* Iterator usage is expected to have record disabled */
3656         iter->head_page = cpu_buffer->reader_page;
3657         iter->head = cpu_buffer->reader_page->read;
3658
3659         iter->cache_reader_page = iter->head_page;
3660         iter->cache_read = cpu_buffer->read;
3661         iter->cache_pages_removed = cpu_buffer->pages_removed;
3662
3663         if (iter->head)
3664                 iter->read_stamp = cpu_buffer->read_stamp;
3665         else
3666                 iter->read_stamp = iter->head_page->page->time_stamp;
3667 }
3668
3669 /**
3670  * ring_buffer_iter_reset - reset an iterator
3671  * @iter: The iterator to reset
3672  *
3673  * Resets the iterator, so that it will start from the beginning
3674  * again.
3675  */
3676 void ring_buffer_iter_reset(struct ring_buffer_iter *iter)
3677 {
3678         struct ring_buffer_per_cpu *cpu_buffer;
3679         unsigned long flags;
3680
3681         if (!iter)
3682                 return;
3683
3684         cpu_buffer = iter->cpu_buffer;
3685
3686         raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
3687         rb_iter_reset(iter);
3688         raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
3689 }
3690 EXPORT_SYMBOL_GPL(ring_buffer_iter_reset);
3691
3692 /**
3693  * ring_buffer_iter_empty - check if an iterator has no more to read
3694  * @iter: The iterator to check
3695  */
3696 int ring_buffer_iter_empty(struct ring_buffer_iter *iter)
3697 {
3698         struct ring_buffer_per_cpu *cpu_buffer;
3699         struct buffer_page *reader;
3700         struct buffer_page *head_page;
3701         struct buffer_page *commit_page;
3702         unsigned commit;
3703
3704         cpu_buffer = iter->cpu_buffer;
3705
3706         /* Remember, trace recording is off when iterator is in use */
3707         reader = cpu_buffer->reader_page;
3708         head_page = cpu_buffer->head_page;
3709         commit_page = cpu_buffer->commit_page;
3710         commit = rb_page_commit(commit_page);
3711
3712         return ((iter->head_page == commit_page && iter->head == commit) ||
3713                 (iter->head_page == reader && commit_page == head_page &&
3714                  head_page->read == commit &&
3715                  iter->head == rb_page_commit(cpu_buffer->reader_page)));
3716 }
3717 EXPORT_SYMBOL_GPL(ring_buffer_iter_empty);
3718
3719 static void
3720 rb_update_read_stamp(struct ring_buffer_per_cpu *cpu_buffer,
3721                      struct ring_buffer_event *event)
3722 {
3723         u64 delta;
3724
3725         switch (event->type_len) {
3726         case RINGBUF_TYPE_PADDING:
3727                 return;
3728
3729         case RINGBUF_TYPE_TIME_EXTEND:
3730                 delta = ring_buffer_event_time_stamp(event);
3731                 cpu_buffer->read_stamp += delta;
3732                 return;
3733
3734         case RINGBUF_TYPE_TIME_STAMP:
3735                 delta = ring_buffer_event_time_stamp(event);
3736                 cpu_buffer->read_stamp = delta;
3737                 return;
3738
3739         case RINGBUF_TYPE_DATA:
3740                 cpu_buffer->read_stamp += event->time_delta;
3741                 return;
3742
3743         default:
3744                 BUG();
3745         }
3746         return;
3747 }
3748
3749 static void
3750 rb_update_iter_read_stamp(struct ring_buffer_iter *iter,
3751                           struct ring_buffer_event *event)
3752 {
3753         u64 delta;
3754
3755         switch (event->type_len) {
3756         case RINGBUF_TYPE_PADDING:
3757                 return;
3758
3759         case RINGBUF_TYPE_TIME_EXTEND:
3760                 delta = ring_buffer_event_time_stamp(event);
3761                 iter->read_stamp += delta;
3762                 return;
3763
3764         case RINGBUF_TYPE_TIME_STAMP:
3765                 delta = ring_buffer_event_time_stamp(event);
3766                 iter->read_stamp = delta;
3767                 return;
3768
3769         case RINGBUF_TYPE_DATA:
3770                 iter->read_stamp += event->time_delta;
3771                 return;
3772
3773         default:
3774                 BUG();
3775         }
3776         return;
3777 }
3778
3779 static struct buffer_page *
3780 rb_get_reader_page(struct ring_buffer_per_cpu *cpu_buffer)
3781 {
3782         struct buffer_page *reader = NULL;
3783         unsigned long overwrite;
3784         unsigned long flags;
3785         int nr_loops = 0;
3786         int ret;
3787
3788         local_irq_save(flags);
3789         arch_spin_lock(&cpu_buffer->lock);
3790
3791  again:
3792         /*
3793          * This should normally only loop twice. But because the
3794          * start of the reader inserts an empty page, it causes
3795          * a case where we will loop three times. There should be no
3796          * reason to loop four times (that I know of).
3797          */
3798         if (RB_WARN_ON(cpu_buffer, ++nr_loops > 3)) {
3799                 reader = NULL;
3800                 goto out;
3801         }
3802
3803         reader = cpu_buffer->reader_page;
3804
3805         /* If there's more to read, return this page */
3806         if (cpu_buffer->reader_page->read < rb_page_size(reader))
3807                 goto out;
3808
3809         /* Never should we have an index greater than the size */
3810         if (RB_WARN_ON(cpu_buffer,
3811                        cpu_buffer->reader_page->read > rb_page_size(reader)))
3812                 goto out;
3813
3814         /* check if we caught up to the tail */
3815         reader = NULL;
3816         if (cpu_buffer->commit_page == cpu_buffer->reader_page)
3817                 goto out;
3818
3819         /* Don't bother swapping if the ring buffer is empty */
3820         if (rb_num_of_entries(cpu_buffer) == 0)
3821                 goto out;
3822
3823         /*
3824          * Reset the reader page to size zero.
3825          */
3826         local_set(&cpu_buffer->reader_page->write, 0);
3827         local_set(&cpu_buffer->reader_page->entries, 0);
3828         local_set(&cpu_buffer->reader_page->page->commit, 0);
3829         cpu_buffer->reader_page->real_end = 0;
3830
3831  spin:
3832         /*
3833          * Splice the empty reader page into the list around the head.
3834          */
3835         reader = rb_set_head_page(cpu_buffer);
3836         if (!reader)
3837                 goto out;
3838         cpu_buffer->reader_page->list.next = rb_list_head(reader->list.next);
3839         cpu_buffer->reader_page->list.prev = reader->list.prev;
3840
3841         /*
3842          * cpu_buffer->pages just needs to point to the buffer, it
3843          *  has no specific buffer page to point to. Lets move it out
3844          *  of our way so we don't accidentally swap it.
3845          */
3846         cpu_buffer->pages = reader->list.prev;
3847
3848         /* The reader page will be pointing to the new head */
3849         rb_set_list_to_head(cpu_buffer, &cpu_buffer->reader_page->list);
3850
3851         /*
3852          * We want to make sure we read the overruns after we set up our
3853          * pointers to the next object. The writer side does a
3854          * cmpxchg to cross pages which acts as the mb on the writer
3855          * side. Note, the reader will constantly fail the swap
3856          * while the writer is updating the pointers, so this
3857          * guarantees that the overwrite recorded here is the one we
3858          * want to compare with the last_overrun.
3859          */
3860         smp_mb();
3861         overwrite = local_read(&(cpu_buffer->overrun));
3862
3863         /*
3864          * Here's the tricky part.
3865          *
3866          * We need to move the pointer past the header page.
3867          * But we can only do that if a writer is not currently
3868          * moving it. The page before the header page has the
3869          * flag bit '1' set if it is pointing to the page we want.
3870          * but if the writer is in the process of moving it
3871          * than it will be '2' or already moved '0'.
3872          */
3873
3874         ret = rb_head_page_replace(reader, cpu_buffer->reader_page);
3875
3876         /*
3877          * If we did not convert it, then we must try again.
3878          */
3879         if (!ret)
3880                 goto spin;
3881
3882         /*
3883          * Yay! We succeeded in replacing the page.
3884          *
3885          * Now make the new head point back to the reader page.
3886          */
3887         rb_list_head(reader->list.next)->prev = &cpu_buffer->reader_page->list;
3888         rb_inc_page(cpu_buffer, &cpu_buffer->head_page);
3889
3890         local_inc(&cpu_buffer->pages_read);
3891
3892         /* Finally update the reader page to the new head */
3893         cpu_buffer->reader_page = reader;
3894         cpu_buffer->reader_page->read = 0;
3895
3896         if (overwrite != cpu_buffer->last_overrun) {
3897                 cpu_buffer->lost_events = overwrite - cpu_buffer->last_overrun;
3898                 cpu_buffer->last_overrun = overwrite;
3899         }
3900
3901         goto again;
3902
3903  out:
3904         /* Update the read_stamp on the first event */
3905         if (reader && reader->read == 0)
3906                 cpu_buffer->read_stamp = reader->page->time_stamp;
3907
3908         arch_spin_unlock(&cpu_buffer->lock);
3909         local_irq_restore(flags);
3910
3911         /*
3912          * The writer has preempt disable, wait for it. But not forever
3913          * Although, 1 second is pretty much "forever"
3914          */
3915 #define USECS_WAIT      1000000
3916         for (nr_loops = 0; nr_loops < USECS_WAIT; nr_loops++) {
3917                 /* If the write is past the end of page, a writer is still updating it */
3918                 if (likely(!reader || rb_page_write(reader) <= BUF_PAGE_SIZE))
3919                         break;
3920
3921                 udelay(1);
3922
3923                 /* Get the latest version of the reader write value */
3924                 smp_rmb();
3925         }
3926
3927         /* The writer is not moving forward? Something is wrong */
3928         if (RB_WARN_ON(cpu_buffer, nr_loops == USECS_WAIT))
3929                 reader = NULL;
3930
3931         /*
3932          * Make sure we see any padding after the write update
3933          * (see rb_reset_tail()).
3934          *
3935          * In addition, a writer may be writing on the reader page
3936          * if the page has not been fully filled, so the read barrier
3937          * is also needed to make sure we see the content of what is
3938          * committed by the writer (see rb_set_commit_to_write()).
3939          */
3940         smp_rmb();
3941
3942
3943         return reader;
3944 }
3945
3946 static void rb_advance_reader(struct ring_buffer_per_cpu *cpu_buffer)
3947 {
3948         struct ring_buffer_event *event;
3949         struct buffer_page *reader;
3950         unsigned length;
3951
3952         reader = rb_get_reader_page(cpu_buffer);
3953
3954         /* This function should not be called when buffer is empty */
3955         if (RB_WARN_ON(cpu_buffer, !reader))
3956                 return;
3957
3958         event = rb_reader_event(cpu_buffer);
3959
3960         if (event->type_len <= RINGBUF_TYPE_DATA_TYPE_LEN_MAX)
3961                 cpu_buffer->read++;
3962
3963         rb_update_read_stamp(cpu_buffer, event);
3964
3965         length = rb_event_length(event);
3966         cpu_buffer->reader_page->read += length;
3967 }
3968
3969 static void rb_advance_iter(struct ring_buffer_iter *iter)
3970 {
3971         struct ring_buffer_per_cpu *cpu_buffer;
3972         struct ring_buffer_event *event;
3973         unsigned length;
3974
3975         cpu_buffer = iter->cpu_buffer;
3976
3977         /*
3978          * Check if we are at the end of the buffer.
3979          */
3980         if (iter->head >= rb_page_size(iter->head_page)) {
3981                 /* discarded commits can make the page empty */
3982                 if (iter->head_page == cpu_buffer->commit_page)
3983                         return;
3984                 rb_inc_iter(iter);
3985                 return;
3986         }
3987
3988         event = rb_iter_head_event(iter);
3989
3990         length = rb_event_length(event);
3991
3992         /*
3993          * This should not be called to advance the header if we are
3994          * at the tail of the buffer.
3995          */
3996         if (RB_WARN_ON(cpu_buffer,
3997                        (iter->head_page == cpu_buffer->commit_page) &&
3998                        (iter->head + length > rb_commit_index(cpu_buffer))))
3999                 return;
4000
4001         rb_update_iter_read_stamp(iter, event);
4002
4003         iter->head += length;
4004
4005         /* check for end of page padding */
4006         if ((iter->head >= rb_page_size(iter->head_page)) &&
4007             (iter->head_page != cpu_buffer->commit_page))
4008                 rb_inc_iter(iter);
4009 }
4010
4011 static int rb_lost_events(struct ring_buffer_per_cpu *cpu_buffer)
4012 {
4013         return cpu_buffer->lost_events;
4014 }
4015
4016 static struct ring_buffer_event *
4017 rb_buffer_peek(struct ring_buffer_per_cpu *cpu_buffer, u64 *ts,
4018                unsigned long *lost_events)
4019 {
4020         struct ring_buffer_event *event;
4021         struct buffer_page *reader;
4022         int nr_loops = 0;
4023
4024         if (ts)
4025                 *ts = 0;
4026  again:
4027         /*
4028          * We repeat when a time extend is encountered.
4029          * Since the time extend is always attached to a data event,
4030          * we should never loop more than once.
4031          * (We never hit the following condition more than twice).
4032          */
4033         if (RB_WARN_ON(cpu_buffer, ++nr_loops > 2))
4034                 return NULL;
4035
4036         reader = rb_get_reader_page(cpu_buffer);
4037         if (!reader)
4038                 return NULL;
4039
4040         event = rb_reader_event(cpu_buffer);
4041
4042         switch (event->type_len) {
4043         case RINGBUF_TYPE_PADDING:
4044                 if (rb_null_event(event))
4045                         RB_WARN_ON(cpu_buffer, 1);
4046                 /*
4047                  * Because the writer could be discarding every
4048                  * event it creates (which would probably be bad)
4049                  * if we were to go back to "again" then we may never
4050                  * catch up, and will trigger the warn on, or lock
4051                  * the box. Return the padding, and we will release
4052                  * the current locks, and try again.
4053                  */
4054                 return event;
4055
4056         case RINGBUF_TYPE_TIME_EXTEND:
4057                 /* Internal data, OK to advance */
4058                 rb_advance_reader(cpu_buffer);
4059                 goto again;
4060
4061         case RINGBUF_TYPE_TIME_STAMP:
4062                 if (ts) {
4063                         *ts = ring_buffer_event_time_stamp(event);
4064                         ring_buffer_normalize_time_stamp(cpu_buffer->buffer,
4065                                                          cpu_buffer->cpu, ts);
4066                 }
4067                 /* Internal data, OK to advance */
4068                 rb_advance_reader(cpu_buffer);
4069                 goto again;
4070
4071         case RINGBUF_TYPE_DATA:
4072                 if (ts && !(*ts)) {
4073                         *ts = cpu_buffer->read_stamp + event->time_delta;
4074                         ring_buffer_normalize_time_stamp(cpu_buffer->buffer,
4075                                                          cpu_buffer->cpu, ts);
4076                 }
4077                 if (lost_events)
4078                         *lost_events = rb_lost_events(cpu_buffer);
4079                 return event;
4080
4081         default:
4082                 BUG();
4083         }
4084
4085         return NULL;
4086 }
4087 EXPORT_SYMBOL_GPL(ring_buffer_peek);
4088
4089 static struct ring_buffer_event *
4090 rb_iter_peek(struct ring_buffer_iter *iter, u64 *ts)
4091 {
4092         struct ring_buffer *buffer;
4093         struct ring_buffer_per_cpu *cpu_buffer;
4094         struct ring_buffer_event *event;
4095         int nr_loops = 0;
4096
4097         if (ts)
4098                 *ts = 0;
4099
4100         cpu_buffer = iter->cpu_buffer;
4101         buffer = cpu_buffer->buffer;
4102
4103         /*
4104          * Check if someone performed a consuming read to the buffer
4105          * or removed some pages from the buffer. In these cases,
4106          * iterator was invalidated and we need to reset it.
4107          */
4108         if (unlikely(iter->cache_read != cpu_buffer->read ||
4109                      iter->cache_reader_page != cpu_buffer->reader_page ||
4110                      iter->cache_pages_removed != cpu_buffer->pages_removed))
4111                 rb_iter_reset(iter);
4112
4113  again:
4114         if (ring_buffer_iter_empty(iter))
4115                 return NULL;
4116
4117         /*
4118          * We repeat when a time extend is encountered or we hit
4119          * the end of the page. Since the time extend is always attached
4120          * to a data event, we should never loop more than three times.
4121          * Once for going to next page, once on time extend, and
4122          * finally once to get the event.
4123          * (We never hit the following condition more than thrice).
4124          */
4125         if (RB_WARN_ON(cpu_buffer, ++nr_loops > 3))
4126                 return NULL;
4127
4128         if (rb_per_cpu_empty(cpu_buffer))
4129                 return NULL;
4130
4131         if (iter->head >= rb_page_size(iter->head_page)) {
4132                 rb_inc_iter(iter);
4133                 goto again;
4134         }
4135
4136         event = rb_iter_head_event(iter);
4137
4138         switch (event->type_len) {
4139         case RINGBUF_TYPE_PADDING:
4140                 if (rb_null_event(event)) {
4141                         rb_inc_iter(iter);
4142                         goto again;
4143                 }
4144                 rb_advance_iter(iter);
4145                 return event;
4146
4147         case RINGBUF_TYPE_TIME_EXTEND:
4148                 /* Internal data, OK to advance */
4149                 rb_advance_iter(iter);
4150                 goto again;
4151
4152         case RINGBUF_TYPE_TIME_STAMP:
4153                 if (ts) {
4154                         *ts = ring_buffer_event_time_stamp(event);
4155                         ring_buffer_normalize_time_stamp(cpu_buffer->buffer,
4156                                                          cpu_buffer->cpu, ts);
4157                 }
4158                 /* Internal data, OK to advance */
4159                 rb_advance_iter(iter);
4160                 goto again;
4161
4162         case RINGBUF_TYPE_DATA:
4163                 if (ts && !(*ts)) {
4164                         *ts = iter->read_stamp + event->time_delta;
4165                         ring_buffer_normalize_time_stamp(buffer,
4166                                                          cpu_buffer->cpu, ts);
4167                 }
4168                 return event;
4169
4170         default:
4171                 BUG();
4172         }
4173
4174         return NULL;
4175 }
4176 EXPORT_SYMBOL_GPL(ring_buffer_iter_peek);
4177
4178 static inline bool rb_reader_lock(struct ring_buffer_per_cpu *cpu_buffer)
4179 {
4180         if (likely(!in_nmi())) {
4181                 raw_spin_lock(&cpu_buffer->reader_lock);
4182                 return true;
4183         }
4184
4185         /*
4186          * If an NMI die dumps out the content of the ring buffer
4187          * trylock must be used to prevent a deadlock if the NMI
4188          * preempted a task that holds the ring buffer locks. If
4189          * we get the lock then all is fine, if not, then continue
4190          * to do the read, but this can corrupt the ring buffer,
4191          * so it must be permanently disabled from future writes.
4192          * Reading from NMI is a oneshot deal.
4193          */
4194         if (raw_spin_trylock(&cpu_buffer->reader_lock))
4195                 return true;
4196
4197         /* Continue without locking, but disable the ring buffer */
4198         atomic_inc(&cpu_buffer->record_disabled);
4199         return false;
4200 }
4201
4202 static inline void
4203 rb_reader_unlock(struct ring_buffer_per_cpu *cpu_buffer, bool locked)
4204 {
4205         if (likely(locked))
4206                 raw_spin_unlock(&cpu_buffer->reader_lock);
4207         return;
4208 }
4209
4210 /**
4211  * ring_buffer_peek - peek at the next event to be read
4212  * @buffer: The ring buffer to read
4213  * @cpu: The cpu to peak at
4214  * @ts: The timestamp counter of this event.
4215  * @lost_events: a variable to store if events were lost (may be NULL)
4216  *
4217  * This will return the event that will be read next, but does
4218  * not consume the data.
4219  */
4220 struct ring_buffer_event *
4221 ring_buffer_peek(struct ring_buffer *buffer, int cpu, u64 *ts,
4222                  unsigned long *lost_events)
4223 {
4224         struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu];
4225         struct ring_buffer_event *event;
4226         unsigned long flags;
4227         bool dolock;
4228
4229         if (!cpumask_test_cpu(cpu, buffer->cpumask))
4230                 return NULL;
4231
4232  again:
4233         local_irq_save(flags);
4234         dolock = rb_reader_lock(cpu_buffer);
4235         event = rb_buffer_peek(cpu_buffer, ts, lost_events);
4236         if (event && event->type_len == RINGBUF_TYPE_PADDING)
4237                 rb_advance_reader(cpu_buffer);
4238         rb_reader_unlock(cpu_buffer, dolock);
4239         local_irq_restore(flags);
4240
4241         if (event && event->type_len == RINGBUF_TYPE_PADDING)
4242                 goto again;
4243
4244         return event;
4245 }
4246
4247 /**
4248  * ring_buffer_iter_peek - peek at the next event to be read
4249  * @iter: The ring buffer iterator
4250  * @ts: The timestamp counter of this event.
4251  *
4252  * This will return the event that will be read next, but does
4253  * not increment the iterator.
4254  */
4255 struct ring_buffer_event *
4256 ring_buffer_iter_peek(struct ring_buffer_iter *iter, u64 *ts)
4257 {
4258         struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer;
4259         struct ring_buffer_event *event;
4260         unsigned long flags;
4261
4262  again:
4263         raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
4264         event = rb_iter_peek(iter, ts);
4265         raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
4266
4267         if (event && event->type_len == RINGBUF_TYPE_PADDING)
4268                 goto again;
4269
4270         return event;
4271 }
4272
4273 /**
4274  * ring_buffer_consume - return an event and consume it
4275  * @buffer: The ring buffer to get the next event from
4276  * @cpu: the cpu to read the buffer from
4277  * @ts: a variable to store the timestamp (may be NULL)
4278  * @lost_events: a variable to store if events were lost (may be NULL)
4279  *
4280  * Returns the next event in the ring buffer, and that event is consumed.
4281  * Meaning, that sequential reads will keep returning a different event,
4282  * and eventually empty the ring buffer if the producer is slower.
4283  */
4284 struct ring_buffer_event *
4285 ring_buffer_consume(struct ring_buffer *buffer, int cpu, u64 *ts,
4286                     unsigned long *lost_events)
4287 {
4288         struct ring_buffer_per_cpu *cpu_buffer;
4289         struct ring_buffer_event *event = NULL;
4290         unsigned long flags;
4291         bool dolock;
4292
4293  again:
4294         /* might be called in atomic */
4295         preempt_disable();
4296
4297         if (!cpumask_test_cpu(cpu, buffer->cpumask))
4298                 goto out;
4299
4300         cpu_buffer = buffer->buffers[cpu];
4301         local_irq_save(flags);
4302         dolock = rb_reader_lock(cpu_buffer);
4303
4304         event = rb_buffer_peek(cpu_buffer, ts, lost_events);
4305         if (event) {
4306                 cpu_buffer->lost_events = 0;
4307                 rb_advance_reader(cpu_buffer);
4308         }
4309
4310         rb_reader_unlock(cpu_buffer, dolock);
4311         local_irq_restore(flags);
4312
4313  out:
4314         preempt_enable();
4315
4316         if (event && event->type_len == RINGBUF_TYPE_PADDING)
4317                 goto again;
4318
4319         return event;
4320 }
4321 EXPORT_SYMBOL_GPL(ring_buffer_consume);
4322
4323 /**
4324  * ring_buffer_read_prepare - Prepare for a non consuming read of the buffer
4325  * @buffer: The ring buffer to read from
4326  * @cpu: The cpu buffer to iterate over
4327  * @flags: gfp flags to use for memory allocation
4328  *
4329  * This performs the initial preparations necessary to iterate
4330  * through the buffer.  Memory is allocated, buffer recording
4331  * is disabled, and the iterator pointer is returned to the caller.
4332  *
4333  * Disabling buffer recording prevents the reading from being
4334  * corrupted. This is not a consuming read, so a producer is not
4335  * expected.
4336  *
4337  * After a sequence of ring_buffer_read_prepare calls, the user is
4338  * expected to make at least one call to ring_buffer_read_prepare_sync.
4339  * Afterwards, ring_buffer_read_start is invoked to get things going
4340  * for real.
4341  *
4342  * This overall must be paired with ring_buffer_read_finish.
4343  */
4344 struct ring_buffer_iter *
4345 ring_buffer_read_prepare(struct ring_buffer *buffer, int cpu, gfp_t flags)
4346 {
4347         struct ring_buffer_per_cpu *cpu_buffer;
4348         struct ring_buffer_iter *iter;
4349
4350         if (!cpumask_test_cpu(cpu, buffer->cpumask))
4351                 return NULL;
4352
4353         iter = kmalloc(sizeof(*iter), flags);
4354         if (!iter)
4355                 return NULL;
4356
4357         cpu_buffer = buffer->buffers[cpu];
4358
4359         iter->cpu_buffer = cpu_buffer;
4360
4361         atomic_inc(&buffer->resize_disabled);
4362         atomic_inc(&cpu_buffer->record_disabled);
4363
4364         return iter;
4365 }
4366 EXPORT_SYMBOL_GPL(ring_buffer_read_prepare);
4367
4368 /**
4369  * ring_buffer_read_prepare_sync - Synchronize a set of prepare calls
4370  *
4371  * All previously invoked ring_buffer_read_prepare calls to prepare
4372  * iterators will be synchronized.  Afterwards, read_buffer_read_start
4373  * calls on those iterators are allowed.
4374  */
4375 void
4376 ring_buffer_read_prepare_sync(void)
4377 {
4378         synchronize_rcu();
4379 }
4380 EXPORT_SYMBOL_GPL(ring_buffer_read_prepare_sync);
4381
4382 /**
4383  * ring_buffer_read_start - start a non consuming read of the buffer
4384  * @iter: The iterator returned by ring_buffer_read_prepare
4385  *
4386  * This finalizes the startup of an iteration through the buffer.
4387  * The iterator comes from a call to ring_buffer_read_prepare and
4388  * an intervening ring_buffer_read_prepare_sync must have been
4389  * performed.
4390  *
4391  * Must be paired with ring_buffer_read_finish.
4392  */
4393 void
4394 ring_buffer_read_start(struct ring_buffer_iter *iter)
4395 {
4396         struct ring_buffer_per_cpu *cpu_buffer;
4397         unsigned long flags;
4398
4399         if (!iter)
4400                 return;
4401
4402         cpu_buffer = iter->cpu_buffer;
4403
4404         raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
4405         arch_spin_lock(&cpu_buffer->lock);
4406         rb_iter_reset(iter);
4407         arch_spin_unlock(&cpu_buffer->lock);
4408         raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
4409 }
4410 EXPORT_SYMBOL_GPL(ring_buffer_read_start);
4411
4412 /**
4413  * ring_buffer_read_finish - finish reading the iterator of the buffer
4414  * @iter: The iterator retrieved by ring_buffer_start
4415  *
4416  * This re-enables the recording to the buffer, and frees the
4417  * iterator.
4418  */
4419 void
4420 ring_buffer_read_finish(struct ring_buffer_iter *iter)
4421 {
4422         struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer;
4423         unsigned long flags;
4424
4425         /*
4426          * Ring buffer is disabled from recording, here's a good place
4427          * to check the integrity of the ring buffer.
4428          * Must prevent readers from trying to read, as the check
4429          * clears the HEAD page and readers require it.
4430          */
4431         raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
4432         rb_check_pages(cpu_buffer);
4433         raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
4434
4435         atomic_dec(&cpu_buffer->record_disabled);
4436         atomic_dec(&cpu_buffer->buffer->resize_disabled);
4437         kfree(iter);
4438 }
4439 EXPORT_SYMBOL_GPL(ring_buffer_read_finish);
4440
4441 /**
4442  * ring_buffer_read - read the next item in the ring buffer by the iterator
4443  * @iter: The ring buffer iterator
4444  * @ts: The time stamp of the event read.
4445  *
4446  * This reads the next event in the ring buffer and increments the iterator.
4447  */
4448 struct ring_buffer_event *
4449 ring_buffer_read(struct ring_buffer_iter *iter, u64 *ts)
4450 {
4451         struct ring_buffer_event *event;
4452         struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer;
4453         unsigned long flags;
4454
4455         raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
4456  again:
4457         event = rb_iter_peek(iter, ts);
4458         if (!event)
4459                 goto out;
4460
4461         if (event->type_len == RINGBUF_TYPE_PADDING)
4462                 goto again;
4463
4464         rb_advance_iter(iter);
4465  out:
4466         raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
4467
4468         return event;
4469 }
4470 EXPORT_SYMBOL_GPL(ring_buffer_read);
4471
4472 /**
4473  * ring_buffer_size - return the size of the ring buffer (in bytes)
4474  * @buffer: The ring buffer.
4475  */
4476 unsigned long ring_buffer_size(struct ring_buffer *buffer, int cpu)
4477 {
4478         /*
4479          * Earlier, this method returned
4480          *      BUF_PAGE_SIZE * buffer->nr_pages
4481          * Since the nr_pages field is now removed, we have converted this to
4482          * return the per cpu buffer value.
4483          */
4484         if (!cpumask_test_cpu(cpu, buffer->cpumask))
4485                 return 0;
4486
4487         return BUF_PAGE_SIZE * buffer->buffers[cpu]->nr_pages;
4488 }
4489 EXPORT_SYMBOL_GPL(ring_buffer_size);
4490
4491 static void rb_clear_buffer_page(struct buffer_page *page)
4492 {
4493         local_set(&page->write, 0);
4494         local_set(&page->entries, 0);
4495         rb_init_page(page->page);
4496         page->read = 0;
4497 }
4498
4499 static void
4500 rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer)
4501 {
4502         struct buffer_page *page;
4503
4504         rb_head_page_deactivate(cpu_buffer);
4505
4506         cpu_buffer->head_page
4507                 = list_entry(cpu_buffer->pages, struct buffer_page, list);
4508         rb_clear_buffer_page(cpu_buffer->head_page);
4509         list_for_each_entry(page, cpu_buffer->pages, list) {
4510                 rb_clear_buffer_page(page);
4511         }
4512
4513         cpu_buffer->tail_page = cpu_buffer->head_page;
4514         cpu_buffer->commit_page = cpu_buffer->head_page;
4515
4516         INIT_LIST_HEAD(&cpu_buffer->reader_page->list);
4517         INIT_LIST_HEAD(&cpu_buffer->new_pages);
4518         rb_clear_buffer_page(cpu_buffer->reader_page);
4519
4520         local_set(&cpu_buffer->entries_bytes, 0);
4521         local_set(&cpu_buffer->overrun, 0);
4522         local_set(&cpu_buffer->commit_overrun, 0);
4523         local_set(&cpu_buffer->dropped_events, 0);
4524         local_set(&cpu_buffer->entries, 0);
4525         local_set(&cpu_buffer->committing, 0);
4526         local_set(&cpu_buffer->commits, 0);
4527         local_set(&cpu_buffer->pages_touched, 0);
4528         local_set(&cpu_buffer->pages_lost, 0);
4529         local_set(&cpu_buffer->pages_read, 0);
4530         cpu_buffer->last_pages_touch = 0;
4531         cpu_buffer->shortest_full = 0;
4532         cpu_buffer->read = 0;
4533         cpu_buffer->read_bytes = 0;
4534
4535         cpu_buffer->write_stamp = 0;
4536         cpu_buffer->read_stamp = 0;
4537
4538         cpu_buffer->lost_events = 0;
4539         cpu_buffer->last_overrun = 0;
4540
4541         rb_head_page_activate(cpu_buffer);
4542         cpu_buffer->pages_removed = 0;
4543 }
4544
4545 /**
4546  * ring_buffer_reset_cpu - reset a ring buffer per CPU buffer
4547  * @buffer: The ring buffer to reset a per cpu buffer of
4548  * @cpu: The CPU buffer to be reset
4549  */
4550 void ring_buffer_reset_cpu(struct ring_buffer *buffer, int cpu)
4551 {
4552         struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu];
4553         unsigned long flags;
4554
4555         if (!cpumask_test_cpu(cpu, buffer->cpumask))
4556                 return;
4557         /* prevent another thread from changing buffer sizes */
4558         mutex_lock(&buffer->mutex);
4559
4560         atomic_inc(&buffer->resize_disabled);
4561         atomic_inc(&cpu_buffer->record_disabled);
4562
4563         /* Make sure all commits have finished */
4564         synchronize_rcu();
4565
4566         raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
4567
4568         if (RB_WARN_ON(cpu_buffer, local_read(&cpu_buffer->committing)))
4569                 goto out;
4570
4571         arch_spin_lock(&cpu_buffer->lock);
4572
4573         rb_reset_cpu(cpu_buffer);
4574
4575         arch_spin_unlock(&cpu_buffer->lock);
4576
4577  out:
4578         raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
4579
4580         atomic_dec(&cpu_buffer->record_disabled);
4581         atomic_dec(&buffer->resize_disabled);
4582
4583         mutex_unlock(&buffer->mutex);
4584 }
4585 EXPORT_SYMBOL_GPL(ring_buffer_reset_cpu);
4586
4587 /**
4588  * ring_buffer_reset - reset a ring buffer
4589  * @buffer: The ring buffer to reset all cpu buffers
4590  */
4591 void ring_buffer_reset(struct ring_buffer *buffer)
4592 {
4593         int cpu;
4594
4595         for_each_buffer_cpu(buffer, cpu)
4596                 ring_buffer_reset_cpu(buffer, cpu);
4597 }
4598 EXPORT_SYMBOL_GPL(ring_buffer_reset);
4599
4600 /**
4601  * rind_buffer_empty - is the ring buffer empty?
4602  * @buffer: The ring buffer to test
4603  */
4604 bool ring_buffer_empty(struct ring_buffer *buffer)
4605 {
4606         struct ring_buffer_per_cpu *cpu_buffer;
4607         unsigned long flags;
4608         bool dolock;
4609         int cpu;
4610         int ret;
4611
4612         /* yes this is racy, but if you don't like the race, lock the buffer */
4613         for_each_buffer_cpu(buffer, cpu) {
4614                 cpu_buffer = buffer->buffers[cpu];
4615                 local_irq_save(flags);
4616                 dolock = rb_reader_lock(cpu_buffer);
4617                 ret = rb_per_cpu_empty(cpu_buffer);
4618                 rb_reader_unlock(cpu_buffer, dolock);
4619                 local_irq_restore(flags);
4620
4621                 if (!ret)
4622                         return false;
4623         }
4624
4625         return true;
4626 }
4627 EXPORT_SYMBOL_GPL(ring_buffer_empty);
4628
4629 /**
4630  * ring_buffer_empty_cpu - is a cpu buffer of a ring buffer empty?
4631  * @buffer: The ring buffer
4632  * @cpu: The CPU buffer to test
4633  */
4634 bool ring_buffer_empty_cpu(struct ring_buffer *buffer, int cpu)
4635 {
4636         struct ring_buffer_per_cpu *cpu_buffer;
4637         unsigned long flags;
4638         bool dolock;
4639         int ret;
4640
4641         if (!cpumask_test_cpu(cpu, buffer->cpumask))
4642                 return true;
4643
4644         cpu_buffer = buffer->buffers[cpu];
4645         local_irq_save(flags);
4646         dolock = rb_reader_lock(cpu_buffer);
4647         ret = rb_per_cpu_empty(cpu_buffer);
4648         rb_reader_unlock(cpu_buffer, dolock);
4649         local_irq_restore(flags);
4650
4651         return ret;
4652 }
4653 EXPORT_SYMBOL_GPL(ring_buffer_empty_cpu);
4654
4655 #ifdef CONFIG_RING_BUFFER_ALLOW_SWAP
4656 /**
4657  * ring_buffer_swap_cpu - swap a CPU buffer between two ring buffers
4658  * @buffer_a: One buffer to swap with
4659  * @buffer_b: The other buffer to swap with
4660  *
4661  * This function is useful for tracers that want to take a "snapshot"
4662  * of a CPU buffer and has another back up buffer lying around.
4663  * it is expected that the tracer handles the cpu buffer not being
4664  * used at the moment.
4665  */
4666 int ring_buffer_swap_cpu(struct ring_buffer *buffer_a,
4667                          struct ring_buffer *buffer_b, int cpu)
4668 {
4669         struct ring_buffer_per_cpu *cpu_buffer_a;
4670         struct ring_buffer_per_cpu *cpu_buffer_b;
4671         int ret = -EINVAL;
4672
4673         if (!cpumask_test_cpu(cpu, buffer_a->cpumask) ||
4674             !cpumask_test_cpu(cpu, buffer_b->cpumask))
4675                 goto out;
4676
4677         cpu_buffer_a = buffer_a->buffers[cpu];
4678         cpu_buffer_b = buffer_b->buffers[cpu];
4679
4680         /* At least make sure the two buffers are somewhat the same */
4681         if (cpu_buffer_a->nr_pages != cpu_buffer_b->nr_pages)
4682                 goto out;
4683
4684         ret = -EAGAIN;
4685
4686         if (atomic_read(&buffer_a->record_disabled))
4687                 goto out;
4688
4689         if (atomic_read(&buffer_b->record_disabled))
4690                 goto out;
4691
4692         if (atomic_read(&cpu_buffer_a->record_disabled))
4693                 goto out;
4694
4695         if (atomic_read(&cpu_buffer_b->record_disabled))
4696                 goto out;
4697
4698         /*
4699          * We can't do a synchronize_rcu here because this
4700          * function can be called in atomic context.
4701          * Normally this will be called from the same CPU as cpu.
4702          * If not it's up to the caller to protect this.
4703          */
4704         atomic_inc(&cpu_buffer_a->record_disabled);
4705         atomic_inc(&cpu_buffer_b->record_disabled);
4706
4707         ret = -EBUSY;
4708         if (local_read(&cpu_buffer_a->committing))
4709                 goto out_dec;
4710         if (local_read(&cpu_buffer_b->committing))
4711                 goto out_dec;
4712
4713         buffer_a->buffers[cpu] = cpu_buffer_b;
4714         buffer_b->buffers[cpu] = cpu_buffer_a;
4715
4716         cpu_buffer_b->buffer = buffer_a;
4717         cpu_buffer_a->buffer = buffer_b;
4718
4719         ret = 0;
4720
4721 out_dec:
4722         atomic_dec(&cpu_buffer_a->record_disabled);
4723         atomic_dec(&cpu_buffer_b->record_disabled);
4724 out:
4725         return ret;
4726 }
4727 EXPORT_SYMBOL_GPL(ring_buffer_swap_cpu);
4728 #endif /* CONFIG_RING_BUFFER_ALLOW_SWAP */
4729
4730 /**
4731  * ring_buffer_alloc_read_page - allocate a page to read from buffer
4732  * @buffer: the buffer to allocate for.
4733  * @cpu: the cpu buffer to allocate.
4734  *
4735  * This function is used in conjunction with ring_buffer_read_page.
4736  * When reading a full page from the ring buffer, these functions
4737  * can be used to speed up the process. The calling function should
4738  * allocate a few pages first with this function. Then when it
4739  * needs to get pages from the ring buffer, it passes the result
4740  * of this function into ring_buffer_read_page, which will swap
4741  * the page that was allocated, with the read page of the buffer.
4742  *
4743  * Returns:
4744  *  The page allocated, or ERR_PTR
4745  */
4746 void *ring_buffer_alloc_read_page(struct ring_buffer *buffer, int cpu)
4747 {
4748         struct ring_buffer_per_cpu *cpu_buffer;
4749         struct buffer_data_page *bpage = NULL;
4750         unsigned long flags;
4751         struct page *page;
4752
4753         if (!cpumask_test_cpu(cpu, buffer->cpumask))
4754                 return ERR_PTR(-ENODEV);
4755
4756         cpu_buffer = buffer->buffers[cpu];
4757         local_irq_save(flags);
4758         arch_spin_lock(&cpu_buffer->lock);
4759
4760         if (cpu_buffer->free_page) {
4761                 bpage = cpu_buffer->free_page;
4762                 cpu_buffer->free_page = NULL;
4763         }
4764
4765         arch_spin_unlock(&cpu_buffer->lock);
4766         local_irq_restore(flags);
4767
4768         if (bpage)
4769                 goto out;
4770
4771         page = alloc_pages_node(cpu_to_node(cpu),
4772                                 GFP_KERNEL | __GFP_NORETRY, 0);
4773         if (!page)
4774                 return ERR_PTR(-ENOMEM);
4775
4776         bpage = page_address(page);
4777
4778  out:
4779         rb_init_page(bpage);
4780
4781         return bpage;
4782 }
4783 EXPORT_SYMBOL_GPL(ring_buffer_alloc_read_page);
4784
4785 /**
4786  * ring_buffer_free_read_page - free an allocated read page
4787  * @buffer: the buffer the page was allocate for
4788  * @cpu: the cpu buffer the page came from
4789  * @data: the page to free
4790  *
4791  * Free a page allocated from ring_buffer_alloc_read_page.
4792  */
4793 void ring_buffer_free_read_page(struct ring_buffer *buffer, int cpu, void *data)
4794 {
4795         struct ring_buffer_per_cpu *cpu_buffer;
4796         struct buffer_data_page *bpage = data;
4797         struct page *page = virt_to_page(bpage);
4798         unsigned long flags;
4799
4800         if (!buffer || !buffer->buffers || !buffer->buffers[cpu])
4801                 return;
4802
4803         cpu_buffer = buffer->buffers[cpu];
4804
4805         /* If the page is still in use someplace else, we can't reuse it */
4806         if (page_ref_count(page) > 1)
4807                 goto out;
4808
4809         local_irq_save(flags);
4810         arch_spin_lock(&cpu_buffer->lock);
4811
4812         if (!cpu_buffer->free_page) {
4813                 cpu_buffer->free_page = bpage;
4814                 bpage = NULL;
4815         }
4816
4817         arch_spin_unlock(&cpu_buffer->lock);
4818         local_irq_restore(flags);
4819
4820  out:
4821         free_page((unsigned long)bpage);
4822 }
4823 EXPORT_SYMBOL_GPL(ring_buffer_free_read_page);
4824
4825 /**
4826  * ring_buffer_read_page - extract a page from the ring buffer
4827  * @buffer: buffer to extract from
4828  * @data_page: the page to use allocated from ring_buffer_alloc_read_page
4829  * @len: amount to extract
4830  * @cpu: the cpu of the buffer to extract
4831  * @full: should the extraction only happen when the page is full.
4832  *
4833  * This function will pull out a page from the ring buffer and consume it.
4834  * @data_page must be the address of the variable that was returned
4835  * from ring_buffer_alloc_read_page. This is because the page might be used
4836  * to swap with a page in the ring buffer.
4837  *
4838  * for example:
4839  *      rpage = ring_buffer_alloc_read_page(buffer, cpu);
4840  *      if (IS_ERR(rpage))
4841  *              return PTR_ERR(rpage);
4842  *      ret = ring_buffer_read_page(buffer, &rpage, len, cpu, 0);
4843  *      if (ret >= 0)
4844  *              process_page(rpage, ret);
4845  *
4846  * When @full is set, the function will not return true unless
4847  * the writer is off the reader page.
4848  *
4849  * Note: it is up to the calling functions to handle sleeps and wakeups.
4850  *  The ring buffer can be used anywhere in the kernel and can not
4851  *  blindly call wake_up. The layer that uses the ring buffer must be
4852  *  responsible for that.
4853  *
4854  * Returns:
4855  *  >=0 if data has been transferred, returns the offset of consumed data.
4856  *  <0 if no data has been transferred.
4857  */
4858 int ring_buffer_read_page(struct ring_buffer *buffer,
4859                           void **data_page, size_t len, int cpu, int full)
4860 {
4861         struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu];
4862         struct ring_buffer_event *event;
4863         struct buffer_data_page *bpage;
4864         struct buffer_page *reader;
4865         unsigned long missed_events;
4866         unsigned long flags;
4867         unsigned int commit;
4868         unsigned int read;
4869         u64 save_timestamp;
4870         int ret = -1;
4871
4872         if (!cpumask_test_cpu(cpu, buffer->cpumask))
4873                 goto out;
4874
4875         /*
4876          * If len is not big enough to hold the page header, then
4877          * we can not copy anything.
4878          */
4879         if (len <= BUF_PAGE_HDR_SIZE)
4880                 goto out;
4881
4882         len -= BUF_PAGE_HDR_SIZE;
4883
4884         if (!data_page)
4885                 goto out;
4886
4887         bpage = *data_page;
4888         if (!bpage)
4889                 goto out;
4890
4891         raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
4892
4893         reader = rb_get_reader_page(cpu_buffer);
4894         if (!reader)
4895                 goto out_unlock;
4896
4897         event = rb_reader_event(cpu_buffer);
4898
4899         read = reader->read;
4900         commit = rb_page_commit(reader);
4901
4902         /* Check if any events were dropped */
4903         missed_events = cpu_buffer->lost_events;
4904
4905         /*
4906          * If this page has been partially read or
4907          * if len is not big enough to read the rest of the page or
4908          * a writer is still on the page, then
4909          * we must copy the data from the page to the buffer.
4910          * Otherwise, we can simply swap the page with the one passed in.
4911          */
4912         if (read || (len < (commit - read)) ||
4913             cpu_buffer->reader_page == cpu_buffer->commit_page) {
4914                 struct buffer_data_page *rpage = cpu_buffer->reader_page->page;
4915                 unsigned int rpos = read;
4916                 unsigned int pos = 0;
4917                 unsigned int size;
4918
4919                 /*
4920                  * If a full page is expected, this can still be returned
4921                  * if there's been a previous partial read and the
4922                  * rest of the page can be read and the commit page is off
4923                  * the reader page.
4924                  */
4925                 if (full &&
4926                     (!read || (len < (commit - read)) ||
4927                      cpu_buffer->reader_page == cpu_buffer->commit_page))
4928                         goto out_unlock;
4929
4930                 if (len > (commit - read))
4931                         len = (commit - read);
4932
4933                 /* Always keep the time extend and data together */
4934                 size = rb_event_ts_length(event);
4935
4936                 if (len < size)
4937                         goto out_unlock;
4938
4939                 /* save the current timestamp, since the user will need it */
4940                 save_timestamp = cpu_buffer->read_stamp;
4941
4942                 /* Need to copy one event at a time */
4943                 do {
4944                         /* We need the size of one event, because
4945                          * rb_advance_reader only advances by one event,
4946                          * whereas rb_event_ts_length may include the size of
4947                          * one or two events.
4948                          * We have already ensured there's enough space if this
4949                          * is a time extend. */
4950                         size = rb_event_length(event);
4951                         memcpy(bpage->data + pos, rpage->data + rpos, size);
4952
4953                         len -= size;
4954
4955                         rb_advance_reader(cpu_buffer);
4956                         rpos = reader->read;
4957                         pos += size;
4958
4959                         if (rpos >= commit)
4960                                 break;
4961
4962                         event = rb_reader_event(cpu_buffer);
4963                         /* Always keep the time extend and data together */
4964                         size = rb_event_ts_length(event);
4965                 } while (len >= size);
4966
4967                 /* update bpage */
4968                 local_set(&bpage->commit, pos);
4969                 bpage->time_stamp = save_timestamp;
4970
4971                 /* we copied everything to the beginning */
4972                 read = 0;
4973         } else {
4974                 /* update the entry counter */
4975                 cpu_buffer->read += rb_page_entries(reader);
4976                 cpu_buffer->read_bytes += BUF_PAGE_SIZE;
4977
4978                 /* swap the pages */
4979                 rb_init_page(bpage);
4980                 bpage = reader->page;
4981                 reader->page = *data_page;
4982                 local_set(&reader->write, 0);
4983                 local_set(&reader->entries, 0);
4984                 reader->read = 0;
4985                 *data_page = bpage;
4986
4987                 /*
4988                  * Use the real_end for the data size,
4989                  * This gives us a chance to store the lost events
4990                  * on the page.
4991                  */
4992                 if (reader->real_end)
4993                         local_set(&bpage->commit, reader->real_end);
4994         }
4995         ret = read;
4996
4997         cpu_buffer->lost_events = 0;
4998
4999         commit = local_read(&bpage->commit);
5000         /*
5001          * Set a flag in the commit field if we lost events
5002          */
5003         if (missed_events) {
5004                 /* If there is room at the end of the page to save the
5005                  * missed events, then record it there.
5006                  */
5007                 if (BUF_PAGE_SIZE - commit >= sizeof(missed_events)) {
5008                         memcpy(&bpage->data[commit], &missed_events,
5009                                sizeof(missed_events));
5010                         local_add(RB_MISSED_STORED, &bpage->commit);
5011                         commit += sizeof(missed_events);
5012                 }
5013                 local_add(RB_MISSED_EVENTS, &bpage->commit);
5014         }
5015
5016         /*
5017          * This page may be off to user land. Zero it out here.
5018          */
5019         if (commit < BUF_PAGE_SIZE)
5020                 memset(&bpage->data[commit], 0, BUF_PAGE_SIZE - commit);
5021
5022  out_unlock:
5023         raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
5024
5025  out:
5026         return ret;
5027 }
5028 EXPORT_SYMBOL_GPL(ring_buffer_read_page);
5029
5030 /*
5031  * We only allocate new buffers, never free them if the CPU goes down.
5032  * If we were to free the buffer, then the user would lose any trace that was in
5033  * the buffer.
5034  */
5035 int trace_rb_cpu_prepare(unsigned int cpu, struct hlist_node *node)
5036 {
5037         struct ring_buffer *buffer;
5038         long nr_pages_same;
5039         int cpu_i;
5040         unsigned long nr_pages;
5041
5042         buffer = container_of(node, struct ring_buffer, node);
5043         if (cpumask_test_cpu(cpu, buffer->cpumask))
5044                 return 0;
5045
5046         nr_pages = 0;
5047         nr_pages_same = 1;
5048         /* check if all cpu sizes are same */
5049         for_each_buffer_cpu(buffer, cpu_i) {
5050                 /* fill in the size from first enabled cpu */
5051                 if (nr_pages == 0)
5052                         nr_pages = buffer->buffers[cpu_i]->nr_pages;
5053                 if (nr_pages != buffer->buffers[cpu_i]->nr_pages) {
5054                         nr_pages_same = 0;
5055                         break;
5056                 }
5057         }
5058         /* allocate minimum pages, user can later expand it */
5059         if (!nr_pages_same)
5060                 nr_pages = 2;
5061         buffer->buffers[cpu] =
5062                 rb_allocate_cpu_buffer(buffer, nr_pages, cpu);
5063         if (!buffer->buffers[cpu]) {
5064                 WARN(1, "failed to allocate ring buffer on CPU %u\n",
5065                      cpu);
5066                 return -ENOMEM;
5067         }
5068         smp_wmb();
5069         cpumask_set_cpu(cpu, buffer->cpumask);
5070         return 0;
5071 }
5072
5073 #ifdef CONFIG_RING_BUFFER_STARTUP_TEST
5074 /*
5075  * This is a basic integrity check of the ring buffer.
5076  * Late in the boot cycle this test will run when configured in.
5077  * It will kick off a thread per CPU that will go into a loop
5078  * writing to the per cpu ring buffer various sizes of data.
5079  * Some of the data will be large items, some small.
5080  *
5081  * Another thread is created that goes into a spin, sending out
5082  * IPIs to the other CPUs to also write into the ring buffer.
5083  * this is to test the nesting ability of the buffer.
5084  *
5085  * Basic stats are recorded and reported. If something in the
5086  * ring buffer should happen that's not expected, a big warning
5087  * is displayed and all ring buffers are disabled.
5088  */
5089 static struct task_struct *rb_threads[NR_CPUS] __initdata;
5090
5091 struct rb_test_data {
5092         struct ring_buffer      *buffer;
5093         unsigned long           events;
5094         unsigned long           bytes_written;
5095         unsigned long           bytes_alloc;
5096         unsigned long           bytes_dropped;
5097         unsigned long           events_nested;
5098         unsigned long           bytes_written_nested;
5099         unsigned long           bytes_alloc_nested;
5100         unsigned long           bytes_dropped_nested;
5101         int                     min_size_nested;
5102         int                     max_size_nested;
5103         int                     max_size;
5104         int                     min_size;
5105         int                     cpu;
5106         int                     cnt;
5107 };
5108
5109 static struct rb_test_data rb_data[NR_CPUS] __initdata;
5110
5111 /* 1 meg per cpu */
5112 #define RB_TEST_BUFFER_SIZE     1048576
5113
5114 static char rb_string[] __initdata =
5115         "abcdefghijklmnopqrstuvwxyz1234567890!@#$%^&*()?+\\"
5116         "?+|:';\",.<>/?abcdefghijklmnopqrstuvwxyz1234567890"
5117         "!@#$%^&*()?+\\?+|:';\",.<>/?abcdefghijklmnopqrstuv";
5118
5119 static bool rb_test_started __initdata;
5120
5121 struct rb_item {
5122         int size;
5123         char str[];
5124 };
5125
5126 static __init int rb_write_something(struct rb_test_data *data, bool nested)
5127 {
5128         struct ring_buffer_event *event;
5129         struct rb_item *item;
5130         bool started;
5131         int event_len;
5132         int size;
5133         int len;
5134         int cnt;
5135
5136         /* Have nested writes different that what is written */
5137         cnt = data->cnt + (nested ? 27 : 0);
5138
5139         /* Multiply cnt by ~e, to make some unique increment */
5140         size = (cnt * 68 / 25) % (sizeof(rb_string) - 1);
5141
5142         len = size + sizeof(struct rb_item);
5143
5144         started = rb_test_started;
5145         /* read rb_test_started before checking buffer enabled */
5146         smp_rmb();
5147
5148         event = ring_buffer_lock_reserve(data->buffer, len);
5149         if (!event) {
5150                 /* Ignore dropped events before test starts. */
5151                 if (started) {
5152                         if (nested)
5153                                 data->bytes_dropped += len;
5154                         else
5155                                 data->bytes_dropped_nested += len;
5156                 }
5157                 return len;
5158         }
5159
5160         event_len = ring_buffer_event_length(event);
5161
5162         if (RB_WARN_ON(data->buffer, event_len < len))
5163                 goto out;
5164
5165         item = ring_buffer_event_data(event);
5166         item->size = size;
5167         memcpy(item->str, rb_string, size);
5168
5169         if (nested) {
5170                 data->bytes_alloc_nested += event_len;
5171                 data->bytes_written_nested += len;
5172                 data->events_nested++;
5173                 if (!data->min_size_nested || len < data->min_size_nested)
5174                         data->min_size_nested = len;
5175                 if (len > data->max_size_nested)
5176                         data->max_size_nested = len;
5177         } else {
5178                 data->bytes_alloc += event_len;
5179                 data->bytes_written += len;
5180                 data->events++;
5181                 if (!data->min_size || len < data->min_size)
5182                         data->max_size = len;
5183                 if (len > data->max_size)
5184                         data->max_size = len;
5185         }
5186
5187  out:
5188         ring_buffer_unlock_commit(data->buffer, event);
5189
5190         return 0;
5191 }
5192
5193 static __init int rb_test(void *arg)
5194 {
5195         struct rb_test_data *data = arg;
5196
5197         while (!kthread_should_stop()) {
5198                 rb_write_something(data, false);
5199                 data->cnt++;
5200
5201                 set_current_state(TASK_INTERRUPTIBLE);
5202                 /* Now sleep between a min of 100-300us and a max of 1ms */
5203                 usleep_range(((data->cnt % 3) + 1) * 100, 1000);
5204         }
5205
5206         return 0;
5207 }
5208
5209 static __init void rb_ipi(void *ignore)
5210 {
5211         struct rb_test_data *data;
5212         int cpu = smp_processor_id();
5213
5214         data = &rb_data[cpu];
5215         rb_write_something(data, true);
5216 }
5217
5218 static __init int rb_hammer_test(void *arg)
5219 {
5220         while (!kthread_should_stop()) {
5221
5222                 /* Send an IPI to all cpus to write data! */
5223                 smp_call_function(rb_ipi, NULL, 1);
5224                 /* No sleep, but for non preempt, let others run */
5225                 schedule();
5226         }
5227
5228         return 0;
5229 }
5230
5231 static __init int test_ringbuffer(void)
5232 {
5233         struct task_struct *rb_hammer;
5234         struct ring_buffer *buffer;
5235         int cpu;
5236         int ret = 0;
5237
5238         if (security_locked_down(LOCKDOWN_TRACEFS)) {
5239                 pr_warning("Lockdown is enabled, skipping ring buffer tests\n");
5240                 return 0;
5241         }
5242
5243         pr_info("Running ring buffer tests...\n");
5244
5245         buffer = ring_buffer_alloc(RB_TEST_BUFFER_SIZE, RB_FL_OVERWRITE);
5246         if (WARN_ON(!buffer))
5247                 return 0;
5248
5249         /* Disable buffer so that threads can't write to it yet */
5250         ring_buffer_record_off(buffer);
5251
5252         for_each_online_cpu(cpu) {
5253                 rb_data[cpu].buffer = buffer;
5254                 rb_data[cpu].cpu = cpu;
5255                 rb_data[cpu].cnt = cpu;
5256                 rb_threads[cpu] = kthread_create(rb_test, &rb_data[cpu],
5257                                                  "rbtester/%d", cpu);
5258                 if (WARN_ON(IS_ERR(rb_threads[cpu]))) {
5259                         pr_cont("FAILED\n");
5260                         ret = PTR_ERR(rb_threads[cpu]);
5261                         goto out_free;
5262                 }
5263
5264                 kthread_bind(rb_threads[cpu], cpu);
5265                 wake_up_process(rb_threads[cpu]);
5266         }
5267
5268         /* Now create the rb hammer! */
5269         rb_hammer = kthread_run(rb_hammer_test, NULL, "rbhammer");
5270         if (WARN_ON(IS_ERR(rb_hammer))) {
5271                 pr_cont("FAILED\n");
5272                 ret = PTR_ERR(rb_hammer);
5273                 goto out_free;
5274         }
5275
5276         ring_buffer_record_on(buffer);
5277         /*
5278          * Show buffer is enabled before setting rb_test_started.
5279          * Yes there's a small race window where events could be
5280          * dropped and the thread wont catch it. But when a ring
5281          * buffer gets enabled, there will always be some kind of
5282          * delay before other CPUs see it. Thus, we don't care about
5283          * those dropped events. We care about events dropped after
5284          * the threads see that the buffer is active.
5285          */
5286         smp_wmb();
5287         rb_test_started = true;
5288
5289         set_current_state(TASK_INTERRUPTIBLE);
5290         /* Just run for 10 seconds */;
5291         schedule_timeout(10 * HZ);
5292
5293         kthread_stop(rb_hammer);
5294
5295  out_free:
5296         for_each_online_cpu(cpu) {
5297                 if (!rb_threads[cpu])
5298                         break;
5299                 kthread_stop(rb_threads[cpu]);
5300         }
5301         if (ret) {
5302                 ring_buffer_free(buffer);
5303                 return ret;
5304         }
5305
5306         /* Report! */
5307         pr_info("finished\n");
5308         for_each_online_cpu(cpu) {
5309                 struct ring_buffer_event *event;
5310                 struct rb_test_data *data = &rb_data[cpu];
5311                 struct rb_item *item;
5312                 unsigned long total_events;
5313                 unsigned long total_dropped;
5314                 unsigned long total_written;
5315                 unsigned long total_alloc;
5316                 unsigned long total_read = 0;
5317                 unsigned long total_size = 0;
5318                 unsigned long total_len = 0;
5319                 unsigned long total_lost = 0;
5320                 unsigned long lost;
5321                 int big_event_size;
5322                 int small_event_size;
5323
5324                 ret = -1;
5325
5326                 total_events = data->events + data->events_nested;
5327                 total_written = data->bytes_written + data->bytes_written_nested;
5328                 total_alloc = data->bytes_alloc + data->bytes_alloc_nested;
5329                 total_dropped = data->bytes_dropped + data->bytes_dropped_nested;
5330
5331                 big_event_size = data->max_size + data->max_size_nested;
5332                 small_event_size = data->min_size + data->min_size_nested;
5333
5334                 pr_info("CPU %d:\n", cpu);
5335                 pr_info("              events:    %ld\n", total_events);
5336                 pr_info("       dropped bytes:    %ld\n", total_dropped);
5337                 pr_info("       alloced bytes:    %ld\n", total_alloc);
5338                 pr_info("       written bytes:    %ld\n", total_written);
5339                 pr_info("       biggest event:    %d\n", big_event_size);
5340                 pr_info("      smallest event:    %d\n", small_event_size);
5341
5342                 if (RB_WARN_ON(buffer, total_dropped))
5343                         break;
5344
5345                 ret = 0;
5346
5347                 while ((event = ring_buffer_consume(buffer, cpu, NULL, &lost))) {
5348                         total_lost += lost;
5349                         item = ring_buffer_event_data(event);
5350                         total_len += ring_buffer_event_length(event);
5351                         total_size += item->size + sizeof(struct rb_item);
5352                         if (memcmp(&item->str[0], rb_string, item->size) != 0) {
5353                                 pr_info("FAILED!\n");
5354                                 pr_info("buffer had: %.*s\n", item->size, item->str);
5355                                 pr_info("expected:   %.*s\n", item->size, rb_string);
5356                                 RB_WARN_ON(buffer, 1);
5357                                 ret = -1;
5358                                 break;
5359                         }
5360                         total_read++;
5361                 }
5362                 if (ret)
5363                         break;
5364
5365                 ret = -1;
5366
5367                 pr_info("         read events:   %ld\n", total_read);
5368                 pr_info("         lost events:   %ld\n", total_lost);
5369                 pr_info("        total events:   %ld\n", total_lost + total_read);
5370                 pr_info("  recorded len bytes:   %ld\n", total_len);
5371                 pr_info(" recorded size bytes:   %ld\n", total_size);
5372                 if (total_lost)
5373                         pr_info(" With dropped events, record len and size may not match\n"
5374                                 " alloced and written from above\n");
5375                 if (!total_lost) {
5376                         if (RB_WARN_ON(buffer, total_len != total_alloc ||
5377                                        total_size != total_written))
5378                                 break;
5379                 }
5380                 if (RB_WARN_ON(buffer, total_lost + total_read != total_events))
5381                         break;
5382
5383                 ret = 0;
5384         }
5385         if (!ret)
5386                 pr_info("Ring buffer PASSED!\n");
5387
5388         ring_buffer_free(buffer);
5389         return 0;
5390 }
5391
5392 late_initcall(test_ringbuffer);
5393 #endif /* CONFIG_RING_BUFFER_STARTUP_TEST */