/* SPDX-License-Identifier: GPL-2.0-or-later */ /* * Definitions for the 'struct ptr_ring' datastructure. * * Author: * Michael S. Tsirkin <mst@redhat.com> * * Copyright (C) 2016 Red Hat, Inc. * * This is a limited-size FIFO maintaining pointers in FIFO order, with * one CPU producing entries and another consuming entries from a FIFO. * * This implementation tries to minimize cache-contention when there is a * single producer and a single consumer CPU.
*/
struct ptr_ring { int producer ____cacheline_aligned_in_smp;
spinlock_t producer_lock; int consumer_head ____cacheline_aligned_in_smp; /* next valid entry */ int consumer_tail; /* next entry to invalidate */
spinlock_t consumer_lock; /* Shared consumer/producer data */ /* Read-only by both the producer and the consumer */ int size ____cacheline_aligned_in_smp; /* max entries in queue */ int batch; /* number of entries to consume in a batch */ void **queue;
};
/* Note: callers invoking this in a loop must use a compiler barrier, * for example cpu_relax(). * * NB: this is unlike __ptr_ring_empty in that callers must hold producer_lock: * see e.g. ptr_ring_full.
*/ staticinlinebool __ptr_ring_full(struct ptr_ring *r)
{ return r->queue[r->producer];
}
spin_lock_bh(&r->producer_lock);
ret = __ptr_ring_full(r);
spin_unlock_bh(&r->producer_lock);
return ret;
}
/* Note: callers invoking this in a loop must use a compiler barrier, * for example cpu_relax(). Callers must hold producer_lock. * Callers are responsible for making sure pointer that is being queued * points to a valid data.
*/ staticinlineint __ptr_ring_produce(struct ptr_ring *r, void *ptr)
{ if (unlikely(!r->size) || r->queue[r->producer]) return -ENOSPC;
/* Make sure the pointer we are storing points to a valid data. */ /* Pairs with the dependency ordering in __ptr_ring_consume. */
smp_wmb();
/* * Note: resize (below) nests producer lock within consumer lock, so if you * consume in interrupt or BH context, you must disable interrupts/BH when * calling this.
*/ staticinlineint ptr_ring_produce(struct ptr_ring *r, void *ptr)
{ int ret;
spin_lock(&r->producer_lock);
ret = __ptr_ring_produce(r, ptr);
spin_unlock(&r->producer_lock);
return ret;
}
staticinlineint ptr_ring_produce_irq(struct ptr_ring *r, void *ptr)
{ int ret;
spin_lock_irq(&r->producer_lock);
ret = __ptr_ring_produce(r, ptr);
spin_unlock_irq(&r->producer_lock);
/* * Test ring empty status without taking any locks. * * NB: This is only safe to call if ring is never resized. * * However, if some other CPU consumes ring entries at the same time, the value * returned is not guaranteed to be correct. * * In this case - to avoid incorrectly detecting the ring * as empty - the CPU consuming the ring entries is responsible * for either consuming all ring entries until the ring is empty, * or synchronizing with some other CPU and causing it to * re-test __ptr_ring_empty and/or consume the ring enteries * after the synchronization point. * * Note: callers invoking this in a loop must use a compiler barrier, * for example cpu_relax().
*/ staticinlinebool __ptr_ring_empty(struct ptr_ring *r)
{ if (likely(r->size)) return !r->queue[READ_ONCE(r->consumer_head)]; returntrue;
}
spin_lock_bh(&r->consumer_lock);
ret = __ptr_ring_empty(r);
spin_unlock_bh(&r->consumer_lock);
return ret;
}
/* Must only be called after __ptr_ring_peek returned !NULL */ staticinlinevoid __ptr_ring_discard_one(struct ptr_ring *r)
{ /* Fundamentally, what we want to do is update consumer * index and zero out the entry so producer can reuse it. * Doing it naively at each consume would be as simple as: * consumer = r->consumer; * r->queue[consumer++] = NULL; * if (unlikely(consumer >= r->size)) * consumer = 0; * r->consumer = consumer; * but that is suboptimal when the ring is full as producer is writing * out new entries in the same cache line. Defer these updates until a * batch of entries has been consumed.
*/ /* Note: we must keep consumer_head valid at all times for __ptr_ring_empty * to work correctly.
*/ int consumer_head = r->consumer_head; int head = consumer_head++;
/* Once we have processed enough entries invalidate them in * the ring all at once so producer can reuse their space in the ring. * We also do this when we reach end of the ring - not mandatory * but helps keep the implementation simple.
*/ if (unlikely(consumer_head - r->consumer_tail >= r->batch ||
consumer_head >= r->size)) { /* Zero out entries in the reverse order: this way we touch the * cache line that producer might currently be reading the last; * producer won't make progress and touch other cache lines * besides the first one until we write out all entries.
*/ while (likely(head >= r->consumer_tail))
r->queue[head--] = NULL;
r->consumer_tail = consumer_head;
} if (unlikely(consumer_head >= r->size)) {
consumer_head = 0;
r->consumer_tail = 0;
} /* matching READ_ONCE in __ptr_ring_empty for lockless tests */
WRITE_ONCE(r->consumer_head, consumer_head);
}
/* The READ_ONCE in __ptr_ring_peek guarantees that anyone * accessing data through the pointer is up to date. Pairs * with smp_wmb in __ptr_ring_produce.
*/
ptr = __ptr_ring_peek(r); if (ptr)
__ptr_ring_discard_one(r);
return ptr;
}
staticinlineint __ptr_ring_consume_batched(struct ptr_ring *r, void **array, int n)
{ void *ptr; int i;
for (i = 0; i < n; i++) {
ptr = __ptr_ring_consume(r); if (!ptr) break;
array[i] = ptr;
}
return i;
}
/* * Note: resize (below) nests producer lock within consumer lock, so if you * call this in interrupt or BH context, you must disable interrupts/BH when * producing.
*/ staticinlinevoid *ptr_ring_consume(struct ptr_ring *r)
{ void *ptr;
staticinlineint ptr_ring_consume_batched(struct ptr_ring *r, void **array, int n)
{ int ret;
spin_lock(&r->consumer_lock);
ret = __ptr_ring_consume_batched(r, array, n);
spin_unlock(&r->consumer_lock);
return ret;
}
staticinlineint ptr_ring_consume_batched_irq(struct ptr_ring *r, void **array, int n)
{ int ret;
spin_lock_irq(&r->consumer_lock);
ret = __ptr_ring_consume_batched(r, array, n);
spin_unlock_irq(&r->consumer_lock);
return ret;
}
staticinlineint ptr_ring_consume_batched_any(struct ptr_ring *r, void **array, int n)
{ unsignedlong flags; int ret;
spin_lock_irqsave(&r->consumer_lock, flags);
ret = __ptr_ring_consume_batched(r, array, n);
spin_unlock_irqrestore(&r->consumer_lock, flags);
return ret;
}
staticinlineint ptr_ring_consume_batched_bh(struct ptr_ring *r, void **array, int n)
{ int ret;
spin_lock_bh(&r->consumer_lock);
ret = __ptr_ring_consume_batched(r, array, n);
spin_unlock_bh(&r->consumer_lock);
return ret;
}
/* Cast to structure type and call a function without discarding from FIFO. * Function must return a value. * Callers must take consumer_lock.
*/ #define __PTR_RING_PEEK_CALL(r, f) ((f)(__ptr_ring_peek(r)))
/* Not all gfp_t flags (besides GFP_KERNEL) are allowed. See * documentation for vmalloc for which of them are legal.
*/ staticinlinevoid **__ptr_ring_init_queue_alloc_noprof(unsignedint size, gfp_t gfp)
{ if (size > KMALLOC_MAX_SIZE / sizeof(void *)) return NULL; return kvmalloc_array_noprof(size, sizeof(void *), gfp | __GFP_ZERO);
}
staticinlinevoid __ptr_ring_set_size(struct ptr_ring *r, int size)
{
r->size = size;
r->batch = SMP_CACHE_BYTES * 2 / sizeof(*(r->queue)); /* We need to set batch at least to 1 to make logic * in __ptr_ring_discard_one work correctly. * Batching too much (because ring is small) would cause a lot of * burstiness. Needs tuning, for now disable batching.
*/ if (r->batch > r->size / 2 || !r->batch)
r->batch = 1;
}
staticinlineint ptr_ring_init_noprof(struct ptr_ring *r, int size, gfp_t gfp)
{
r->queue = __ptr_ring_init_queue_alloc_noprof(size, gfp); if (!r->queue) return -ENOMEM;
/* * Return entries into ring. Destroy entries that don't fit. * * Note: this is expected to be a rare slow path operation. * * Note: producer lock is nested within consumer lock, so if you * resize you must make sure all uses nest correctly. * In particular if you consume ring in interrupt or BH context, you must * disable interrupts/BH when doing so.
*/ staticinlinevoid ptr_ring_unconsume(struct ptr_ring *r, void **batch, int n, void (*destroy)(void *))
{ unsignedlong flags; int head;
/* * Clean out buffered entries (for simplicity). This way following code * can test entries for NULL and if not assume they are valid.
*/
head = r->consumer_head - 1; while (likely(head >= r->consumer_tail))
r->queue[head--] = NULL;
r->consumer_tail = r->consumer_head;
/* * Go over entries in batch, start moving head back and copy entries. * Stop when we run into previously unconsumed entries.
*/ while (n) {
head = r->consumer_head - 1; if (head < 0)
head = r->size - 1; if (r->queue[head]) { /* This batch entry will have to be destroyed. */ goto done;
}
r->queue[head] = batch[--n];
r->consumer_tail = head; /* matching READ_ONCE in __ptr_ring_empty for lockless tests */
WRITE_ONCE(r->consumer_head, head);
}
done: /* Destroy all entries left in the batch. */ while (n)
destroy(batch[--n]);
spin_unlock(&r->producer_lock);
spin_unlock_irqrestore(&r->consumer_lock, flags);
}
/* * Note: producer lock is nested within consumer lock, so if you * resize you must make sure all uses nest correctly. * In particular if you consume ring in interrupt or BH context, you must * disable interrupts/BH when doing so.
*/ staticinlineint ptr_ring_resize_noprof(struct ptr_ring *r, int size, gfp_t gfp, void (*destroy)(void *))
{ unsignedlong flags; void **queue = __ptr_ring_init_queue_alloc_noprof(size, gfp); void **old;
/* * Note: producer lock is nested within consumer lock, so if you * resize you must make sure all uses nest correctly. * In particular if you consume ring in BH context, you must * disable BH when doing so.
*/ staticinlineint ptr_ring_resize_multiple_bh_noprof(struct ptr_ring **rings, unsignedint nrings, int size, gfp_t gfp, void (*destroy)(void *))
{ void ***queues; int i;
queues = kmalloc_array_noprof(nrings, sizeof(*queues), gfp); if (!queues) goto noqueues;
for (i = 0; i < nrings; ++i) {
queues[i] = __ptr_ring_init_queue_alloc_noprof(size, gfp); if (!queues[i]) goto nomem;
}
for (i = 0; i < nrings; ++i) {
spin_lock_bh(&(rings[i])->consumer_lock);
spin_lock(&(rings[i])->producer_lock);
queues[i] = __ptr_ring_swap_queue(rings[i], queues[i],
size, gfp, destroy);
spin_unlock(&(rings[i])->producer_lock);
spin_unlock_bh(&(rings[i])->consumer_lock);
}
Die Informationen auf dieser Webseite wurden
nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit,
noch Qualität der bereit gestellten Informationen zugesichert.
Bemerkung:
Die farbliche Syntaxdarstellung ist noch experimentell.