/** * DOC: Work queue definition. * * There are two types of work queues: simple, with one worker thread, and round-robin, which uses * a group of the former to do the work, and assigns work to them in round-robin fashion (roughly). * Externally, both are represented via the same common sub-structure, though there's actually not * a great deal of overlap between the two types internally.
*/ struct vdo_work_queue { /* Name of just the work queue (e.g., "cpuQ12") */ char *name; bool round_robin_mode; struct vdo_thread *owner; /* Life cycle functions, etc */ conststruct vdo_work_queue_type *type;
};
/* * The fields above are unchanged after setup but often read, and are good candidates for * caching -- and if the max priority is 2, just fit in one x86-64 cache line if aligned. * The fields below are often modified as we sleep and wake, so we want a separate cache * line for performance.
*/
/* Any (0 or 1) worker threads waiting for new work to do */
wait_queue_head_t waiting_worker_threads ____cacheline_aligned; /* Hack to reduce wakeup calls if the worker thread is running */
atomic_t idle;
/* These are infrequently used so in terms of performance we don't care where they land. */ struct task_struct *thread; /* Notify creator once worker has initialized */ struct completion *started;
};
/* * Dequeue and return the next waiting completion, if any. * * We scan the funnel queues from highest priority to lowest, once; there is therefore a race * condition where a high-priority completion can be enqueued followed by a lower-priority one, and * we'll grab the latter (but we'll catch the high-priority item on the next call). If strict * enforcement of priorities becomes necessary, this function will need fixing.
*/ staticstruct vdo_completion *poll_for_completion(struct simple_work_queue *queue)
{ int i;
for (i = queue->common.type->max_priority; i >= 0; i--) { struct funnel_queue_entry *link = vdo_funnel_queue_poll(queue->priority_lists[i]);
if (link != NULL) return container_of(link, struct vdo_completion, work_queue_entry_link);
}
return NULL;
}
staticvoid enqueue_work_queue_completion(struct simple_work_queue *queue, struct vdo_completion *completion)
{
VDO_ASSERT_LOG_ONLY(completion->my_queue == NULL, "completion %px (fn %px) to enqueue (%px) is not already queued (%px)",
completion, completion->callback, queue, completion->my_queue); if (completion->priority == VDO_WORK_Q_DEFAULT_PRIORITY)
completion->priority = queue->common.type->default_priority;
if (VDO_ASSERT(completion->priority <= queue->common.type->max_priority, "priority is in range for queue") != VDO_SUCCESS)
completion->priority = 0;
completion->my_queue = &queue->common;
/* Funnel queue handles the synchronization for the put. */
vdo_funnel_queue_put(queue->priority_lists[completion->priority],
&completion->work_queue_entry_link);
/* * Due to how funnel queue synchronization is handled (just atomic operations), the * simplest safe implementation here would be to wake-up any waiting threads after * enqueueing each item. Even if the funnel queue is not empty at the time of adding an * item to the queue, the consumer thread may not see this since it is not guaranteed to * have the same view of the queue as a producer thread. * * However, the above is wasteful so instead we attempt to minimize the number of thread * wakeups. Using an idle flag, and careful ordering using memory barriers, we should be * able to determine when the worker thread might be asleep or going to sleep. We use * cmpxchg to try to take ownership (vs other producer threads) of the responsibility for * waking the worker thread, so multiple wakeups aren't tried at once. * * This was tuned for some x86 boxes that were handy; it's untested whether doing the read * first is any better or worse for other platforms, even other x86 configurations.
*/
smp_mb(); if ((atomic_read(&queue->idle) != 1) || (atomic_cmpxchg(&queue->idle, 1, 0) != 1)) return;
/* There's a maximum of one thread in this list. */
wake_up(&queue->waiting_worker_threads);
}
/* * Wait for the next completion to process, or until kthread_should_stop indicates that it's time * for us to shut down. * * If kthread_should_stop says it's time to stop but we have pending completions return a * completion. * * Also update statistics relating to scheduler interactions.
*/ staticstruct vdo_completion *wait_for_next_completion(struct simple_work_queue *queue)
{ struct vdo_completion *completion;
DEFINE_WAIT(wait);
while (true) {
prepare_to_wait(&queue->waiting_worker_threads, &wait,
TASK_INTERRUPTIBLE); /* * Don't set the idle flag until a wakeup will not be lost. * * Force synchronization between setting the idle flag and checking the funnel * queue; the producer side will do them in the reverse order. (There's still a * race condition we've chosen to allow, because we've got a timeout below that * unwedges us if we hit it, but this may narrow the window a little.)
*/
atomic_set(&queue->idle, 1);
smp_mb(); /* store-load barrier between "idle" and funnel queue */
completion = poll_for_completion(queue); if (completion != NULL) break;
/* * We need to check for thread-stop after setting TASK_INTERRUPTIBLE state up * above. Otherwise, schedule() will put the thread to sleep and might miss a * wakeup from kthread_stop() call in vdo_finish_work_queue().
*/ if (kthread_should_stop()) break;
schedule();
/* * Most of the time when we wake, it should be because there's work to do. If it * was a spurious wakeup, continue looping.
*/
completion = poll_for_completion(queue); if (completion != NULL) break;
}
while (true) { struct vdo_completion *completion = poll_for_completion(queue);
if (completion == NULL)
completion = wait_for_next_completion(queue);
if (completion == NULL) { /* No completions but kthread_should_stop() was triggered. */ break;
}
process_completion(queue, completion);
/* * Be friendly to a CPU that has other work to do, if the kernel has told us to. * This speeds up some performance tests; that "other work" might include other VDO * threads.
*/
cond_resched();
}
result = vdo_duplicate_string(name, "queue name", &queue->common.name); if (result != VDO_SUCCESS) {
vdo_free(queue); return -ENOMEM;
}
for (i = 0; i <= type->max_priority; i++) {
result = vdo_make_funnel_queue(&queue->priority_lists[i]); if (result != VDO_SUCCESS) {
free_simple_work_queue(queue); return result;
}
}
/* * If we don't wait to ensure the thread is running VDO code, a quick kthread_stop (due to * errors elsewhere) could cause it to never get as far as running VDO, skipping the * cleanup code. * * Eventually we should just make that path safe too, and then we won't need this * synchronization.
*/
wait_for_completion(&started);
*queue_ptr = queue; return VDO_SUCCESS;
}
/** * vdo_make_work_queue() - Create a work queue; if multiple threads are requested, completions will * be distributed to them in round-robin fashion. * * Each queue is associated with a struct vdo_thread which has a single vdo thread id. Regardless * of the actual number of queues and threads allocated here, code outside of the queue * implementation will treat this as a single zone.
*/ int vdo_make_work_queue(constchar *thread_name_prefix, constchar *name, struct vdo_thread *owner, conststruct vdo_work_queue_type *type, unsignedint thread_count, void *thread_privates[], struct vdo_work_queue **queue_ptr)
{ struct round_robin_work_queue *queue; int result; char thread_name[TASK_COMM_LEN]; unsignedint i;
for (i = 0; i < count; i++)
finish_simple_work_queue(queue_table[i]);
}
/* No enqueueing of completions should be done once this function is called. */ void vdo_finish_work_queue(struct vdo_work_queue *queue)
{ if (queue == NULL) return;
if (queue->round_robin_mode)
finish_round_robin_work_queue(as_round_robin_work_queue(queue)); else
finish_simple_work_queue(as_simple_work_queue(queue));
}
/* * Write to the buffer some info about the completion, for logging. Since the common use case is * dumping info about a lot of completions to syslog all at once, the format favors brevity over * readability.
*/ void vdo_dump_work_queue(struct vdo_work_queue *queue)
{ if (queue->round_robin_mode) { struct round_robin_work_queue *round_robin = as_round_robin_work_queue(queue); unsignedint i;
for (i = 0; i < round_robin->num_service_queues; i++)
dump_simple_work_queue(round_robin->service_queues[i]);
} else {
dump_simple_work_queue(as_simple_work_queue(queue));
}
}
staticvoid get_function_name(void *pointer, char *buffer, size_t buffer_length)
{ if (pointer == NULL) { /* * Format "%ps" logs a null pointer as "(null)" with a bunch of leading spaces. We * sometimes use this when logging lots of data; don't be so verbose.
*/
strscpy(buffer, "-", buffer_length);
} else { /* * Use a pragma to defeat gcc's format checking, which doesn't understand that * "%ps" actually does support a precision spec in Linux kernel code.
*/ char *space;
/* Completion submission */ /* * If the completion has a timeout that has already passed, the timeout handler function may be * invoked by this function.
*/ void vdo_enqueue_work_queue(struct vdo_work_queue *queue, struct vdo_completion *completion)
{ /* * Convert the provided generic vdo_work_queue to the simple_work_queue to actually queue * on.
*/ struct simple_work_queue *simple_queue = NULL;
/* * It shouldn't be a big deal if the same rotor gets used for multiple work queues. * Any patterns that might develop are likely to be disrupted by random ordering of * multiple completions and migration between cores, unless the load is so light as * to be regular in ordering of tasks and the threads are confined to individual * cores; with a load that light we won't care.
*/ unsignedint rotor = this_cpu_inc_return(service_queue_rotor); unsignedint index = rotor % round_robin->num_service_queues;
/* * Return the work queue pointer recorded at initialization time in the work-queue stack handle * initialized on the stack of the current thread, if any.
*/ staticstruct simple_work_queue *get_current_thread_work_queue(void)
{ /* * In interrupt context, if a vdo thread is what got interrupted, the calls below will find * the queue for the thread which was interrupted. However, the interrupted thread may have * been processing a completion, in which case starting to process another would violate * our concurrency assumptions.
*/ if (in_interrupt()) return NULL;
if (kthread_func(current) != work_queue_runner) /* Not a VDO work queue thread. */ return NULL;
/** * vdo_get_work_queue_private_data() - Returns the private data for the current thread's work * queue, or NULL if none or if the current thread is not a * work queue thread.
*/ void *vdo_get_work_queue_private_data(void)
{ struct simple_work_queue *queue = get_current_thread_work_queue();
Die Informationen auf dieser Webseite wurden
nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit,
noch Qualität der bereit gestellten Informationen zugesichert.
Bemerkung:
Die farbliche Syntaxdarstellung und die Messung sind noch experimentell.