/* * The "absolute" timestamp in the buffer is only 59 bits. * If a clock has the 5 MSBs set, it needs to be saved and * reinserted.
*/ #define TS_MSB (0xf8ULL << 56) #define ABS_TS_MASK (~TS_MSB)
/* * The ring buffer header is special. We must manually up keep it.
*/ int ring_buffer_print_entry_header(struct trace_seq *s)
{
trace_seq_puts(s, "# compressed entry header\n");
trace_seq_puts(s, "\ttype_len : 5 bits\n");
trace_seq_puts(s, "\ttime_delta : 27 bits\n");
trace_seq_puts(s, "\tarray : 32 bits\n");
trace_seq_putc(s, '\n');
trace_seq_printf(s, "\tpadding : type == %d\n",
RINGBUF_TYPE_PADDING);
trace_seq_printf(s, "\ttime_extend : type == %d\n",
RINGBUF_TYPE_TIME_EXTEND);
trace_seq_printf(s, "\ttime_stamp : type == %d\n",
RINGBUF_TYPE_TIME_STAMP);
trace_seq_printf(s, "\tdata max type_len == %d\n",
RINGBUF_TYPE_DATA_TYPE_LEN_MAX);
return !trace_seq_has_overflowed(s);
}
/* * The ring buffer is made up of a list of pages. A separate list of pages is * allocated for each CPU. A writer may only write to a buffer that is * associated with the CPU it is currently executing on. A reader may read * from any per cpu buffer. * * The reader is special. For each per cpu buffer, the reader has its own * reader page. When a reader has read the entire reader page, this reader * page is swapped with another page in the ring buffer. * * Now, as long as the writer is off the reader page, the reader can do what * ever it wants with that page. The writer will never write to that page * again (as long as it is out of the ring buffer). * * Here's some silly ASCII art. * * +------+ * |reader| RING BUFFER * |page | * +------+ +---+ +---+ +---+ * | |-->| |-->| | * +---+ +---+ +---+ * ^ | * | | * +---------------+ * * * +------+ * |reader| RING BUFFER * |page |------------------v * +------+ +---+ +---+ +---+ * | |-->| |-->| | * +---+ +---+ +---+ * ^ | * | | * +---------------+ * * * +------+ * |reader| RING BUFFER * |page |------------------v * +------+ +---+ +---+ +---+ * ^ | |-->| |-->| | * | +---+ +---+ +---+ * | | * | | * +------------------------------+ * * * +------+ * |buffer| RING BUFFER * |page |------------------v * +------+ +---+ +---+ +---+ * ^ | | | |-->| | * | New +---+ +---+ +---+ * | Reader------^ | * | page | * +------------------------------+ * * * After we make this swap, the reader can hand this page off to the splice * code and be done with it. It can even allocate a new page if it needs to * and swap that into the ring buffer. * * We will be using cmpxchg soon to make all this lockless. *
*/
/* Used for individual buffers (after the counter) */ #define RB_BUFFER_OFF (1 << 20)
/* * Return the length of the given event. Will return * the length of the time extend if the event is a * time extend.
*/ staticinlineunsigned
rb_event_length(struct ring_buffer_event *event)
{ switch (event->type_len) { case RINGBUF_TYPE_PADDING: if (rb_null_event(event)) /* undefined */ return -1; return event->array[0] + RB_EVNT_HDR_SIZE;
case RINGBUF_TYPE_TIME_EXTEND: return RB_LEN_TIME_EXTEND;
case RINGBUF_TYPE_TIME_STAMP: return RB_LEN_TIME_STAMP;
case RINGBUF_TYPE_DATA: return rb_event_data_length(event); default:
WARN_ON_ONCE(1);
} /* not hit */ return 0;
}
/* * Return total length of time extend and data, * or just the event length for all other events.
*/ staticinlineunsigned
rb_event_ts_length(struct ring_buffer_event *event)
{ unsigned len = 0;
if (extended_time(event)) { /* time extends include the data event after it */
len = RB_LEN_TIME_EXTEND;
event = skip_time_extend(event);
} return len + rb_event_length(event);
}
/** * ring_buffer_event_length - return the length of the event * @event: the event to get the length of * * Returns the size of the data load of a data event. * If the event is something other than a data event, it * returns the size of the event itself. With the exception * of a TIME EXTEND, where it still returns the size of the * data load of the data event after it.
*/ unsigned ring_buffer_event_length(struct ring_buffer_event *event)
{ unsigned length;
if (extended_time(event))
event = skip_time_extend(event);
/* inline for ring buffer fast paths */ static __always_inline void *
rb_event_data(struct ring_buffer_event *event)
{ if (extended_time(event))
event = skip_time_extend(event);
WARN_ON_ONCE(event->type_len > RINGBUF_TYPE_DATA_TYPE_LEN_MAX); /* If length is in len field, then array[0] has the data */ if (event->type_len) return (void *)&event->array[0]; /* Otherwise length is in array[0] and array[1] has the data */ return (void *)&event->array[1];
}
/** * ring_buffer_event_data - return the data of the event * @event: the event to get the data from
*/ void *ring_buffer_event_data(struct ring_buffer_event *event)
{ return rb_event_data(event);
}
EXPORT_SYMBOL_GPL(ring_buffer_event_data);
/* Flag when events were overwritten */ #define RB_MISSED_EVENTS (1 << 31) /* Missed count stored at end */ #define RB_MISSED_STORED (1 << 30)
#define RB_MISSED_MASK (3 << 30)
struct buffer_data_page {
u64 time_stamp; /* page time stamp */
local_t commit; /* write committed index */ unsignedchar data[] RB_ALIGN_DATA; /* data of buffer page */
};
struct buffer_data_read_page { unsigned order; /* order of the page */ struct buffer_data_page *data; /* actual data, stored in this page */
};
/* * Note, the buffer_page list must be first. The buffer pages * are allocated in cache lines, which means that each buffer * page will be at the beginning of a cache line, and thus * the least significant bits will be zero. We use this to * add flags in the list struct pointers, to make the ring buffer * lockless.
*/ struct buffer_page { struct list_head list; /* list of buffer pages */
local_t write; /* index for next write */ unsigned read; /* index for next read */
local_t entries; /* entries on this page */ unsignedlong real_end; /* real end of data */ unsigned order; /* order of the page */
u32 id:30; /* ID for external mapping */
u32 range:1; /* Mapped via a range */ struct buffer_data_page *page; /* Actual data page */
};
/* * The buffer page counters, write and entries, must be reset * atomically when crossing page boundaries. To synchronize this * update, two counters are inserted into the number. One is * the actual counter for the write position or count on the page. * * The other is a counter of updaters. Before an update happens * the update partition of the counter is incremented. This will * allow the updater to update the counter atomically. * * The counter is 20 bits, and the state data is 12.
*/ #define RB_WRITE_MASK 0xfffff #define RB_WRITE_INTCNT (1 << 20)
staticvoid free_buffer_page(struct buffer_page *bpage)
{ /* Range pages are not to be freed */ if (!bpage->range)
free_pages((unsignedlong)bpage->page, bpage->order);
kfree(bpage);
}
/* * We need to fit the time_stamp delta into 27 bits.
*/ staticinlinebool test_time_stamp(u64 delta)
{ return !!(delta & TS_DELTA_TEST);
}
/* * Structure to hold event state and handle nested events.
*/ struct rb_event_info {
u64 ts;
u64 delta;
u64 before;
u64 after; unsignedlong length; struct buffer_page *tail_page; int add_timestamp;
};
/* * Used for the add_timestamp * NONE * EXTEND - wants a time extend * ABSOLUTE - the buffer requests all events to have absolute time stamps * FORCE - force a full time stamp.
*/ enum {
RB_ADD_STAMP_NONE = 0,
RB_ADD_STAMP_EXTEND = BIT(1),
RB_ADD_STAMP_ABSOLUTE = BIT(2),
RB_ADD_STAMP_FORCE = BIT(3)
}; /* * Used for which event context the event is in. * TRANSITION = 0 * NMI = 1 * IRQ = 2 * SOFTIRQ = 3 * NORMAL = 4 * * See trace_recursive_lock() comment below for more details.
*/ enum {
RB_CTX_TRANSITION,
RB_CTX_NMI,
RB_CTX_IRQ,
RB_CTX_SOFTIRQ,
RB_CTX_NORMAL,
RB_CTX_MAX
};
unsignedint mapped; unsignedint user_mapped; /* user space mapping */ struct mutex mapping_lock; unsignedlong *subbuf_ids; /* ID to subbuf VA */ struct trace_buffer_meta *meta_page; struct ring_buffer_cpu_meta *ring_meta;
/* ring buffer pages to update, > 0 to add, < 0 to remove */ long nr_pages_to_update; struct list_head new_pages; /* new pages to add */ struct work_struct update_pages_work; struct completion update_done;
/* * Enable this to make sure that the event passed to * ring_buffer_event_time_stamp() is not committed and also * is on the buffer that it passed in.
*/ //#define RB_VERIFY_EVENT #ifdef RB_VERIFY_EVENT staticstruct list_head *rb_list_head(struct list_head *list); staticvoid verify_event(struct ring_buffer_per_cpu *cpu_buffer, void *event)
{ struct buffer_page *page = cpu_buffer->commit_page; struct buffer_page *tail_page = READ_ONCE(cpu_buffer->tail_page); struct list_head *next; long commit, write; unsignedlong addr = (unsignedlong)event; bool done = false; int stop = 0;
/* Make sure the event exists and is not committed yet */ do { if (page == tail_page || WARN_ON_ONCE(stop++ > 100))
done = true;
commit = local_read(&page->page->commit);
write = local_read(&page->write); if (addr >= (unsignedlong)&page->page->data[commit] &&
addr < (unsignedlong)&page->page->data[write]) return;
/* * The absolute time stamp drops the 5 MSBs and some clocks may * require them. The rb_fix_abs_ts() will take a previous full * time stamp, and add the 5 MSB of that time stamp on to the * saved absolute time stamp. Then they are compared in case of * the unlikely event that the latest time stamp incremented * the 5 MSB.
*/ staticinline u64 rb_fix_abs_ts(u64 abs, u64 save_ts)
{ if (save_ts & TS_MSB) {
abs |= save_ts & TS_MSB; /* Check for overflow */ if (unlikely(abs < save_ts))
abs += 1ULL << 59;
} return abs;
}
/** * ring_buffer_event_time_stamp - return the event's current time stamp * @buffer: The buffer that the event is on * @event: the event to get the time stamp of * * Note, this must be called after @event is reserved, and before it is * committed to the ring buffer. And must be called from the same * context where the event was reserved (normal, softirq, irq, etc). * * Returns the time stamp associated with the current event. * If the event has an extended time stamp, then that is used as * the time stamp to return. * In the highly unlikely case that the event was nested more than * the max nesting, then the write_stamp of the buffer is returned, * otherwise current time is returned, but that really neither of * the last two cases should ever happen.
*/
u64 ring_buffer_event_time_stamp(struct trace_buffer *buffer, struct ring_buffer_event *event)
{ struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[smp_processor_id()]; unsignedint nest;
u64 ts;
/* If the event includes an absolute time, then just use that */ if (event->type_len == RINGBUF_TYPE_TIME_STAMP) {
ts = rb_event_time_stamp(event); return rb_fix_abs_ts(ts, cpu_buffer->tail_page->page->time_stamp);
}
nest = local_read(&cpu_buffer->committing);
verify_event(cpu_buffer, event); if (WARN_ON_ONCE(!nest)) goto fail;
/* Read the current saved nesting level time stamp */ if (likely(--nest < MAX_NEST)) return cpu_buffer->event_stamp[nest];
/* Shouldn't happen, warn if it does */
WARN_ONCE(1, "nest (%d) greater than max", nest);
/** * ring_buffer_nr_dirty_pages - get the number of used pages in the ring buffer * @buffer: The ring_buffer to get the number of pages from * @cpu: The cpu of the ring_buffer to get the number of pages from * * Returns the number of pages that have content in the ring buffer.
*/
size_t ring_buffer_nr_dirty_pages(struct trace_buffer *buffer, int cpu)
{
size_t read;
size_t lost;
size_t cnt;
read = local_read(&buffer->buffers[cpu]->pages_read);
lost = local_read(&buffer->buffers[cpu]->pages_lost);
cnt = local_read(&buffer->buffers[cpu]->pages_touched);
if (WARN_ON_ONCE(cnt < lost)) return 0;
cnt -= lost;
/* The reader can read an empty page, but not more than that */ if (cnt < read) {
WARN_ON_ONCE(read > cnt + 1); return 0;
}
return cnt - read;
}
static __always_inline bool full_hit(struct trace_buffer *buffer, int cpu, int full)
{ struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu];
size_t nr_pages;
size_t dirty;
nr_pages = cpu_buffer->nr_pages; if (!nr_pages || !full) returntrue;
/* * Add one as dirty will never equal nr_pages, as the sub-buffer * that the writer is on is not counted as dirty. * This is needed if "buffer_percent" is set to 100.
*/
dirty = ring_buffer_nr_dirty_pages(buffer, cpu) + 1;
return (dirty * 100) >= (full * nr_pages);
}
/* * rb_wake_up_waiters - wake up tasks waiting for ring buffer input * * Schedules a delayed work to wake up any task that is blocked on the * ring buffer waiters queue.
*/ staticvoid rb_wake_up_waiters(struct irq_work *work)
{ struct rb_irq_work *rbwork = container_of(work, struct rb_irq_work, work);
/* For waiters waiting for the first wake up */
(void)atomic_fetch_inc_release(&rbwork->seq);
wake_up_all(&rbwork->waiters); if (rbwork->full_waiters_pending || rbwork->wakeup_full) { /* Only cpu_buffer sets the above flags */ struct ring_buffer_per_cpu *cpu_buffer =
container_of(rbwork, struct ring_buffer_per_cpu, irq_work);
/* Called from interrupt context */
raw_spin_lock(&cpu_buffer->reader_lock);
rbwork->wakeup_full = false;
rbwork->full_waiters_pending = false;
/* Waking up all waiters, they will reset the shortest full */
cpu_buffer->shortest_full = 0;
raw_spin_unlock(&cpu_buffer->reader_lock);
wake_up_all(&rbwork->full_waiters);
}
}
/** * ring_buffer_wake_waiters - wake up any waiters on this ring buffer * @buffer: The ring buffer to wake waiters on * @cpu: The CPU buffer to wake waiters on * * In the case of a file that represents a ring buffer is closing, * it is prudent to wake up any waiters that are on this.
*/ void ring_buffer_wake_waiters(struct trace_buffer *buffer, int cpu)
{ struct ring_buffer_per_cpu *cpu_buffer; struct rb_irq_work *rbwork;
if (!buffer) return;
if (cpu == RING_BUFFER_ALL_CPUS) {
/* Wake up individual ones too. One level recursion */
for_each_buffer_cpu(buffer, cpu)
ring_buffer_wake_waiters(buffer, cpu);
rbwork = &buffer->irq_work;
} else { if (WARN_ON_ONCE(!buffer->buffers)) return; if (WARN_ON_ONCE(cpu >= nr_cpu_ids)) return;
cpu_buffer = buffer->buffers[cpu]; /* The CPU buffer may not have been initialized yet */ if (!cpu_buffer) return;
rbwork = &cpu_buffer->irq_work;
}
/* This can be called in any context */
irq_work_queue(&rbwork->work);
}
staticbool rb_watermark_hit(struct trace_buffer *buffer, int cpu, int full)
{ struct ring_buffer_per_cpu *cpu_buffer; bool ret = false;
/* Reads of all CPUs always waits for any data */ if (cpu == RING_BUFFER_ALL_CPUS) return !ring_buffer_empty(buffer);
cpu_buffer = buffer->buffers[cpu];
if (!ring_buffer_empty_cpu(buffer, cpu)) { unsignedlong flags; bool pagebusy;
staticinlinebool
rb_wait_cond(struct rb_irq_work *rbwork, struct trace_buffer *buffer, int cpu, int full, ring_buffer_cond_fn cond, void *data)
{ if (rb_watermark_hit(buffer, cpu, full)) returntrue;
if (cond(data)) returntrue;
/* * The events can happen in critical sections where * checking a work queue can cause deadlocks. * After adding a task to the queue, this flag is set * only to notify events to try to wake up the queue * using irq_work. * * We don't clear it even if the buffer is no longer * empty. The flag only causes the next event to run * irq_work to do the work queue wake up. The worse * that can happen if we race with !trace_empty() is that * an event will cause an irq_work to try to wake up * an empty queue. * * There's no reason to protect this flag either, as * the work queue and irq_work logic will do the necessary * synchronization for the wake ups. The only thing * that is necessary is that the wake up happens after * a task has been queued. It's OK for spurious wake ups.
*/ if (full)
rbwork->full_waiters_pending = true; else
rbwork->waiters_pending = true;
returnfalse;
}
struct rb_wait_data { struct rb_irq_work *irq_work; int seq;
};
/* * The default wait condition for ring_buffer_wait() is to just to exit the * wait loop the first time it is woken up.
*/ staticbool rb_wait_once(void *data)
{ struct rb_wait_data *rdata = data; struct rb_irq_work *rbwork = rdata->irq_work;
/** * ring_buffer_wait - wait for input to the ring buffer * @buffer: buffer to wait on * @cpu: the cpu buffer to wait on * @full: wait until the percentage of pages are available, if @cpu != RING_BUFFER_ALL_CPUS * @cond: condition function to break out of wait (NULL to run once) * @data: the data to pass to @cond. * * If @cpu == RING_BUFFER_ALL_CPUS then the task will wake up as soon * as data is added to any of the @buffer's cpu buffers. Otherwise * it will wait for data to be added to a specific cpu buffer.
*/ int ring_buffer_wait(struct trace_buffer *buffer, int cpu, int full,
ring_buffer_cond_fn cond, void *data)
{ struct ring_buffer_per_cpu *cpu_buffer; struct wait_queue_head *waitq; struct rb_irq_work *rbwork; struct rb_wait_data rdata; int ret = 0;
/* * Depending on what the caller is waiting for, either any * data in any cpu buffer, or a specific buffer, put the * caller on the appropriate wait queue.
*/ if (cpu == RING_BUFFER_ALL_CPUS) {
rbwork = &buffer->irq_work; /* Full only makes sense on per cpu reads */
full = 0;
} else { if (!cpumask_test_cpu(cpu, buffer->cpumask)) return -ENODEV;
cpu_buffer = buffer->buffers[cpu];
rbwork = &cpu_buffer->irq_work;
}
if (full)
waitq = &rbwork->full_waiters; else
waitq = &rbwork->waiters;
/* Set up to exit loop as soon as it is woken */ if (!cond) {
cond = rb_wait_once;
rdata.irq_work = rbwork;
rdata.seq = atomic_read_acquire(&rbwork->seq);
data = &rdata;
}
ret = wait_event_interruptible((*waitq),
rb_wait_cond(rbwork, buffer, cpu, full, cond, data));
return ret;
}
/** * ring_buffer_poll_wait - poll on buffer input * @buffer: buffer to wait on * @cpu: the cpu buffer to wait on * @filp: the file descriptor * @poll_table: The poll descriptor * @full: wait until the percentage of pages are available, if @cpu != RING_BUFFER_ALL_CPUS * * If @cpu == RING_BUFFER_ALL_CPUS then the task will wake up as soon * as data is added to any of the @buffer's cpu buffers. Otherwise * it will wait for data to be added to a specific cpu buffer. * * Returns EPOLLIN | EPOLLRDNORM if data exists in the buffers, * zero otherwise.
*/
__poll_t ring_buffer_poll_wait(struct trace_buffer *buffer, int cpu, struct file *filp, poll_table *poll_table, int full)
{ struct ring_buffer_per_cpu *cpu_buffer; struct rb_irq_work *rbwork;
if (cpu == RING_BUFFER_ALL_CPUS) {
rbwork = &buffer->irq_work;
full = 0;
} else { if (!cpumask_test_cpu(cpu, buffer->cpumask)) return EPOLLERR;
if (full) {
poll_wait(filp, &rbwork->full_waiters, poll_table);
if (rb_watermark_hit(buffer, cpu, full)) return EPOLLIN | EPOLLRDNORM; /* * Only allow full_waiters_pending update to be seen after * the shortest_full is set (in rb_watermark_hit). If the * writer sees the full_waiters_pending flag set, it will * compare the amount in the ring buffer to shortest_full. * If the amount in the ring buffer is greater than the * shortest_full percent, it will call the irq_work handler * to wake up this list. The irq_handler will reset shortest_full * back to zero. That's done under the reader_lock, but * the below smp_mb() makes sure that the update to * full_waiters_pending doesn't leak up into the above.
*/
smp_mb();
rbwork->full_waiters_pending = true; return 0;
}
/* * There's a tight race between setting the waiters_pending and * checking if the ring buffer is empty. Once the waiters_pending bit * is set, the next event will wake the task up, but we can get stuck * if there's only a single event in. * * FIXME: Ideally, we need a memory barrier on the writer side as well, * but adding a memory barrier to all events will cause too much of a * performance hit in the fast path. We only need a memory barrier when * the buffer goes from empty to having content. But as this race is * extremely small, and it's not a problem if another event comes in, we * will fix it later.
*/
smp_mb();
void ring_buffer_normalize_time_stamp(struct trace_buffer *buffer, int cpu, u64 *ts)
{ /* Just stupid testing the normalize function and deltas */
*ts >>= DEBUG_SHIFT;
}
EXPORT_SYMBOL_GPL(ring_buffer_normalize_time_stamp);
/* * Making the ring buffer lockless makes things tricky. * Although writes only happen on the CPU that they are on, * and they only need to worry about interrupts. Reads can * happen on any CPU. * * The reader page is always off the ring buffer, but when the * reader finishes with a page, it needs to swap its page with * a new one from the buffer. The reader needs to take from * the head (writes go to the tail). But if a writer is in overwrite * mode and wraps, it must push the head page forward. * * Here lies the problem. * * The reader must be careful to replace only the head page, and * not another one. As described at the top of the file in the * ASCII art, the reader sets its old page to point to the next * page after head. It then sets the page after head to point to * the old reader page. But if the writer moves the head page * during this operation, the reader could end up with the tail. * * We use cmpxchg to help prevent this race. We also do something * special with the page before head. We set the LSB to 1. * * When the writer must push the page forward, it will clear the * bit that points to the head page, move the head, and then set * the bit that points to the new head page. * * We also don't want an interrupt coming in and moving the head * page on another writer. Thus we use the second LSB to catch * that too. Thus: * * head->list->prev->next bit 1 bit 0 * ------- ------- * Normal page 0 0 * Points to head page 0 1 * New head page 1 0 * * Note we can not trust the prev pointer of the head page, because: * * +----+ +-----+ +-----+ * | |------>| T |---X--->| N | * | |<------| | | | * +----+ +-----+ +-----+ * ^ ^ | * | +-----+ | | * +----------| R |----------+ | * | |<-----------+ * +-----+ * * Key: ---X--> HEAD flag set in pointer * T Tail page * R Reader page * N Next page * * (see __rb_reserve_next() to see where this happens) * * What the above shows is that the reader just swapped out * the reader page with a page in the buffer, but before it * could make the new header point back to the new page added * it was preempted by a writer. The writer moved forward onto * the new page added by the reader and is about to move forward * again. * * You can see, it is legitimate for the previous pointer of * the head (or any page) not to point back to itself. But only * temporarily.
*/
/* * rb_is_head_page - test if the given page is the head page * * Because the reader may move the head_page pointer, we can * not trust what the head page is (it may be pointing to * the reader page). But if the next page is a header page, * its flags will be non zero.
*/ staticinlineint
rb_is_head_page(struct buffer_page *page, struct list_head *list)
{ unsignedlong val;
val = (unsignedlong)list->next;
if ((val & ~RB_FLAG_MASK) != (unsignedlong)&page->list) return RB_PAGE_MOVED;
return val & RB_FLAG_MASK;
}
/* * rb_is_reader_page * * The unique thing about the reader page, is that, if the * writer is ever on it, the previous pointer never points * back to the reader page.
*/ staticbool rb_is_reader_page(struct buffer_page *page)
{ struct list_head *list = page->list.prev;
return rb_list_head(list->next) != &page->list;
}
/* * rb_set_list_to_head - set a list_head to be pointing to head.
*/ staticvoid rb_set_list_to_head(struct list_head *list)
{ unsignedlong *ptr;
if (RB_WARN_ON(cpu_buffer, !cpu_buffer->head_page)) return NULL;
/* sanity check */
list = cpu_buffer->pages; if (RB_WARN_ON(cpu_buffer, rb_list_head(list->prev->next) != list)) return NULL;
page = head = cpu_buffer->head_page; /* * It is possible that the writer moves the header behind * where we started, and we miss in one loop. * A second loop should grab the header, but we'll do * three loops just because I'm paranoid.
*/ for (i = 0; i < 3; i++) { do { if (rb_is_head_page(page, page->list.prev)) {
cpu_buffer->head_page = page; return page;
}
rb_inc_page(&page);
} while (page != head);
}
/* * The tail page now needs to be moved forward. * * We need to reset the tail page, but without messing * with possible erasing of data brought in by interrupts * that have moved the tail page and are currently on it. * * We add a counter to the write field to denote this.
*/
old_write = local_add_return(RB_WRITE_INTCNT, &next_page->write);
old_entries = local_add_return(RB_WRITE_INTCNT, &next_page->entries);
/* * Just make sure we have seen our old_write and synchronize * with any interrupts that come in.
*/
barrier();
/* * If the tail page is still the same as what we think * it is, then it is up to us to update the tail * pointer.
*/ if (tail_page == READ_ONCE(cpu_buffer->tail_page)) { /* Zero the write counter */ unsignedlong val = old_write & ~RB_WRITE_MASK; unsignedlong eval = old_entries & ~RB_WRITE_MASK;
/* * This will only succeed if an interrupt did * not come in and change it. In which case, we * do not want to modify it. * * We add (void) to let the compiler know that we do not care * about the return value of these functions. We use the * cmpxchg to only update if an interrupt did not already * do it for us. If the cmpxchg fails, we don't care.
*/
(void)local_cmpxchg(&next_page->write, old_write, val);
(void)local_cmpxchg(&next_page->entries, old_entries, eval);
/* * No need to worry about races with clearing out the commit. * it only can increment when a commit takes place. But that * only happens in the outer most nested commit.
*/
local_set(&next_page->page->commit, 0);
/* Either we update tail_page or an interrupt does */ if (try_cmpxchg(&cpu_buffer->tail_page, &tail_page, next_page))
local_inc(&cpu_buffer->pages_touched);
}
}
if (RB_WARN_ON(cpu_buffer,
rb_list_head(rb_list_head(list->prev)->next) != list)) returnfalse;
returntrue;
}
/** * rb_check_pages - integrity check of buffer pages * @cpu_buffer: CPU buffer with pages to test * * As a safety measure we check to make sure the data pages have not * been corrupted.
*/ staticvoid rb_check_pages(struct ring_buffer_per_cpu *cpu_buffer)
{ struct list_head *head, *tmp; unsignedlong buffer_cnt; unsignedlong flags; int nr_loops = 0;
/* * Walk the linked list underpinning the ring buffer and validate all * its next and prev links. * * The check acquires the reader_lock to avoid concurrent processing * with code that could be modifying the list. However, the lock cannot * be held for the entire duration of the walk, as this would make the * time when interrupts are disabled non-deterministic, dependent on the * ring buffer size. Therefore, the code releases and re-acquires the * lock after checking each page. The ring_buffer_per_cpu.cnt variable * is then used to detect if the list was modified while the lock was * not held, in which case the check needs to be restarted. * * The code attempts to perform the check at most three times before * giving up. This is acceptable because this is only a self-validation * to detect problems early on. In practice, the list modification * operations are fairly spaced, and so this check typically succeeds at * most on the second try.
*/
again: if (++nr_loops > 3) return;
raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
head = rb_list_head(cpu_buffer->pages); if (!rb_check_links(cpu_buffer, head)) goto out_locked;
buffer_cnt = cpu_buffer->cnt;
tmp = head;
raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
while (true) {
raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
if (buffer_cnt != cpu_buffer->cnt) { /* The list was updated, try again. */
raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); goto again;
}
tmp = rb_list_head(tmp->next); if (tmp == head) /* The iteration circled back, all is done. */ goto out_locked;
if (!rb_check_links(cpu_buffer, tmp)) goto out_locked;
/* * Take an address, add the meta data size as well as the array of * array subbuffer indexes, then align it to a subbuffer size. * * This is used to help find the next per cpu subbuffer within a mapped range.
*/ staticunsignedlong
rb_range_align_subbuf(unsignedlong addr, int subbuf_size, int nr_subbufs)
{
addr += sizeof(struct ring_buffer_cpu_meta) + sizeof(int) * nr_subbufs; return ALIGN(addr, subbuf_size);
}
/* * Return the ring_buffer_meta for a given @cpu.
*/ staticvoid *rb_range_meta(struct trace_buffer *buffer, int nr_pages, int cpu)
{ int subbuf_size = buffer->subbuf_size + BUF_PAGE_HDR_SIZE; struct ring_buffer_cpu_meta *meta; struct ring_buffer_meta *bmeta; unsignedlong ptr; int nr_subbufs;
bmeta = buffer->meta; if (!bmeta) return NULL;
ptr = (unsignedlong)bmeta + bmeta->buffers_offset;
meta = (struct ring_buffer_cpu_meta *)ptr;
/* When nr_pages passed in is zero, the first meta has already been initialized */ if (!nr_pages) {
nr_subbufs = meta->nr_subbufs;
} else { /* Include the reader page */
nr_subbufs = nr_pages + 1;
}
/* * The first chunk may not be subbuffer aligned, where as * the rest of the chunks are.
*/ if (cpu) {
ptr = rb_range_align_subbuf(ptr, subbuf_size, nr_subbufs);
ptr += subbuf_size * nr_subbufs;
/* We can use multiplication to find chunks greater than 1 */ if (cpu > 1) { unsignedlong size; unsignedlong p;
/* Save the beginning of this CPU chunk */
p = ptr;
ptr = rb_range_align_subbuf(ptr, subbuf_size, nr_subbufs);
ptr += subbuf_size * nr_subbufs;
/* Now all chunks after this are the same size */
size = ptr - p;
ptr += size * (cpu - 2);
}
} return (void *)ptr;
}
/* Return the start of subbufs given the meta pointer */ staticvoid *rb_subbufs_from_meta(struct ring_buffer_cpu_meta *meta)
{ int subbuf_size = meta->subbuf_size; unsignedlong ptr;
/* * Return a specific sub-buffer for a given @cpu defined by @idx.
*/ staticvoid *rb_range_buffer(struct ring_buffer_per_cpu *cpu_buffer, int idx)
{ struct ring_buffer_cpu_meta *meta; unsignedlong ptr; int subbuf_size;
meta = rb_range_meta(cpu_buffer->buffer, 0, cpu_buffer->cpu); if (!meta) return NULL;
if (WARN_ON_ONCE(idx >= meta->nr_subbufs)) return NULL;
subbuf_size = meta->subbuf_size;
/* Map this buffer to the order that's in meta->buffers[] */
idx = meta->buffers[idx];
/* * See if the existing memory contains a valid meta section. * if so, use that, otherwise initialize it.
*/ staticbool rb_meta_init(struct trace_buffer *buffer, int scratch_size)
{ unsignedlong ptr = buffer->range_addr_start; struct ring_buffer_meta *bmeta; unsignedlong total_size; int struct_sizes;
/* Zero out the scatch pad */
memset((void *)bmeta + sizeof(*bmeta), 0, bmeta->buffers_offset - sizeof(*bmeta));
returnfalse;
}
/* * See if the existing memory contains valid ring buffer data. * As the previous kernel must be the same as this kernel, all * the calculations (size of buffers and number of buffers) * must be the same.
*/ staticbool rb_cpu_meta_valid(struct ring_buffer_cpu_meta *meta, int cpu, struct trace_buffer *buffer, int nr_pages, unsignedlong *subbuf_mask)
{ int subbuf_size = PAGE_SIZE; struct buffer_data_page *subbuf; unsignedlong buffers_start; unsignedlong buffers_end; int i;
/* Is the head and commit buffers within the range of buffers? */ if (meta->head_buffer < buffers_start ||
meta->head_buffer >= buffers_end) {
pr_info("Ring buffer boot meta [%d] head buffer out of range\n", cpu); returnfalse;
}
if (meta->commit_buffer < buffers_start ||
meta->commit_buffer >= buffers_end) {
pr_info("Ring buffer boot meta [%d] commit buffer out of range\n", cpu); returnfalse;
}
subbuf = rb_subbufs_from_meta(meta);
bitmap_clear(subbuf_mask, 0, meta->nr_subbufs);
/* Is the meta buffers and the subbufs themselves have correct data? */ for (i = 0; i < meta->nr_subbufs; i++) { if (meta->buffers[i] < 0 ||
meta->buffers[i] >= meta->nr_subbufs) {
pr_info("Ring buffer boot meta [%d] array out of range\n", cpu); returnfalse;
}
if ((unsigned)local_read(&subbuf->commit) > subbuf_size) {
pr_info("Ring buffer boot meta [%d] buffer invalid commit\n", cpu); returnfalse;
}
if (test_bit(meta->buffers[i], subbuf_mask)) {
pr_info("Ring buffer boot meta [%d] array has duplicates\n", cpu); returnfalse;
}
/* If the meta data has been validated, now validate the events */ staticvoid rb_meta_validate_events(struct ring_buffer_per_cpu *cpu_buffer)
{ struct ring_buffer_cpu_meta *meta = cpu_buffer->ring_meta; struct buffer_page *head_page, *orig_head; unsignedlong entry_bytes = 0; unsignedlong entries = 0; int ret;
u64 ts; int i;
if (!meta || !meta->head_buffer) return;
/* Do the reader page first */
ret = rb_validate_buffer(cpu_buffer->reader_page->page, cpu_buffer->cpu); if (ret < 0) {
pr_info("Ring buffer reader page is invalid\n"); goto invalid;
}
entries += ret;
entry_bytes += local_read(&cpu_buffer->reader_page->page->commit);
local_set(&cpu_buffer->reader_page->entries, ret);
/* * Try to rewind the head so that we can read the pages which already * read in the previous boot.
*/ if (head_page == cpu_buffer->tail_page) goto skip_rewind;
rb_dec_page(&head_page); for (i = 0; i < meta->nr_subbufs + 1; i++, rb_dec_page(&head_page)) {
/* Rewind until tail (writer) page. */ if (head_page == cpu_buffer->tail_page) break;
/* Ensure the page has older data than head. */ if (ts < head_page->page->time_stamp) break;
ts = head_page->page->time_stamp; /* Ensure the page has correct timestamp and some data. */ if (!ts || rb_page_commit(head_page) == 0) break;
/* Stop rewind if the page is invalid. */
ret = rb_validate_buffer(head_page->page, cpu_buffer->cpu); if (ret < 0) break;
/* Recover the number of entries and update stats. */
local_set(&head_page->entries, ret); if (ret)
local_inc(&cpu_buffer->pages_touched);
entries += ret;
entry_bytes += rb_page_commit(head_page);
} if (i)
pr_info("Ring buffer [%d] rewound %d pages\n", cpu_buffer->cpu, i);
/* The last rewound page must be skipped. */ if (head_page != orig_head)
rb_inc_page(&head_page);
/* * If the ring buffer was rewound, then inject the reader page * into the location just before the original head page.
*/ if (head_page != orig_head) { struct buffer_page *bpage = orig_head;
rb_dec_page(&bpage); /* * Insert the reader_page before the original head page. * Since the list encode RB_PAGE flags, general list * operations should be avoided.
*/
cpu_buffer->reader_page->list.next = &orig_head->list;
cpu_buffer->reader_page->list.prev = orig_head->list.prev;
orig_head->list.prev = &cpu_buffer->reader_page->list;
bpage->list.next = &cpu_buffer->reader_page->list;
/* Make the head_page the reader page */
cpu_buffer->reader_page = head_page;
bpage = head_page;
rb_inc_page(&head_page);
head_page->list.prev = bpage->list.prev;
rb_dec_page(&bpage);
bpage->list.next = &head_page->list;
rb_set_list_to_head(&bpage->list);
cpu_buffer->pages = &head_page->list;
skip_rewind: /* If the commit_buffer is the reader page, update the commit page */ if (meta->commit_buffer == (unsignedlong)cpu_buffer->reader_page->page) {
cpu_buffer->commit_page = cpu_buffer->reader_page; /* Nothing more to do, the only page is the reader page */ goto done;
}
/* Iterate until finding the commit page */ for (i = 0; i < meta->nr_subbufs + 1; i++, rb_inc_page(&head_page)) {
/* Reader page has already been done */ if (head_page == cpu_buffer->reader_page) continue;
ret = rb_validate_buffer(head_page->page, cpu_buffer->cpu); if (ret < 0) {
pr_info("Ring buffer meta [%d] invalid buffer page\n",
cpu_buffer->cpu); goto invalid;
}
/* If the buffer has content, update pages_touched */ if (ret)
local_inc(&cpu_buffer->pages_touched);
if (head_page == cpu_buffer->commit_page) break;
}
if (head_page != cpu_buffer->commit_page) {
pr_info("Ring buffer meta [%d] commit page not found\n",
cpu_buffer->cpu); goto invalid;
}
done:
local_set(&cpu_buffer->entries, entries);
local_set(&cpu_buffer->entries_bytes, entry_bytes);
pr_info("Ring buffer meta [%d] is from previous boot!\n", cpu_buffer->cpu); return;
invalid: /* The content of the buffers are invalid, reset the meta data */
meta->head_buffer = 0;
meta->commit_buffer = 0;
/* Reset the reader page */
local_set(&cpu_buffer->reader_page->entries, 0);
local_set(&cpu_buffer->reader_page->page->commit, 0);
/* Reset all the subbuffers */ for (i = 0; i < meta->nr_subbufs - 1; i++, rb_inc_page(&head_page)) {
local_set(&head_page->entries, 0);
local_set(&head_page->page->commit, 0);
}
}
staticvoid rb_range_meta_init(struct trace_buffer *buffer, int nr_pages, int scratch_size)
{ struct ring_buffer_cpu_meta *meta; unsignedlong *subbuf_mask; unsignedlong delta; void *subbuf; bool valid = false; int cpu; int i;
/* Create a mask to test the subbuf array */
subbuf_mask = bitmap_alloc(nr_pages + 1, GFP_KERNEL); /* If subbuf_mask fails to allocate, then rb_meta_valid() will return false */
if (rb_meta_init(buffer, scratch_size))
valid = true;
for (cpu = 0; cpu < nr_cpu_ids; cpu++) { void *next_meta;
meta = rb_range_meta(buffer, nr_pages, cpu);
if (valid && rb_cpu_meta_valid(meta, cpu, buffer, nr_pages, subbuf_mask)) { /* Make the mappings match the current address */
subbuf = rb_subbufs_from_meta(meta);
delta = (unsignedlong)subbuf - meta->first_buffer;
meta->first_buffer += delta;
meta->head_buffer += delta;
meta->commit_buffer += delta; continue;
}
if (cpu < nr_cpu_ids - 1)
next_meta = rb_range_meta(buffer, nr_pages, cpu + 1); else
next_meta = (void *)buffer->range_addr_end;
/* * The buffers[] array holds the order of the sub-buffers * that are after the meta data. The sub-buffers may * be swapped out when read and inserted into a different * location of the ring buffer. Although their addresses * remain the same, the buffers[] array contains the * index into the sub-buffers holding their actual order.
*/ for (i = 0; i < meta->nr_subbufs; i++) {
meta->buffers[i] = i;
rb_init_page(subbuf);
subbuf += meta->subbuf_size;
}
}
bitmap_free(subbuf_mask);
}
int ring_buffer_meta_seq_init(struct file *file, struct trace_buffer *buffer, int cpu)
{ struct seq_file *m; int ret;
ret = seq_open(file, &rb_meta_seq_ops); if (ret) return ret;
m = file->private_data;
m->private = buffer->buffers[cpu];
return 0;
}
/* Map the buffer_pages to the previous head and commit pages */ staticvoid rb_meta_buffer_update(struct ring_buffer_per_cpu *cpu_buffer, struct buffer_page *bpage)
{ struct ring_buffer_cpu_meta *meta = cpu_buffer->ring_meta;
if (meta->head_buffer == (unsignedlong)bpage->page)
cpu_buffer->head_page = bpage;
/* * Check if the available memory is there first. * Note, si_mem_available() only gives us a rough estimate of available * memory. It may not be accurate. But we don't care, we just want * to prevent doing any allocation when it is obvious that it is * not going to succeed.
*/
i = si_mem_available(); if (i < nr_pages) return -ENOMEM;
/* * __GFP_RETRY_MAYFAIL flag makes sure that the allocation fails * gracefully without invoking oom-killer and the system is not * destabilized.
*/
mflags = GFP_KERNEL | __GFP_RETRY_MAYFAIL;
/* * If a user thread allocates too much, and si_mem_available() * reports there's enough memory, even though there is not. * Make sure the OOM killer kills this thread. This can happen * even with RETRY_MAYFAIL because another task may be doing * an allocation after this task has taken all memory. * This is the task the OOM killer needs to take out during this * loop, even if it was triggered by an allocation somewhere else.
*/ if (user_thread)
set_current_oom_origin();
if (buffer->range_addr_start)
meta = rb_range_meta(buffer, nr_pages, cpu_buffer->cpu);
for (i = 0; i < nr_pages; i++) { struct page *page;
bpage = kzalloc_node(ALIGN(sizeof(*bpage), cache_line_size()),
mflags, cpu_to_node(cpu_buffer->cpu)); if (!bpage) goto free_pages;
rb_check_bpage(cpu_buffer, bpage);
/* * Append the pages as for mapped buffers we want to keep * the order
*/
list_add_tail(&bpage->list, pages);
if (meta) { /* A range was given. Use that for the buffer page */
bpage->page = rb_range_buffer(cpu_buffer, i + 1); if (!bpage->page) goto free_pages; /* If this is valid from a previous boot */ if (meta->head_buffer)
rb_meta_buffer_update(cpu_buffer, bpage);
bpage->range = 1;
bpage->id = i + 1;
} else {
page = alloc_pages_node(cpu_to_node(cpu_buffer->cpu),
mflags | __GFP_COMP | __GFP_ZERO,
cpu_buffer->buffer->subbuf_order); if (!page) goto free_pages;
bpage->page = page_address(page);
rb_init_page(bpage->page);
}
bpage->order = cpu_buffer->buffer->subbuf_order;
if (user_thread && fatal_signal_pending(current)) goto free_pages;
} if (user_thread)
clear_current_oom_origin();
if (__rb_allocate_pages(cpu_buffer, nr_pages, &pages)) return -ENOMEM;
/* * The ring buffer page list is a circular list that does not * start and end with a list head. All page list items point to * other pages.
*/
cpu_buffer->pages = pages.next;
list_del(&pages);
cpu_buffer->nr_pages = nr_pages;
rb_check_pages(cpu_buffer);
return 0;
}
staticstruct ring_buffer_per_cpu *
rb_allocate_cpu_buffer(struct trace_buffer *buffer, long nr_pages, int cpu)
{ struct ring_buffer_per_cpu *cpu_buffer __free(kfree) = NULL; struct ring_buffer_cpu_meta *meta; struct buffer_page *bpage; struct page *page; int ret;
cpu_buffer = kzalloc_node(ALIGN(sizeof(*cpu_buffer), cache_line_size()),
GFP_KERNEL, cpu_to_node(cpu)); if (!cpu_buffer) return NULL;
Die Informationen auf dieser Webseite wurden
nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit,
noch Qualität der bereit gestellten Informationen zugesichert.
Bemerkung:
Die farbliche Syntaxdarstellung und die Messung sind noch experimentell.