Quellcodebibliothek Statistik Leitseite products/sources/formale Sprachen/C/Linux/kernel/trace/   (Open Source Betriebssystem Version 6.17.9©)  Datei vom 24.10.2025 mit Größe 207 kB image not shown  

Quelle  ring_buffer.c   Sprache: C

 
// SPDX-License-Identifier: GPL-2.0
/*
 * Generic ring buffer
 *
 * Copyright (C) 2008 Steven Rostedt <srostedt@redhat.com>
 */

#include <linux/trace_recursion.h>
#include <linux/trace_events.h>
#include <linux/ring_buffer.h>
#include <linux/trace_clock.h>
#include <linux/sched/clock.h>
#include <linux/cacheflush.h>
#include <linux/trace_seq.h>
#include <linux/spinlock.h>
#include <linux/irq_work.h>
#include <linux/security.h>
#include <linux/uaccess.h>
#include <linux/hardirq.h>
#include <linux/kthread.h> /* for self test */
#include <linux/module.h>
#include <linux/percpu.h>
#include <linux/mutex.h>
#include <linux/delay.h>
#include <linux/slab.h>
#include <linux/init.h>
#include <linux/hash.h>
#include <linux/list.h>
#include <linux/cpu.h>
#include <linux/oom.h>
#include <linux/mm.h>

#include <asm/local64.h>
#include <asm/local.h>
#include <asm/setup.h>

#include "trace.h"

/*
 * The "absolute" timestamp in the buffer is only 59 bits.
 * If a clock has the 5 MSBs set, it needs to be saved and
 * reinserted.
 */

#define TS_MSB  (0xf8ULL << 56)
#define ABS_TS_MASK (~TS_MSB)

static void update_pages_handler(struct work_struct *work);

#define RING_BUFFER_META_MAGIC 0xBADFEED

struct ring_buffer_meta {
 int  magic;
 int  struct_sizes;
 unsigned long total_size;
 unsigned long buffers_offset;
};

struct ring_buffer_cpu_meta {
 unsigned long first_buffer;
 unsigned long head_buffer;
 unsigned long commit_buffer;
 __u32  subbuf_size;
 __u32  nr_subbufs;
 int  buffers[];
};

/*
 * The ring buffer header is special. We must manually up keep it.
 */

int ring_buffer_print_entry_header(struct trace_seq *s)
{
 trace_seq_puts(s, "# compressed entry header\n");
 trace_seq_puts(s, "\ttype_len : 5 bits\n");
 trace_seq_puts(s, "\ttime_delta : 27 bits\n");
 trace_seq_puts(s, "\tarray : 32 bits\n");
 trace_seq_putc(s, '\n');
 trace_seq_printf(s, "\tpadding : type == %d\n",
    RINGBUF_TYPE_PADDING);
 trace_seq_printf(s, "\ttime_extend : type == %d\n",
    RINGBUF_TYPE_TIME_EXTEND);
 trace_seq_printf(s, "\ttime_stamp : type == %d\n",
    RINGBUF_TYPE_TIME_STAMP);
 trace_seq_printf(s, "\tdata max type_len == %d\n",
    RINGBUF_TYPE_DATA_TYPE_LEN_MAX);

 return !trace_seq_has_overflowed(s);
}

/*
 * The ring buffer is made up of a list of pages. A separate list of pages is
 * allocated for each CPU. A writer may only write to a buffer that is
 * associated with the CPU it is currently executing on.  A reader may read
 * from any per cpu buffer.
 *
 * The reader is special. For each per cpu buffer, the reader has its own
 * reader page. When a reader has read the entire reader page, this reader
 * page is swapped with another page in the ring buffer.
 *
 * Now, as long as the writer is off the reader page, the reader can do what
 * ever it wants with that page. The writer will never write to that page
 * again (as long as it is out of the ring buffer).
 *
 * Here's some silly ASCII art.
 *
 *   +------+
 *   |reader|          RING BUFFER
 *   |page  |
 *   +------+        +---+   +---+   +---+
 *                   |   |-->|   |-->|   |
 *                   +---+   +---+   +---+
 *                     ^               |
 *                     |               |
 *                     +---------------+
 *
 *
 *   +------+
 *   |reader|          RING BUFFER
 *   |page  |------------------v
 *   +------+        +---+   +---+   +---+
 *                   |   |-->|   |-->|   |
 *                   +---+   +---+   +---+
 *                     ^               |
 *                     |               |
 *                     +---------------+
 *
 *
 *   +------+
 *   |reader|          RING BUFFER
 *   |page  |------------------v
 *   +------+        +---+   +---+   +---+
 *      ^            |   |-->|   |-->|   |
 *      |            +---+   +---+   +---+
 *      |                              |
 *      |                              |
 *      +------------------------------+
 *
 *
 *   +------+
 *   |buffer|          RING BUFFER
 *   |page  |------------------v
 *   +------+        +---+   +---+   +---+
 *      ^            |   |   |   |-->|   |
 *      |   New      +---+   +---+   +---+
 *      |  Reader------^               |
 *      |   page                       |
 *      +------------------------------+
 *
 *
 * After we make this swap, the reader can hand this page off to the splice
 * code and be done with it. It can even allocate a new page if it needs to
 * and swap that into the ring buffer.
 *
 * We will be using cmpxchg soon to make all this lockless.
 *
 */


/* Used for individual buffers (after the counter) */
#define RB_BUFFER_OFF  (1 << 20)

#define BUF_PAGE_HDR_SIZE offsetof(struct buffer_data_page, data)

#define RB_EVNT_HDR_SIZE (offsetof(struct ring_buffer_event, array))
#define RB_ALIGNMENT  4U
#define RB_MAX_SMALL_DATA (RB_ALIGNMENT * RINGBUF_TYPE_DATA_TYPE_LEN_MAX)
#define RB_EVNT_MIN_SIZE 8U /* two 32bit words */

#ifndef CONFIG_HAVE_64BIT_ALIGNED_ACCESS
define RB_FORCE_8BYTE_ALIGNMENT 0
define RB_ARCH_ALIGNMENT  RB_ALIGNMENT
#else
define RB_FORCE_8BYTE_ALIGNMENT 1
define RB_ARCH_ALIGNMENT  8U
#endif

#define RB_ALIGN_DATA  __aligned(RB_ARCH_ALIGNMENT)

/* define RINGBUF_TYPE_DATA for 'case RINGBUF_TYPE_DATA:' */
#define RINGBUF_TYPE_DATA 0 ... RINGBUF_TYPE_DATA_TYPE_LEN_MAX

enum {
 RB_LEN_TIME_EXTEND = 8,
 RB_LEN_TIME_STAMP =  8,
};

#define skip_time_extend(event) \
 ((struct ring_buffer_event *)((char *)event + RB_LEN_TIME_EXTEND))

#define extended_time(event) \
 (event->type_len >= RINGBUF_TYPE_TIME_EXTEND)

static inline bool rb_null_event(struct ring_buffer_event *event)
{
 return event->type_len == RINGBUF_TYPE_PADDING && !event->time_delta;
}

static void rb_event_set_padding(struct ring_buffer_event *event)
{
 /* padding has a NULL time_delta */
 event->type_len = RINGBUF_TYPE_PADDING;
 event->time_delta = 0;
}

static unsigned
rb_event_data_length(struct ring_buffer_event *event)
{
 unsigned length;

 if (event->type_len)
  length = event->type_len * RB_ALIGNMENT;
 else
  length = event->array[0];
 return length + RB_EVNT_HDR_SIZE;
}

/*
 * Return the length of the given event. Will return
 * the length of the time extend if the event is a
 * time extend.
 */

static inline unsigned
rb_event_length(struct ring_buffer_event *event)
{
 switch (event->type_len) {
 case RINGBUF_TYPE_PADDING:
  if (rb_null_event(event))
   /* undefined */
   return -1;
  return  event->array[0] + RB_EVNT_HDR_SIZE;

 case RINGBUF_TYPE_TIME_EXTEND:
  return RB_LEN_TIME_EXTEND;

 case RINGBUF_TYPE_TIME_STAMP:
  return RB_LEN_TIME_STAMP;

 case RINGBUF_TYPE_DATA:
  return rb_event_data_length(event);
 default:
  WARN_ON_ONCE(1);
 }
 /* not hit */
 return 0;
}

/*
 * Return total length of time extend and data,
 *   or just the event length for all other events.
 */

static inline unsigned
rb_event_ts_length(struct ring_buffer_event *event)
{
 unsigned len = 0;

 if (extended_time(event)) {
  /* time extends include the data event after it */
  len = RB_LEN_TIME_EXTEND;
  event = skip_time_extend(event);
 }
 return len + rb_event_length(event);
}

/**
 * ring_buffer_event_length - return the length of the event
 * @event: the event to get the length of
 *
 * Returns the size of the data load of a data event.
 * If the event is something other than a data event, it
 * returns the size of the event itself. With the exception
 * of a TIME EXTEND, where it still returns the size of the
 * data load of the data event after it.
 */

unsigned ring_buffer_event_length(struct ring_buffer_event *event)
{
 unsigned length;

 if (extended_time(event))
  event = skip_time_extend(event);

 length = rb_event_length(event);
 if (event->type_len > RINGBUF_TYPE_DATA_TYPE_LEN_MAX)
  return length;
 length -= RB_EVNT_HDR_SIZE;
 if (length > RB_MAX_SMALL_DATA + sizeof(event->array[0]))
                length -= sizeof(event->array[0]);
 return length;
}
EXPORT_SYMBOL_GPL(ring_buffer_event_length);

/* inline for ring buffer fast paths */
static __always_inline void *
rb_event_data(struct ring_buffer_event *event)
{
 if (extended_time(event))
  event = skip_time_extend(event);
 WARN_ON_ONCE(event->type_len > RINGBUF_TYPE_DATA_TYPE_LEN_MAX);
 /* If length is in len field, then array[0] has the data */
 if (event->type_len)
  return (void *)&event->array[0];
 /* Otherwise length is in array[0] and array[1] has the data */
 return (void *)&event->array[1];
}

/**
 * ring_buffer_event_data - return the data of the event
 * @event: the event to get the data from
 */

void *ring_buffer_event_data(struct ring_buffer_event *event)
{
 return rb_event_data(event);
}
EXPORT_SYMBOL_GPL(ring_buffer_event_data);

#define for_each_buffer_cpu(buffer, cpu)  \
 for_each_cpu(cpu, buffer->cpumask)

#define for_each_online_buffer_cpu(buffer, cpu)  \
 for_each_cpu_and(cpu, buffer->cpumask, cpu_online_mask)

#define TS_SHIFT 27
#define TS_MASK  ((1ULL << TS_SHIFT) - 1)
#define TS_DELTA_TEST (~TS_MASK)

static u64 rb_event_time_stamp(struct ring_buffer_event *event)
{
 u64 ts;

 ts = event->array[0];
 ts <<= TS_SHIFT;
 ts += event->time_delta;

 return ts;
}

/* Flag when events were overwritten */
#define RB_MISSED_EVENTS (1 << 31)
/* Missed count stored at end */
#define RB_MISSED_STORED (1 << 30)

#define RB_MISSED_MASK  (3 << 30)

struct buffer_data_page {
 u64   time_stamp; /* page time stamp */
 local_t   commit; /* write committed index */
 unsigned char  data[] RB_ALIGN_DATA; /* data of buffer page */
};

struct buffer_data_read_page {
 unsigned  order; /* order of the page */
 struct buffer_data_page *data; /* actual data, stored in this page */
};

/*
 * Note, the buffer_page list must be first. The buffer pages
 * are allocated in cache lines, which means that each buffer
 * page will be at the beginning of a cache line, and thus
 * the least significant bits will be zero. We use this to
 * add flags in the list struct pointers, to make the ring buffer
 * lockless.
 */

struct buffer_page {
 struct list_head list;  /* list of buffer pages */
 local_t   write;  /* index for next write */
 unsigned  read;  /* index for next read */
 local_t   entries; /* entries on this page */
 unsigned long  real_end; /* real end of data */
 unsigned  order;  /* order of the page */
 u32   id:30;  /* ID for external mapping */
 u32   range:1; /* Mapped via a range */
 struct buffer_data_page *page; /* Actual data page */
};

/*
 * The buffer page counters, write and entries, must be reset
 * atomically when crossing page boundaries. To synchronize this
 * update, two counters are inserted into the number. One is
 * the actual counter for the write position or count on the page.
 *
 * The other is a counter of updaters. Before an update happens
 * the update partition of the counter is incremented. This will
 * allow the updater to update the counter atomically.
 *
 * The counter is 20 bits, and the state data is 12.
 */

#define RB_WRITE_MASK  0xfffff
#define RB_WRITE_INTCNT  (1 << 20)

static void rb_init_page(struct buffer_data_page *bpage)
{
 local_set(&bpage->commit, 0);
}

static __always_inline unsigned int rb_page_commit(struct buffer_page *bpage)
{
 return local_read(&bpage->page->commit);
}

static void free_buffer_page(struct buffer_page *bpage)
{
 /* Range pages are not to be freed */
 if (!bpage->range)
  free_pages((unsigned long)bpage->page, bpage->order);
 kfree(bpage);
}

/*
 * We need to fit the time_stamp delta into 27 bits.
 */

static inline bool test_time_stamp(u64 delta)
{
 return !!(delta & TS_DELTA_TEST);
}

struct rb_irq_work {
 struct irq_work   work;
 wait_queue_head_t  waiters;
 wait_queue_head_t  full_waiters;
 atomic_t   seq;
 bool    waiters_pending;
 bool    full_waiters_pending;
 bool    wakeup_full;
};

/*
 * Structure to hold event state and handle nested events.
 */

struct rb_event_info {
 u64   ts;
 u64   delta;
 u64   before;
 u64   after;
 unsigned long  length;
 struct buffer_page *tail_page;
 int   add_timestamp;
};

/*
 * Used for the add_timestamp
 *  NONE
 *  EXTEND - wants a time extend
 *  ABSOLUTE - the buffer requests all events to have absolute time stamps
 *  FORCE - force a full time stamp.
 */

enum {
 RB_ADD_STAMP_NONE  = 0,
 RB_ADD_STAMP_EXTEND  = BIT(1),
 RB_ADD_STAMP_ABSOLUTE  = BIT(2),
 RB_ADD_STAMP_FORCE  = BIT(3)
};
/*
 * Used for which event context the event is in.
 *  TRANSITION = 0
 *  NMI     = 1
 *  IRQ     = 2
 *  SOFTIRQ = 3
 *  NORMAL  = 4
 *
 * See trace_recursive_lock() comment below for more details.
 */

enum {
 RB_CTX_TRANSITION,
 RB_CTX_NMI,
 RB_CTX_IRQ,
 RB_CTX_SOFTIRQ,
 RB_CTX_NORMAL,
 RB_CTX_MAX
};

struct rb_time_struct {
 local64_t time;
};
typedef struct rb_time_struct rb_time_t;

#define MAX_NEST 5

/*
 * head_page == tail_page && head == tail then buffer is empty.
 */

struct ring_buffer_per_cpu {
 int    cpu;
 atomic_t   record_disabled;
 atomic_t   resize_disabled;
 struct trace_buffer *buffer;
 raw_spinlock_t   reader_lock; /* serialize readers */
 arch_spinlock_t   lock;
 struct lock_class_key  lock_key;
 struct buffer_data_page  *free_page;
 unsigned long   nr_pages;
 unsigned int   current_context;
 struct list_head  *pages;
 /* pages generation counter, incremented when the list changes */
 unsigned long   cnt;
 struct buffer_page  *head_page; /* read from head */
 struct buffer_page  *tail_page; /* write to tail */
 struct buffer_page  *commit_page; /* committed pages */
 struct buffer_page  *reader_page;
 unsigned long   lost_events;
 unsigned long   last_overrun;
 unsigned long   nest;
 local_t    entries_bytes;
 local_t    entries;
 local_t    overrun;
 local_t    commit_overrun;
 local_t    dropped_events;
 local_t    committing;
 local_t    commits;
 local_t    pages_touched;
 local_t    pages_lost;
 local_t    pages_read;
 long    last_pages_touch;
 size_t    shortest_full;
 unsigned long   read;
 unsigned long   read_bytes;
 rb_time_t   write_stamp;
 rb_time_t   before_stamp;
 u64    event_stamp[MAX_NEST];
 u64    read_stamp;
 /* pages removed since last reset */
 unsigned long   pages_removed;

 unsigned int   mapped;
 unsigned int   user_mapped; /* user space mapping */
 struct mutex   mapping_lock;
 unsigned long   *subbuf_ids; /* ID to subbuf VA */
 struct trace_buffer_meta *meta_page;
 struct ring_buffer_cpu_meta *ring_meta;

 /* ring buffer pages to update, > 0 to add, < 0 to remove */
 long    nr_pages_to_update;
 struct list_head  new_pages; /* new pages to add */
 struct work_struct  update_pages_work;
 struct completion  update_done;

 struct rb_irq_work  irq_work;
};

struct trace_buffer {
 unsigned   flags;
 int    cpus;
 atomic_t   record_disabled;
 atomic_t   resizing;
 cpumask_var_t   cpumask;

 struct lock_class_key  *reader_lock_key;

 struct mutex   mutex;

 struct ring_buffer_per_cpu **buffers;

 struct hlist_node  node;
 u64    (*clock)(void);

 struct rb_irq_work  irq_work;
 bool    time_stamp_abs;

 unsigned long   range_addr_start;
 unsigned long   range_addr_end;

 struct ring_buffer_meta  *meta;

 unsigned int   subbuf_size;
 unsigned int   subbuf_order;
 unsigned int   max_data_size;
};

struct ring_buffer_iter {
 struct ring_buffer_per_cpu *cpu_buffer;
 unsigned long   head;
 unsigned long   next_event;
 struct buffer_page  *head_page;
 struct buffer_page  *cache_reader_page;
 unsigned long   cache_read;
 unsigned long   cache_pages_removed;
 u64    read_stamp;
 u64    page_stamp;
 struct ring_buffer_event *event;
 size_t    event_size;
 int    missed_events;
};

int ring_buffer_print_page_header(struct trace_buffer *buffer, struct trace_seq *s)
{
 struct buffer_data_page field;

 trace_seq_printf(s, "\tfield: u64 timestamp;\t"
    "offset:0;\tsize:%u;\tsigned:%u;\n",
    (unsigned int)sizeof(field.time_stamp),
    (unsigned int)is_signed_type(u64));

 trace_seq_printf(s, "\tfield: local_t commit;\t"
    "offset:%u;\tsize:%u;\tsigned:%u;\n",
    (unsigned int)offsetof(typeof(field), commit),
    (unsigned int)sizeof(field.commit),
    (unsigned int)is_signed_type(long));

 trace_seq_printf(s, "\tfield: int overwrite;\t"
    "offset:%u;\tsize:%u;\tsigned:%u;\n",
    (unsigned int)offsetof(typeof(field), commit),
    1,
    (unsigned int)is_signed_type(long));

 trace_seq_printf(s, "\tfield: char data;\t"
    "offset:%u;\tsize:%u;\tsigned:%u;\n",
    (unsigned int)offsetof(typeof(field), data),
    (unsigned int)buffer->subbuf_size,
    (unsigned int)is_signed_type(char));

 return !trace_seq_has_overflowed(s);
}

static inline void rb_time_read(rb_time_t *t, u64 *ret)
{
 *ret = local64_read(&t->time);
}
static void rb_time_set(rb_time_t *t, u64 val)
{
 local64_set(&t->time, val);
}

/*
 * Enable this to make sure that the event passed to
 * ring_buffer_event_time_stamp() is not committed and also
 * is on the buffer that it passed in.
 */

//#define RB_VERIFY_EVENT
#ifdef RB_VERIFY_EVENT
static struct list_head *rb_list_head(struct list_head *list);
static void verify_event(struct ring_buffer_per_cpu *cpu_buffer,
    void *event)
{
 struct buffer_page *page = cpu_buffer->commit_page;
 struct buffer_page *tail_page = READ_ONCE(cpu_buffer->tail_page);
 struct list_head *next;
 long commit, write;
 unsigned long addr = (unsigned long)event;
 bool done = false;
 int stop = 0;

 /* Make sure the event exists and is not committed yet */
 do {
  if (page == tail_page || WARN_ON_ONCE(stop++ > 100))
   done = true;
  commit = local_read(&page->page->commit);
  write = local_read(&page->write);
  if (addr >= (unsigned long)&page->page->data[commit] &&
      addr < (unsigned long)&page->page->data[write])
   return;

  next = rb_list_head(page->list.next);
  page = list_entry(next, struct buffer_page, list);
 } while (!done);
 WARN_ON_ONCE(1);
}
#else
static inline void verify_event(struct ring_buffer_per_cpu *cpu_buffer,
    void *event)
{
}
#endif

/*
 * The absolute time stamp drops the 5 MSBs and some clocks may
 * require them. The rb_fix_abs_ts() will take a previous full
 * time stamp, and add the 5 MSB of that time stamp on to the
 * saved absolute time stamp. Then they are compared in case of
 * the unlikely event that the latest time stamp incremented
 * the 5 MSB.
 */

static inline u64 rb_fix_abs_ts(u64 abs, u64 save_ts)
{
 if (save_ts & TS_MSB) {
  abs |= save_ts & TS_MSB;
  /* Check for overflow */
  if (unlikely(abs < save_ts))
   abs += 1ULL << 59;
 }
 return abs;
}

static inline u64 rb_time_stamp(struct trace_buffer *buffer);

/**
 * ring_buffer_event_time_stamp - return the event's current time stamp
 * @buffer: The buffer that the event is on
 * @event: the event to get the time stamp of
 *
 * Note, this must be called after @event is reserved, and before it is
 * committed to the ring buffer. And must be called from the same
 * context where the event was reserved (normal, softirq, irq, etc).
 *
 * Returns the time stamp associated with the current event.
 * If the event has an extended time stamp, then that is used as
 * the time stamp to return.
 * In the highly unlikely case that the event was nested more than
 * the max nesting, then the write_stamp of the buffer is returned,
 * otherwise  current time is returned, but that really neither of
 * the last two cases should ever happen.
 */

u64 ring_buffer_event_time_stamp(struct trace_buffer *buffer,
     struct ring_buffer_event *event)
{
 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[smp_processor_id()];
 unsigned int nest;
 u64 ts;

 /* If the event includes an absolute time, then just use that */
 if (event->type_len == RINGBUF_TYPE_TIME_STAMP) {
  ts = rb_event_time_stamp(event);
  return rb_fix_abs_ts(ts, cpu_buffer->tail_page->page->time_stamp);
 }

 nest = local_read(&cpu_buffer->committing);
 verify_event(cpu_buffer, event);
 if (WARN_ON_ONCE(!nest))
  goto fail;

 /* Read the current saved nesting level time stamp */
 if (likely(--nest < MAX_NEST))
  return cpu_buffer->event_stamp[nest];

 /* Shouldn't happen, warn if it does */
 WARN_ONCE(1, "nest (%d) greater than max", nest);

 fail:
 rb_time_read(&cpu_buffer->write_stamp, &ts);

 return ts;
}

/**
 * ring_buffer_nr_dirty_pages - get the number of used pages in the ring buffer
 * @buffer: The ring_buffer to get the number of pages from
 * @cpu: The cpu of the ring_buffer to get the number of pages from
 *
 * Returns the number of pages that have content in the ring buffer.
 */

size_t ring_buffer_nr_dirty_pages(struct trace_buffer *buffer, int cpu)
{
 size_t read;
 size_t lost;
 size_t cnt;

 read = local_read(&buffer->buffers[cpu]->pages_read);
 lost = local_read(&buffer->buffers[cpu]->pages_lost);
 cnt = local_read(&buffer->buffers[cpu]->pages_touched);

 if (WARN_ON_ONCE(cnt < lost))
  return 0;

 cnt -= lost;

 /* The reader can read an empty page, but not more than that */
 if (cnt < read) {
  WARN_ON_ONCE(read > cnt + 1);
  return 0;
 }

 return cnt - read;
}

static __always_inline bool full_hit(struct trace_buffer *buffer, int cpu, int full)
{
 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu];
 size_t nr_pages;
 size_t dirty;

 nr_pages = cpu_buffer->nr_pages;
 if (!nr_pages || !full)
  return true;

 /*
 * Add one as dirty will never equal nr_pages, as the sub-buffer
 * that the writer is on is not counted as dirty.
 * This is needed if "buffer_percent" is set to 100.
 */

 dirty = ring_buffer_nr_dirty_pages(buffer, cpu) + 1;

 return (dirty * 100) >= (full * nr_pages);
}

/*
 * rb_wake_up_waiters - wake up tasks waiting for ring buffer input
 *
 * Schedules a delayed work to wake up any task that is blocked on the
 * ring buffer waiters queue.
 */

static void rb_wake_up_waiters(struct irq_work *work)
{
 struct rb_irq_work *rbwork = container_of(work, struct rb_irq_work, work);

 /* For waiters waiting for the first wake up */
 (void)atomic_fetch_inc_release(&rbwork->seq);

 wake_up_all(&rbwork->waiters);
 if (rbwork->full_waiters_pending || rbwork->wakeup_full) {
  /* Only cpu_buffer sets the above flags */
  struct ring_buffer_per_cpu *cpu_buffer =
   container_of(rbwork, struct ring_buffer_per_cpu, irq_work);

  /* Called from interrupt context */
  raw_spin_lock(&cpu_buffer->reader_lock);
  rbwork->wakeup_full = false;
  rbwork->full_waiters_pending = false;

  /* Waking up all waiters, they will reset the shortest full */
  cpu_buffer->shortest_full = 0;
  raw_spin_unlock(&cpu_buffer->reader_lock);

  wake_up_all(&rbwork->full_waiters);
 }
}

/**
 * ring_buffer_wake_waiters - wake up any waiters on this ring buffer
 * @buffer: The ring buffer to wake waiters on
 * @cpu: The CPU buffer to wake waiters on
 *
 * In the case of a file that represents a ring buffer is closing,
 * it is prudent to wake up any waiters that are on this.
 */

void ring_buffer_wake_waiters(struct trace_buffer *buffer, int cpu)
{
 struct ring_buffer_per_cpu *cpu_buffer;
 struct rb_irq_work *rbwork;

 if (!buffer)
  return;

 if (cpu == RING_BUFFER_ALL_CPUS) {

  /* Wake up individual ones too. One level recursion */
  for_each_buffer_cpu(buffer, cpu)
   ring_buffer_wake_waiters(buffer, cpu);

  rbwork = &buffer->irq_work;
 } else {
  if (WARN_ON_ONCE(!buffer->buffers))
   return;
  if (WARN_ON_ONCE(cpu >= nr_cpu_ids))
   return;

  cpu_buffer = buffer->buffers[cpu];
  /* The CPU buffer may not have been initialized yet */
  if (!cpu_buffer)
   return;
  rbwork = &cpu_buffer->irq_work;
 }

 /* This can be called in any context */
 irq_work_queue(&rbwork->work);
}

static bool rb_watermark_hit(struct trace_buffer *buffer, int cpu, int full)
{
 struct ring_buffer_per_cpu *cpu_buffer;
 bool ret = false;

 /* Reads of all CPUs always waits for any data */
 if (cpu == RING_BUFFER_ALL_CPUS)
  return !ring_buffer_empty(buffer);

 cpu_buffer = buffer->buffers[cpu];

 if (!ring_buffer_empty_cpu(buffer, cpu)) {
  unsigned long flags;
  bool pagebusy;

  if (!full)
   return true;

  raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
  pagebusy = cpu_buffer->reader_page == cpu_buffer->commit_page;
  ret = !pagebusy && full_hit(buffer, cpu, full);

  if (!ret && (!cpu_buffer->shortest_full ||
        cpu_buffer->shortest_full > full)) {
      cpu_buffer->shortest_full = full;
  }
  raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
 }
 return ret;
}

static inline bool
rb_wait_cond(struct rb_irq_work *rbwork, struct trace_buffer *buffer,
      int cpu, int full, ring_buffer_cond_fn cond, void *data)
{
 if (rb_watermark_hit(buffer, cpu, full))
  return true;

 if (cond(data))
  return true;

 /*
 * The events can happen in critical sections where
 * checking a work queue can cause deadlocks.
 * After adding a task to the queue, this flag is set
 * only to notify events to try to wake up the queue
 * using irq_work.
 *
 * We don't clear it even if the buffer is no longer
 * empty. The flag only causes the next event to run
 * irq_work to do the work queue wake up. The worse
 * that can happen if we race with !trace_empty() is that
 * an event will cause an irq_work to try to wake up
 * an empty queue.
 *
 * There's no reason to protect this flag either, as
 * the work queue and irq_work logic will do the necessary
 * synchronization for the wake ups. The only thing
 * that is necessary is that the wake up happens after
 * a task has been queued. It's OK for spurious wake ups.
 */

 if (full)
  rbwork->full_waiters_pending = true;
 else
  rbwork->waiters_pending = true;

 return false;
}

struct rb_wait_data {
 struct rb_irq_work  *irq_work;
 int    seq;
};

/*
 * The default wait condition for ring_buffer_wait() is to just to exit the
 * wait loop the first time it is woken up.
 */

static bool rb_wait_once(void *data)
{
 struct rb_wait_data *rdata = data;
 struct rb_irq_work *rbwork = rdata->irq_work;

 return atomic_read_acquire(&rbwork->seq) != rdata->seq;
}

/**
 * ring_buffer_wait - wait for input to the ring buffer
 * @buffer: buffer to wait on
 * @cpu: the cpu buffer to wait on
 * @full: wait until the percentage of pages are available, if @cpu != RING_BUFFER_ALL_CPUS
 * @cond: condition function to break out of wait (NULL to run once)
 * @data: the data to pass to @cond.
 *
 * If @cpu == RING_BUFFER_ALL_CPUS then the task will wake up as soon
 * as data is added to any of the @buffer's cpu buffers. Otherwise
 * it will wait for data to be added to a specific cpu buffer.
 */

int ring_buffer_wait(struct trace_buffer *buffer, int cpu, int full,
       ring_buffer_cond_fn cond, void *data)
{
 struct ring_buffer_per_cpu *cpu_buffer;
 struct wait_queue_head *waitq;
 struct rb_irq_work *rbwork;
 struct rb_wait_data rdata;
 int ret = 0;

 /*
 * Depending on what the caller is waiting for, either any
 * data in any cpu buffer, or a specific buffer, put the
 * caller on the appropriate wait queue.
 */

 if (cpu == RING_BUFFER_ALL_CPUS) {
  rbwork = &buffer->irq_work;
  /* Full only makes sense on per cpu reads */
  full = 0;
 } else {
  if (!cpumask_test_cpu(cpu, buffer->cpumask))
   return -ENODEV;
  cpu_buffer = buffer->buffers[cpu];
  rbwork = &cpu_buffer->irq_work;
 }

 if (full)
  waitq = &rbwork->full_waiters;
 else
  waitq = &rbwork->waiters;

 /* Set up to exit loop as soon as it is woken */
 if (!cond) {
  cond = rb_wait_once;
  rdata.irq_work = rbwork;
  rdata.seq = atomic_read_acquire(&rbwork->seq);
  data = &rdata;
 }

 ret = wait_event_interruptible((*waitq),
    rb_wait_cond(rbwork, buffer, cpu, full, cond, data));

 return ret;
}

/**
 * ring_buffer_poll_wait - poll on buffer input
 * @buffer: buffer to wait on
 * @cpu: the cpu buffer to wait on
 * @filp: the file descriptor
 * @poll_table: The poll descriptor
 * @full: wait until the percentage of pages are available, if @cpu != RING_BUFFER_ALL_CPUS
 *
 * If @cpu == RING_BUFFER_ALL_CPUS then the task will wake up as soon
 * as data is added to any of the @buffer's cpu buffers. Otherwise
 * it will wait for data to be added to a specific cpu buffer.
 *
 * Returns EPOLLIN | EPOLLRDNORM if data exists in the buffers,
 * zero otherwise.
 */

__poll_t ring_buffer_poll_wait(struct trace_buffer *buffer, int cpu,
     struct file *filp, poll_table *poll_table, int full)
{
 struct ring_buffer_per_cpu *cpu_buffer;
 struct rb_irq_work *rbwork;

 if (cpu == RING_BUFFER_ALL_CPUS) {
  rbwork = &buffer->irq_work;
  full = 0;
 } else {
  if (!cpumask_test_cpu(cpu, buffer->cpumask))
   return EPOLLERR;

  cpu_buffer = buffer->buffers[cpu];
  rbwork = &cpu_buffer->irq_work;
 }

 if (full) {
  poll_wait(filp, &rbwork->full_waiters, poll_table);

  if (rb_watermark_hit(buffer, cpu, full))
   return EPOLLIN | EPOLLRDNORM;
  /*
 * Only allow full_waiters_pending update to be seen after
 * the shortest_full is set (in rb_watermark_hit). If the
 * writer sees the full_waiters_pending flag set, it will
 * compare the amount in the ring buffer to shortest_full.
 * If the amount in the ring buffer is greater than the
 * shortest_full percent, it will call the irq_work handler
 * to wake up this list. The irq_handler will reset shortest_full
 * back to zero. That's done under the reader_lock, but
 * the below smp_mb() makes sure that the update to
 * full_waiters_pending doesn't leak up into the above.
 */

  smp_mb();
  rbwork->full_waiters_pending = true;
  return 0;
 }

 poll_wait(filp, &rbwork->waiters, poll_table);
 rbwork->waiters_pending = true;

 /*
 * There's a tight race between setting the waiters_pending and
 * checking if the ring buffer is empty.  Once the waiters_pending bit
 * is set, the next event will wake the task up, but we can get stuck
 * if there's only a single event in.
 *
 * FIXME: Ideally, we need a memory barrier on the writer side as well,
 * but adding a memory barrier to all events will cause too much of a
 * performance hit in the fast path.  We only need a memory barrier when
 * the buffer goes from empty to having content.  But as this race is
 * extremely small, and it's not a problem if another event comes in, we
 * will fix it later.
 */

 smp_mb();

 if ((cpu == RING_BUFFER_ALL_CPUS && !ring_buffer_empty(buffer)) ||
     (cpu != RING_BUFFER_ALL_CPUS && !ring_buffer_empty_cpu(buffer, cpu)))
  return EPOLLIN | EPOLLRDNORM;
 return 0;
}

/* buffer may be either ring_buffer or ring_buffer_per_cpu */
#define RB_WARN_ON(b, cond)      \
 ({        \
  int _____ret = unlikely(cond);    \
  if (_____ret) {      \
   if (__same_type(*(b), struct ring_buffer_per_cpu)) { \
    struct ring_buffer_per_cpu *__b = \
     (void *)b;   \
    atomic_inc(&__b->buffer->record_disabled); \
   } else      \
    atomic_inc(&b->record_disabled); \
   WARN_ON(1);     \
  }       \
  _____ret;      \
 })

/* Up this if you want to test the TIME_EXTENTS and normalization */
#define DEBUG_SHIFT 0

static inline u64 rb_time_stamp(struct trace_buffer *buffer)
{
 u64 ts;

 /* Skip retpolines :-( */
 if (IS_ENABLED(CONFIG_MITIGATION_RETPOLINE) && likely(buffer->clock == trace_clock_local))
  ts = trace_clock_local();
 else
  ts = buffer->clock();

 /* shift to debug/test normalization and TIME_EXTENTS */
 return ts << DEBUG_SHIFT;
}

u64 ring_buffer_time_stamp(struct trace_buffer *buffer)
{
 u64 time;

 preempt_disable_notrace();
 time = rb_time_stamp(buffer);
 preempt_enable_notrace();

 return time;
}
EXPORT_SYMBOL_GPL(ring_buffer_time_stamp);

void ring_buffer_normalize_time_stamp(struct trace_buffer *buffer,
          int cpu, u64 *ts)
{
 /* Just stupid testing the normalize function and deltas */
 *ts >>= DEBUG_SHIFT;
}
EXPORT_SYMBOL_GPL(ring_buffer_normalize_time_stamp);

/*
 * Making the ring buffer lockless makes things tricky.
 * Although writes only happen on the CPU that they are on,
 * and they only need to worry about interrupts. Reads can
 * happen on any CPU.
 *
 * The reader page is always off the ring buffer, but when the
 * reader finishes with a page, it needs to swap its page with
 * a new one from the buffer. The reader needs to take from
 * the head (writes go to the tail). But if a writer is in overwrite
 * mode and wraps, it must push the head page forward.
 *
 * Here lies the problem.
 *
 * The reader must be careful to replace only the head page, and
 * not another one. As described at the top of the file in the
 * ASCII art, the reader sets its old page to point to the next
 * page after head. It then sets the page after head to point to
 * the old reader page. But if the writer moves the head page
 * during this operation, the reader could end up with the tail.
 *
 * We use cmpxchg to help prevent this race. We also do something
 * special with the page before head. We set the LSB to 1.
 *
 * When the writer must push the page forward, it will clear the
 * bit that points to the head page, move the head, and then set
 * the bit that points to the new head page.
 *
 * We also don't want an interrupt coming in and moving the head
 * page on another writer. Thus we use the second LSB to catch
 * that too. Thus:
 *
 * head->list->prev->next        bit 1          bit 0
 *                              -------        -------
 * Normal page                     0              0
 * Points to head page             0              1
 * New head page                   1              0
 *
 * Note we can not trust the prev pointer of the head page, because:
 *
 * +----+       +-----+        +-----+
 * |    |------>|  T  |---X--->|  N  |
 * |    |<------|     |        |     |
 * +----+       +-----+        +-----+
 *   ^                           ^ |
 *   |          +-----+          | |
 *   +----------|  R  |----------+ |
 *              |     |<-----------+
 *              +-----+
 *
 * Key:  ---X-->  HEAD flag set in pointer
 *         T      Tail page
 *         R      Reader page
 *         N      Next page
 *
 * (see __rb_reserve_next() to see where this happens)
 *
 *  What the above shows is that the reader just swapped out
 *  the reader page with a page in the buffer, but before it
 *  could make the new header point back to the new page added
 *  it was preempted by a writer. The writer moved forward onto
 *  the new page added by the reader and is about to move forward
 *  again.
 *
 *  You can see, it is legitimate for the previous pointer of
 *  the head (or any page) not to point back to itself. But only
 *  temporarily.
 */


#define RB_PAGE_NORMAL  0UL
#define RB_PAGE_HEAD  1UL
#define RB_PAGE_UPDATE  2UL


#define RB_FLAG_MASK  3UL

/* PAGE_MOVED is not part of the mask */
#define RB_PAGE_MOVED  4UL

/*
 * rb_list_head - remove any bit
 */

static struct list_head *rb_list_head(struct list_head *list)
{
 unsigned long val = (unsigned long)list;

 return (struct list_head *)(val & ~RB_FLAG_MASK);
}

/*
 * rb_is_head_page - test if the given page is the head page
 *
 * Because the reader may move the head_page pointer, we can
 * not trust what the head page is (it may be pointing to
 * the reader page). But if the next page is a header page,
 * its flags will be non zero.
 */

static inline int
rb_is_head_page(struct buffer_page *page, struct list_head *list)
{
 unsigned long val;

 val = (unsigned long)list->next;

 if ((val & ~RB_FLAG_MASK) != (unsigned long)&page->list)
  return RB_PAGE_MOVED;

 return val & RB_FLAG_MASK;
}

/*
 * rb_is_reader_page
 *
 * The unique thing about the reader page, is that, if the
 * writer is ever on it, the previous pointer never points
 * back to the reader page.
 */

static bool rb_is_reader_page(struct buffer_page *page)
{
 struct list_head *list = page->list.prev;

 return rb_list_head(list->next) != &page->list;
}

/*
 * rb_set_list_to_head - set a list_head to be pointing to head.
 */

static void rb_set_list_to_head(struct list_head *list)
{
 unsigned long *ptr;

 ptr = (unsigned long *)&list->next;
 *ptr |= RB_PAGE_HEAD;
 *ptr &= ~RB_PAGE_UPDATE;
}

/*
 * rb_head_page_activate - sets up head page
 */

static void rb_head_page_activate(struct ring_buffer_per_cpu *cpu_buffer)
{
 struct buffer_page *head;

 head = cpu_buffer->head_page;
 if (!head)
  return;

 /*
 * Set the previous list pointer to have the HEAD flag.
 */

 rb_set_list_to_head(head->list.prev);

 if (cpu_buffer->ring_meta) {
  struct ring_buffer_cpu_meta *meta = cpu_buffer->ring_meta;
  meta->head_buffer = (unsigned long)head->page;
 }
}

static void rb_list_head_clear(struct list_head *list)
{
 unsigned long *ptr = (unsigned long *)&list->next;

 *ptr &= ~RB_FLAG_MASK;
}

/*
 * rb_head_page_deactivate - clears head page ptr (for free list)
 */

static void
rb_head_page_deactivate(struct ring_buffer_per_cpu *cpu_buffer)
{
 struct list_head *hd;

 /* Go through the whole list and clear any pointers found. */
 rb_list_head_clear(cpu_buffer->pages);

 list_for_each(hd, cpu_buffer->pages)
  rb_list_head_clear(hd);
}

static int rb_head_page_set(struct ring_buffer_per_cpu *cpu_buffer,
       struct buffer_page *head,
       struct buffer_page *prev,
       int old_flag, int new_flag)
{
 struct list_head *list;
 unsigned long val = (unsigned long)&head->list;
 unsigned long ret;

 list = &prev->list;

 val &= ~RB_FLAG_MASK;

 ret = cmpxchg((unsigned long *)&list->next,
        val | old_flag, val | new_flag);

 /* check if the reader took the page */
 if ((ret & ~RB_FLAG_MASK) != val)
  return RB_PAGE_MOVED;

 return ret & RB_FLAG_MASK;
}

static int rb_head_page_set_update(struct ring_buffer_per_cpu *cpu_buffer,
       struct buffer_page *head,
       struct buffer_page *prev,
       int old_flag)
{
 return rb_head_page_set(cpu_buffer, head, prev,
    old_flag, RB_PAGE_UPDATE);
}

static int rb_head_page_set_head(struct ring_buffer_per_cpu *cpu_buffer,
     struct buffer_page *head,
     struct buffer_page *prev,
     int old_flag)
{
 return rb_head_page_set(cpu_buffer, head, prev,
    old_flag, RB_PAGE_HEAD);
}

static int rb_head_page_set_normal(struct ring_buffer_per_cpu *cpu_buffer,
       struct buffer_page *head,
       struct buffer_page *prev,
       int old_flag)
{
 return rb_head_page_set(cpu_buffer, head, prev,
    old_flag, RB_PAGE_NORMAL);
}

static inline void rb_inc_page(struct buffer_page **bpage)
{
 struct list_head *p = rb_list_head((*bpage)->list.next);

 *bpage = list_entry(p, struct buffer_page, list);
}

static inline void rb_dec_page(struct buffer_page **bpage)
{
 struct list_head *p = rb_list_head((*bpage)->list.prev);

 *bpage = list_entry(p, struct buffer_page, list);
}

static struct buffer_page *
rb_set_head_page(struct ring_buffer_per_cpu *cpu_buffer)
{
 struct buffer_page *head;
 struct buffer_page *page;
 struct list_head *list;
 int i;

 if (RB_WARN_ON(cpu_buffer, !cpu_buffer->head_page))
  return NULL;

 /* sanity check */
 list = cpu_buffer->pages;
 if (RB_WARN_ON(cpu_buffer, rb_list_head(list->prev->next) != list))
  return NULL;

 page = head = cpu_buffer->head_page;
 /*
 * It is possible that the writer moves the header behind
 * where we started, and we miss in one loop.
 * A second loop should grab the header, but we'll do
 * three loops just because I'm paranoid.
 */

 for (i = 0; i < 3; i++) {
  do {
   if (rb_is_head_page(page, page->list.prev)) {
    cpu_buffer->head_page = page;
    return page;
   }
   rb_inc_page(&page);
  } while (page != head);
 }

 RB_WARN_ON(cpu_buffer, 1);

 return NULL;
}

static bool rb_head_page_replace(struct buffer_page *old,
    struct buffer_page *new)
{
 unsigned long *ptr = (unsigned long *)&old->list.prev->next;
 unsigned long val;

 val = *ptr & ~RB_FLAG_MASK;
 val |= RB_PAGE_HEAD;

 return try_cmpxchg(ptr, &val, (unsigned long)&new->list);
}

/*
 * rb_tail_page_update - move the tail page forward
 */

static void rb_tail_page_update(struct ring_buffer_per_cpu *cpu_buffer,
          struct buffer_page *tail_page,
          struct buffer_page *next_page)
{
 unsigned long old_entries;
 unsigned long old_write;

 /*
 * The tail page now needs to be moved forward.
 *
 * We need to reset the tail page, but without messing
 * with possible erasing of data brought in by interrupts
 * that have moved the tail page and are currently on it.
 *
 * We add a counter to the write field to denote this.
 */

 old_write = local_add_return(RB_WRITE_INTCNT, &next_page->write);
 old_entries = local_add_return(RB_WRITE_INTCNT, &next_page->entries);

 /*
 * Just make sure we have seen our old_write and synchronize
 * with any interrupts that come in.
 */

 barrier();

 /*
 * If the tail page is still the same as what we think
 * it is, then it is up to us to update the tail
 * pointer.
 */

 if (tail_page == READ_ONCE(cpu_buffer->tail_page)) {
  /* Zero the write counter */
  unsigned long val = old_write & ~RB_WRITE_MASK;
  unsigned long eval = old_entries & ~RB_WRITE_MASK;

  /*
 * This will only succeed if an interrupt did
 * not come in and change it. In which case, we
 * do not want to modify it.
 *
 * We add (void) to let the compiler know that we do not care
 * about the return value of these functions. We use the
 * cmpxchg to only update if an interrupt did not already
 * do it for us. If the cmpxchg fails, we don't care.
 */

  (void)local_cmpxchg(&next_page->write, old_write, val);
  (void)local_cmpxchg(&next_page->entries, old_entries, eval);

  /*
 * No need to worry about races with clearing out the commit.
 * it only can increment when a commit takes place. But that
 * only happens in the outer most nested commit.
 */

  local_set(&next_page->page->commit, 0);

  /* Either we update tail_page or an interrupt does */
  if (try_cmpxchg(&cpu_buffer->tail_page, &tail_page, next_page))
   local_inc(&cpu_buffer->pages_touched);
 }
}

static void rb_check_bpage(struct ring_buffer_per_cpu *cpu_buffer,
     struct buffer_page *bpage)
{
 unsigned long val = (unsigned long)bpage;

 RB_WARN_ON(cpu_buffer, val & RB_FLAG_MASK);
}

static bool rb_check_links(struct ring_buffer_per_cpu *cpu_buffer,
      struct list_head *list)
{
 if (RB_WARN_ON(cpu_buffer,
         rb_list_head(rb_list_head(list->next)->prev) != list))
  return false;

 if (RB_WARN_ON(cpu_buffer,
         rb_list_head(rb_list_head(list->prev)->next) != list))
  return false;

 return true;
}

/**
 * rb_check_pages - integrity check of buffer pages
 * @cpu_buffer: CPU buffer with pages to test
 *
 * As a safety measure we check to make sure the data pages have not
 * been corrupted.
 */

static void rb_check_pages(struct ring_buffer_per_cpu *cpu_buffer)
{
 struct list_head *head, *tmp;
 unsigned long buffer_cnt;
 unsigned long flags;
 int nr_loops = 0;

 /*
 * Walk the linked list underpinning the ring buffer and validate all
 * its next and prev links.
 *
 * The check acquires the reader_lock to avoid concurrent processing
 * with code that could be modifying the list. However, the lock cannot
 * be held for the entire duration of the walk, as this would make the
 * time when interrupts are disabled non-deterministic, dependent on the
 * ring buffer size. Therefore, the code releases and re-acquires the
 * lock after checking each page. The ring_buffer_per_cpu.cnt variable
 * is then used to detect if the list was modified while the lock was
 * not held, in which case the check needs to be restarted.
 *
 * The code attempts to perform the check at most three times before
 * giving up. This is acceptable because this is only a self-validation
 * to detect problems early on. In practice, the list modification
 * operations are fairly spaced, and so this check typically succeeds at
 * most on the second try.
 */

again:
 if (++nr_loops > 3)
  return;

 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
 head = rb_list_head(cpu_buffer->pages);
 if (!rb_check_links(cpu_buffer, head))
  goto out_locked;
 buffer_cnt = cpu_buffer->cnt;
 tmp = head;
 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);

 while (true) {
  raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);

  if (buffer_cnt != cpu_buffer->cnt) {
   /* The list was updated, try again. */
   raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
   goto again;
  }

  tmp = rb_list_head(tmp->next);
  if (tmp == head)
   /* The iteration circled back, all is done. */
   goto out_locked;

  if (!rb_check_links(cpu_buffer, tmp))
   goto out_locked;

  raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
 }

out_locked:
 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
}

/*
 * Take an address, add the meta data size as well as the array of
 * array subbuffer indexes, then align it to a subbuffer size.
 *
 * This is used to help find the next per cpu subbuffer within a mapped range.
 */

static unsigned long
rb_range_align_subbuf(unsigned long addr, int subbuf_size, int nr_subbufs)
{
 addr += sizeof(struct ring_buffer_cpu_meta) +
  sizeof(int) * nr_subbufs;
 return ALIGN(addr, subbuf_size);
}

/*
 * Return the ring_buffer_meta for a given @cpu.
 */

static void *rb_range_meta(struct trace_buffer *buffer, int nr_pages, int cpu)
{
 int subbuf_size = buffer->subbuf_size + BUF_PAGE_HDR_SIZE;
 struct ring_buffer_cpu_meta *meta;
 struct ring_buffer_meta *bmeta;
 unsigned long ptr;
 int nr_subbufs;

 bmeta = buffer->meta;
 if (!bmeta)
  return NULL;

 ptr = (unsigned long)bmeta + bmeta->buffers_offset;
 meta = (struct ring_buffer_cpu_meta *)ptr;

 /* When nr_pages passed in is zero, the first meta has already been initialized */
 if (!nr_pages) {
  nr_subbufs = meta->nr_subbufs;
 } else {
  /* Include the reader page */
  nr_subbufs = nr_pages + 1;
 }

 /*
 * The first chunk may not be subbuffer aligned, where as
 * the rest of the chunks are.
 */

 if (cpu) {
  ptr = rb_range_align_subbuf(ptr, subbuf_size, nr_subbufs);
  ptr += subbuf_size * nr_subbufs;

  /* We can use multiplication to find chunks greater than 1 */
  if (cpu > 1) {
   unsigned long size;
   unsigned long p;

   /* Save the beginning of this CPU chunk */
   p = ptr;
   ptr = rb_range_align_subbuf(ptr, subbuf_size, nr_subbufs);
   ptr += subbuf_size * nr_subbufs;

   /* Now all chunks after this are the same size */
   size = ptr - p;
   ptr += size * (cpu - 2);
  }
 }
 return (void *)ptr;
}

/* Return the start of subbufs given the meta pointer */
static void *rb_subbufs_from_meta(struct ring_buffer_cpu_meta *meta)
{
 int subbuf_size = meta->subbuf_size;
 unsigned long ptr;

 ptr = (unsigned long)meta;
 ptr = rb_range_align_subbuf(ptr, subbuf_size, meta->nr_subbufs);

 return (void *)ptr;
}

/*
 * Return a specific sub-buffer for a given @cpu defined by @idx.
 */

static void *rb_range_buffer(struct ring_buffer_per_cpu *cpu_buffer, int idx)
{
 struct ring_buffer_cpu_meta *meta;
 unsigned long ptr;
 int subbuf_size;

 meta = rb_range_meta(cpu_buffer->buffer, 0, cpu_buffer->cpu);
 if (!meta)
  return NULL;

 if (WARN_ON_ONCE(idx >= meta->nr_subbufs))
  return NULL;

 subbuf_size = meta->subbuf_size;

 /* Map this buffer to the order that's in meta->buffers[] */
 idx = meta->buffers[idx];

 ptr = (unsigned long)rb_subbufs_from_meta(meta);

 ptr += subbuf_size * idx;
 if (ptr + subbuf_size > cpu_buffer->buffer->range_addr_end)
  return NULL;

 return (void *)ptr;
}

/*
 * See if the existing memory contains a valid meta section.
 * if so, use that, otherwise initialize it.
 */

static bool rb_meta_init(struct trace_buffer *buffer, int scratch_size)
{
 unsigned long ptr = buffer->range_addr_start;
 struct ring_buffer_meta *bmeta;
 unsigned long total_size;
 int struct_sizes;

 bmeta = (struct ring_buffer_meta *)ptr;
 buffer->meta = bmeta;

 total_size = buffer->range_addr_end - buffer->range_addr_start;

 struct_sizes = sizeof(struct ring_buffer_cpu_meta);
 struct_sizes |= sizeof(*bmeta) << 16;

 /* The first buffer will start word size after the meta page */
 ptr += sizeof(*bmeta);
 ptr = ALIGN(ptr, sizeof(long));
 ptr += scratch_size;

 if (bmeta->magic != RING_BUFFER_META_MAGIC) {
  pr_info("Ring buffer boot meta mismatch of magic\n");
  goto init;
 }

 if (bmeta->struct_sizes != struct_sizes) {
  pr_info("Ring buffer boot meta mismatch of struct size\n");
  goto init;
 }

 if (bmeta->total_size != total_size) {
  pr_info("Ring buffer boot meta mismatch of total size\n");
  goto init;
 }

 if (bmeta->buffers_offset > bmeta->total_size) {
  pr_info("Ring buffer boot meta mismatch of offset outside of total size\n");
  goto init;
 }

 if (bmeta->buffers_offset != (void *)ptr - (void *)bmeta) {
  pr_info("Ring buffer boot meta mismatch of first buffer offset\n");
  goto init;
 }

 return true;

 init:
 bmeta->magic = RING_BUFFER_META_MAGIC;
 bmeta->struct_sizes = struct_sizes;
 bmeta->total_size = total_size;
 bmeta->buffers_offset = (void *)ptr - (void *)bmeta;

 /* Zero out the scatch pad */
 memset((void *)bmeta + sizeof(*bmeta), 0, bmeta->buffers_offset - sizeof(*bmeta));

 return false;
}

/*
 * See if the existing memory contains valid ring buffer data.
 * As the previous kernel must be the same as this kernel, all
 * the calculations (size of buffers and number of buffers)
 * must be the same.
 */

static bool rb_cpu_meta_valid(struct ring_buffer_cpu_meta *meta, int cpu,
         struct trace_buffer *buffer, int nr_pages,
         unsigned long *subbuf_mask)
{
 int subbuf_size = PAGE_SIZE;
 struct buffer_data_page *subbuf;
 unsigned long buffers_start;
 unsigned long buffers_end;
 int i;

 if (!subbuf_mask)
  return false;

 buffers_start = meta->first_buffer;
 buffers_end = meta->first_buffer + (subbuf_size * meta->nr_subbufs);

 /* Is the head and commit buffers within the range of buffers? */
 if (meta->head_buffer < buffers_start ||
     meta->head_buffer >= buffers_end) {
  pr_info("Ring buffer boot meta [%d] head buffer out of range\n", cpu);
  return false;
 }

 if (meta->commit_buffer < buffers_start ||
     meta->commit_buffer >= buffers_end) {
  pr_info("Ring buffer boot meta [%d] commit buffer out of range\n", cpu);
  return false;
 }

 subbuf = rb_subbufs_from_meta(meta);

 bitmap_clear(subbuf_mask, 0, meta->nr_subbufs);

 /* Is the meta buffers and the subbufs themselves have correct data? */
 for (i = 0; i < meta->nr_subbufs; i++) {
  if (meta->buffers[i] < 0 ||
      meta->buffers[i] >= meta->nr_subbufs) {
   pr_info("Ring buffer boot meta [%d] array out of range\n", cpu);
   return false;
  }

  if ((unsigned)local_read(&subbuf->commit) > subbuf_size) {
   pr_info("Ring buffer boot meta [%d] buffer invalid commit\n", cpu);
   return false;
  }

  if (test_bit(meta->buffers[i], subbuf_mask)) {
   pr_info("Ring buffer boot meta [%d] array has duplicates\n", cpu);
   return false;
  }

  set_bit(meta->buffers[i], subbuf_mask);
  subbuf = (void *)subbuf + subbuf_size;
 }

 return true;
}

static int rb_meta_subbuf_idx(struct ring_buffer_cpu_meta *meta, void *subbuf);

static int rb_read_data_buffer(struct buffer_data_page *dpage, int tail, int cpu,
          unsigned long long *timestamp, u64 *delta_ptr)
{
 struct ring_buffer_event *event;
 u64 ts, delta;
 int events = 0;
 int e;

 *delta_ptr = 0;
 *timestamp = 0;

 ts = dpage->time_stamp;

 for (e = 0; e < tail; e += rb_event_length(event)) {

  event = (struct ring_buffer_event *)(dpage->data + e);

  switch (event->type_len) {

  case RINGBUF_TYPE_TIME_EXTEND:
   delta = rb_event_time_stamp(event);
   ts += delta;
   break;

  case RINGBUF_TYPE_TIME_STAMP:
   delta = rb_event_time_stamp(event);
   delta = rb_fix_abs_ts(delta, ts);
   if (delta < ts) {
    *delta_ptr = delta;
    *timestamp = ts;
    return -1;
   }
   ts = delta;
   break;

  case RINGBUF_TYPE_PADDING:
   if (event->time_delta == 1)
    break;
   fallthrough;
  case RINGBUF_TYPE_DATA:
   events++;
   ts += event->time_delta;
   break;

  default:
   return -1;
  }
 }
 *timestamp = ts;
 return events;
}

static int rb_validate_buffer(struct buffer_data_page *dpage, int cpu)
{
 unsigned long long ts;
 u64 delta;
 int tail;

 tail = local_read(&dpage->commit);
 return rb_read_data_buffer(dpage, tail, cpu, &ts, &delta);
}

/* If the meta data has been validated, now validate the events */
static void rb_meta_validate_events(struct ring_buffer_per_cpu *cpu_buffer)
{
 struct ring_buffer_cpu_meta *meta = cpu_buffer->ring_meta;
 struct buffer_page *head_page, *orig_head;
 unsigned long entry_bytes = 0;
 unsigned long entries = 0;
 int ret;
 u64 ts;
 int i;

 if (!meta || !meta->head_buffer)
  return;

 /* Do the reader page first */
 ret = rb_validate_buffer(cpu_buffer->reader_page->page, cpu_buffer->cpu);
 if (ret < 0) {
  pr_info("Ring buffer reader page is invalid\n");
  goto invalid;
 }
 entries += ret;
 entry_bytes += local_read(&cpu_buffer->reader_page->page->commit);
 local_set(&cpu_buffer->reader_page->entries, ret);

 orig_head = head_page = cpu_buffer->head_page;
 ts = head_page->page->time_stamp;

 /*
 * Try to rewind the head so that we can read the pages which already
 * read in the previous boot.
 */

 if (head_page == cpu_buffer->tail_page)
  goto skip_rewind;

 rb_dec_page(&head_page);
 for (i = 0; i < meta->nr_subbufs + 1; i++, rb_dec_page(&head_page)) {

  /* Rewind until tail (writer) page. */
  if (head_page == cpu_buffer->tail_page)
   break;

  /* Ensure the page has older data than head. */
  if (ts < head_page->page->time_stamp)
   break;

  ts = head_page->page->time_stamp;
  /* Ensure the page has correct timestamp and some data. */
  if (!ts || rb_page_commit(head_page) == 0)
   break;

  /* Stop rewind if the page is invalid. */
  ret = rb_validate_buffer(head_page->page, cpu_buffer->cpu);
  if (ret < 0)
   break;

  /* Recover the number of entries and update stats. */
  local_set(&head_page->entries, ret);
  if (ret)
   local_inc(&cpu_buffer->pages_touched);
  entries += ret;
  entry_bytes += rb_page_commit(head_page);
 }
 if (i)
  pr_info("Ring buffer [%d] rewound %d pages\n", cpu_buffer->cpu, i);

 /* The last rewound page must be skipped. */
 if (head_page != orig_head)
  rb_inc_page(&head_page);

 /*
 * If the ring buffer was rewound, then inject the reader page
 * into the location just before the original head page.
 */

 if (head_page != orig_head) {
  struct buffer_page *bpage = orig_head;

  rb_dec_page(&bpage);
  /*
 * Insert the reader_page before the original head page.
 * Since the list encode RB_PAGE flags, general list
 * operations should be avoided.
 */

  cpu_buffer->reader_page->list.next = &orig_head->list;
  cpu_buffer->reader_page->list.prev = orig_head->list.prev;
  orig_head->list.prev = &cpu_buffer->reader_page->list;
  bpage->list.next = &cpu_buffer->reader_page->list;

  /* Make the head_page the reader page */
  cpu_buffer->reader_page = head_page;
  bpage = head_page;
  rb_inc_page(&head_page);
  head_page->list.prev = bpage->list.prev;
  rb_dec_page(&bpage);
  bpage->list.next = &head_page->list;
  rb_set_list_to_head(&bpage->list);
  cpu_buffer->pages = &head_page->list;

  cpu_buffer->head_page = head_page;
  meta->head_buffer = (unsigned long)head_page->page;

  /* Reset all the indexes */
  bpage = cpu_buffer->reader_page;
  meta->buffers[0] = rb_meta_subbuf_idx(meta, bpage->page);
  bpage->id = 0;

  for (i = 1, bpage = head_page; i < meta->nr_subbufs;
       i++, rb_inc_page(&bpage)) {
   meta->buffers[i] = rb_meta_subbuf_idx(meta, bpage->page);
   bpage->id = i;
  }

  /* We'll restart verifying from orig_head */
  head_page = orig_head;
 }

 skip_rewind:
 /* If the commit_buffer is the reader page, update the commit page */
 if (meta->commit_buffer == (unsigned long)cpu_buffer->reader_page->page) {
  cpu_buffer->commit_page = cpu_buffer->reader_page;
  /* Nothing more to do, the only page is the reader page */
  goto done;
 }

 /* Iterate until finding the commit page */
 for (i = 0; i < meta->nr_subbufs + 1; i++, rb_inc_page(&head_page)) {

  /* Reader page has already been done */
  if (head_page == cpu_buffer->reader_page)
   continue;

  ret = rb_validate_buffer(head_page->page, cpu_buffer->cpu);
  if (ret < 0) {
   pr_info("Ring buffer meta [%d] invalid buffer page\n",
    cpu_buffer->cpu);
   goto invalid;
  }

  /* If the buffer has content, update pages_touched */
  if (ret)
   local_inc(&cpu_buffer->pages_touched);

  entries += ret;
  entry_bytes += local_read(&head_page->page->commit);
  local_set(&cpu_buffer->head_page->entries, ret);

  if (head_page == cpu_buffer->commit_page)
   break;
 }

 if (head_page != cpu_buffer->commit_page) {
  pr_info("Ring buffer meta [%d] commit page not found\n",
   cpu_buffer->cpu);
  goto invalid;
 }
 done:
 local_set(&cpu_buffer->entries, entries);
 local_set(&cpu_buffer->entries_bytes, entry_bytes);

 pr_info("Ring buffer meta [%d] is from previous boot!\n", cpu_buffer->cpu);
 return;

 invalid:
 /* The content of the buffers are invalid, reset the meta data */
 meta->head_buffer = 0;
 meta->commit_buffer = 0;

 /* Reset the reader page */
 local_set(&cpu_buffer->reader_page->entries, 0);
 local_set(&cpu_buffer->reader_page->page->commit, 0);

 /* Reset all the subbuffers */
 for (i = 0; i < meta->nr_subbufs - 1; i++, rb_inc_page(&head_page)) {
  local_set(&head_page->entries, 0);
  local_set(&head_page->page->commit, 0);
 }
}

static void rb_range_meta_init(struct trace_buffer *buffer, int nr_pages, int scratch_size)
{
 struct ring_buffer_cpu_meta *meta;
 unsigned long *subbuf_mask;
 unsigned long delta;
 void *subbuf;
 bool valid = false;
 int cpu;
 int i;

 /* Create a mask to test the subbuf array */
 subbuf_mask = bitmap_alloc(nr_pages + 1, GFP_KERNEL);
 /* If subbuf_mask fails to allocate, then rb_meta_valid() will return false */

 if (rb_meta_init(buffer, scratch_size))
  valid = true;

 for (cpu = 0; cpu < nr_cpu_ids; cpu++) {
  void *next_meta;

  meta = rb_range_meta(buffer, nr_pages, cpu);

  if (valid && rb_cpu_meta_valid(meta, cpu, buffer, nr_pages, subbuf_mask)) {
   /* Make the mappings match the current address */
   subbuf = rb_subbufs_from_meta(meta);
   delta = (unsigned long)subbuf - meta->first_buffer;
   meta->first_buffer += delta;
   meta->head_buffer += delta;
   meta->commit_buffer += delta;
   continue;
  }

  if (cpu < nr_cpu_ids - 1)
   next_meta = rb_range_meta(buffer, nr_pages, cpu + 1);
  else
   next_meta = (void *)buffer->range_addr_end;

  memset(meta, 0, next_meta - (void *)meta);

  meta->nr_subbufs = nr_pages + 1;
  meta->subbuf_size = PAGE_SIZE;

  subbuf = rb_subbufs_from_meta(meta);

  meta->first_buffer = (unsigned long)subbuf;

  /*
 * The buffers[] array holds the order of the sub-buffers
 * that are after the meta data. The sub-buffers may
 * be swapped out when read and inserted into a different
 * location of the ring buffer. Although their addresses
 * remain the same, the buffers[] array contains the
 * index into the sub-buffers holding their actual order.
 */

  for (i = 0; i < meta->nr_subbufs; i++) {
   meta->buffers[i] = i;
   rb_init_page(subbuf);
   subbuf += meta->subbuf_size;
  }
 }
 bitmap_free(subbuf_mask);
}

static void *rbm_start(struct seq_file *m, loff_t *pos)
{
 struct ring_buffer_per_cpu *cpu_buffer = m->private;
 struct ring_buffer_cpu_meta *meta = cpu_buffer->ring_meta;
 unsigned long val;

 if (!meta)
  return NULL;

 if (*pos > meta->nr_subbufs)
  return NULL;

 val = *pos;
 val++;

 return (void *)val;
}

static void *rbm_next(struct seq_file *m, void *v, loff_t *pos)
{
 (*pos)++;

 return rbm_start(m, pos);
}

static int rbm_show(struct seq_file *m, void *v)
{
 struct ring_buffer_per_cpu *cpu_buffer = m->private;
 struct ring_buffer_cpu_meta *meta = cpu_buffer->ring_meta;
 unsigned long val = (unsigned long)v;

 if (val == 1) {
  seq_printf(m, "head_buffer: %d\n",
      rb_meta_subbuf_idx(meta, (void *)meta->head_buffer));
  seq_printf(m, "commit_buffer: %d\n",
      rb_meta_subbuf_idx(meta, (void *)meta->commit_buffer));
  seq_printf(m, "subbuf_size: %d\n", meta->subbuf_size);
  seq_printf(m, "nr_subbufs: %d\n", meta->nr_subbufs);
  return 0;
 }

 val -= 2;
 seq_printf(m, "buffer[%ld]: %d\n", val, meta->buffers[val]);

 return 0;
}

static void rbm_stop(struct seq_file *m, void *p)
{
}

static const struct seq_operations rb_meta_seq_ops = {
 .start  = rbm_start,
 .next  = rbm_next,
 .show  = rbm_show,
 .stop  = rbm_stop,
};

int ring_buffer_meta_seq_init(struct file *file, struct trace_buffer *buffer, int cpu)
{
 struct seq_file *m;
 int ret;

 ret = seq_open(file, &rb_meta_seq_ops);
 if (ret)
  return ret;

 m = file->private_data;
 m->private = buffer->buffers[cpu];

 return 0;
}

/* Map the buffer_pages to the previous head and commit pages */
static void rb_meta_buffer_update(struct ring_buffer_per_cpu *cpu_buffer,
      struct buffer_page *bpage)
{
 struct ring_buffer_cpu_meta *meta = cpu_buffer->ring_meta;

 if (meta->head_buffer == (unsigned long)bpage->page)
  cpu_buffer->head_page = bpage;

 if (meta->commit_buffer == (unsigned long)bpage->page) {
  cpu_buffer->commit_page = bpage;
  cpu_buffer->tail_page = bpage;
 }
}

static int __rb_allocate_pages(struct ring_buffer_per_cpu *cpu_buffer,
  long nr_pages, struct list_head *pages)
{
 struct trace_buffer *buffer = cpu_buffer->buffer;
 struct ring_buffer_cpu_meta *meta = NULL;
 struct buffer_page *bpage, *tmp;
 bool user_thread = current->mm != NULL;
 gfp_t mflags;
 long i;

 /*
 * Check if the available memory is there first.
 * Note, si_mem_available() only gives us a rough estimate of available
 * memory. It may not be accurate. But we don't care, we just want
 * to prevent doing any allocation when it is obvious that it is
 * not going to succeed.
 */

 i = si_mem_available();
 if (i < nr_pages)
  return -ENOMEM;

 /*
 * __GFP_RETRY_MAYFAIL flag makes sure that the allocation fails
 * gracefully without invoking oom-killer and the system is not
 * destabilized.
 */

 mflags = GFP_KERNEL | __GFP_RETRY_MAYFAIL;

 /*
 * If a user thread allocates too much, and si_mem_available()
 * reports there's enough memory, even though there is not.
 * Make sure the OOM killer kills this thread. This can happen
 * even with RETRY_MAYFAIL because another task may be doing
 * an allocation after this task has taken all memory.
 * This is the task the OOM killer needs to take out during this
 * loop, even if it was triggered by an allocation somewhere else.
 */

 if (user_thread)
  set_current_oom_origin();

 if (buffer->range_addr_start)
  meta = rb_range_meta(buffer, nr_pages, cpu_buffer->cpu);

 for (i = 0; i < nr_pages; i++) {
  struct page *page;

  bpage = kzalloc_node(ALIGN(sizeof(*bpage), cache_line_size()),
        mflags, cpu_to_node(cpu_buffer->cpu));
  if (!bpage)
   goto free_pages;

  rb_check_bpage(cpu_buffer, bpage);

  /*
 * Append the pages as for mapped buffers we want to keep
 * the order
 */

  list_add_tail(&bpage->list, pages);

  if (meta) {
   /* A range was given. Use that for the buffer page */
   bpage->page = rb_range_buffer(cpu_buffer, i + 1);
   if (!bpage->page)
    goto free_pages;
   /* If this is valid from a previous boot */
   if (meta->head_buffer)
    rb_meta_buffer_update(cpu_buffer, bpage);
   bpage->range = 1;
   bpage->id = i + 1;
  } else {
   page = alloc_pages_node(cpu_to_node(cpu_buffer->cpu),
      mflags | __GFP_COMP | __GFP_ZERO,
      cpu_buffer->buffer->subbuf_order);
   if (!page)
    goto free_pages;
   bpage->page = page_address(page);
   rb_init_page(bpage->page);
  }
  bpage->order = cpu_buffer->buffer->subbuf_order;

  if (user_thread && fatal_signal_pending(current))
   goto free_pages;
 }
 if (user_thread)
  clear_current_oom_origin();

 return 0;

free_pages:
 list_for_each_entry_safe(bpage, tmp, pages, list) {
  list_del_init(&bpage->list);
  free_buffer_page(bpage);
 }
 if (user_thread)
  clear_current_oom_origin();

 return -ENOMEM;
}

static int rb_allocate_pages(struct ring_buffer_per_cpu *cpu_buffer,
        unsigned long nr_pages)
{
 LIST_HEAD(pages);

 WARN_ON(!nr_pages);

 if (__rb_allocate_pages(cpu_buffer, nr_pages, &pages))
  return -ENOMEM;

 /*
 * The ring buffer page list is a circular list that does not
 * start and end with a list head. All page list items point to
 * other pages.
 */

 cpu_buffer->pages = pages.next;
 list_del(&pages);

 cpu_buffer->nr_pages = nr_pages;

 rb_check_pages(cpu_buffer);

 return 0;
}

static struct ring_buffer_per_cpu *
rb_allocate_cpu_buffer(struct trace_buffer *buffer, long nr_pages, int cpu)
{
 struct ring_buffer_per_cpu *cpu_buffer __free(kfree) = NULL;
 struct ring_buffer_cpu_meta *meta;
 struct buffer_page *bpage;
 struct page *page;
 int ret;

 cpu_buffer = kzalloc_node(ALIGN(sizeof(*cpu_buffer), cache_line_size()),
      GFP_KERNEL, cpu_to_node(cpu));
 if (!cpu_buffer)
  return NULL;

 cpu_buffer->cpu = cpu;
 cpu_buffer->buffer = buffer;
 raw_spin_lock_init(&cpu_buffer->reader_lock);
 lockdep_set_class(&cpu_buffer->reader_lock, buffer->reader_lock_key);
 cpu_buffer->lock = (arch_spinlock_t)__ARCH_SPIN_LOCK_UNLOCKED;
 INIT_WORK(&cpu_buffer->update_pages_work, update_pages_handler);
 init_completion(&cpu_buffer->update_done);
 init_irq_work(&cpu_buffer->irq_work.work, rb_wake_up_waiters);
 init_waitqueue_head(&cpu_buffer->irq_work.waiters);
 init_waitqueue_head(&cpu_buffer->irq_work.full_waiters);
 mutex_init(&cpu_buffer->mapping_lock);

 bpage = kzalloc_node(ALIGN(sizeof(*bpage), cache_line_size()),
       GFP_KERNEL, cpu_to_node(cpu));
 if (!bpage)
  return NULL;

 rb_check_bpage(cpu_buffer, bpage);

 cpu_buffer->reader_page = bpage;

 if (buffer->range_addr_start) {
  /*
 * Range mapped buffers have the same restrictions as memory
 * mapped ones do.
 */

  cpu_buffer->mapped = 1;
  cpu_buffer->ring_meta = rb_range_meta(buffer, nr_pages, cpu);
  bpage->page = rb_range_buffer(cpu_buffer, 0);
  if (!bpage->page)
   goto fail_free_reader;
  if (cpu_buffer->ring_meta->head_buffer)
   rb_meta_buffer_update(cpu_buffer, bpage);
  bpage->range = 1;
 } else {
  page = alloc_pages_node(cpu_to_node(cpu),
     GFP_KERNEL | __GFP_COMP | __GFP_ZERO,
     cpu_buffer->buffer->subbuf_order);
--> --------------------

--> maximum size reached

--> --------------------

Messung V0.5
C=89 H=95 G=91

¤ Dauer der Verarbeitung: 0.42 Sekunden  (vorverarbeitet)  ¤

*© Formatika GbR, Deutschland






Wurzel

Suchen

Beweissystem der NASA

Beweissystem Isabelle

NIST Cobol Testsuite

Cephes Mathematical Library

Wiener Entwicklungsmethode

Haftungshinweis

Die Informationen auf dieser Webseite wurden nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit, noch Qualität der bereit gestellten Informationen zugesichert.

Bemerkung:

Die farbliche Syntaxdarstellung und die Messung sind noch experimentell.