/* Add extra RINGBUF maps to this ring buffer manager */ int ring_buffer__add(struct ring_buffer *rb, int map_fd,
ring_buffer_sample_fn sample_cb, void *ctx)
{ struct bpf_map_info info;
__u32 len = sizeof(info); struct epoll_event *e; struct ring *r;
__u64 mmap_sz; void *tmp; int err;
memset(&info, 0, sizeof(info));
err = bpf_map_get_info_by_fd(map_fd, &info, &len); if (err) {
err = -errno;
pr_warn("ringbuf: failed to get map info for fd=%d: %s\n",
map_fd, errstr(err)); return libbpf_err(err);
}
if (info.type != BPF_MAP_TYPE_RINGBUF) {
pr_warn("ringbuf: map fd=%d is not BPF_MAP_TYPE_RINGBUF\n",
map_fd); return libbpf_err(-EINVAL);
}
/* Map read-only producer page and data pages. We map twice as big * data size to allow simple reading of samples that wrap around the * end of a ring buffer. See kernel implementation for details.
*/
mmap_sz = rb->page_size + 2 * (__u64)info.max_entries; if (mmap_sz != (__u64)(size_t)mmap_sz) {
err = -E2BIG;
pr_warn("ringbuf: ring buffer size (%u) is too big\n", info.max_entries); goto err_out;
}
tmp = mmap(NULL, (size_t)mmap_sz, PROT_READ, MAP_SHARED, map_fd, rb->page_size); if (tmp == MAP_FAILED) {
err = -errno;
pr_warn("ringbuf: failed to mmap data pages for map fd=%d: %s\n",
map_fd, errstr(err)); goto err_out;
}
r->producer_pos = tmp;
r->data = tmp + rb->page_size;
e = &rb->events[rb->ring_cnt];
memset(e, 0, sizeof(*e));
if ((len & BPF_RINGBUF_DISCARD_BIT) == 0) {
sample = (void *)len_ptr + BPF_RINGBUF_HDR_SZ;
err = r->sample_cb(r->ctx, sample, len); if (err < 0) { /* update consumer pos and bail out */
smp_store_release(r->consumer_pos,
cons_pos); return err;
}
cnt++;
}
smp_store_release(r->consumer_pos, cons_pos);
if (cnt >= n) goto done;
}
} while (got_new_data);
done: return cnt;
}
/* Consume available ring buffer(s) data without event polling, up to n * records. * * Returns number of records consumed across all registered ring buffers (or * n, whichever is less), or negative number if any of the callbacks return * error.
*/ int ring_buffer__consume_n(struct ring_buffer *rb, size_t n)
{
int64_t err, res = 0; int i;
for (i = 0; i < rb->ring_cnt; i++) { struct ring *ring = rb->rings[i];
err = ringbuf_process_ring(ring, n); if (err < 0) return libbpf_err(err);
res += err;
n -= err;
if (n == 0) break;
} return res > INT_MAX ? INT_MAX : res;
}
/* Consume available ring buffer(s) data without event polling. * Returns number of records consumed across all registered ring buffers (or * INT_MAX, whichever is less), or negative number if any of the callbacks * return error.
*/ int ring_buffer__consume(struct ring_buffer *rb)
{
int64_t err, res = 0; int i;
for (i = 0; i < rb->ring_cnt; i++) { struct ring *ring = rb->rings[i];
err = ringbuf_process_ring(ring, INT_MAX); if (err < 0) return libbpf_err(err);
res += err; if (res > INT_MAX) {
res = INT_MAX; break;
}
} return res;
}
/* Poll for available data and consume records, if any are available. * Returns number of records consumed (or INT_MAX, whichever is less), or * negative number, if any of the registered callbacks returned error.
*/ int ring_buffer__poll(struct ring_buffer *rb, int timeout_ms)
{ int i, cnt;
int64_t err, res = 0;
for (i = 0; i < cnt; i++) {
__u32 ring_id = rb->events[i].data.fd; struct ring *ring = rb->rings[ring_id];
err = ringbuf_process_ring(ring, INT_MAX); if (err < 0) return libbpf_err(err);
res += err;
} if (res > INT_MAX)
res = INT_MAX; return res;
}
/* Get an fd that can be used to sleep until data is available in the ring(s) */ int ring_buffer__epoll_fd(conststruct ring_buffer *rb)
{ return rb->epoll_fd;
}
struct ring *ring_buffer__ring(struct ring_buffer *rb, unsignedint idx)
{ if (idx >= rb->ring_cnt) return errno = ERANGE, NULL;
return rb->rings[idx];
}
unsignedlong ring__consumer_pos(conststruct ring *r)
{ /* Synchronizes with smp_store_release() in ringbuf_process_ring(). */ return smp_load_acquire(r->consumer_pos);
}
unsignedlong ring__producer_pos(conststruct ring *r)
{ /* Synchronizes with smp_store_release() in __bpf_ringbuf_reserve() in * the kernel.
*/ return smp_load_acquire(r->producer_pos);
}
size_t ring__avail_data_size(conststruct ring *r)
{ unsignedlong cons_pos, prod_pos;
/* Map read-write the producer page and data pages. We map the data * region as twice the total size of the ring buffer to allow the * simple reading and writing of samples that wrap around the end of * the buffer. See the kernel implementation for details.
*/
mmap_sz = rb->page_size + 2 * (__u64)info.max_entries; if (mmap_sz != (__u64)(size_t)mmap_sz) {
pr_warn("user ringbuf: ring buf size (%u) is too big\n", info.max_entries); return -E2BIG;
}
tmp = mmap(NULL, (size_t)mmap_sz, PROT_READ | PROT_WRITE, MAP_SHARED,
map_fd, rb->page_size); if (tmp == MAP_FAILED) {
err = -errno;
pr_warn("user ringbuf: failed to mmap data pages for map fd=%d: %s\n",
map_fd, errstr(err)); return err;
}
void *user_ring_buffer__reserve(struct user_ring_buffer *rb, __u32 size)
{
__u32 avail_size, total_size, max_size; /* 64-bit to avoid overflow in case of extreme application behavior */
__u64 cons_pos, prod_pos; struct ringbuf_hdr *hdr;
/* The top two bits are used as special flags */ if (size & (BPF_RINGBUF_BUSY_BIT | BPF_RINGBUF_DISCARD_BIT)) return errno = E2BIG, NULL;
/* Synchronizes with smp_store_release() in __bpf_user_ringbuf_peek() in * the kernel.
*/
cons_pos = smp_load_acquire(rb->consumer_pos); /* Synchronizes with smp_store_release() in user_ringbuf_commit() */
prod_pos = smp_load_acquire(rb->producer_pos);
max_size = rb->mask + 1;
avail_size = max_size - (prod_pos - cons_pos); /* Round up total size to a multiple of 8. */
total_size = (size + BPF_RINGBUF_HDR_SZ + 7) / 8 * 8;
if (total_size > max_size) return errno = E2BIG, NULL;
if (avail_size < total_size) return errno = ENOSPC, NULL;
/* The kernel guarantees at least one event notification * delivery whenever at least one sample is drained from the * ring buffer in an invocation to bpf_ringbuf_drain(). Other * additional events may be delivered at any time, but only one * event is guaranteed per bpf_ringbuf_drain() invocation, * provided that a sample is drained, and the BPF program did * not pass BPF_RB_NO_WAKEUP to bpf_ringbuf_drain(). If * BPF_RB_FORCE_WAKEUP is passed to bpf_ringbuf_drain(), a * wakeup event will be delivered even if no samples are * drained.
*/
cnt = epoll_wait(rb->epoll_fd, &rb->event, 1, ms_remaining); if (cnt < 0) return NULL;
if (timeout_ms == -1) continue;
err = clock_gettime(CLOCK_MONOTONIC, &curr); if (err) return NULL;
Die Informationen auf dieser Webseite wurden
nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit,
noch Qualität der bereit gestellten Informationen zugesichert.
Bemerkung:
Die farbliche Syntaxdarstellung und die Messung sind noch experimentell.