// SPDX-License-Identifier: GPL-2.0-only /* bpf/cpumap.c * * Copyright (c) 2017 Jesper Dangaard Brouer, Red Hat Inc.
*/
/** * DOC: cpu map * The 'cpumap' is primarily used as a backend map for XDP BPF helper * call bpf_redirect_map() and XDP_REDIRECT action, like 'devmap'. * * Unlike devmap which redirects XDP frames out to another NIC device, * this map type redirects raw XDP frames to another CPU. The remote * CPU will do SKB-allocation and call the normal network stack.
*/ /* * This is a scalability and isolation mechanism, that allow * separating the early driver network XDP layer, from the rest of the * netstack, and assigning dedicated CPUs for this stage. This * basically allows for 10G wirespeed pre-filtering via bpf.
*/ #include <linux/bitops.h> #include <linux/bpf.h> #include <linux/filter.h> #include <linux/ptr_ring.h> #include <net/xdp.h> #include <net/hotdata.h>
/* General idea: XDP packets getting XDP redirected to another CPU, * will maximum be stored/queued for one driver ->poll() call. It is * guaranteed that queueing the frame and the flush operation happen on * same CPU. Thus, cpu_map_flush operation can deduct via this_cpu_ptr() * which queue in bpf_cpu_map_entry contains packets.
*/
#define CPU_MAP_BULK_SIZE 8 /* 8 == one cacheline on 64-bit archs */ struct bpf_cpu_map_entry; struct bpf_cpu_map;
/* Struct for every remote "destination" CPU in map */ struct bpf_cpu_map_entry {
u32 cpu; /* kthread CPU and map index */ int map_id; /* Back reference to map */
/* XDP can run multiple RX-ring queues, need __percpu enqueue store */ struct xdp_bulk_queue __percpu *bulkq;
/* Queue with potential multi-producers, and single-consumer kthread */ struct ptr_ring *queue; struct task_struct *kthread;
/* Pre-limit array size based on NR_CPUS, not final CPU check */ if (attr->max_entries > NR_CPUS) return ERR_PTR(-E2BIG);
cmap = bpf_map_area_alloc(sizeof(*cmap), NUMA_NO_NODE); if (!cmap) return ERR_PTR(-ENOMEM);
bpf_map_init_from_attr(&cmap->map, attr);
/* Alloc array for possible remote "destination" CPUs */
cmap->cpu_map = bpf_map_area_alloc(cmap->map.max_entries * sizeof(struct bpf_cpu_map_entry *),
cmap->map.numa_node); if (!cmap->cpu_map) {
bpf_map_area_free(cmap); return ERR_PTR(-ENOMEM);
}
return &cmap->map;
}
staticvoid __cpu_map_ring_cleanup(struct ptr_ring *ring)
{ /* The tear-down procedure should have made sure that queue is * empty. See __cpu_map_entry_replace() and work-queue * invoked cpu_map_kthread_stop(). Catch any broken behaviour * gracefully and warn once.
*/ void *ptr;
while ((ptr = ptr_ring_consume(ring))) {
WARN_ON_ONCE(1); if (unlikely(__ptr_test_bit(0, &ptr))) {
__ptr_clear_bit(0, &ptr);
kfree_skb(ptr); continue;
}
xdp_return_frame(ptr);
}
}
staticvoid cpu_map_gro_flush(struct bpf_cpu_map_entry *rcpu, bool empty)
{ /* * If the ring is not empty, there'll be a new iteration soon, and we * only need to do a full flush if a tick is long (> 1 ms). * If the ring is empty, to not hold GRO packets in the stack for too * long, do a full flush. * This is equivalent to how NAPI decides whether to perform a full * flush.
*/
gro_flush_normal(&rcpu->gro, !empty && HZ >= 1000);
}
/* When kthread gives stop order, then rcpu have been disconnected * from map, thus no new packets can enter. Remaining in-flight * per CPU stored packets are flushed to this queue. Wait honoring * kthread_stop signal until queue is empty.
*/ while (!kthread_should_stop() || !__ptr_ring_empty(rcpu->queue)) { struct xdp_cpumap_stats stats = {}; /* zero stats */ unsignedint kmem_alloc_drops = 0, sched = 0; struct cpu_map_ret ret = { }; void *frames[CPUMAP_BATCH]; void *skbs[CPUMAP_BATCH];
u32 i, n, m; bool empty;
/* Release CPU reschedule checks */ if (__ptr_ring_empty(rcpu->queue)) {
set_current_state(TASK_INTERRUPTIBLE); /* Recheck to avoid lost wake-up */ if (__ptr_ring_empty(rcpu->queue)) {
schedule();
sched = 1;
last_qs = jiffies;
} else {
__set_current_state(TASK_RUNNING);
}
} else {
rcu_softirq_qs_periodic(last_qs);
sched = cond_resched();
}
/* * The bpf_cpu_map_entry is single consumer, with this * kthread CPU pinned. Lockless access to ptr_ring * consume side valid as no-resize allowed of queue.
*/
n = __ptr_ring_consume_batched(rcpu->queue, frames,
CPUMAP_BATCH); for (i = 0; i < n; i++) { void *f = frames[i]; struct page *page;
if (unlikely(__ptr_test_bit(0, &f))) { struct sk_buff *skb = f;
/* Bring struct page memory area to curr CPU. Read by * build_skb_around via page_is_pfmemalloc(), and when * freed written by page_frag_free call.
*/
prefetchw(page);
}
local_bh_disable();
/* Support running another XDP prog on this CPU */
cpu_map_bpf_prog_run(rcpu, frames, skbs, &ret, &stats); if (!ret.xdp_n) goto stats;
m = napi_skb_cache_get_bulk(skbs, ret.xdp_n); if (unlikely(m < ret.xdp_n)) { for (i = m; i < ret.xdp_n; i++)
xdp_return_frame(frames[i]);
if (ret.skb_n)
memmove(&skbs[m], &skbs[ret.xdp_n],
ret.skb_n * sizeof(*skbs));
kmem_alloc_drops += ret.xdp_n - m;
ret.xdp_n = m;
}
for (i = 0; i < ret.xdp_n; i++) { struct xdp_frame *xdpf = frames[i];
/* Can fail only when !skb -- already handled above */
__xdp_build_skb_from_frame(xdpf, skbs[i], xdpf->dev_rx);
}
stats: /* Feedback loop via tracepoint. * NB: keep before recv to allow measuring enqueue/dequeue latency.
*/
trace_xdp_cpumap_kthread(rcpu->map_id, n, kmem_alloc_drops,
sched, &stats);
for (i = 0; i < ret.xdp_n + ret.skb_n; i++)
gro_receive_skb(&rcpu->gro, skbs[i]);
/* Flush either every 64 packets or in case of empty ring */
packets += n;
empty = __ptr_ring_empty(rcpu->queue); if (packets >= NAPI_POLL_WEIGHT || empty) {
cpu_map_gro_flush(rcpu, empty);
packets = 0;
}
local_bh_enable(); /* resched point, may call do_softirq() */
}
__set_current_state(TASK_RUNNING);
/* Make sure kthread runs on a single CPU */
kthread_bind(rcpu->kthread, cpu);
wake_up_process(rcpu->kthread);
/* Make sure kthread has been running, so kthread_stop() will not * stop the kthread prematurely and all pending frames or skbs * will be handled by the kthread before kthread_stop() returns.
*/
wait_for_completion(&rcpu->kthread_running);
/* This cpu_map_entry have been disconnected from map and one * RCU grace-period have elapsed. Thus, XDP cannot queue any * new packets and cannot change/set flush_needed that can * find this entry.
*/
rcpu = container_of(to_rcu_work(work), struct bpf_cpu_map_entry, free_work);
/* kthread_stop will wake_up_process and wait for it to complete. * cpu_map_kthread_run() makes sure the pointer ring is empty * before exiting.
*/
kthread_stop(rcpu->kthread);
if (rcpu->prog)
bpf_prog_put(rcpu->prog);
gro_cleanup(&rcpu->gro); /* The queue should be empty at this point */
__cpu_map_ring_cleanup(rcpu->queue);
ptr_ring_cleanup(rcpu->queue, NULL);
kfree(rcpu->queue);
free_percpu(rcpu->bulkq);
kfree(rcpu);
}
/* After the xchg of the bpf_cpu_map_entry pointer, we need to make sure the old * entry is no longer in use before freeing. We use queue_rcu_work() to call * __cpu_map_entry_free() in a separate workqueue after waiting for an RCU grace * period. This means that (a) all pending enqueue and flush operations have * completed (because of the RCU callback), and (b) we are in a workqueue * context where we can stop the kthread and wait for it to exit before freeing * everything.
*/ staticvoid __cpu_map_entry_replace(struct bpf_cpu_map *cmap,
u32 key_cpu, struct bpf_cpu_map_entry *rcpu)
{ struct bpf_cpu_map_entry *old_rcpu;
/* At this point bpf_prog->aux->refcnt == 0 and this map->refcnt == 0, * so the bpf programs (can be more than one that used this map) were * disconnected from events. Wait for outstanding critical sections in * these programs to complete. synchronize_rcu() below not only * guarantees no further "XDP/bpf-side" reads against * bpf_cpu_map->cpu_map, but also ensure pending flush operations * (if any) are completed.
*/
synchronize_rcu();
/* The only possible user of bpf_cpu_map_entry is * cpu_map_kthread_run().
*/ for (i = 0; i < cmap->map.max_entries; i++) { struct bpf_cpu_map_entry *rcpu;
rcpu = rcu_dereference_raw(cmap->cpu_map[i]); if (!rcpu) continue;
/* Currently the dynamically allocated elements are not counted */
usage += (u64)map->max_entries * sizeof(struct bpf_cpu_map_entry *); return usage;
}
/* Runs under RCU-read-side, plus in softirq under NAPI protection. * Thus, safe percpu variable access.
*/ staticvoid bq_enqueue(struct bpf_cpu_map_entry *rcpu, struct xdp_frame *xdpf)
{ struct xdp_bulk_queue *bq = this_cpu_ptr(rcpu->bulkq);
if (unlikely(bq->count == CPU_MAP_BULK_SIZE))
bq_flush_to_queue(bq);
/* Notice, xdp_buff/page MUST be queued here, long enough for * driver to code invoking us to finished, due to driver * (e.g. ixgbe) recycle tricks based on page-refcnt. * * Thus, incoming xdp_frame is always queued here (else we race * with another CPU on page-refcnt and remaining driver code). * Queue time is very short, as driver will invoke flush * operation, when completing napi->poll call.
*/
bq->q[bq->count++] = xdpf;
if (!bq->flush_node.prev) { struct list_head *flush_list = bpf_net_ctx_get_cpu_map_flush_list();
list_add(&bq->flush_node, flush_list);
}
}
int cpu_map_enqueue(struct bpf_cpu_map_entry *rcpu, struct xdp_frame *xdpf, struct net_device *dev_rx)
{ /* Info needed when constructing SKB on remote CPU */
xdpf->dev_rx = dev_rx;
bq_enqueue(rcpu, xdpf); return 0;
}
int cpu_map_generic_redirect(struct bpf_cpu_map_entry *rcpu, struct sk_buff *skb)
{ int ret;
Die Informationen auf dieser Webseite wurden
nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit,
noch Qualität der bereit gestellten Informationen zugesichert.
Bemerkung:
Die farbliche Syntaxdarstellung und die Messung sind noch experimentell.