Linux Kernel  3.7.1
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Groups Pages
ring_buffer.c
Go to the documentation of this file.
1 /*
2  * Generic ring buffer
3  *
4  * Copyright (C) 2008 Steven Rostedt <[email protected]>
5  */
6 #include <linux/ring_buffer.h>
7 #include <linux/trace_clock.h>
8 #include <linux/spinlock.h>
9 #include <linux/debugfs.h>
10 #include <linux/uaccess.h>
11 #include <linux/hardirq.h>
12 #include <linux/kmemcheck.h>
13 #include <linux/module.h>
14 #include <linux/percpu.h>
15 #include <linux/mutex.h>
16 #include <linux/slab.h>
17 #include <linux/init.h>
18 #include <linux/hash.h>
19 #include <linux/list.h>
20 #include <linux/cpu.h>
21 #include <linux/fs.h>
22 
23 #include <asm/local.h>
24 #include "trace.h"
25 
26 static void update_pages_handler(struct work_struct *work);
27 
28 /*
29  * The ring buffer header is special. We must manually up keep it.
30  */
32 {
33  int ret;
34 
35  ret = trace_seq_printf(s, "# compressed entry header\n");
36  ret = trace_seq_printf(s, "\ttype_len : 5 bits\n");
37  ret = trace_seq_printf(s, "\ttime_delta : 27 bits\n");
38  ret = trace_seq_printf(s, "\tarray : 32 bits\n");
39  ret = trace_seq_printf(s, "\n");
40  ret = trace_seq_printf(s, "\tpadding : type == %d\n",
42  ret = trace_seq_printf(s, "\ttime_extend : type == %d\n",
44  ret = trace_seq_printf(s, "\tdata max type_len == %d\n",
46 
47  return ret;
48 }
49 
50 /*
51  * The ring buffer is made up of a list of pages. A separate list of pages is
52  * allocated for each CPU. A writer may only write to a buffer that is
53  * associated with the CPU it is currently executing on. A reader may read
54  * from any per cpu buffer.
55  *
56  * The reader is special. For each per cpu buffer, the reader has its own
57  * reader page. When a reader has read the entire reader page, this reader
58  * page is swapped with another page in the ring buffer.
59  *
60  * Now, as long as the writer is off the reader page, the reader can do what
61  * ever it wants with that page. The writer will never write to that page
62  * again (as long as it is out of the ring buffer).
63  *
64  * Here's some silly ASCII art.
65  *
66  * +------+
67  * |reader| RING BUFFER
68  * |page |
69  * +------+ +---+ +---+ +---+
70  * | |-->| |-->| |
71  * +---+ +---+ +---+
72  * ^ |
73  * | |
74  * +---------------+
75  *
76  *
77  * +------+
78  * |reader| RING BUFFER
79  * |page |------------------v
80  * +------+ +---+ +---+ +---+
81  * | |-->| |-->| |
82  * +---+ +---+ +---+
83  * ^ |
84  * | |
85  * +---------------+
86  *
87  *
88  * +------+
89  * |reader| RING BUFFER
90  * |page |------------------v
91  * +------+ +---+ +---+ +---+
92  * ^ | |-->| |-->| |
93  * | +---+ +---+ +---+
94  * | |
95  * | |
96  * +------------------------------+
97  *
98  *
99  * +------+
100  * |buffer| RING BUFFER
101  * |page |------------------v
102  * +------+ +---+ +---+ +---+
103  * ^ | | | |-->| |
104  * | New +---+ +---+ +---+
105  * | Reader------^ |
106  * | page |
107  * +------------------------------+
108  *
109  *
110  * After we make this swap, the reader can hand this page off to the splice
111  * code and be done with it. It can even allocate a new page if it needs to
112  * and swap that into the ring buffer.
113  *
114  * We will be using cmpxchg soon to make all this lockless.
115  *
116  */
117 
118 /*
119  * A fast way to enable or disable all ring buffers is to
120  * call tracing_on or tracing_off. Turning off the ring buffers
121  * prevents all ring buffers from being recorded to.
122  * Turning this switch on, makes it OK to write to the
123  * ring buffer, if the ring buffer is enabled itself.
124  *
125  * There's three layers that must be on in order to write
126  * to the ring buffer.
127  *
128  * 1) This global flag must be set.
129  * 2) The ring buffer must be enabled for recording.
130  * 3) The per cpu buffer must be enabled for recording.
131  *
132  * In case of an anomaly, this global flag has a bit set that
133  * will permantly disable all ring buffers.
134  */
135 
136 /*
137  * Global flag to disable all recording to ring buffers
138  * This has two bits: ON, DISABLED
139  *
140  * ON DISABLED
141  * ---- ----------
142  * 0 0 : ring buffers are off
143  * 1 0 : ring buffers are on
144  * X 1 : ring buffers are permanently disabled
145  */
146 
147 enum {
150 };
151 
152 enum {
155 };
156 
157 static unsigned long ring_buffer_flags __read_mostly = RB_BUFFERS_ON;
158 
159 /* Used for individual buffers (after the counter) */
160 #define RB_BUFFER_OFF (1 << 20)
161 
162 #define BUF_PAGE_HDR_SIZE offsetof(struct buffer_data_page, data)
163 
171 {
173 }
174 
175 #define RB_EVNT_HDR_SIZE (offsetof(struct ring_buffer_event, array))
176 #define RB_ALIGNMENT 4U
177 #define RB_MAX_SMALL_DATA (RB_ALIGNMENT * RINGBUF_TYPE_DATA_TYPE_LEN_MAX)
178 #define RB_EVNT_MIN_SIZE 8U /* two 32bit words */
179 
180 #if !defined(CONFIG_64BIT) || defined(CONFIG_HAVE_EFFICIENT_UNALIGNED_ACCESS)
181 # define RB_FORCE_8BYTE_ALIGNMENT 0
182 # define RB_ARCH_ALIGNMENT RB_ALIGNMENT
183 #else
184 # define RB_FORCE_8BYTE_ALIGNMENT 1
185 # define RB_ARCH_ALIGNMENT 8U
186 #endif
187 
188 /* define RINGBUF_TYPE_DATA for 'case RINGBUF_TYPE_DATA:' */
189 #define RINGBUF_TYPE_DATA 0 ... RINGBUF_TYPE_DATA_TYPE_LEN_MAX
190 
191 enum {
194 };
195 
196 #define skip_time_extend(event) \
197  ((struct ring_buffer_event *)((char *)event + RB_LEN_TIME_EXTEND))
198 
199 static inline int rb_null_event(struct ring_buffer_event *event)
200 {
201  return event->type_len == RINGBUF_TYPE_PADDING && !event->time_delta;
202 }
203 
204 static void rb_event_set_padding(struct ring_buffer_event *event)
205 {
206  /* padding has a NULL time_delta */
207  event->type_len = RINGBUF_TYPE_PADDING;
208  event->time_delta = 0;
209 }
210 
211 static unsigned
212 rb_event_data_length(struct ring_buffer_event *event)
213 {
214  unsigned length;
215 
216  if (event->type_len)
217  length = event->type_len * RB_ALIGNMENT;
218  else
219  length = event->array[0];
220  return length + RB_EVNT_HDR_SIZE;
221 }
222 
223 /*
224  * Return the length of the given event. Will return
225  * the length of the time extend if the event is a
226  * time extend.
227  */
228 static inline unsigned
229 rb_event_length(struct ring_buffer_event *event)
230 {
231  switch (event->type_len) {
233  if (rb_null_event(event))
234  /* undefined */
235  return -1;
236  return event->array[0] + RB_EVNT_HDR_SIZE;
237 
239  return RB_LEN_TIME_EXTEND;
240 
242  return RB_LEN_TIME_STAMP;
243 
244  case RINGBUF_TYPE_DATA:
245  return rb_event_data_length(event);
246  default:
247  BUG();
248  }
249  /* not hit */
250  return 0;
251 }
252 
253 /*
254  * Return total length of time extend and data,
255  * or just the event length for all other events.
256  */
257 static inline unsigned
258 rb_event_ts_length(struct ring_buffer_event *event)
259 {
260  unsigned len = 0;
261 
262  if (event->type_len == RINGBUF_TYPE_TIME_EXTEND) {
263  /* time extends include the data event after it */
264  len = RB_LEN_TIME_EXTEND;
265  event = skip_time_extend(event);
266  }
267  return len + rb_event_length(event);
268 }
269 
281 {
282  unsigned length;
283 
284  if (event->type_len == RINGBUF_TYPE_TIME_EXTEND)
285  event = skip_time_extend(event);
286 
287  length = rb_event_length(event);
289  return length;
290  length -= RB_EVNT_HDR_SIZE;
291  if (length > RB_MAX_SMALL_DATA + sizeof(event->array[0]))
292  length -= sizeof(event->array[0]);
293  return length;
294 }
296 
297 /* inline for ring buffer fast paths */
298 static void *
299 rb_event_data(struct ring_buffer_event *event)
300 {
301  if (event->type_len == RINGBUF_TYPE_TIME_EXTEND)
302  event = skip_time_extend(event);
304  /* If length is in len field, then array[0] has the data */
305  if (event->type_len)
306  return (void *)&event->array[0];
307  /* Otherwise length is in array[0] and array[1] has the data */
308  return (void *)&event->array[1];
309 }
310 
316 {
317  return rb_event_data(event);
318 }
320 
321 #define for_each_buffer_cpu(buffer, cpu) \
322  for_each_cpu(cpu, buffer->cpumask)
323 
324 #define TS_SHIFT 27
325 #define TS_MASK ((1ULL << TS_SHIFT) - 1)
326 #define TS_DELTA_TEST (~TS_MASK)
327 
328 /* Flag when events were overwritten */
329 #define RB_MISSED_EVENTS (1 << 31)
330 /* Missed count stored at end */
331 #define RB_MISSED_STORED (1 << 30)
332 
334  u64 time_stamp; /* page time stamp */
335  local_t commit; /* write committed index */
336  unsigned char data[]; /* data of buffer page */
337 };
338 
339 /*
340  * Note, the buffer_page list must be first. The buffer pages
341  * are allocated in cache lines, which means that each buffer
342  * page will be at the beginning of a cache line, and thus
343  * the least significant bits will be zero. We use this to
344  * add flags in the list struct pointers, to make the ring buffer
345  * lockless.
346  */
347 struct buffer_page {
348  struct list_head list; /* list of buffer pages */
349  local_t write; /* index for next write */
350  unsigned read; /* index for next read */
351  local_t entries; /* entries on this page */
352  unsigned long real_end; /* real end of data */
353  struct buffer_data_page *page; /* Actual data page */
354 };
355 
356 /*
357  * The buffer page counters, write and entries, must be reset
358  * atomically when crossing page boundaries. To synchronize this
359  * update, two counters are inserted into the number. One is
360  * the actual counter for the write position or count on the page.
361  *
362  * The other is a counter of updaters. Before an update happens
363  * the update partition of the counter is incremented. This will
364  * allow the updater to update the counter atomically.
365  *
366  * The counter is 20 bits, and the state data is 12.
367  */
368 #define RB_WRITE_MASK 0xfffff
369 #define RB_WRITE_INTCNT (1 << 20)
370 
371 static void rb_init_page(struct buffer_data_page *bpage)
372 {
373  local_set(&bpage->commit, 0);
374 }
375 
383 {
384  return local_read(&((struct buffer_data_page *)page)->commit)
386 }
387 
388 /*
389  * Also stolen from mm/slob.c. Thanks to Mathieu Desnoyers for pointing
390  * this issue out.
391  */
392 static void free_buffer_page(struct buffer_page *bpage)
393 {
394  free_page((unsigned long)bpage->page);
395  kfree(bpage);
396 }
397 
398 /*
399  * We need to fit the time_stamp delta into 27 bits.
400  */
401 static inline int test_time_stamp(u64 delta)
402 {
403  if (delta & TS_DELTA_TEST)
404  return 1;
405  return 0;
406 }
407 
408 #define BUF_PAGE_SIZE (PAGE_SIZE - BUF_PAGE_HDR_SIZE)
409 
410 /* Max payload is BUF_PAGE_SIZE - header (8bytes) */
411 #define BUF_MAX_DATA_SIZE (BUF_PAGE_SIZE - (sizeof(u32) * 2))
412 
414 {
415  struct buffer_data_page field;
416  int ret;
417 
418  ret = trace_seq_printf(s, "\tfield: u64 timestamp;\t"
419  "offset:0;\tsize:%u;\tsigned:%u;\n",
420  (unsigned int)sizeof(field.time_stamp),
421  (unsigned int)is_signed_type(u64));
422 
423  ret = trace_seq_printf(s, "\tfield: local_t commit;\t"
424  "offset:%u;\tsize:%u;\tsigned:%u;\n",
425  (unsigned int)offsetof(typeof(field), commit),
426  (unsigned int)sizeof(field.commit),
427  (unsigned int)is_signed_type(long));
428 
429  ret = trace_seq_printf(s, "\tfield: int overwrite;\t"
430  "offset:%u;\tsize:%u;\tsigned:%u;\n",
431  (unsigned int)offsetof(typeof(field), commit),
432  1,
433  (unsigned int)is_signed_type(long));
434 
435  ret = trace_seq_printf(s, "\tfield: char data;\t"
436  "offset:%u;\tsize:%u;\tsigned:%u;\n",
437  (unsigned int)offsetof(typeof(field), data),
438  (unsigned int)BUF_PAGE_SIZE,
439  (unsigned int)is_signed_type(char));
440 
441  return ret;
442 }
443 
444 /*
445  * head_page == tail_page && head == tail then buffer is empty.
446  */
448  int cpu;
451  raw_spinlock_t reader_lock; /* serialize readers */
454  unsigned int nr_pages;
455  struct list_head *pages;
456  struct buffer_page *head_page; /* read from head */
457  struct buffer_page *tail_page; /* write to tail */
458  struct buffer_page *commit_page; /* committed pages */
460  unsigned long lost_events;
461  unsigned long last_overrun;
468  unsigned long read;
469  unsigned long read_bytes;
472  /* ring buffer pages to update, > 0 to add, < 0 to remove */
474  struct list_head new_pages; /* new pages to add */
477 };
478 
479 struct ring_buffer {
480  unsigned flags;
481  int cpus;
485 
487 
488  struct mutex mutex;
489 
491 
492 #ifdef CONFIG_HOTPLUG_CPU
493  struct notifier_block cpu_notify;
494 #endif
496 };
497 
500  unsigned long head;
503  unsigned long cache_read;
505 };
506 
507 /* buffer may be either ring_buffer or ring_buffer_per_cpu */
508 #define RB_WARN_ON(b, cond) \
509  ({ \
510  int _____ret = unlikely(cond); \
511  if (_____ret) { \
512  if (__same_type(*(b), struct ring_buffer_per_cpu)) { \
513  struct ring_buffer_per_cpu *__b = \
514  (void *)b; \
515  atomic_inc(&__b->buffer->record_disabled); \
516  } else \
517  atomic_inc(&b->record_disabled); \
518  WARN_ON(1); \
519  } \
520  _____ret; \
521  })
522 
523 /* Up this if you want to test the TIME_EXTENTS and normalization */
524 #define DEBUG_SHIFT 0
525 
526 static inline u64 rb_time_stamp(struct ring_buffer *buffer)
527 {
528  /* shift to debug/test normalization and TIME_EXTENTS */
529  return buffer->clock() << DEBUG_SHIFT;
530 }
531 
533 {
534  u64 time;
535 
537  time = rb_time_stamp(buffer);
539 
540  return time;
541 }
543 
545  int cpu, u64 *ts)
546 {
547  /* Just stupid testing the normalize function and deltas */
548  *ts >>= DEBUG_SHIFT;
549 }
551 
552 /*
553  * Making the ring buffer lockless makes things tricky.
554  * Although writes only happen on the CPU that they are on,
555  * and they only need to worry about interrupts. Reads can
556  * happen on any CPU.
557  *
558  * The reader page is always off the ring buffer, but when the
559  * reader finishes with a page, it needs to swap its page with
560  * a new one from the buffer. The reader needs to take from
561  * the head (writes go to the tail). But if a writer is in overwrite
562  * mode and wraps, it must push the head page forward.
563  *
564  * Here lies the problem.
565  *
566  * The reader must be careful to replace only the head page, and
567  * not another one. As described at the top of the file in the
568  * ASCII art, the reader sets its old page to point to the next
569  * page after head. It then sets the page after head to point to
570  * the old reader page. But if the writer moves the head page
571  * during this operation, the reader could end up with the tail.
572  *
573  * We use cmpxchg to help prevent this race. We also do something
574  * special with the page before head. We set the LSB to 1.
575  *
576  * When the writer must push the page forward, it will clear the
577  * bit that points to the head page, move the head, and then set
578  * the bit that points to the new head page.
579  *
580  * We also don't want an interrupt coming in and moving the head
581  * page on another writer. Thus we use the second LSB to catch
582  * that too. Thus:
583  *
584  * head->list->prev->next bit 1 bit 0
585  * ------- -------
586  * Normal page 0 0
587  * Points to head page 0 1
588  * New head page 1 0
589  *
590  * Note we can not trust the prev pointer of the head page, because:
591  *
592  * +----+ +-----+ +-----+
593  * | |------>| T |---X--->| N |
594  * | |<------| | | |
595  * +----+ +-----+ +-----+
596  * ^ ^ |
597  * | +-----+ | |
598  * +----------| R |----------+ |
599  * | |<-----------+
600  * +-----+
601  *
602  * Key: ---X--> HEAD flag set in pointer
603  * T Tail page
604  * R Reader page
605  * N Next page
606  *
607  * (see __rb_reserve_next() to see where this happens)
608  *
609  * What the above shows is that the reader just swapped out
610  * the reader page with a page in the buffer, but before it
611  * could make the new header point back to the new page added
612  * it was preempted by a writer. The writer moved forward onto
613  * the new page added by the reader and is about to move forward
614  * again.
615  *
616  * You can see, it is legitimate for the previous pointer of
617  * the head (or any page) not to point back to itself. But only
618  * temporarially.
619  */
620 
621 #define RB_PAGE_NORMAL 0UL
622 #define RB_PAGE_HEAD 1UL
623 #define RB_PAGE_UPDATE 2UL
624 
625 
626 #define RB_FLAG_MASK 3UL
627 
628 /* PAGE_MOVED is not part of the mask */
629 #define RB_PAGE_MOVED 4UL
630 
631 /*
632  * rb_list_head - remove any bit
633  */
634 static struct list_head *rb_list_head(struct list_head *list)
635 {
636  unsigned long val = (unsigned long)list;
637 
638  return (struct list_head *)(val & ~RB_FLAG_MASK);
639 }
640 
641 /*
642  * rb_is_head_page - test if the given page is the head page
643  *
644  * Because the reader may move the head_page pointer, we can
645  * not trust what the head page is (it may be pointing to
646  * the reader page). But if the next page is a header page,
647  * its flags will be non zero.
648  */
649 static inline int
650 rb_is_head_page(struct ring_buffer_per_cpu *cpu_buffer,
651  struct buffer_page *page, struct list_head *list)
652 {
653  unsigned long val;
654 
655  val = (unsigned long)list->next;
656 
657  if ((val & ~RB_FLAG_MASK) != (unsigned long)&page->list)
658  return RB_PAGE_MOVED;
659 
660  return val & RB_FLAG_MASK;
661 }
662 
663 /*
664  * rb_is_reader_page
665  *
666  * The unique thing about the reader page, is that, if the
667  * writer is ever on it, the previous pointer never points
668  * back to the reader page.
669  */
670 static int rb_is_reader_page(struct buffer_page *page)
671 {
672  struct list_head *list = page->list.prev;
673 
674  return rb_list_head(list->next) != &page->list;
675 }
676 
677 /*
678  * rb_set_list_to_head - set a list_head to be pointing to head.
679  */
680 static void rb_set_list_to_head(struct ring_buffer_per_cpu *cpu_buffer,
681  struct list_head *list)
682 {
683  unsigned long *ptr;
684 
685  ptr = (unsigned long *)&list->next;
686  *ptr |= RB_PAGE_HEAD;
687  *ptr &= ~RB_PAGE_UPDATE;
688 }
689 
690 /*
691  * rb_head_page_activate - sets up head page
692  */
693 static void rb_head_page_activate(struct ring_buffer_per_cpu *cpu_buffer)
694 {
695  struct buffer_page *head;
696 
697  head = cpu_buffer->head_page;
698  if (!head)
699  return;
700 
701  /*
702  * Set the previous list pointer to have the HEAD flag.
703  */
704  rb_set_list_to_head(cpu_buffer, head->list.prev);
705 }
706 
707 static void rb_list_head_clear(struct list_head *list)
708 {
709  unsigned long *ptr = (unsigned long *)&list->next;
710 
711  *ptr &= ~RB_FLAG_MASK;
712 }
713 
714 /*
715  * rb_head_page_dactivate - clears head page ptr (for free list)
716  */
717 static void
718 rb_head_page_deactivate(struct ring_buffer_per_cpu *cpu_buffer)
719 {
720  struct list_head *hd;
721 
722  /* Go through the whole list and clear any pointers found. */
723  rb_list_head_clear(cpu_buffer->pages);
724 
725  list_for_each(hd, cpu_buffer->pages)
726  rb_list_head_clear(hd);
727 }
728 
729 static int rb_head_page_set(struct ring_buffer_per_cpu *cpu_buffer,
730  struct buffer_page *head,
732  int old_flag, int new_flag)
733 {
734  struct list_head *list;
735  unsigned long val = (unsigned long)&head->list;
736  unsigned long ret;
737 
738  list = &prev->list;
739 
740  val &= ~RB_FLAG_MASK;
741 
742  ret = cmpxchg((unsigned long *)&list->next,
743  val | old_flag, val | new_flag);
744 
745  /* check if the reader took the page */
746  if ((ret & ~RB_FLAG_MASK) != val)
747  return RB_PAGE_MOVED;
748 
749  return ret & RB_FLAG_MASK;
750 }
751 
752 static int rb_head_page_set_update(struct ring_buffer_per_cpu *cpu_buffer,
753  struct buffer_page *head,
754  struct buffer_page *prev,
755  int old_flag)
756 {
757  return rb_head_page_set(cpu_buffer, head, prev,
758  old_flag, RB_PAGE_UPDATE);
759 }
760 
761 static int rb_head_page_set_head(struct ring_buffer_per_cpu *cpu_buffer,
762  struct buffer_page *head,
763  struct buffer_page *prev,
764  int old_flag)
765 {
766  return rb_head_page_set(cpu_buffer, head, prev,
767  old_flag, RB_PAGE_HEAD);
768 }
769 
770 static int rb_head_page_set_normal(struct ring_buffer_per_cpu *cpu_buffer,
771  struct buffer_page *head,
772  struct buffer_page *prev,
773  int old_flag)
774 {
775  return rb_head_page_set(cpu_buffer, head, prev,
776  old_flag, RB_PAGE_NORMAL);
777 }
778 
779 static inline void rb_inc_page(struct ring_buffer_per_cpu *cpu_buffer,
780  struct buffer_page **bpage)
781 {
782  struct list_head *p = rb_list_head((*bpage)->list.next);
783 
784  *bpage = list_entry(p, struct buffer_page, list);
785 }
786 
787 static struct buffer_page *
788 rb_set_head_page(struct ring_buffer_per_cpu *cpu_buffer)
789 {
790  struct buffer_page *head;
791  struct buffer_page *page;
792  struct list_head *list;
793  int i;
794 
795  if (RB_WARN_ON(cpu_buffer, !cpu_buffer->head_page))
796  return NULL;
797 
798  /* sanity check */
799  list = cpu_buffer->pages;
800  if (RB_WARN_ON(cpu_buffer, rb_list_head(list->prev->next) != list))
801  return NULL;
802 
803  page = head = cpu_buffer->head_page;
804  /*
805  * It is possible that the writer moves the header behind
806  * where we started, and we miss in one loop.
807  * A second loop should grab the header, but we'll do
808  * three loops just because I'm paranoid.
809  */
810  for (i = 0; i < 3; i++) {
811  do {
812  if (rb_is_head_page(cpu_buffer, page, page->list.prev)) {
813  cpu_buffer->head_page = page;
814  return page;
815  }
816  rb_inc_page(cpu_buffer, &page);
817  } while (page != head);
818  }
819 
820  RB_WARN_ON(cpu_buffer, 1);
821 
822  return NULL;
823 }
824 
825 static int rb_head_page_replace(struct buffer_page *old,
826  struct buffer_page *new)
827 {
828  unsigned long *ptr = (unsigned long *)&old->list.prev->next;
829  unsigned long val;
830  unsigned long ret;
831 
832  val = *ptr & ~RB_FLAG_MASK;
833  val |= RB_PAGE_HEAD;
834 
835  ret = cmpxchg(ptr, val, (unsigned long)&new->list);
836 
837  return ret == val;
838 }
839 
840 /*
841  * rb_tail_page_update - move the tail page forward
842  *
843  * Returns 1 if moved tail page, 0 if someone else did.
844  */
845 static int rb_tail_page_update(struct ring_buffer_per_cpu *cpu_buffer,
846  struct buffer_page *tail_page,
847  struct buffer_page *next_page)
848 {
849  struct buffer_page *old_tail;
850  unsigned long old_entries;
851  unsigned long old_write;
852  int ret = 0;
853 
854  /*
855  * The tail page now needs to be moved forward.
856  *
857  * We need to reset the tail page, but without messing
858  * with possible erasing of data brought in by interrupts
859  * that have moved the tail page and are currently on it.
860  *
861  * We add a counter to the write field to denote this.
862  */
863  old_write = local_add_return(RB_WRITE_INTCNT, &next_page->write);
864  old_entries = local_add_return(RB_WRITE_INTCNT, &next_page->entries);
865 
866  /*
867  * Just make sure we have seen our old_write and synchronize
868  * with any interrupts that come in.
869  */
870  barrier();
871 
872  /*
873  * If the tail page is still the same as what we think
874  * it is, then it is up to us to update the tail
875  * pointer.
876  */
877  if (tail_page == cpu_buffer->tail_page) {
878  /* Zero the write counter */
879  unsigned long val = old_write & ~RB_WRITE_MASK;
880  unsigned long eval = old_entries & ~RB_WRITE_MASK;
881 
882  /*
883  * This will only succeed if an interrupt did
884  * not come in and change it. In which case, we
885  * do not want to modify it.
886  *
887  * We add (void) to let the compiler know that we do not care
888  * about the return value of these functions. We use the
889  * cmpxchg to only update if an interrupt did not already
890  * do it for us. If the cmpxchg fails, we don't care.
891  */
892  (void)local_cmpxchg(&next_page->write, old_write, val);
893  (void)local_cmpxchg(&next_page->entries, old_entries, eval);
894 
895  /*
896  * No need to worry about races with clearing out the commit.
897  * it only can increment when a commit takes place. But that
898  * only happens in the outer most nested commit.
899  */
900  local_set(&next_page->page->commit, 0);
901 
902  old_tail = cmpxchg(&cpu_buffer->tail_page,
903  tail_page, next_page);
904 
905  if (old_tail == tail_page)
906  ret = 1;
907  }
908 
909  return ret;
910 }
911 
912 static int rb_check_bpage(struct ring_buffer_per_cpu *cpu_buffer,
913  struct buffer_page *bpage)
914 {
915  unsigned long val = (unsigned long)bpage;
916 
917  if (RB_WARN_ON(cpu_buffer, val & RB_FLAG_MASK))
918  return 1;
919 
920  return 0;
921 }
922 
926 static int rb_check_list(struct ring_buffer_per_cpu *cpu_buffer,
927  struct list_head *list)
928 {
929  if (RB_WARN_ON(cpu_buffer, rb_list_head(list->prev) != list->prev))
930  return 1;
931  if (RB_WARN_ON(cpu_buffer, rb_list_head(list->next) != list->next))
932  return 1;
933  return 0;
934 }
935 
943 static int rb_check_pages(struct ring_buffer_per_cpu *cpu_buffer)
944 {
945  struct list_head *head = cpu_buffer->pages;
946  struct buffer_page *bpage, *tmp;
947 
948  /* Reset the head page if it exists */
949  if (cpu_buffer->head_page)
950  rb_set_head_page(cpu_buffer);
951 
952  rb_head_page_deactivate(cpu_buffer);
953 
954  if (RB_WARN_ON(cpu_buffer, head->next->prev != head))
955  return -1;
956  if (RB_WARN_ON(cpu_buffer, head->prev->next != head))
957  return -1;
958 
959  if (rb_check_list(cpu_buffer, head))
960  return -1;
961 
962  list_for_each_entry_safe(bpage, tmp, head, list) {
963  if (RB_WARN_ON(cpu_buffer,
964  bpage->list.next->prev != &bpage->list))
965  return -1;
966  if (RB_WARN_ON(cpu_buffer,
967  bpage->list.prev->next != &bpage->list))
968  return -1;
969  if (rb_check_list(cpu_buffer, &bpage->list))
970  return -1;
971  }
972 
973  rb_head_page_activate(cpu_buffer);
974 
975  return 0;
976 }
977 
978 static int __rb_allocate_pages(int nr_pages, struct list_head *pages, int cpu)
979 {
980  int i;
981  struct buffer_page *bpage, *tmp;
982 
983  for (i = 0; i < nr_pages; i++) {
984  struct page *page;
985  /*
986  * __GFP_NORETRY flag makes sure that the allocation fails
987  * gracefully without invoking oom-killer and the system is
988  * not destabilized.
989  */
990  bpage = kzalloc_node(ALIGN(sizeof(*bpage), cache_line_size()),
992  cpu_to_node(cpu));
993  if (!bpage)
994  goto free_pages;
995 
996  list_add(&bpage->list, pages);
997 
998  page = alloc_pages_node(cpu_to_node(cpu),
1000  if (!page)
1001  goto free_pages;
1002  bpage->page = page_address(page);
1003  rb_init_page(bpage->page);
1004  }
1005 
1006  return 0;
1007 
1008 free_pages:
1009  list_for_each_entry_safe(bpage, tmp, pages, list) {
1010  list_del_init(&bpage->list);
1011  free_buffer_page(bpage);
1012  }
1013 
1014  return -ENOMEM;
1015 }
1016 
1017 static int rb_allocate_pages(struct ring_buffer_per_cpu *cpu_buffer,
1018  unsigned nr_pages)
1019 {
1020  LIST_HEAD(pages);
1021 
1022  WARN_ON(!nr_pages);
1023 
1024  if (__rb_allocate_pages(nr_pages, &pages, cpu_buffer->cpu))
1025  return -ENOMEM;
1026 
1027  /*
1028  * The ring buffer page list is a circular list that does not
1029  * start and end with a list head. All page list items point to
1030  * other pages.
1031  */
1032  cpu_buffer->pages = pages.next;
1033  list_del(&pages);
1034 
1035  cpu_buffer->nr_pages = nr_pages;
1036 
1037  rb_check_pages(cpu_buffer);
1038 
1039  return 0;
1040 }
1041 
1042 static struct ring_buffer_per_cpu *
1043 rb_allocate_cpu_buffer(struct ring_buffer *buffer, int nr_pages, int cpu)
1044 {
1045  struct ring_buffer_per_cpu *cpu_buffer;
1046  struct buffer_page *bpage;
1047  struct page *page;
1048  int ret;
1049 
1050  cpu_buffer = kzalloc_node(ALIGN(sizeof(*cpu_buffer), cache_line_size()),
1051  GFP_KERNEL, cpu_to_node(cpu));
1052  if (!cpu_buffer)
1053  return NULL;
1054 
1055  cpu_buffer->cpu = cpu;
1056  cpu_buffer->buffer = buffer;
1057  raw_spin_lock_init(&cpu_buffer->reader_lock);
1058  lockdep_set_class(&cpu_buffer->reader_lock, buffer->reader_lock_key);
1060  INIT_WORK(&cpu_buffer->update_pages_work, update_pages_handler);
1061  init_completion(&cpu_buffer->update_done);
1062 
1063  bpage = kzalloc_node(ALIGN(sizeof(*bpage), cache_line_size()),
1064  GFP_KERNEL, cpu_to_node(cpu));
1065  if (!bpage)
1066  goto fail_free_buffer;
1067 
1068  rb_check_bpage(cpu_buffer, bpage);
1069 
1070  cpu_buffer->reader_page = bpage;
1071  page = alloc_pages_node(cpu_to_node(cpu), GFP_KERNEL, 0);
1072  if (!page)
1073  goto fail_free_reader;
1074  bpage->page = page_address(page);
1075  rb_init_page(bpage->page);
1076 
1077  INIT_LIST_HEAD(&cpu_buffer->reader_page->list);
1078  INIT_LIST_HEAD(&cpu_buffer->new_pages);
1079 
1080  ret = rb_allocate_pages(cpu_buffer, nr_pages);
1081  if (ret < 0)
1082  goto fail_free_reader;
1083 
1084  cpu_buffer->head_page
1085  = list_entry(cpu_buffer->pages, struct buffer_page, list);
1086  cpu_buffer->tail_page = cpu_buffer->commit_page = cpu_buffer->head_page;
1087 
1088  rb_head_page_activate(cpu_buffer);
1089 
1090  return cpu_buffer;
1091 
1092  fail_free_reader:
1093  free_buffer_page(cpu_buffer->reader_page);
1094 
1095  fail_free_buffer:
1096  kfree(cpu_buffer);
1097  return NULL;
1098 }
1099 
1100 static void rb_free_cpu_buffer(struct ring_buffer_per_cpu *cpu_buffer)
1101 {
1102  struct list_head *head = cpu_buffer->pages;
1103  struct buffer_page *bpage, *tmp;
1104 
1105  free_buffer_page(cpu_buffer->reader_page);
1106 
1107  rb_head_page_deactivate(cpu_buffer);
1108 
1109  if (head) {
1110  list_for_each_entry_safe(bpage, tmp, head, list) {
1111  list_del_init(&bpage->list);
1112  free_buffer_page(bpage);
1113  }
1114  bpage = list_entry(head, struct buffer_page, list);
1115  free_buffer_page(bpage);
1116  }
1117 
1118  kfree(cpu_buffer);
1119 }
1120 
1121 #ifdef CONFIG_HOTPLUG_CPU
1122 static int rb_cpu_notify(struct notifier_block *self,
1123  unsigned long action, void *hcpu);
1124 #endif
1125 
1136 struct ring_buffer *__ring_buffer_alloc(unsigned long size, unsigned flags,
1137  struct lock_class_key *key)
1138 {
1139  struct ring_buffer *buffer;
1140  int bsize;
1141  int cpu, nr_pages;
1142 
1143  /* keep it in its own cache line */
1144  buffer = kzalloc(ALIGN(sizeof(*buffer), cache_line_size()),
1145  GFP_KERNEL);
1146  if (!buffer)
1147  return NULL;
1148 
1149  if (!alloc_cpumask_var(&buffer->cpumask, GFP_KERNEL))
1150  goto fail_free_buffer;
1151 
1152  nr_pages = DIV_ROUND_UP(size, BUF_PAGE_SIZE);
1153  buffer->flags = flags;
1154  buffer->clock = trace_clock_local;
1155  buffer->reader_lock_key = key;
1156 
1157  /* need at least two pages */
1158  if (nr_pages < 2)
1159  nr_pages = 2;
1160 
1161  /*
1162  * In case of non-hotplug cpu, if the ring-buffer is allocated
1163  * in early initcall, it will not be notified of secondary cpus.
1164  * In that off case, we need to allocate for all possible cpus.
1165  */
1166 #ifdef CONFIG_HOTPLUG_CPU
1167  get_online_cpus();
1168  cpumask_copy(buffer->cpumask, cpu_online_mask);
1169 #else
1170  cpumask_copy(buffer->cpumask, cpu_possible_mask);
1171 #endif
1172  buffer->cpus = nr_cpu_ids;
1173 
1174  bsize = sizeof(void *) * nr_cpu_ids;
1175  buffer->buffers = kzalloc(ALIGN(bsize, cache_line_size()),
1176  GFP_KERNEL);
1177  if (!buffer->buffers)
1178  goto fail_free_cpumask;
1179 
1180  for_each_buffer_cpu(buffer, cpu) {
1181  buffer->buffers[cpu] =
1182  rb_allocate_cpu_buffer(buffer, nr_pages, cpu);
1183  if (!buffer->buffers[cpu])
1184  goto fail_free_buffers;
1185  }
1186 
1187 #ifdef CONFIG_HOTPLUG_CPU
1188  buffer->cpu_notify.notifier_call = rb_cpu_notify;
1189  buffer->cpu_notify.priority = 0;
1190  register_cpu_notifier(&buffer->cpu_notify);
1191 #endif
1192 
1193  put_online_cpus();
1194  mutex_init(&buffer->mutex);
1195 
1196  return buffer;
1197 
1198  fail_free_buffers:
1199  for_each_buffer_cpu(buffer, cpu) {
1200  if (buffer->buffers[cpu])
1201  rb_free_cpu_buffer(buffer->buffers[cpu]);
1202  }
1203  kfree(buffer->buffers);
1204 
1205  fail_free_cpumask:
1206  free_cpumask_var(buffer->cpumask);
1207  put_online_cpus();
1208 
1209  fail_free_buffer:
1210  kfree(buffer);
1211  return NULL;
1212 }
1214 
1219 void
1221 {
1222  int cpu;
1223 
1224  get_online_cpus();
1225 
1226 #ifdef CONFIG_HOTPLUG_CPU
1227  unregister_cpu_notifier(&buffer->cpu_notify);
1228 #endif
1229 
1230  for_each_buffer_cpu(buffer, cpu)
1231  rb_free_cpu_buffer(buffer->buffers[cpu]);
1232 
1233  put_online_cpus();
1234 
1235  kfree(buffer->buffers);
1236  free_cpumask_var(buffer->cpumask);
1237 
1238  kfree(buffer);
1239 }
1241 
1243  u64 (*clock)(void))
1244 {
1245  buffer->clock = clock;
1246 }
1247 
1248 static void rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer);
1249 
1250 static inline unsigned long rb_page_entries(struct buffer_page *bpage)
1251 {
1252  return local_read(&bpage->entries) & RB_WRITE_MASK;
1253 }
1254 
1255 static inline unsigned long rb_page_write(struct buffer_page *bpage)
1256 {
1257  return local_read(&bpage->write) & RB_WRITE_MASK;
1258 }
1259 
1260 static int
1261 rb_remove_pages(struct ring_buffer_per_cpu *cpu_buffer, unsigned int nr_pages)
1262 {
1263  struct list_head *tail_page, *to_remove, *next_page;
1264  struct buffer_page *to_remove_page, *tmp_iter_page;
1265  struct buffer_page *last_page, *first_page;
1266  unsigned int nr_removed;
1267  unsigned long head_bit;
1268  int page_entries;
1269 
1270  head_bit = 0;
1271 
1272  raw_spin_lock_irq(&cpu_buffer->reader_lock);
1273  atomic_inc(&cpu_buffer->record_disabled);
1274  /*
1275  * We don't race with the readers since we have acquired the reader
1276  * lock. We also don't race with writers after disabling recording.
1277  * This makes it easy to figure out the first and the last page to be
1278  * removed from the list. We unlink all the pages in between including
1279  * the first and last pages. This is done in a busy loop so that we
1280  * lose the least number of traces.
1281  * The pages are freed after we restart recording and unlock readers.
1282  */
1283  tail_page = &cpu_buffer->tail_page->list;
1284 
1285  /*
1286  * tail page might be on reader page, we remove the next page
1287  * from the ring buffer
1288  */
1289  if (cpu_buffer->tail_page == cpu_buffer->reader_page)
1290  tail_page = rb_list_head(tail_page->next);
1291  to_remove = tail_page;
1292 
1293  /* start of pages to remove */
1294  first_page = list_entry(rb_list_head(to_remove->next),
1295  struct buffer_page, list);
1296 
1297  for (nr_removed = 0; nr_removed < nr_pages; nr_removed++) {
1298  to_remove = rb_list_head(to_remove)->next;
1299  head_bit |= (unsigned long)to_remove & RB_PAGE_HEAD;
1300  }
1301 
1302  next_page = rb_list_head(to_remove)->next;
1303 
1304  /*
1305  * Now we remove all pages between tail_page and next_page.
1306  * Make sure that we have head_bit value preserved for the
1307  * next page
1308  */
1309  tail_page->next = (struct list_head *)((unsigned long)next_page |
1310  head_bit);
1311  next_page = rb_list_head(next_page);
1312  next_page->prev = tail_page;
1313 
1314  /* make sure pages points to a valid page in the ring buffer */
1315  cpu_buffer->pages = next_page;
1316 
1317  /* update head page */
1318  if (head_bit)
1319  cpu_buffer->head_page = list_entry(next_page,
1320  struct buffer_page, list);
1321 
1322  /*
1323  * change read pointer to make sure any read iterators reset
1324  * themselves
1325  */
1326  cpu_buffer->read = 0;
1327 
1328  /* pages are removed, resume tracing and then free the pages */
1329  atomic_dec(&cpu_buffer->record_disabled);
1330  raw_spin_unlock_irq(&cpu_buffer->reader_lock);
1331 
1332  RB_WARN_ON(cpu_buffer, list_empty(cpu_buffer->pages));
1333 
1334  /* last buffer page to remove */
1335  last_page = list_entry(rb_list_head(to_remove), struct buffer_page,
1336  list);
1337  tmp_iter_page = first_page;
1338 
1339  do {
1340  to_remove_page = tmp_iter_page;
1341  rb_inc_page(cpu_buffer, &tmp_iter_page);
1342 
1343  /* update the counters */
1344  page_entries = rb_page_entries(to_remove_page);
1345  if (page_entries) {
1346  /*
1347  * If something was added to this page, it was full
1348  * since it is not the tail page. So we deduct the
1349  * bytes consumed in ring buffer from here.
1350  * Increment overrun to account for the lost events.
1351  */
1352  local_add(page_entries, &cpu_buffer->overrun);
1353  local_sub(BUF_PAGE_SIZE, &cpu_buffer->entries_bytes);
1354  }
1355 
1356  /*
1357  * We have already removed references to this list item, just
1358  * free up the buffer_page and its page
1359  */
1360  free_buffer_page(to_remove_page);
1361  nr_removed--;
1362 
1363  } while (to_remove_page != last_page);
1364 
1365  RB_WARN_ON(cpu_buffer, nr_removed);
1366 
1367  return nr_removed == 0;
1368 }
1369 
1370 static int
1371 rb_insert_pages(struct ring_buffer_per_cpu *cpu_buffer)
1372 {
1373  struct list_head *pages = &cpu_buffer->new_pages;
1374  int retries, success;
1375 
1376  raw_spin_lock_irq(&cpu_buffer->reader_lock);
1377  /*
1378  * We are holding the reader lock, so the reader page won't be swapped
1379  * in the ring buffer. Now we are racing with the writer trying to
1380  * move head page and the tail page.
1381  * We are going to adapt the reader page update process where:
1382  * 1. We first splice the start and end of list of new pages between
1383  * the head page and its previous page.
1384  * 2. We cmpxchg the prev_page->next to point from head page to the
1385  * start of new pages list.
1386  * 3. Finally, we update the head->prev to the end of new list.
1387  *
1388  * We will try this process 10 times, to make sure that we don't keep
1389  * spinning.
1390  */
1391  retries = 10;
1392  success = 0;
1393  while (retries--) {
1394  struct list_head *head_page, *prev_page, *r;
1395  struct list_head *last_page, *first_page;
1396  struct list_head *head_page_with_bit;
1397 
1398  head_page = &rb_set_head_page(cpu_buffer)->list;
1399  if (!head_page)
1400  break;
1401  prev_page = head_page->prev;
1402 
1403  first_page = pages->next;
1404  last_page = pages->prev;
1405 
1406  head_page_with_bit = (struct list_head *)
1407  ((unsigned long)head_page | RB_PAGE_HEAD);
1408 
1409  last_page->next = head_page_with_bit;
1410  first_page->prev = prev_page;
1411 
1412  r = cmpxchg(&prev_page->next, head_page_with_bit, first_page);
1413 
1414  if (r == head_page_with_bit) {
1415  /*
1416  * yay, we replaced the page pointer to our new list,
1417  * now, we just have to update to head page's prev
1418  * pointer to point to end of list
1419  */
1420  head_page->prev = last_page;
1421  success = 1;
1422  break;
1423  }
1424  }
1425 
1426  if (success)
1427  INIT_LIST_HEAD(pages);
1428  /*
1429  * If we weren't successful in adding in new pages, warn and stop
1430  * tracing
1431  */
1432  RB_WARN_ON(cpu_buffer, !success);
1433  raw_spin_unlock_irq(&cpu_buffer->reader_lock);
1434 
1435  /* free pages if they weren't inserted */
1436  if (!success) {
1437  struct buffer_page *bpage, *tmp;
1438  list_for_each_entry_safe(bpage, tmp, &cpu_buffer->new_pages,
1439  list) {
1440  list_del_init(&bpage->list);
1441  free_buffer_page(bpage);
1442  }
1443  }
1444  return success;
1445 }
1446 
1447 static void rb_update_pages(struct ring_buffer_per_cpu *cpu_buffer)
1448 {
1449  int success;
1450 
1451  if (cpu_buffer->nr_pages_to_update > 0)
1452  success = rb_insert_pages(cpu_buffer);
1453  else
1454  success = rb_remove_pages(cpu_buffer,
1455  -cpu_buffer->nr_pages_to_update);
1456 
1457  if (success)
1458  cpu_buffer->nr_pages += cpu_buffer->nr_pages_to_update;
1459 }
1460 
1461 static void update_pages_handler(struct work_struct *work)
1462 {
1463  struct ring_buffer_per_cpu *cpu_buffer = container_of(work,
1465  rb_update_pages(cpu_buffer);
1466  complete(&cpu_buffer->update_done);
1467 }
1468 
1478 int ring_buffer_resize(struct ring_buffer *buffer, unsigned long size,
1479  int cpu_id)
1480 {
1481  struct ring_buffer_per_cpu *cpu_buffer;
1482  unsigned nr_pages;
1483  int cpu, err = 0;
1484 
1485  /*
1486  * Always succeed at resizing a non-existent buffer:
1487  */
1488  if (!buffer)
1489  return size;
1490 
1491  /* Make sure the requested buffer exists */
1492  if (cpu_id != RING_BUFFER_ALL_CPUS &&
1493  !cpumask_test_cpu(cpu_id, buffer->cpumask))
1494  return size;
1495 
1496  size = DIV_ROUND_UP(size, BUF_PAGE_SIZE);
1497  size *= BUF_PAGE_SIZE;
1498 
1499  /* we need a minimum of two pages */
1500  if (size < BUF_PAGE_SIZE * 2)
1501  size = BUF_PAGE_SIZE * 2;
1502 
1503  nr_pages = DIV_ROUND_UP(size, BUF_PAGE_SIZE);
1504 
1505  /*
1506  * Don't succeed if resizing is disabled, as a reader might be
1507  * manipulating the ring buffer and is expecting a sane state while
1508  * this is true.
1509  */
1510  if (atomic_read(&buffer->resize_disabled))
1511  return -EBUSY;
1512 
1513  /* prevent another thread from changing buffer sizes */
1514  mutex_lock(&buffer->mutex);
1515 
1516  if (cpu_id == RING_BUFFER_ALL_CPUS) {
1517  /* calculate the pages to update */
1518  for_each_buffer_cpu(buffer, cpu) {
1519  cpu_buffer = buffer->buffers[cpu];
1520 
1521  cpu_buffer->nr_pages_to_update = nr_pages -
1522  cpu_buffer->nr_pages;
1523  /*
1524  * nothing more to do for removing pages or no update
1525  */
1526  if (cpu_buffer->nr_pages_to_update <= 0)
1527  continue;
1528  /*
1529  * to add pages, make sure all new pages can be
1530  * allocated without receiving ENOMEM
1531  */
1532  INIT_LIST_HEAD(&cpu_buffer->new_pages);
1533  if (__rb_allocate_pages(cpu_buffer->nr_pages_to_update,
1534  &cpu_buffer->new_pages, cpu)) {
1535  /* not enough memory for new pages */
1536  err = -ENOMEM;
1537  goto out_err;
1538  }
1539  }
1540 
1541  get_online_cpus();
1542  /*
1543  * Fire off all the required work handlers
1544  * We can't schedule on offline CPUs, but it's not necessary
1545  * since we can change their buffer sizes without any race.
1546  */
1547  for_each_buffer_cpu(buffer, cpu) {
1548  cpu_buffer = buffer->buffers[cpu];
1549  if (!cpu_buffer->nr_pages_to_update)
1550  continue;
1551 
1552  if (cpu_online(cpu))
1553  schedule_work_on(cpu,
1554  &cpu_buffer->update_pages_work);
1555  else
1556  rb_update_pages(cpu_buffer);
1557  }
1558 
1559  /* wait for all the updates to complete */
1560  for_each_buffer_cpu(buffer, cpu) {
1561  cpu_buffer = buffer->buffers[cpu];
1562  if (!cpu_buffer->nr_pages_to_update)
1563  continue;
1564 
1565  if (cpu_online(cpu))
1566  wait_for_completion(&cpu_buffer->update_done);
1567  cpu_buffer->nr_pages_to_update = 0;
1568  }
1569 
1570  put_online_cpus();
1571  } else {
1572  /* Make sure this CPU has been intitialized */
1573  if (!cpumask_test_cpu(cpu_id, buffer->cpumask))
1574  goto out;
1575 
1576  cpu_buffer = buffer->buffers[cpu_id];
1577 
1578  if (nr_pages == cpu_buffer->nr_pages)
1579  goto out;
1580 
1581  cpu_buffer->nr_pages_to_update = nr_pages -
1582  cpu_buffer->nr_pages;
1583 
1584  INIT_LIST_HEAD(&cpu_buffer->new_pages);
1585  if (cpu_buffer->nr_pages_to_update > 0 &&
1586  __rb_allocate_pages(cpu_buffer->nr_pages_to_update,
1587  &cpu_buffer->new_pages, cpu_id)) {
1588  err = -ENOMEM;
1589  goto out_err;
1590  }
1591 
1592  get_online_cpus();
1593 
1594  if (cpu_online(cpu_id)) {
1595  schedule_work_on(cpu_id,
1596  &cpu_buffer->update_pages_work);
1597  wait_for_completion(&cpu_buffer->update_done);
1598  } else
1599  rb_update_pages(cpu_buffer);
1600 
1601  cpu_buffer->nr_pages_to_update = 0;
1602  put_online_cpus();
1603  }
1604 
1605  out:
1606  /*
1607  * The ring buffer resize can happen with the ring buffer
1608  * enabled, so that the update disturbs the tracing as little
1609  * as possible. But if the buffer is disabled, we do not need
1610  * to worry about that, and we can take the time to verify
1611  * that the buffer is not corrupt.
1612  */
1613  if (atomic_read(&buffer->record_disabled)) {
1614  atomic_inc(&buffer->record_disabled);
1615  /*
1616  * Even though the buffer was disabled, we must make sure
1617  * that it is truly disabled before calling rb_check_pages.
1618  * There could have been a race between checking
1619  * record_disable and incrementing it.
1620  */
1622  for_each_buffer_cpu(buffer, cpu) {
1623  cpu_buffer = buffer->buffers[cpu];
1624  rb_check_pages(cpu_buffer);
1625  }
1626  atomic_dec(&buffer->record_disabled);
1627  }
1628 
1629  mutex_unlock(&buffer->mutex);
1630  return size;
1631 
1632  out_err:
1633  for_each_buffer_cpu(buffer, cpu) {
1634  struct buffer_page *bpage, *tmp;
1635 
1636  cpu_buffer = buffer->buffers[cpu];
1637  cpu_buffer->nr_pages_to_update = 0;
1638 
1639  if (list_empty(&cpu_buffer->new_pages))
1640  continue;
1641 
1642  list_for_each_entry_safe(bpage, tmp, &cpu_buffer->new_pages,
1643  list) {
1644  list_del_init(&bpage->list);
1645  free_buffer_page(bpage);
1646  }
1647  }
1648  mutex_unlock(&buffer->mutex);
1649  return err;
1650 }
1652 
1653 void ring_buffer_change_overwrite(struct ring_buffer *buffer, int val)
1654 {
1655  mutex_lock(&buffer->mutex);
1656  if (val)
1657  buffer->flags |= RB_FL_OVERWRITE;
1658  else
1659  buffer->flags &= ~RB_FL_OVERWRITE;
1660  mutex_unlock(&buffer->mutex);
1661 }
1663 
1664 static inline void *
1665 __rb_data_page_index(struct buffer_data_page *bpage, unsigned index)
1666 {
1667  return bpage->data + index;
1668 }
1669 
1670 static inline void *__rb_page_index(struct buffer_page *bpage, unsigned index)
1671 {
1672  return bpage->page->data + index;
1673 }
1674 
1675 static inline struct ring_buffer_event *
1676 rb_reader_event(struct ring_buffer_per_cpu *cpu_buffer)
1677 {
1678  return __rb_page_index(cpu_buffer->reader_page,
1679  cpu_buffer->reader_page->read);
1680 }
1681 
1682 static inline struct ring_buffer_event *
1683 rb_iter_head_event(struct ring_buffer_iter *iter)
1684 {
1685  return __rb_page_index(iter->head_page, iter->head);
1686 }
1687 
1688 static inline unsigned rb_page_commit(struct buffer_page *bpage)
1689 {
1690  return local_read(&bpage->page->commit);
1691 }
1692 
1693 /* Size is determined by what has been committed */
1694 static inline unsigned rb_page_size(struct buffer_page *bpage)
1695 {
1696  return rb_page_commit(bpage);
1697 }
1698 
1699 static inline unsigned
1700 rb_commit_index(struct ring_buffer_per_cpu *cpu_buffer)
1701 {
1702  return rb_page_commit(cpu_buffer->commit_page);
1703 }
1704 
1705 static inline unsigned
1706 rb_event_index(struct ring_buffer_event *event)
1707 {
1708  unsigned long addr = (unsigned long)event;
1709 
1710  return (addr & ~PAGE_MASK) - BUF_PAGE_HDR_SIZE;
1711 }
1712 
1713 static inline int
1714 rb_event_is_commit(struct ring_buffer_per_cpu *cpu_buffer,
1715  struct ring_buffer_event *event)
1716 {
1717  unsigned long addr = (unsigned long)event;
1718  unsigned long index;
1719 
1720  index = rb_event_index(event);
1721  addr &= PAGE_MASK;
1722 
1723  return cpu_buffer->commit_page->page == (void *)addr &&
1724  rb_commit_index(cpu_buffer) == index;
1725 }
1726 
1727 static void
1728 rb_set_commit_to_write(struct ring_buffer_per_cpu *cpu_buffer)
1729 {
1730  unsigned long max_count;
1731 
1732  /*
1733  * We only race with interrupts and NMIs on this CPU.
1734  * If we own the commit event, then we can commit
1735  * all others that interrupted us, since the interruptions
1736  * are in stack format (they finish before they come
1737  * back to us). This allows us to do a simple loop to
1738  * assign the commit to the tail.
1739  */
1740  again:
1741  max_count = cpu_buffer->nr_pages * 100;
1742 
1743  while (cpu_buffer->commit_page != cpu_buffer->tail_page) {
1744  if (RB_WARN_ON(cpu_buffer, !(--max_count)))
1745  return;
1746  if (RB_WARN_ON(cpu_buffer,
1747  rb_is_reader_page(cpu_buffer->tail_page)))
1748  return;
1749  local_set(&cpu_buffer->commit_page->page->commit,
1750  rb_page_write(cpu_buffer->commit_page));
1751  rb_inc_page(cpu_buffer, &cpu_buffer->commit_page);
1752  cpu_buffer->write_stamp =
1753  cpu_buffer->commit_page->page->time_stamp;
1754  /* add barrier to keep gcc from optimizing too much */
1755  barrier();
1756  }
1757  while (rb_commit_index(cpu_buffer) !=
1758  rb_page_write(cpu_buffer->commit_page)) {
1759 
1760  local_set(&cpu_buffer->commit_page->page->commit,
1761  rb_page_write(cpu_buffer->commit_page));
1762  RB_WARN_ON(cpu_buffer,
1763  local_read(&cpu_buffer->commit_page->page->commit) &
1764  ~RB_WRITE_MASK);
1765  barrier();
1766  }
1767 
1768  /* again, keep gcc from optimizing */
1769  barrier();
1770 
1771  /*
1772  * If an interrupt came in just after the first while loop
1773  * and pushed the tail page forward, we will be left with
1774  * a dangling commit that will never go forward.
1775  */
1776  if (unlikely(cpu_buffer->commit_page != cpu_buffer->tail_page))
1777  goto again;
1778 }
1779 
1780 static void rb_reset_reader_page(struct ring_buffer_per_cpu *cpu_buffer)
1781 {
1782  cpu_buffer->read_stamp = cpu_buffer->reader_page->page->time_stamp;
1783  cpu_buffer->reader_page->read = 0;
1784 }
1785 
1786 static void rb_inc_iter(struct ring_buffer_iter *iter)
1787 {
1788  struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer;
1789 
1790  /*
1791  * The iterator could be on the reader page (it starts there).
1792  * But the head could have moved, since the reader was
1793  * found. Check for this case and assign the iterator
1794  * to the head page instead of next.
1795  */
1796  if (iter->head_page == cpu_buffer->reader_page)
1797  iter->head_page = rb_set_head_page(cpu_buffer);
1798  else
1799  rb_inc_page(cpu_buffer, &iter->head_page);
1800 
1801  iter->read_stamp = iter->head_page->page->time_stamp;
1802  iter->head = 0;
1803 }
1804 
1805 /* Slow path, do not inline */
1806 static noinline struct ring_buffer_event *
1807 rb_add_time_stamp(struct ring_buffer_event *event, u64 delta)
1808 {
1809  event->type_len = RINGBUF_TYPE_TIME_EXTEND;
1810 
1811  /* Not the first event on the page? */
1812  if (rb_event_index(event)) {
1813  event->time_delta = delta & TS_MASK;
1814  event->array[0] = delta >> TS_SHIFT;
1815  } else {
1816  /* nope, just zero it */
1817  event->time_delta = 0;
1818  event->array[0] = 0;
1819  }
1820 
1821  return skip_time_extend(event);
1822 }
1823 
1835 static void
1836 rb_update_event(struct ring_buffer_per_cpu *cpu_buffer,
1837  struct ring_buffer_event *event, unsigned length,
1838  int add_timestamp, u64 delta)
1839 {
1840  /* Only a commit updates the timestamp */
1841  if (unlikely(!rb_event_is_commit(cpu_buffer, event)))
1842  delta = 0;
1843 
1844  /*
1845  * If we need to add a timestamp, then we
1846  * add it to the start of the resevered space.
1847  */
1848  if (unlikely(add_timestamp)) {
1849  event = rb_add_time_stamp(event, delta);
1850  length -= RB_LEN_TIME_EXTEND;
1851  delta = 0;
1852  }
1853 
1854  event->time_delta = delta;
1855  length -= RB_EVNT_HDR_SIZE;
1856  if (length > RB_MAX_SMALL_DATA || RB_FORCE_8BYTE_ALIGNMENT) {
1857  event->type_len = 0;
1858  event->array[0] = length;
1859  } else
1860  event->type_len = DIV_ROUND_UP(length, RB_ALIGNMENT);
1861 }
1862 
1863 /*
1864  * rb_handle_head_page - writer hit the head page
1865  *
1866  * Returns: +1 to retry page
1867  * 0 to continue
1868  * -1 on error
1869  */
1870 static int
1871 rb_handle_head_page(struct ring_buffer_per_cpu *cpu_buffer,
1872  struct buffer_page *tail_page,
1873  struct buffer_page *next_page)
1874 {
1875  struct buffer_page *new_head;
1876  int entries;
1877  int type;
1878  int ret;
1879 
1880  entries = rb_page_entries(next_page);
1881 
1882  /*
1883  * The hard part is here. We need to move the head
1884  * forward, and protect against both readers on
1885  * other CPUs and writers coming in via interrupts.
1886  */
1887  type = rb_head_page_set_update(cpu_buffer, next_page, tail_page,
1888  RB_PAGE_HEAD);
1889 
1890  /*
1891  * type can be one of four:
1892  * NORMAL - an interrupt already moved it for us
1893  * HEAD - we are the first to get here.
1894  * UPDATE - we are the interrupt interrupting
1895  * a current move.
1896  * MOVED - a reader on another CPU moved the next
1897  * pointer to its reader page. Give up
1898  * and try again.
1899  */
1900 
1901  switch (type) {
1902  case RB_PAGE_HEAD:
1903  /*
1904  * We changed the head to UPDATE, thus
1905  * it is our responsibility to update
1906  * the counters.
1907  */
1908  local_add(entries, &cpu_buffer->overrun);
1909  local_sub(BUF_PAGE_SIZE, &cpu_buffer->entries_bytes);
1910 
1911  /*
1912  * The entries will be zeroed out when we move the
1913  * tail page.
1914  */
1915 
1916  /* still more to do */
1917  break;
1918 
1919  case RB_PAGE_UPDATE:
1920  /*
1921  * This is an interrupt that interrupt the
1922  * previous update. Still more to do.
1923  */
1924  break;
1925  case RB_PAGE_NORMAL:
1926  /*
1927  * An interrupt came in before the update
1928  * and processed this for us.
1929  * Nothing left to do.
1930  */
1931  return 1;
1932  case RB_PAGE_MOVED:
1933  /*
1934  * The reader is on another CPU and just did
1935  * a swap with our next_page.
1936  * Try again.
1937  */
1938  return 1;
1939  default:
1940  RB_WARN_ON(cpu_buffer, 1); /* WTF??? */
1941  return -1;
1942  }
1943 
1944  /*
1945  * Now that we are here, the old head pointer is
1946  * set to UPDATE. This will keep the reader from
1947  * swapping the head page with the reader page.
1948  * The reader (on another CPU) will spin till
1949  * we are finished.
1950  *
1951  * We just need to protect against interrupts
1952  * doing the job. We will set the next pointer
1953  * to HEAD. After that, we set the old pointer
1954  * to NORMAL, but only if it was HEAD before.
1955  * otherwise we are an interrupt, and only
1956  * want the outer most commit to reset it.
1957  */
1958  new_head = next_page;
1959  rb_inc_page(cpu_buffer, &new_head);
1960 
1961  ret = rb_head_page_set_head(cpu_buffer, new_head, next_page,
1962  RB_PAGE_NORMAL);
1963 
1964  /*
1965  * Valid returns are:
1966  * HEAD - an interrupt came in and already set it.
1967  * NORMAL - One of two things:
1968  * 1) We really set it.
1969  * 2) A bunch of interrupts came in and moved
1970  * the page forward again.
1971  */
1972  switch (ret) {
1973  case RB_PAGE_HEAD:
1974  case RB_PAGE_NORMAL:
1975  /* OK */
1976  break;
1977  default:
1978  RB_WARN_ON(cpu_buffer, 1);
1979  return -1;
1980  }
1981 
1982  /*
1983  * It is possible that an interrupt came in,
1984  * set the head up, then more interrupts came in
1985  * and moved it again. When we get back here,
1986  * the page would have been set to NORMAL but we
1987  * just set it back to HEAD.
1988  *
1989  * How do you detect this? Well, if that happened
1990  * the tail page would have moved.
1991  */
1992  if (ret == RB_PAGE_NORMAL) {
1993  /*
1994  * If the tail had moved passed next, then we need
1995  * to reset the pointer.
1996  */
1997  if (cpu_buffer->tail_page != tail_page &&
1998  cpu_buffer->tail_page != next_page)
1999  rb_head_page_set_normal(cpu_buffer, new_head,
2000  next_page,
2001  RB_PAGE_HEAD);
2002  }
2003 
2004  /*
2005  * If this was the outer most commit (the one that
2006  * changed the original pointer from HEAD to UPDATE),
2007  * then it is up to us to reset it to NORMAL.
2008  */
2009  if (type == RB_PAGE_HEAD) {
2010  ret = rb_head_page_set_normal(cpu_buffer, next_page,
2011  tail_page,
2012  RB_PAGE_UPDATE);
2013  if (RB_WARN_ON(cpu_buffer,
2014  ret != RB_PAGE_UPDATE))
2015  return -1;
2016  }
2017 
2018  return 0;
2019 }
2020 
2021 static unsigned rb_calculate_event_length(unsigned length)
2022 {
2023  struct ring_buffer_event event; /* Used only for sizeof array */
2024 
2025  /* zero length can cause confusions */
2026  if (!length)
2027  length = 1;
2028 
2030  length += sizeof(event.array[0]);
2031 
2032  length += RB_EVNT_HDR_SIZE;
2033  length = ALIGN(length, RB_ARCH_ALIGNMENT);
2034 
2035  return length;
2036 }
2037 
2038 static inline void
2039 rb_reset_tail(struct ring_buffer_per_cpu *cpu_buffer,
2040  struct buffer_page *tail_page,
2041  unsigned long tail, unsigned long length)
2042 {
2043  struct ring_buffer_event *event;
2044 
2045  /*
2046  * Only the event that crossed the page boundary
2047  * must fill the old tail_page with padding.
2048  */
2049  if (tail >= BUF_PAGE_SIZE) {
2050  /*
2051  * If the page was filled, then we still need
2052  * to update the real_end. Reset it to zero
2053  * and the reader will ignore it.
2054  */
2055  if (tail == BUF_PAGE_SIZE)
2056  tail_page->real_end = 0;
2057 
2058  local_sub(length, &tail_page->write);
2059  return;
2060  }
2061 
2062  event = __rb_page_index(tail_page, tail);
2064 
2065  /* account for padding bytes */
2066  local_add(BUF_PAGE_SIZE - tail, &cpu_buffer->entries_bytes);
2067 
2068  /*
2069  * Save the original length to the meta data.
2070  * This will be used by the reader to add lost event
2071  * counter.
2072  */
2073  tail_page->real_end = tail;
2074 
2075  /*
2076  * If this event is bigger than the minimum size, then
2077  * we need to be careful that we don't subtract the
2078  * write counter enough to allow another writer to slip
2079  * in on this page.
2080  * We put in a discarded commit instead, to make sure
2081  * that this space is not used again.
2082  *
2083  * If we are less than the minimum size, we don't need to
2084  * worry about it.
2085  */
2086  if (tail > (BUF_PAGE_SIZE - RB_EVNT_MIN_SIZE)) {
2087  /* No room for any events */
2088 
2089  /* Mark the rest of the page with padding */
2090  rb_event_set_padding(event);
2091 
2092  /* Set the write back to the previous setting */
2093  local_sub(length, &tail_page->write);
2094  return;
2095  }
2096 
2097  /* Put in a discarded event */
2098  event->array[0] = (BUF_PAGE_SIZE - tail) - RB_EVNT_HDR_SIZE;
2099  event->type_len = RINGBUF_TYPE_PADDING;
2100  /* time delta must be non zero */
2101  event->time_delta = 1;
2102 
2103  /* Set write to end of buffer */
2104  length = (tail + length) - BUF_PAGE_SIZE;
2105  local_sub(length, &tail_page->write);
2106 }
2107 
2108 /*
2109  * This is the slow path, force gcc not to inline it.
2110  */
2111 static noinline struct ring_buffer_event *
2112 rb_move_tail(struct ring_buffer_per_cpu *cpu_buffer,
2113  unsigned long length, unsigned long tail,
2114  struct buffer_page *tail_page, u64 ts)
2115 {
2116  struct buffer_page *commit_page = cpu_buffer->commit_page;
2117  struct ring_buffer *buffer = cpu_buffer->buffer;
2118  struct buffer_page *next_page;
2119  int ret;
2120 
2121  next_page = tail_page;
2122 
2123  rb_inc_page(cpu_buffer, &next_page);
2124 
2125  /*
2126  * If for some reason, we had an interrupt storm that made
2127  * it all the way around the buffer, bail, and warn
2128  * about it.
2129  */
2130  if (unlikely(next_page == commit_page)) {
2131  local_inc(&cpu_buffer->commit_overrun);
2132  goto out_reset;
2133  }
2134 
2135  /*
2136  * This is where the fun begins!
2137  *
2138  * We are fighting against races between a reader that
2139  * could be on another CPU trying to swap its reader
2140  * page with the buffer head.
2141  *
2142  * We are also fighting against interrupts coming in and
2143  * moving the head or tail on us as well.
2144  *
2145  * If the next page is the head page then we have filled
2146  * the buffer, unless the commit page is still on the
2147  * reader page.
2148  */
2149  if (rb_is_head_page(cpu_buffer, next_page, &tail_page->list)) {
2150 
2151  /*
2152  * If the commit is not on the reader page, then
2153  * move the header page.
2154  */
2155  if (!rb_is_reader_page(cpu_buffer->commit_page)) {
2156  /*
2157  * If we are not in overwrite mode,
2158  * this is easy, just stop here.
2159  */
2160  if (!(buffer->flags & RB_FL_OVERWRITE))
2161  goto out_reset;
2162 
2163  ret = rb_handle_head_page(cpu_buffer,
2164  tail_page,
2165  next_page);
2166  if (ret < 0)
2167  goto out_reset;
2168  if (ret)
2169  goto out_again;
2170  } else {
2171  /*
2172  * We need to be careful here too. The
2173  * commit page could still be on the reader
2174  * page. We could have a small buffer, and
2175  * have filled up the buffer with events
2176  * from interrupts and such, and wrapped.
2177  *
2178  * Note, if the tail page is also the on the
2179  * reader_page, we let it move out.
2180  */
2181  if (unlikely((cpu_buffer->commit_page !=
2182  cpu_buffer->tail_page) &&
2183  (cpu_buffer->commit_page ==
2184  cpu_buffer->reader_page))) {
2185  local_inc(&cpu_buffer->commit_overrun);
2186  goto out_reset;
2187  }
2188  }
2189  }
2190 
2191  ret = rb_tail_page_update(cpu_buffer, tail_page, next_page);
2192  if (ret) {
2193  /*
2194  * Nested commits always have zero deltas, so
2195  * just reread the time stamp
2196  */
2197  ts = rb_time_stamp(buffer);
2198  next_page->page->time_stamp = ts;
2199  }
2200 
2201  out_again:
2202 
2203  rb_reset_tail(cpu_buffer, tail_page, tail, length);
2204 
2205  /* fail and let the caller try again */
2206  return ERR_PTR(-EAGAIN);
2207 
2208  out_reset:
2209  /* reset write */
2210  rb_reset_tail(cpu_buffer, tail_page, tail, length);
2211 
2212  return NULL;
2213 }
2214 
2215 static struct ring_buffer_event *
2216 __rb_reserve_next(struct ring_buffer_per_cpu *cpu_buffer,
2217  unsigned long length, u64 ts,
2218  u64 delta, int add_timestamp)
2219 {
2220  struct buffer_page *tail_page;
2221  struct ring_buffer_event *event;
2222  unsigned long tail, write;
2223 
2224  /*
2225  * If the time delta since the last event is too big to
2226  * hold in the time field of the event, then we append a
2227  * TIME EXTEND event ahead of the data event.
2228  */
2229  if (unlikely(add_timestamp))
2230  length += RB_LEN_TIME_EXTEND;
2231 
2232  tail_page = cpu_buffer->tail_page;
2233  write = local_add_return(length, &tail_page->write);
2234 
2235  /* set write to only the index of the write */
2236  write &= RB_WRITE_MASK;
2237  tail = write - length;
2238 
2239  /* See if we shot pass the end of this buffer page */
2240  if (unlikely(write > BUF_PAGE_SIZE))
2241  return rb_move_tail(cpu_buffer, length, tail,
2242  tail_page, ts);
2243 
2244  /* We reserved something on the buffer */
2245 
2246  event = __rb_page_index(tail_page, tail);
2248  rb_update_event(cpu_buffer, event, length, add_timestamp, delta);
2249 
2250  local_inc(&tail_page->entries);
2251 
2252  /*
2253  * If this is the first commit on the page, then update
2254  * its timestamp.
2255  */
2256  if (!tail)
2257  tail_page->page->time_stamp = ts;
2258 
2259  /* account for these added bytes */
2260  local_add(length, &cpu_buffer->entries_bytes);
2261 
2262  return event;
2263 }
2264 
2265 static inline int
2266 rb_try_to_discard(struct ring_buffer_per_cpu *cpu_buffer,
2267  struct ring_buffer_event *event)
2268 {
2269  unsigned long new_index, old_index;
2270  struct buffer_page *bpage;
2271  unsigned long index;
2272  unsigned long addr;
2273 
2274  new_index = rb_event_index(event);
2275  old_index = new_index + rb_event_ts_length(event);
2276  addr = (unsigned long)event;
2277  addr &= PAGE_MASK;
2278 
2279  bpage = cpu_buffer->tail_page;
2280 
2281  if (bpage->page == (void *)addr && rb_page_write(bpage) == old_index) {
2282  unsigned long write_mask =
2283  local_read(&bpage->write) & ~RB_WRITE_MASK;
2284  unsigned long event_length = rb_event_length(event);
2285  /*
2286  * This is on the tail page. It is possible that
2287  * a write could come in and move the tail page
2288  * and write to the next page. That is fine
2289  * because we just shorten what is on this page.
2290  */
2291  old_index += write_mask;
2292  new_index += write_mask;
2293  index = local_cmpxchg(&bpage->write, old_index, new_index);
2294  if (index == old_index) {
2295  /* update counters */
2296  local_sub(event_length, &cpu_buffer->entries_bytes);
2297  return 1;
2298  }
2299  }
2300 
2301  /* could not discard */
2302  return 0;
2303 }
2304 
2305 static void rb_start_commit(struct ring_buffer_per_cpu *cpu_buffer)
2306 {
2307  local_inc(&cpu_buffer->committing);
2308  local_inc(&cpu_buffer->commits);
2309 }
2310 
2311 static inline void rb_end_commit(struct ring_buffer_per_cpu *cpu_buffer)
2312 {
2313  unsigned long commits;
2314 
2315  if (RB_WARN_ON(cpu_buffer,
2316  !local_read(&cpu_buffer->committing)))
2317  return;
2318 
2319  again:
2320  commits = local_read(&cpu_buffer->commits);
2321  /* synchronize with interrupts */
2322  barrier();
2323  if (local_read(&cpu_buffer->committing) == 1)
2324  rb_set_commit_to_write(cpu_buffer);
2325 
2326  local_dec(&cpu_buffer->committing);
2327 
2328  /* synchronize with interrupts */
2329  barrier();
2330 
2331  /*
2332  * Need to account for interrupts coming in between the
2333  * updating of the commit page and the clearing of the
2334  * committing counter.
2335  */
2336  if (unlikely(local_read(&cpu_buffer->commits) != commits) &&
2337  !local_read(&cpu_buffer->committing)) {
2338  local_inc(&cpu_buffer->committing);
2339  goto again;
2340  }
2341 }
2342 
2343 static struct ring_buffer_event *
2344 rb_reserve_next_event(struct ring_buffer *buffer,
2345  struct ring_buffer_per_cpu *cpu_buffer,
2346  unsigned long length)
2347 {
2348  struct ring_buffer_event *event;
2349  u64 ts, delta;
2350  int nr_loops = 0;
2351  int add_timestamp;
2352  u64 diff;
2353 
2354  rb_start_commit(cpu_buffer);
2355 
2356 #ifdef CONFIG_RING_BUFFER_ALLOW_SWAP
2357  /*
2358  * Due to the ability to swap a cpu buffer from a buffer
2359  * it is possible it was swapped before we committed.
2360  * (committing stops a swap). We check for it here and
2361  * if it happened, we have to fail the write.
2362  */
2363  barrier();
2364  if (unlikely(ACCESS_ONCE(cpu_buffer->buffer) != buffer)) {
2365  local_dec(&cpu_buffer->committing);
2366  local_dec(&cpu_buffer->commits);
2367  return NULL;
2368  }
2369 #endif
2370 
2371  length = rb_calculate_event_length(length);
2372  again:
2373  add_timestamp = 0;
2374  delta = 0;
2375 
2376  /*
2377  * We allow for interrupts to reenter here and do a trace.
2378  * If one does, it will cause this original code to loop
2379  * back here. Even with heavy interrupts happening, this
2380  * should only happen a few times in a row. If this happens
2381  * 1000 times in a row, there must be either an interrupt
2382  * storm or we have something buggy.
2383  * Bail!
2384  */
2385  if (RB_WARN_ON(cpu_buffer, ++nr_loops > 1000))
2386  goto out_fail;
2387 
2388  ts = rb_time_stamp(cpu_buffer->buffer);
2389  diff = ts - cpu_buffer->write_stamp;
2390 
2391  /* make sure this diff is calculated here */
2392  barrier();
2393 
2394  /* Did the write stamp get updated already? */
2395  if (likely(ts >= cpu_buffer->write_stamp)) {
2396  delta = diff;
2397  if (unlikely(test_time_stamp(delta))) {
2398  int local_clock_stable = 1;
2399 #ifdef CONFIG_HAVE_UNSTABLE_SCHED_CLOCK
2400  local_clock_stable = sched_clock_stable;
2401 #endif
2402  WARN_ONCE(delta > (1ULL << 59),
2403  KERN_WARNING "Delta way too big! %llu ts=%llu write stamp = %llu\n%s",
2404  (unsigned long long)delta,
2405  (unsigned long long)ts,
2406  (unsigned long long)cpu_buffer->write_stamp,
2407  local_clock_stable ? "" :
2408  "If you just came from a suspend/resume,\n"
2409  "please switch to the trace global clock:\n"
2410  " echo global > /sys/kernel/debug/tracing/trace_clock\n");
2411  add_timestamp = 1;
2412  }
2413  }
2414 
2415  event = __rb_reserve_next(cpu_buffer, length, ts,
2416  delta, add_timestamp);
2417  if (unlikely(PTR_ERR(event) == -EAGAIN))
2418  goto again;
2419 
2420  if (!event)
2421  goto out_fail;
2422 
2423  return event;
2424 
2425  out_fail:
2426  rb_end_commit(cpu_buffer);
2427  return NULL;
2428 }
2429 
2430 #ifdef CONFIG_TRACING
2431 
2432 #define TRACE_RECURSIVE_DEPTH 16
2433 
2434 /* Keep this code out of the fast path cache */
2435 static noinline void trace_recursive_fail(void)
2436 {
2437  /* Disable all tracing before we do anything else */
2439 
2440  printk_once(KERN_WARNING "Tracing recursion: depth[%ld]:"
2441  "HC[%lu]:SC[%lu]:NMI[%lu]\n",
2445  in_nmi());
2446 
2447  WARN_ON_ONCE(1);
2448 }
2449 
2450 static inline int trace_recursive_lock(void)
2451 {
2453 
2454  if (likely(trace_recursion_buffer() < TRACE_RECURSIVE_DEPTH))
2455  return 0;
2456 
2457  trace_recursive_fail();
2458 
2459  return -1;
2460 }
2461 
2462 static inline void trace_recursive_unlock(void)
2463 {
2465 
2467 }
2468 
2469 #else
2470 
2471 #define trace_recursive_lock() (0)
2472 #define trace_recursive_unlock() do { } while (0)
2473 
2474 #endif
2475 
2491 struct ring_buffer_event *
2492 ring_buffer_lock_reserve(struct ring_buffer *buffer, unsigned long length)
2493 {
2494  struct ring_buffer_per_cpu *cpu_buffer;
2495  struct ring_buffer_event *event;
2496  int cpu;
2497 
2499  return NULL;
2500 
2501  /* If we are tracing schedule, we don't want to recurse */
2503 
2504  if (atomic_read(&buffer->record_disabled))
2505  goto out_nocheck;
2506 
2507  if (trace_recursive_lock())
2508  goto out_nocheck;
2509 
2510  cpu = raw_smp_processor_id();
2511 
2512  if (!cpumask_test_cpu(cpu, buffer->cpumask))
2513  goto out;
2514 
2515  cpu_buffer = buffer->buffers[cpu];
2516 
2517  if (atomic_read(&cpu_buffer->record_disabled))
2518  goto out;
2519 
2520  if (length > BUF_MAX_DATA_SIZE)
2521  goto out;
2522 
2523  event = rb_reserve_next_event(buffer, cpu_buffer, length);
2524  if (!event)
2525  goto out;
2526 
2527  return event;
2528 
2529  out:
2531 
2532  out_nocheck:
2534  return NULL;
2535 }
2537 
2538 static void
2539 rb_update_write_stamp(struct ring_buffer_per_cpu *cpu_buffer,
2540  struct ring_buffer_event *event)
2541 {
2542  u64 delta;
2543 
2544  /*
2545  * The event first in the commit queue updates the
2546  * time stamp.
2547  */
2548  if (rb_event_is_commit(cpu_buffer, event)) {
2549  /*
2550  * A commit event that is first on a page
2551  * updates the write timestamp with the page stamp
2552  */
2553  if (!rb_event_index(event))
2554  cpu_buffer->write_stamp =
2555  cpu_buffer->commit_page->page->time_stamp;
2556  else if (event->type_len == RINGBUF_TYPE_TIME_EXTEND) {
2557  delta = event->array[0];
2558  delta <<= TS_SHIFT;
2559  delta += event->time_delta;
2560  cpu_buffer->write_stamp += delta;
2561  } else
2562  cpu_buffer->write_stamp += event->time_delta;
2563  }
2564 }
2565 
2566 static void rb_commit(struct ring_buffer_per_cpu *cpu_buffer,
2567  struct ring_buffer_event *event)
2568 {
2569  local_inc(&cpu_buffer->entries);
2570  rb_update_write_stamp(cpu_buffer, event);
2571  rb_end_commit(cpu_buffer);
2572 }
2573 
2584  struct ring_buffer_event *event)
2585 {
2586  struct ring_buffer_per_cpu *cpu_buffer;
2587  int cpu = raw_smp_processor_id();
2588 
2589  cpu_buffer = buffer->buffers[cpu];
2590 
2591  rb_commit(cpu_buffer, event);
2592 
2594 
2596 
2597  return 0;
2598 }
2600 
2601 static inline void rb_event_discard(struct ring_buffer_event *event)
2602 {
2603  if (event->type_len == RINGBUF_TYPE_TIME_EXTEND)
2604  event = skip_time_extend(event);
2605 
2606  /* array[0] holds the actual length for the discarded event */
2607  event->array[0] = rb_event_data_length(event) - RB_EVNT_HDR_SIZE;
2608  event->type_len = RINGBUF_TYPE_PADDING;
2609  /* time delta must be non zero */
2610  if (!event->time_delta)
2611  event->time_delta = 1;
2612 }
2613 
2614 /*
2615  * Decrement the entries to the page that an event is on.
2616  * The event does not even need to exist, only the pointer
2617  * to the page it is on. This may only be called before the commit
2618  * takes place.
2619  */
2620 static inline void
2621 rb_decrement_entry(struct ring_buffer_per_cpu *cpu_buffer,
2622  struct ring_buffer_event *event)
2623 {
2624  unsigned long addr = (unsigned long)event;
2625  struct buffer_page *bpage = cpu_buffer->commit_page;
2626  struct buffer_page *start;
2627 
2628  addr &= PAGE_MASK;
2629 
2630  /* Do the likely case first */
2631  if (likely(bpage->page == (void *)addr)) {
2632  local_dec(&bpage->entries);
2633  return;
2634  }
2635 
2636  /*
2637  * Because the commit page may be on the reader page we
2638  * start with the next page and check the end loop there.
2639  */
2640  rb_inc_page(cpu_buffer, &bpage);
2641  start = bpage;
2642  do {
2643  if (bpage->page == (void *)addr) {
2644  local_dec(&bpage->entries);
2645  return;
2646  }
2647  rb_inc_page(cpu_buffer, &bpage);
2648  } while (bpage != start);
2649 
2650  /* commit not part of this buffer?? */
2651  RB_WARN_ON(cpu_buffer, 1);
2652 }
2653 
2674  struct ring_buffer_event *event)
2675 {
2676  struct ring_buffer_per_cpu *cpu_buffer;
2677  int cpu;
2678 
2679  /* The event is discarded regardless */
2680  rb_event_discard(event);
2681 
2682  cpu = smp_processor_id();
2683  cpu_buffer = buffer->buffers[cpu];
2684 
2685  /*
2686  * This must only be called if the event has not been
2687  * committed yet. Thus we can assume that preemption
2688  * is still disabled.
2689  */
2690  RB_WARN_ON(buffer, !local_read(&cpu_buffer->committing));
2691 
2692  rb_decrement_entry(cpu_buffer, event);
2693  if (rb_try_to_discard(cpu_buffer, event))
2694  goto out;
2695 
2696  /*
2697  * The commit is still visible by the reader, so we
2698  * must still update the timestamp.
2699  */
2700  rb_update_write_stamp(cpu_buffer, event);
2701  out:
2702  rb_end_commit(cpu_buffer);
2703 
2705 
2707 
2708 }
2710 
2724 int ring_buffer_write(struct ring_buffer *buffer,
2725  unsigned long length,
2726  void *data)
2727 {
2728  struct ring_buffer_per_cpu *cpu_buffer;
2729  struct ring_buffer_event *event;
2730  void *body;
2731  int ret = -EBUSY;
2732  int cpu;
2733 
2735  return -EBUSY;
2736 
2738 
2739  if (atomic_read(&buffer->record_disabled))
2740  goto out;
2741 
2742  cpu = raw_smp_processor_id();
2743 
2744  if (!cpumask_test_cpu(cpu, buffer->cpumask))
2745  goto out;
2746 
2747  cpu_buffer = buffer->buffers[cpu];
2748 
2749  if (atomic_read(&cpu_buffer->record_disabled))
2750  goto out;
2751 
2752  if (length > BUF_MAX_DATA_SIZE)
2753  goto out;
2754 
2755  event = rb_reserve_next_event(buffer, cpu_buffer, length);
2756  if (!event)
2757  goto out;
2758 
2759  body = rb_event_data(event);
2760 
2761  memcpy(body, data, length);
2762 
2763  rb_commit(cpu_buffer, event);
2764 
2765  ret = 0;
2766  out:
2768 
2769  return ret;
2770 }
2772 
2773 static int rb_per_cpu_empty(struct ring_buffer_per_cpu *cpu_buffer)
2774 {
2775  struct buffer_page *reader = cpu_buffer->reader_page;
2776  struct buffer_page *head = rb_set_head_page(cpu_buffer);
2777  struct buffer_page *commit = cpu_buffer->commit_page;
2778 
2779  /* In case of error, head will be NULL */
2780  if (unlikely(!head))
2781  return 1;
2782 
2783  return reader->read == rb_page_commit(reader) &&
2784  (commit == reader ||
2785  (commit == head &&
2786  head->read == rb_page_commit(commit)));
2787 }
2788 
2799 {
2800  atomic_inc(&buffer->record_disabled);
2801 }
2803 
2812 {
2813  atomic_dec(&buffer->record_disabled);
2814 }
2816 
2829 {
2830  unsigned int rd;
2831  unsigned int new_rd;
2832 
2833  do {
2834  rd = atomic_read(&buffer->record_disabled);
2835  new_rd = rd | RB_BUFFER_OFF;
2836  } while (atomic_cmpxchg(&buffer->record_disabled, rd, new_rd) != rd);
2837 }
2839 
2852 {
2853  unsigned int rd;
2854  unsigned int new_rd;
2855 
2856  do {
2857  rd = atomic_read(&buffer->record_disabled);
2858  new_rd = rd & ~RB_BUFFER_OFF;
2859  } while (atomic_cmpxchg(&buffer->record_disabled, rd, new_rd) != rd);
2860 }
2862 
2870 {
2871  return !atomic_read(&buffer->record_disabled);
2872 }
2873 
2884 void ring_buffer_record_disable_cpu(struct ring_buffer *buffer, int cpu)
2885 {
2886  struct ring_buffer_per_cpu *cpu_buffer;
2887 
2888  if (!cpumask_test_cpu(cpu, buffer->cpumask))
2889  return;
2890 
2891  cpu_buffer = buffer->buffers[cpu];
2892  atomic_inc(&cpu_buffer->record_disabled);
2893 }
2895 
2904 void ring_buffer_record_enable_cpu(struct ring_buffer *buffer, int cpu)
2905 {
2906  struct ring_buffer_per_cpu *cpu_buffer;
2907 
2908  if (!cpumask_test_cpu(cpu, buffer->cpumask))
2909  return;
2910 
2911  cpu_buffer = buffer->buffers[cpu];
2912  atomic_dec(&cpu_buffer->record_disabled);
2913 }
2915 
2916 /*
2917  * The total entries in the ring buffer is the running counter
2918  * of entries entered into the ring buffer, minus the sum of
2919  * the entries read from the ring buffer and the number of
2920  * entries that were overwritten.
2921  */
2922 static inline unsigned long
2923 rb_num_of_entries(struct ring_buffer_per_cpu *cpu_buffer)
2924 {
2925  return local_read(&cpu_buffer->entries) -
2926  (local_read(&cpu_buffer->overrun) + cpu_buffer->read);
2927 }
2928 
2934 unsigned long ring_buffer_oldest_event_ts(struct ring_buffer *buffer, int cpu)
2935 {
2936  unsigned long flags;
2937  struct ring_buffer_per_cpu *cpu_buffer;
2938  struct buffer_page *bpage;
2939  unsigned long ret = 0;
2940 
2941  if (!cpumask_test_cpu(cpu, buffer->cpumask))
2942  return 0;
2943 
2944  cpu_buffer = buffer->buffers[cpu];
2945  raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
2946  /*
2947  * if the tail is on reader_page, oldest time stamp is on the reader
2948  * page
2949  */
2950  if (cpu_buffer->tail_page == cpu_buffer->reader_page)
2951  bpage = cpu_buffer->reader_page;
2952  else
2953  bpage = rb_set_head_page(cpu_buffer);
2954  if (bpage)
2955  ret = bpage->page->time_stamp;
2956  raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
2957 
2958  return ret;
2959 }
2961 
2967 unsigned long ring_buffer_bytes_cpu(struct ring_buffer *buffer, int cpu)
2968 {
2969  struct ring_buffer_per_cpu *cpu_buffer;
2970  unsigned long ret;
2971 
2972  if (!cpumask_test_cpu(cpu, buffer->cpumask))
2973  return 0;
2974 
2975  cpu_buffer = buffer->buffers[cpu];
2976  ret = local_read(&cpu_buffer->entries_bytes) - cpu_buffer->read_bytes;
2977 
2978  return ret;
2979 }
2981 
2987 unsigned long ring_buffer_entries_cpu(struct ring_buffer *buffer, int cpu)
2988 {
2989  struct ring_buffer_per_cpu *cpu_buffer;
2990 
2991  if (!cpumask_test_cpu(cpu, buffer->cpumask))
2992  return 0;
2993 
2994  cpu_buffer = buffer->buffers[cpu];
2995 
2996  return rb_num_of_entries(cpu_buffer);
2997 }
2999 
3005 unsigned long ring_buffer_overrun_cpu(struct ring_buffer *buffer, int cpu)
3006 {
3007  struct ring_buffer_per_cpu *cpu_buffer;
3008  unsigned long ret;
3009 
3010  if (!cpumask_test_cpu(cpu, buffer->cpumask))
3011  return 0;
3012 
3013  cpu_buffer = buffer->buffers[cpu];
3014  ret = local_read(&cpu_buffer->overrun);
3015 
3016  return ret;
3017 }
3019 
3025 unsigned long
3027 {
3028  struct ring_buffer_per_cpu *cpu_buffer;
3029  unsigned long ret;
3030 
3031  if (!cpumask_test_cpu(cpu, buffer->cpumask))
3032  return 0;
3033 
3034  cpu_buffer = buffer->buffers[cpu];
3035  ret = local_read(&cpu_buffer->commit_overrun);
3036 
3037  return ret;
3038 }
3040 
3048 unsigned long ring_buffer_entries(struct ring_buffer *buffer)
3049 {
3050  struct ring_buffer_per_cpu *cpu_buffer;
3051  unsigned long entries = 0;
3052  int cpu;
3053 
3054  /* if you care about this being correct, lock the buffer */
3055  for_each_buffer_cpu(buffer, cpu) {
3056  cpu_buffer = buffer->buffers[cpu];
3057  entries += rb_num_of_entries(cpu_buffer);
3058  }
3059 
3060  return entries;
3061 }
3063 
3071 unsigned long ring_buffer_overruns(struct ring_buffer *buffer)
3072 {
3073  struct ring_buffer_per_cpu *cpu_buffer;
3074  unsigned long overruns = 0;
3075  int cpu;
3076 
3077  /* if you care about this being correct, lock the buffer */
3078  for_each_buffer_cpu(buffer, cpu) {
3079  cpu_buffer = buffer->buffers[cpu];
3080  overruns += local_read(&cpu_buffer->overrun);
3081  }
3082 
3083  return overruns;
3084 }
3086 
3087 static void rb_iter_reset(struct ring_buffer_iter *iter)
3088 {
3089  struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer;
3090 
3091  /* Iterator usage is expected to have record disabled */
3092  if (list_empty(&cpu_buffer->reader_page->list)) {
3093  iter->head_page = rb_set_head_page(cpu_buffer);
3094  if (unlikely(!iter->head_page))
3095  return;
3096  iter->head = iter->head_page->read;
3097  } else {
3098  iter->head_page = cpu_buffer->reader_page;
3099  iter->head = cpu_buffer->reader_page->read;
3100  }
3101  if (iter->head)
3102  iter->read_stamp = cpu_buffer->read_stamp;
3103  else
3104  iter->read_stamp = iter->head_page->page->time_stamp;
3105  iter->cache_reader_page = cpu_buffer->reader_page;
3106  iter->cache_read = cpu_buffer->read;
3107 }
3108 
3117 {
3118  struct ring_buffer_per_cpu *cpu_buffer;
3119  unsigned long flags;
3120 
3121  if (!iter)
3122  return;
3123 
3124  cpu_buffer = iter->cpu_buffer;
3125 
3126  raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
3127  rb_iter_reset(iter);
3128  raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
3129 }
3131 
3137 {
3138  struct ring_buffer_per_cpu *cpu_buffer;
3139 
3140  cpu_buffer = iter->cpu_buffer;
3141 
3142  return iter->head_page == cpu_buffer->commit_page &&
3143  iter->head == rb_commit_index(cpu_buffer);
3144 }
3146 
3147 static void
3148 rb_update_read_stamp(struct ring_buffer_per_cpu *cpu_buffer,
3149  struct ring_buffer_event *event)
3150 {
3151  u64 delta;
3152 
3153  switch (event->type_len) {
3154  case RINGBUF_TYPE_PADDING:
3155  return;
3156 
3158  delta = event->array[0];
3159  delta <<= TS_SHIFT;
3160  delta += event->time_delta;
3161  cpu_buffer->read_stamp += delta;
3162  return;
3163 
3165  /* FIXME: not implemented */
3166  return;
3167 
3168  case RINGBUF_TYPE_DATA:
3169  cpu_buffer->read_stamp += event->time_delta;
3170  return;
3171 
3172  default:
3173  BUG();
3174  }
3175  return;
3176 }
3177 
3178 static void
3179 rb_update_iter_read_stamp(struct ring_buffer_iter *iter,
3180  struct ring_buffer_event *event)
3181 {
3182  u64 delta;
3183 
3184  switch (event->type_len) {
3185  case RINGBUF_TYPE_PADDING:
3186  return;
3187 
3189  delta = event->array[0];
3190  delta <<= TS_SHIFT;
3191  delta += event->time_delta;
3192  iter->read_stamp += delta;
3193  return;
3194 
3196  /* FIXME: not implemented */
3197  return;
3198 
3199  case RINGBUF_TYPE_DATA:
3200  iter->read_stamp += event->time_delta;
3201  return;
3202 
3203  default:
3204  BUG();
3205  }
3206  return;
3207 }
3208 
3209 static struct buffer_page *
3210 rb_get_reader_page(struct ring_buffer_per_cpu *cpu_buffer)
3211 {
3212  struct buffer_page *reader = NULL;
3213  unsigned long overwrite;
3214  unsigned long flags;
3215  int nr_loops = 0;
3216  int ret;
3217 
3218  local_irq_save(flags);
3219  arch_spin_lock(&cpu_buffer->lock);
3220 
3221  again:
3222  /*
3223  * This should normally only loop twice. But because the
3224  * start of the reader inserts an empty page, it causes
3225  * a case where we will loop three times. There should be no
3226  * reason to loop four times (that I know of).
3227  */
3228  if (RB_WARN_ON(cpu_buffer, ++nr_loops > 3)) {
3229  reader = NULL;
3230  goto out;
3231  }
3232 
3233  reader = cpu_buffer->reader_page;
3234 
3235  /* If there's more to read, return this page */
3236  if (cpu_buffer->reader_page->read < rb_page_size(reader))
3237  goto out;
3238 
3239  /* Never should we have an index greater than the size */
3240  if (RB_WARN_ON(cpu_buffer,
3241  cpu_buffer->reader_page->read > rb_page_size(reader)))
3242  goto out;
3243 
3244  /* check if we caught up to the tail */
3245  reader = NULL;
3246  if (cpu_buffer->commit_page == cpu_buffer->reader_page)
3247  goto out;
3248 
3249  /* Don't bother swapping if the ring buffer is empty */
3250  if (rb_num_of_entries(cpu_buffer) == 0)
3251  goto out;
3252 
3253  /*
3254  * Reset the reader page to size zero.
3255  */
3256  local_set(&cpu_buffer->reader_page->write, 0);
3257  local_set(&cpu_buffer->reader_page->entries, 0);
3258  local_set(&cpu_buffer->reader_page->page->commit, 0);
3259  cpu_buffer->reader_page->real_end = 0;
3260 
3261  spin:
3262  /*
3263  * Splice the empty reader page into the list around the head.
3264  */
3265  reader = rb_set_head_page(cpu_buffer);
3266  if (!reader)
3267  goto out;
3268  cpu_buffer->reader_page->list.next = rb_list_head(reader->list.next);
3269  cpu_buffer->reader_page->list.prev = reader->list.prev;
3270 
3271  /*
3272  * cpu_buffer->pages just needs to point to the buffer, it
3273  * has no specific buffer page to point to. Lets move it out
3274  * of our way so we don't accidentally swap it.
3275  */
3276  cpu_buffer->pages = reader->list.prev;
3277 
3278  /* The reader page will be pointing to the new head */
3279  rb_set_list_to_head(cpu_buffer, &cpu_buffer->reader_page->list);
3280 
3281  /*
3282  * We want to make sure we read the overruns after we set up our
3283  * pointers to the next object. The writer side does a
3284  * cmpxchg to cross pages which acts as the mb on the writer
3285  * side. Note, the reader will constantly fail the swap
3286  * while the writer is updating the pointers, so this
3287  * guarantees that the overwrite recorded here is the one we
3288  * want to compare with the last_overrun.
3289  */
3290  smp_mb();
3291  overwrite = local_read(&(cpu_buffer->overrun));
3292 
3293  /*
3294  * Here's the tricky part.
3295  *
3296  * We need to move the pointer past the header page.
3297  * But we can only do that if a writer is not currently
3298  * moving it. The page before the header page has the
3299  * flag bit '1' set if it is pointing to the page we want.
3300  * but if the writer is in the process of moving it
3301  * than it will be '2' or already moved '0'.
3302  */
3303 
3304  ret = rb_head_page_replace(reader, cpu_buffer->reader_page);
3305 
3306  /*
3307  * If we did not convert it, then we must try again.
3308  */
3309  if (!ret)
3310  goto spin;
3311 
3312  /*
3313  * Yeah! We succeeded in replacing the page.
3314  *
3315  * Now make the new head point back to the reader page.
3316  */
3317  rb_list_head(reader->list.next)->prev = &cpu_buffer->reader_page->list;
3318  rb_inc_page(cpu_buffer, &cpu_buffer->head_page);
3319 
3320  /* Finally update the reader page to the new head */
3321  cpu_buffer->reader_page = reader;
3322  rb_reset_reader_page(cpu_buffer);
3323 
3324  if (overwrite != cpu_buffer->last_overrun) {
3325  cpu_buffer->lost_events = overwrite - cpu_buffer->last_overrun;
3326  cpu_buffer->last_overrun = overwrite;
3327  }
3328 
3329  goto again;
3330 
3331  out:
3332  arch_spin_unlock(&cpu_buffer->lock);
3333  local_irq_restore(flags);
3334 
3335  return reader;
3336 }
3337 
3338 static void rb_advance_reader(struct ring_buffer_per_cpu *cpu_buffer)
3339 {
3340  struct ring_buffer_event *event;
3341  struct buffer_page *reader;
3342  unsigned length;
3343 
3344  reader = rb_get_reader_page(cpu_buffer);
3345 
3346  /* This function should not be called when buffer is empty */
3347  if (RB_WARN_ON(cpu_buffer, !reader))
3348  return;
3349 
3350  event = rb_reader_event(cpu_buffer);
3351 
3353  cpu_buffer->read++;
3354 
3355  rb_update_read_stamp(cpu_buffer, event);
3356 
3357  length = rb_event_length(event);
3358  cpu_buffer->reader_page->read += length;
3359 }
3360 
3361 static void rb_advance_iter(struct ring_buffer_iter *iter)
3362 {
3363  struct ring_buffer_per_cpu *cpu_buffer;
3364  struct ring_buffer_event *event;
3365  unsigned length;
3366 
3367  cpu_buffer = iter->cpu_buffer;
3368 
3369  /*
3370  * Check if we are at the end of the buffer.
3371  */
3372  if (iter->head >= rb_page_size(iter->head_page)) {
3373  /* discarded commits can make the page empty */
3374  if (iter->head_page == cpu_buffer->commit_page)
3375  return;
3376  rb_inc_iter(iter);
3377  return;
3378  }
3379 
3380  event = rb_iter_head_event(iter);
3381 
3382  length = rb_event_length(event);
3383 
3384  /*
3385  * This should not be called to advance the header if we are
3386  * at the tail of the buffer.
3387  */
3388  if (RB_WARN_ON(cpu_buffer,
3389  (iter->head_page == cpu_buffer->commit_page) &&
3390  (iter->head + length > rb_commit_index(cpu_buffer))))
3391  return;
3392 
3393  rb_update_iter_read_stamp(iter, event);
3394 
3395  iter->head += length;
3396 
3397  /* check for end of page padding */
3398  if ((iter->head >= rb_page_size(iter->head_page)) &&
3399  (iter->head_page != cpu_buffer->commit_page))
3400  rb_advance_iter(iter);
3401 }
3402 
3403 static int rb_lost_events(struct ring_buffer_per_cpu *cpu_buffer)
3404 {
3405  return cpu_buffer->lost_events;
3406 }
3407 
3408 static struct ring_buffer_event *
3409 rb_buffer_peek(struct ring_buffer_per_cpu *cpu_buffer, u64 *ts,
3410  unsigned long *lost_events)
3411 {
3412  struct ring_buffer_event *event;
3413  struct buffer_page *reader;
3414  int nr_loops = 0;
3415 
3416  again:
3417  /*
3418  * We repeat when a time extend is encountered.
3419  * Since the time extend is always attached to a data event,
3420  * we should never loop more than once.
3421  * (We never hit the following condition more than twice).
3422  */
3423  if (RB_WARN_ON(cpu_buffer, ++nr_loops > 2))
3424  return NULL;
3425 
3426  reader = rb_get_reader_page(cpu_buffer);
3427  if (!reader)
3428  return NULL;
3429 
3430  event = rb_reader_event(cpu_buffer);
3431 
3432  switch (event->type_len) {
3433  case RINGBUF_TYPE_PADDING:
3434  if (rb_null_event(event))
3435  RB_WARN_ON(cpu_buffer, 1);
3436  /*
3437  * Because the writer could be discarding every
3438  * event it creates (which would probably be bad)
3439  * if we were to go back to "again" then we may never
3440  * catch up, and will trigger the warn on, or lock
3441  * the box. Return the padding, and we will release
3442  * the current locks, and try again.
3443  */
3444  return event;
3445 
3447  /* Internal data, OK to advance */
3448  rb_advance_reader(cpu_buffer);
3449  goto again;
3450 
3452  /* FIXME: not implemented */
3453  rb_advance_reader(cpu_buffer);
3454  goto again;
3455 
3456  case RINGBUF_TYPE_DATA:
3457  if (ts) {
3458  *ts = cpu_buffer->read_stamp + event->time_delta;
3460  cpu_buffer->cpu, ts);
3461  }
3462  if (lost_events)
3463  *lost_events = rb_lost_events(cpu_buffer);
3464  return event;
3465 
3466  default:
3467  BUG();
3468  }
3469 
3470  return NULL;
3471 }
3473 
3474 static struct ring_buffer_event *
3475 rb_iter_peek(struct ring_buffer_iter *iter, u64 *ts)
3476 {
3477  struct ring_buffer *buffer;
3478  struct ring_buffer_per_cpu *cpu_buffer;
3479  struct ring_buffer_event *event;
3480  int nr_loops = 0;
3481 
3482  cpu_buffer = iter->cpu_buffer;
3483  buffer = cpu_buffer->buffer;
3484 
3485  /*
3486  * Check if someone performed a consuming read to
3487  * the buffer. A consuming read invalidates the iterator
3488  * and we need to reset the iterator in this case.
3489  */
3490  if (unlikely(iter->cache_read != cpu_buffer->read ||
3491  iter->cache_reader_page != cpu_buffer->reader_page))
3492  rb_iter_reset(iter);
3493 
3494  again:
3495  if (ring_buffer_iter_empty(iter))
3496  return NULL;
3497 
3498  /*
3499  * We repeat when a time extend is encountered.
3500  * Since the time extend is always attached to a data event,
3501  * we should never loop more than once.
3502  * (We never hit the following condition more than twice).
3503  */
3504  if (RB_WARN_ON(cpu_buffer, ++nr_loops > 2))
3505  return NULL;
3506 
3507  if (rb_per_cpu_empty(cpu_buffer))
3508  return NULL;
3509 
3510  if (iter->head >= local_read(&iter->head_page->page->commit)) {
3511  rb_inc_iter(iter);
3512  goto again;
3513  }
3514 
3515  event = rb_iter_head_event(iter);
3516 
3517  switch (event->type_len) {
3518  case RINGBUF_TYPE_PADDING:
3519  if (rb_null_event(event)) {
3520  rb_inc_iter(iter);
3521  goto again;
3522  }
3523  rb_advance_iter(iter);
3524  return event;
3525 
3527  /* Internal data, OK to advance */
3528  rb_advance_iter(iter);
3529  goto again;
3530 
3532  /* FIXME: not implemented */
3533  rb_advance_iter(iter);
3534  goto again;
3535 
3536  case RINGBUF_TYPE_DATA:
3537  if (ts) {
3538  *ts = iter->read_stamp + event->time_delta;
3540  cpu_buffer->cpu, ts);
3541  }
3542  return event;
3543 
3544  default:
3545  BUG();
3546  }
3547 
3548  return NULL;
3549 }
3551 
3552 static inline int rb_ok_to_lock(void)
3553 {
3554  /*
3555  * If an NMI die dumps out the content of the ring buffer
3556  * do not grab locks. We also permanently disable the ring
3557  * buffer too. A one time deal is all you get from reading
3558  * the ring buffer from an NMI.
3559  */
3560  if (likely(!in_nmi()))
3561  return 1;
3562 
3564  return 0;
3565 }
3566 
3577 struct ring_buffer_event *
3578 ring_buffer_peek(struct ring_buffer *buffer, int cpu, u64 *ts,
3579  unsigned long *lost_events)
3580 {
3581  struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu];
3582  struct ring_buffer_event *event;
3583  unsigned long flags;
3584  int dolock;
3585 
3586  if (!cpumask_test_cpu(cpu, buffer->cpumask))
3587  return NULL;
3588 
3589  dolock = rb_ok_to_lock();
3590  again:
3591  local_irq_save(flags);
3592  if (dolock)
3593  raw_spin_lock(&cpu_buffer->reader_lock);
3594  event = rb_buffer_peek(cpu_buffer, ts, lost_events);
3595  if (event && event->type_len == RINGBUF_TYPE_PADDING)
3596  rb_advance_reader(cpu_buffer);
3597  if (dolock)
3598  raw_spin_unlock(&cpu_buffer->reader_lock);
3599  local_irq_restore(flags);
3600 
3601  if (event && event->type_len == RINGBUF_TYPE_PADDING)
3602  goto again;
3603 
3604  return event;
3605 }
3606 
3615 struct ring_buffer_event *
3617 {
3618  struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer;
3619  struct ring_buffer_event *event;
3620  unsigned long flags;
3621 
3622  again:
3623  raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
3624  event = rb_iter_peek(iter, ts);
3625  raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
3626 
3627  if (event && event->type_len == RINGBUF_TYPE_PADDING)
3628  goto again;
3629 
3630  return event;
3631 }
3632 
3644 struct ring_buffer_event *
3645 ring_buffer_consume(struct ring_buffer *buffer, int cpu, u64 *ts,
3646  unsigned long *lost_events)
3647 {
3648  struct ring_buffer_per_cpu *cpu_buffer;
3649  struct ring_buffer_event *event = NULL;
3650  unsigned long flags;
3651  int dolock;
3652 
3653  dolock = rb_ok_to_lock();
3654 
3655  again:
3656  /* might be called in atomic */
3657  preempt_disable();
3658 
3659  if (!cpumask_test_cpu(cpu, buffer->cpumask))
3660  goto out;
3661 
3662  cpu_buffer = buffer->buffers[cpu];
3663  local_irq_save(flags);
3664  if (dolock)
3665  raw_spin_lock(&cpu_buffer->reader_lock);
3666 
3667  event = rb_buffer_peek(cpu_buffer, ts, lost_events);
3668  if (event) {
3669  cpu_buffer->lost_events = 0;
3670  rb_advance_reader(cpu_buffer);
3671  }
3672 
3673  if (dolock)
3674  raw_spin_unlock(&cpu_buffer->reader_lock);
3675  local_irq_restore(flags);
3676 
3677  out:
3678  preempt_enable();
3679 
3680  if (event && event->type_len == RINGBUF_TYPE_PADDING)
3681  goto again;
3682 
3683  return event;
3684 }
3686 
3707 struct ring_buffer_iter *
3708 ring_buffer_read_prepare(struct ring_buffer *buffer, int cpu)
3709 {
3710  struct ring_buffer_per_cpu *cpu_buffer;
3711  struct ring_buffer_iter *iter;
3712 
3713  if (!cpumask_test_cpu(cpu, buffer->cpumask))
3714  return NULL;
3715 
3716  iter = kmalloc(sizeof(*iter), GFP_KERNEL);
3717  if (!iter)
3718  return NULL;
3719 
3720  cpu_buffer = buffer->buffers[cpu];
3721 
3722  iter->cpu_buffer = cpu_buffer;
3723 
3724  atomic_inc(&buffer->resize_disabled);
3725  atomic_inc(&cpu_buffer->record_disabled);
3726 
3727  return iter;
3728 }
3730 
3738 void
3740 {
3742 }
3744 
3756 void
3758 {
3759  struct ring_buffer_per_cpu *cpu_buffer;
3760  unsigned long flags;
3761 
3762  if (!iter)
3763  return;
3764 
3765  cpu_buffer = iter->cpu_buffer;
3766 
3767  raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
3768  arch_spin_lock(&cpu_buffer->lock);
3769  rb_iter_reset(iter);
3770  arch_spin_unlock(&cpu_buffer->lock);
3771  raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
3772 }
3774 
3782 void
3784 {
3785  struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer;
3786  unsigned long flags;
3787 
3788  /*
3789  * Ring buffer is disabled from recording, here's a good place
3790  * to check the integrity of the ring buffer.
3791  * Must prevent readers from trying to read, as the check
3792  * clears the HEAD page and readers require it.
3793  */
3794  raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
3795  rb_check_pages(cpu_buffer);
3796  raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
3797 
3798  atomic_dec(&cpu_buffer->record_disabled);
3799  atomic_dec(&cpu_buffer->buffer->resize_disabled);
3800  kfree(iter);
3801 }
3803 
3811 struct ring_buffer_event *
3813 {
3814  struct ring_buffer_event *event;
3815  struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer;
3816  unsigned long flags;
3817 
3818  raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
3819  again:
3820  event = rb_iter_peek(iter, ts);
3821  if (!event)
3822  goto out;
3823 
3824  if (event->type_len == RINGBUF_TYPE_PADDING)
3825  goto again;
3826 
3827  rb_advance_iter(iter);
3828  out:
3829  raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
3830 
3831  return event;
3832 }
3834 
3839 unsigned long ring_buffer_size(struct ring_buffer *buffer, int cpu)
3840 {
3841  /*
3842  * Earlier, this method returned
3843  * BUF_PAGE_SIZE * buffer->nr_pages
3844  * Since the nr_pages field is now removed, we have converted this to
3845  * return the per cpu buffer value.
3846  */
3847  if (!cpumask_test_cpu(cpu, buffer->cpumask))
3848  return 0;
3849 
3850  return BUF_PAGE_SIZE * buffer->buffers[cpu]->nr_pages;
3851 }
3852 EXPORT_SYMBOL_GPL(ring_buffer_size);
3853 
3854 static void
3855 rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer)
3856 {
3857  rb_head_page_deactivate(cpu_buffer);
3858 
3859  cpu_buffer->head_page
3860  = list_entry(cpu_buffer->pages, struct buffer_page, list);
3861  local_set(&cpu_buffer->head_page->write, 0);
3862  local_set(&cpu_buffer->head_page->entries, 0);
3863  local_set(&cpu_buffer->head_page->page->commit, 0);
3864 
3865  cpu_buffer->head_page->read = 0;
3866 
3867  cpu_buffer->tail_page = cpu_buffer->head_page;
3868  cpu_buffer->commit_page = cpu_buffer->head_page;
3869 
3870  INIT_LIST_HEAD(&cpu_buffer->reader_page->list);
3871  INIT_LIST_HEAD(&cpu_buffer->new_pages);
3872  local_set(&cpu_buffer->reader_page->write, 0);
3873  local_set(&cpu_buffer->reader_page->entries, 0);
3874  local_set(&cpu_buffer->reader_page->page->commit, 0);
3875  cpu_buffer->reader_page->read = 0;
3876 
3877  local_set(&cpu_buffer->commit_overrun, 0);
3878  local_set(&cpu_buffer->entries_bytes, 0);
3879  local_set(&cpu_buffer->overrun, 0);
3880  local_set(&cpu_buffer->entries, 0);
3881  local_set(&cpu_buffer->committing, 0);
3882  local_set(&cpu_buffer->commits, 0);
3883  cpu_buffer->read = 0;
3884  cpu_buffer->read_bytes = 0;
3885 
3886  cpu_buffer->write_stamp = 0;
3887  cpu_buffer->read_stamp = 0;
3888 
3889  cpu_buffer->lost_events = 0;
3890  cpu_buffer->last_overrun = 0;
3891 
3892  rb_head_page_activate(cpu_buffer);
3893 }
3894 
3900 void ring_buffer_reset_cpu(struct ring_buffer *buffer, int cpu)
3901 {
3902  struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu];
3903  unsigned long flags;
3904 
3905  if (!cpumask_test_cpu(cpu, buffer->cpumask))
3906  return;
3907 
3908  atomic_inc(&buffer->resize_disabled);
3909  atomic_inc(&cpu_buffer->record_disabled);
3910 
3911  /* Make sure all commits have finished */
3913 
3914  raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
3915 
3916  if (RB_WARN_ON(cpu_buffer, local_read(&cpu_buffer->committing)))
3917  goto out;
3918 
3919  arch_spin_lock(&cpu_buffer->lock);
3920 
3921  rb_reset_cpu(cpu_buffer);
3922 
3923  arch_spin_unlock(&cpu_buffer->lock);
3924 
3925  out:
3926  raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
3927 
3928  atomic_dec(&cpu_buffer->record_disabled);
3929  atomic_dec(&buffer->resize_disabled);
3930 }
3932 
3937 void ring_buffer_reset(struct ring_buffer *buffer)
3938 {
3939  int cpu;
3940 
3941  for_each_buffer_cpu(buffer, cpu)
3942  ring_buffer_reset_cpu(buffer, cpu);
3943 }
3945 
3950 int ring_buffer_empty(struct ring_buffer *buffer)
3951 {
3952  struct ring_buffer_per_cpu *cpu_buffer;
3953  unsigned long flags;
3954  int dolock;
3955  int cpu;
3956  int ret;
3957 
3958  dolock = rb_ok_to_lock();
3959 
3960  /* yes this is racy, but if you don't like the race, lock the buffer */
3961  for_each_buffer_cpu(buffer, cpu) {
3962  cpu_buffer = buffer->buffers[cpu];
3963  local_irq_save(flags);
3964  if (dolock)
3965  raw_spin_lock(&cpu_buffer->reader_lock);
3966  ret = rb_per_cpu_empty(cpu_buffer);
3967  if (dolock)
3968  raw_spin_unlock(&cpu_buffer->reader_lock);
3969  local_irq_restore(flags);
3970 
3971  if (!ret)
3972  return 0;
3973  }
3974 
3975  return 1;
3976 }
3978 
3984 int ring_buffer_empty_cpu(struct ring_buffer *buffer, int cpu)
3985 {
3986  struct ring_buffer_per_cpu *cpu_buffer;
3987  unsigned long flags;
3988  int dolock;
3989  int ret;
3990 
3991  if (!cpumask_test_cpu(cpu, buffer->cpumask))
3992  return 1;
3993 
3994  dolock = rb_ok_to_lock();
3995 
3996  cpu_buffer = buffer->buffers[cpu];
3997  local_irq_save(flags);
3998  if (dolock)
3999  raw_spin_lock(&cpu_buffer->reader_lock);
4000  ret = rb_per_cpu_empty(cpu_buffer);
4001  if (dolock)
4002  raw_spin_unlock(&cpu_buffer->reader_lock);
4003  local_irq_restore(flags);
4004 
4005  return ret;
4006 }
4008 
4009 #ifdef CONFIG_RING_BUFFER_ALLOW_SWAP
4010 
4020 int ring_buffer_swap_cpu(struct ring_buffer *buffer_a,
4021  struct ring_buffer *buffer_b, int cpu)
4022 {
4023  struct ring_buffer_per_cpu *cpu_buffer_a;
4024  struct ring_buffer_per_cpu *cpu_buffer_b;
4025  int ret = -EINVAL;
4026 
4027  if (!cpumask_test_cpu(cpu, buffer_a->cpumask) ||
4028  !cpumask_test_cpu(cpu, buffer_b->cpumask))
4029  goto out;
4030 
4031  cpu_buffer_a = buffer_a->buffers[cpu];
4032  cpu_buffer_b = buffer_b->buffers[cpu];
4033 
4034  /* At least make sure the two buffers are somewhat the same */
4035  if (cpu_buffer_a->nr_pages != cpu_buffer_b->nr_pages)
4036  goto out;
4037 
4038  ret = -EAGAIN;
4039 
4041  goto out;
4042 
4043  if (atomic_read(&buffer_a->record_disabled))
4044  goto out;
4045 
4046  if (atomic_read(&buffer_b->record_disabled))
4047  goto out;
4048 
4049  if (atomic_read(&cpu_buffer_a->record_disabled))
4050  goto out;
4051 
4052  if (atomic_read(&cpu_buffer_b->record_disabled))
4053  goto out;
4054 
4055  /*
4056  * We can't do a synchronize_sched here because this
4057  * function can be called in atomic context.
4058  * Normally this will be called from the same CPU as cpu.
4059  * If not it's up to the caller to protect this.
4060  */
4061  atomic_inc(&cpu_buffer_a->record_disabled);
4062  atomic_inc(&cpu_buffer_b->record_disabled);
4063 
4064  ret = -EBUSY;
4065  if (local_read(&cpu_buffer_a->committing))
4066  goto out_dec;
4067  if (local_read(&cpu_buffer_b->committing))
4068  goto out_dec;
4069 
4070  buffer_a->buffers[cpu] = cpu_buffer_b;
4071  buffer_b->buffers[cpu] = cpu_buffer_a;
4072 
4073  cpu_buffer_b->buffer = buffer_a;
4074  cpu_buffer_a->buffer = buffer_b;
4075 
4076  ret = 0;
4077 
4078 out_dec:
4079  atomic_dec(&cpu_buffer_a->record_disabled);
4080  atomic_dec(&cpu_buffer_b->record_disabled);
4081 out:
4082  return ret;
4083 }
4084 EXPORT_SYMBOL_GPL(ring_buffer_swap_cpu);
4085 #endif /* CONFIG_RING_BUFFER_ALLOW_SWAP */
4086 
4102 void *ring_buffer_alloc_read_page(struct ring_buffer *buffer, int cpu)
4103 {
4104  struct buffer_data_page *bpage;
4105  struct page *page;
4106 
4107  page = alloc_pages_node(cpu_to_node(cpu),
4108  GFP_KERNEL | __GFP_NORETRY, 0);
4109  if (!page)
4110  return NULL;
4111 
4112  bpage = page_address(page);
4113 
4114  rb_init_page(bpage);
4115 
4116  return bpage;
4117 }
4119 
4127 void ring_buffer_free_read_page(struct ring_buffer *buffer, void *data)
4128 {
4129  free_page((unsigned long)data);
4130 }
4132 
4167  void **data_page, size_t len, int cpu, int full)
4168 {
4169  struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu];
4170  struct ring_buffer_event *event;
4171  struct buffer_data_page *bpage;
4172  struct buffer_page *reader;
4173  unsigned long missed_events;
4174  unsigned long flags;
4175  unsigned int commit;
4176  unsigned int read;
4177  u64 save_timestamp;
4178  int ret = -1;
4179 
4180  if (!cpumask_test_cpu(cpu, buffer->cpumask))
4181  goto out;
4182 
4183  /*
4184  * If len is not big enough to hold the page header, then
4185  * we can not copy anything.
4186  */
4187  if (len <= BUF_PAGE_HDR_SIZE)
4188  goto out;
4189 
4190  len -= BUF_PAGE_HDR_SIZE;
4191 
4192  if (!data_page)
4193  goto out;
4194 
4195  bpage = *data_page;
4196  if (!bpage)
4197  goto out;
4198 
4199  raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
4200 
4201  reader = rb_get_reader_page(cpu_buffer);
4202  if (!reader)
4203  goto out_unlock;
4204 
4205  event = rb_reader_event(cpu_buffer);
4206 
4207  read = reader->read;
4208  commit = rb_page_commit(reader);
4209 
4210  /* Check if any events were dropped */
4211  missed_events = cpu_buffer->lost_events;
4212 
4213  /*
4214  * If this page has been partially read or
4215  * if len is not big enough to read the rest of the page or
4216  * a writer is still on the page, then
4217  * we must copy the data from the page to the buffer.
4218  * Otherwise, we can simply swap the page with the one passed in.
4219  */
4220  if (read || (len < (commit - read)) ||
4221  cpu_buffer->reader_page == cpu_buffer->commit_page) {
4222  struct buffer_data_page *rpage = cpu_buffer->reader_page->page;
4223  unsigned int rpos = read;
4224  unsigned int pos = 0;
4225  unsigned int size;
4226 
4227  if (full)
4228  goto out_unlock;
4229 
4230  if (len > (commit - read))
4231  len = (commit - read);
4232 
4233  /* Always keep the time extend and data together */
4234  size = rb_event_ts_length(event);
4235 
4236  if (len < size)
4237  goto out_unlock;
4238 
4239  /* save the current timestamp, since the user will need it */
4240  save_timestamp = cpu_buffer->read_stamp;
4241 
4242  /* Need to copy one event at a time */
4243  do {
4244  /* We need the size of one event, because
4245  * rb_advance_reader only advances by one event,
4246  * whereas rb_event_ts_length may include the size of
4247  * one or two events.
4248  * We have already ensured there's enough space if this
4249  * is a time extend. */
4250  size = rb_event_length(event);
4251  memcpy(bpage->data + pos, rpage->data + rpos, size);
4252 
4253  len -= size;
4254 
4255  rb_advance_reader(cpu_buffer);
4256  rpos = reader->read;
4257  pos += size;
4258 
4259  if (rpos >= commit)
4260  break;
4261 
4262  event = rb_reader_event(cpu_buffer);
4263  /* Always keep the time extend and data together */
4264  size = rb_event_ts_length(event);
4265  } while (len >= size);
4266 
4267  /* update bpage */
4268  local_set(&bpage->commit, pos);
4269  bpage->time_stamp = save_timestamp;
4270 
4271  /* we copied everything to the beginning */
4272  read = 0;
4273  } else {
4274  /* update the entry counter */
4275  cpu_buffer->read += rb_page_entries(reader);
4276  cpu_buffer->read_bytes += BUF_PAGE_SIZE;
4277 
4278  /* swap the pages */
4279  rb_init_page(bpage);
4280  bpage = reader->page;
4281  reader->page = *data_page;
4282  local_set(&reader->write, 0);
4283  local_set(&reader->entries, 0);
4284  reader->read = 0;
4285  *data_page = bpage;
4286 
4287  /*
4288  * Use the real_end for the data size,
4289  * This gives us a chance to store the lost events
4290  * on the page.
4291  */
4292  if (reader->real_end)
4293  local_set(&bpage->commit, reader->real_end);
4294  }
4295  ret = read;
4296 
4297  cpu_buffer->lost_events = 0;
4298 
4299  commit = local_read(&bpage->commit);
4300  /*
4301  * Set a flag in the commit field if we lost events
4302  */
4303  if (missed_events) {
4304  /* If there is room at the end of the page to save the
4305  * missed events, then record it there.
4306  */
4307  if (BUF_PAGE_SIZE - commit >= sizeof(missed_events)) {
4308  memcpy(&bpage->data[commit], &missed_events,
4309  sizeof(missed_events));
4310  local_add(RB_MISSED_STORED, &bpage->commit);
4311  commit += sizeof(missed_events);
4312  }
4313  local_add(RB_MISSED_EVENTS, &bpage->commit);
4314  }
4315 
4316  /*
4317  * This page may be off to user land. Zero it out here.
4318  */
4319  if (commit < BUF_PAGE_SIZE)
4320  memset(&bpage->data[commit], 0, BUF_PAGE_SIZE - commit);
4321 
4322  out_unlock:
4323  raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
4324 
4325  out:
4326  return ret;
4327 }
4329 
4330 #ifdef CONFIG_HOTPLUG_CPU
4331 static int rb_cpu_notify(struct notifier_block *self,
4332  unsigned long action, void *hcpu)
4333 {
4334  struct ring_buffer *buffer =
4335  container_of(self, struct ring_buffer, cpu_notify);
4336  long cpu = (long)hcpu;
4337  int cpu_i, nr_pages_same;
4338  unsigned int nr_pages;
4339 
4340  switch (action) {
4341  case CPU_UP_PREPARE:
4342  case CPU_UP_PREPARE_FROZEN:
4343  if (cpumask_test_cpu(cpu, buffer->cpumask))
4344  return NOTIFY_OK;
4345 
4346  nr_pages = 0;
4347  nr_pages_same = 1;
4348  /* check if all cpu sizes are same */
4349  for_each_buffer_cpu(buffer, cpu_i) {
4350  /* fill in the size from first enabled cpu */
4351  if (nr_pages == 0)
4352  nr_pages = buffer->buffers[cpu_i]->nr_pages;
4353  if (nr_pages != buffer->buffers[cpu_i]->nr_pages) {
4354  nr_pages_same = 0;
4355  break;
4356  }
4357  }
4358  /* allocate minimum pages, user can later expand it */
4359  if (!nr_pages_same)
4360  nr_pages = 2;
4361  buffer->buffers[cpu] =
4362  rb_allocate_cpu_buffer(buffer, nr_pages, cpu);
4363  if (!buffer->buffers[cpu]) {
4364  WARN(1, "failed to allocate ring buffer on CPU %ld\n",
4365  cpu);
4366  return NOTIFY_OK;
4367  }
4368  smp_wmb();
4369  cpumask_set_cpu(cpu, buffer->cpumask);
4370  break;
4371  case CPU_DOWN_PREPARE:
4373  /*
4374  * Do nothing.
4375  * If we were to free the buffer, then the user would
4376  * lose any trace that was in the buffer.
4377  */
4378  break;
4379  default:
4380  break;
4381  }
4382  return NOTIFY_OK;
4383 }
4384 #endif