/* * This queue will attempt to handle requests in reasonably sized batches instead of reacting * immediately to each new request. The wait time between batches is dynamically adjusted up or * down to try to balance responsiveness against wasted thread run time. * * If the wait time becomes long enough, the queue will become dormant and must be explicitly * awoken when a new request is enqueued. The enqueue operation updates "newest" in the funnel * queue via xchg (which is a memory barrier), and later checks "dormant" to decide whether to do a * wakeup of the worker thread. * * When deciding to go to sleep, the worker thread sets "dormant" and then examines "newest" to * decide if the funnel queue is idle. In dormant mode, the last examination of "newest" before * going to sleep is done inside the wait_event_interruptible() macro, after a point where one or * more memory barriers have been issued. (Preparing to sleep uses spin locks.) Even if the funnel * queue's "next" field update isn't visible yet to make the entry accessible, its existence will * kick the worker thread out of dormant mode and back into timer-based mode. * * Unbatched requests are used to communicate between different zone threads and will also cause * the queue to awaken immediately.
*/
struct uds_request_queue { /* Wait queue for synchronizing producers and consumer */ struct wait_queue_head wait_head; /* Function to process a request */
uds_request_queue_processor_fn processor; /* Queue of new incoming requests */ struct funnel_queue *main_queue; /* Queue of old requests to retry */ struct funnel_queue *retry_queue; /* The thread id of the worker thread */ struct thread *thread; /* True if the worker was started */ bool started; /* When true, requests can be enqueued */ bool running; /* A flag set when the worker is waiting without a timeout */
atomic_t dormant;
};
/* * Determine if there is a next request to process, and return it if there is. Also return flags * indicating whether the worker thread can sleep (for the use of wait_event() macros) and whether * the thread did sleep before returning a new request.
*/ staticinlinebool dequeue_request(struct uds_request_queue *queue, struct uds_request **request_ptr, bool *waited_ptr)
{ struct uds_request *request = poll_queues(queue);
if (request != NULL) {
*request_ptr = request; returntrue;
}
if (!READ_ONCE(queue->running)) { /* Wake the worker thread so it can exit. */
*request_ptr = NULL; returntrue;
}
if (dormant) { /* * The queue has been roused from dormancy. Clear the flag so enqueuers can * stop broadcasting. No fence is needed for this transition.
*/
atomic_set(&queue->dormant, false);
dormant = false;
time_batch = DEFAULT_WAIT_TIME;
} elseif (waited) { /* * We waited for this request to show up. Adjust the wait time to smooth * out the batch size.
*/ if (current_batch < MINIMUM_BATCH) { /* * If the last batch of requests was too small, increase the wait * time.
*/
time_batch += time_batch / 4; if (time_batch >= MAXIMUM_WAIT_TIME) {
atomic_set(&queue->dormant, true);
dormant = true;
}
} elseif (current_batch > MAXIMUM_BATCH) { /* * If the last batch of requests was too large, decrease the wait * time.
*/
time_batch -= time_batch / 4; if (time_batch < MINIMUM_WAIT_TIME)
time_batch = MINIMUM_WAIT_TIME;
}
current_batch = 0;
}
}
/* * Ensure that we process any remaining requests that were enqueued before trying to shut * down. The corresponding write barrier is in uds_request_queue_finish().
*/
smp_rmb(); while ((request = poll_queues(queue)) != NULL)
queue->processor(request);
}
int uds_make_request_queue(constchar *queue_name,
uds_request_queue_processor_fn processor, struct uds_request_queue **queue_ptr)
{ int result; struct uds_request_queue *queue;
result = vdo_allocate(1, struct uds_request_queue, __func__, &queue); if (result != VDO_SUCCESS) return result;
/* * We must wake the worker thread when it is dormant. A read fence isn't needed here since * we know the queue operation acts as one.
*/ if (atomic_read(&queue->dormant) || unbatched)
wake_up_worker(queue);
}
void uds_request_queue_finish(struct uds_request_queue *queue)
{ if (queue == NULL) return;
/* * This memory barrier ensures that any requests we queued will be seen. The point is that * when dequeue_request() sees the following update to the running flag, it will also be * able to see any change we made to a next field in the funnel queue entry. The * corresponding read barrier is in request_queue_worker().
*/
smp_wmb();
WRITE_ONCE(queue->running, false);
if (queue->started) {
wake_up_worker(queue);
vdo_join_threads(queue->thread);
}
Die Informationen auf dieser Webseite wurden
nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit,
noch Qualität der bereit gestellten Informationen zugesichert.
Bemerkung:
Die farbliche Syntaxdarstellung und die Messung sind noch experimentell.