/* non-mmap()'able part of bpf_ringbuf (everything up to consumer page) */ #define RINGBUF_PGOFF \
(offsetof(struct bpf_ringbuf, consumer_pos) >> PAGE_SHIFT) /* consumer page and producer page */ #define RINGBUF_POS_PAGES 2 #define RINGBUF_NR_META_PAGES (RINGBUF_PGOFF + RINGBUF_POS_PAGES)
#define RINGBUF_MAX_RECORD_SZ (UINT_MAX/4)
struct bpf_ringbuf {
wait_queue_head_t waitq; struct irq_work work;
u64 mask; struct page **pages; int nr_pages;
rqspinlock_t spinlock ____cacheline_aligned_in_smp; /* For user-space producer ring buffers, an atomic_t busy bit is used * to synchronize access to the ring buffers in the kernel, rather than * the spinlock that is used for kernel-producer ring buffers. This is * done because the ring buffer must hold a lock across a BPF program's * callback: * * __bpf_user_ringbuf_peek() // lock acquired * -> program callback_fn() * -> __bpf_user_ringbuf_sample_release() // lock released * * It is unsafe and incorrect to hold an IRQ spinlock across what could * be a long execution window, so we instead simply disallow concurrent * access to the ring buffer by kernel consumers, and return -EBUSY from * __bpf_user_ringbuf_peek() if the busy bit is held by another task.
*/
atomic_t busy ____cacheline_aligned_in_smp; /* Consumer and producer counters are put into separate pages to * allow each position to be mapped with different permissions. * This prevents a user-space application from modifying the * position and ruining in-kernel tracking. The permissions of the * pages depend on who is producing samples: user-space or the * kernel. Note that the pending counter is placed in the same * page as the producer, so that it shares the same cache line. * * Kernel-producer * --------------- * The producer position and data pages are mapped as r/o in * userspace. For this approach, bits in the header of samples are * used to signal to user-space, and to other producers, whether a * sample is currently being written. * * User-space producer * ------------------- * Only the page containing the consumer position is mapped r/o in * user-space. User-space producers also use bits of the header to * communicate to the kernel, but the kernel must carefully check and * validate each sample to ensure that they're correctly formatted, and * fully contained within the ring buffer.
*/ unsignedlong consumer_pos __aligned(PAGE_SIZE); unsignedlong producer_pos __aligned(PAGE_SIZE); unsignedlong pending_pos; char data[] __aligned(PAGE_SIZE);
};
/* 8-byte ring buffer record header structure */ struct bpf_ringbuf_hdr {
u32 len;
u32 pg_off;
};
staticstruct bpf_ringbuf *bpf_ringbuf_area_alloc(size_t data_sz, int numa_node)
{ const gfp_t flags = GFP_KERNEL_ACCOUNT | __GFP_RETRY_MAYFAIL |
__GFP_NOWARN | __GFP_ZERO; int nr_meta_pages = RINGBUF_NR_META_PAGES; int nr_data_pages = data_sz >> PAGE_SHIFT; int nr_pages = nr_meta_pages + nr_data_pages; struct page **pages, *page; struct bpf_ringbuf *rb;
size_t array_size; int i;
/* Each data page is mapped twice to allow "virtual" * continuous read of samples wrapping around the end of ring * buffer area: * ------------------------------------------------------ * | meta pages | real data pages | same data pages | * ------------------------------------------------------ * | | 1 2 3 4 5 6 7 8 9 | 1 2 3 4 5 6 7 8 9 | * ------------------------------------------------------ * | | TA DA | TA DA | * ------------------------------------------------------ * ^^^^^^^ * | * Here, no need to worry about special handling of wrapped-around * data due to double-mapped data pages. This works both in kernel and * when mmap()'ed in user-space, simplifying both kernel and * user-space implementations significantly.
*/
array_size = (nr_meta_pages + 2 * nr_data_pages) * sizeof(*pages);
pages = bpf_map_area_alloc(array_size, numa_node); if (!pages) return NULL;
for (i = 0; i < nr_pages; i++) {
page = alloc_pages_node(numa_node, flags, 0); if (!page) {
nr_pages = i; goto err_free_pages;
}
pages[i] = page; if (i >= nr_meta_pages)
pages[nr_data_pages + i] = page;
}
/* Maximum size of ring buffer area is limited by 32-bit page offset within * record header, counted in pages. Reserve 8 bits for extensibility, and * take into account few extra pages for consumer/producer pages and * non-mmap()'able parts, the current maximum size would be: * * (((1ULL << 24) - RINGBUF_POS_PAGES - RINGBUF_PGOFF) * PAGE_SIZE) * * This gives 64GB limit, which seems plenty for single ring buffer. Now * considering that the maximum value of data_sz is (4GB - 1), there * will be no overflow, so just note the size limit in the comments.
*/ staticstruct bpf_ringbuf *bpf_ringbuf_alloc(size_t data_sz, int numa_node)
{ struct bpf_ringbuf *rb;
rb = bpf_ringbuf_area_alloc(data_sz, numa_node); if (!rb) return NULL;
/* copy pages pointer and nr_pages to local variable, as we are going * to unmap rb itself with vunmap() below
*/ struct page **pages = rb->pages; int i, nr_pages = rb->nr_pages;
vunmap(rb); for (i = 0; i < nr_pages; i++)
__free_page(pages[i]);
bpf_map_area_free(pages);
}
if (vma->vm_flags & VM_WRITE) { if (vma->vm_pgoff == 0) /* Disallow writable mappings to the consumer pointer, * and allow writable mappings to both the producer * position, and the ring buffer data itself.
*/ return -EPERM;
} /* remap_vmalloc_range() checks size and offset constraints */ return remap_vmalloc_range(vma, rb_map->rb, vma->vm_pgoff + RINGBUF_PGOFF);
}
/* Given pointer to ring buffer record metadata and struct bpf_ringbuf itself, * calculate offset from record metadata to ring buffer in pages, rounded * down. This page offset is stored as part of record metadata and allows to * restore struct bpf_ringbuf * from record pointer. This page offset is * stored at offset 4 of record metadata header.
*/ static size_t bpf_ringbuf_rec_pg_off(struct bpf_ringbuf *rb, struct bpf_ringbuf_hdr *hdr)
{ return ((void *)hdr - (void *)rb) >> PAGE_SHIFT;
}
/* Given pointer to ring buffer record header, restore pointer to struct * bpf_ringbuf itself by using page offset stored at offset 4
*/ staticstruct bpf_ringbuf *
bpf_ringbuf_restore_from_rec(struct bpf_ringbuf_hdr *hdr)
{ unsignedlong addr = (unsignedlong)(void *)hdr; unsignedlong off = (unsignedlong)hdr->pg_off << PAGE_SHIFT;
/* check for out of ringbuf space: * - by ensuring producer position doesn't advance more than * (ringbuf_size - 1) ahead * - by ensuring oldest not yet committed record until newest * record does not span more than (ringbuf_size - 1)
*/ if (new_prod_pos - cons_pos > rb->mask ||
new_prod_pos - pend_pos > rb->mask) {
raw_res_spin_unlock_irqrestore(&rb->spinlock, flags); return NULL;
}
/* update record header with correct final size prefix */
xchg(&hdr->len, new_len);
/* if consumer caught up and is waiting for our record, notify about * new data availability
*/
rec_pos = (void *)hdr - (void *)rb->data;
cons_pos = smp_load_acquire(&rb->consumer_pos) & rb->mask;
/* The sample must fit within the region advertised by the producer position. */ if (total_len > prod_pos - cons_pos) return -EINVAL;
/* The sample must fit within the data region of the ring buffer. */ if (total_len > ringbuf_total_data_sz(rb)) return -E2BIG;
/* The sample must fit into a struct bpf_dynptr. */
err = bpf_dynptr_check_size(sample_len); if (err) return -E2BIG;
if (flags & BPF_RINGBUF_DISCARD_BIT) { /* If the discard bit is set, the sample should be skipped. * * Update the consumer pos, and return -EAGAIN so the caller * knows to skip this sample and try to read the next one.
*/
smp_store_release(&rb->consumer_pos, cons_pos + total_len); return -EAGAIN;
}
if (flags & BPF_RINGBUF_BUSY_BIT) return -ENODATA;
/* Using smp_load_acquire() is unnecessary here, as the busy-bit * prevents another task from writing to consumer_pos after it was read * by this task with smp_load_acquire() in __bpf_user_ringbuf_peek().
*/
consumer_pos = rb->consumer_pos; /* Synchronizes with smp_load_acquire() in user-space producer. */
smp_store_release(&rb->consumer_pos, consumer_pos + rounded_size);
}
schedule_work_return: /* Prevent the clearing of the busy-bit from being reordered before the * storing of any rb consumer or producer positions.
*/
atomic_set_release(&rb->busy, 0);
Die Informationen auf dieser Webseite wurden
nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit,
noch Qualität der bereit gestellten Informationen zugesichert.
Bemerkung:
Die farbliche Syntaxdarstellung und die Messung sind noch experimentell.