if (timeout != 0) { unsignedlong now = jiffies; if (time_before(now, timeout)) return timeout - now;
} return 0;
}
EXPORT_SYMBOL_GPL(rpc_task_timeout);
/* * Disable the timer for a given RPC task. Should be called with * queue->lock and bh_disabled in order to avoid races within * rpc_run_timer().
*/ staticvoid
__rpc_disable_timer(struct rpc_wait_queue *queue, struct rpc_task *task)
{ if (list_empty(&task->u.tk_wait.timer_list)) return;
task->tk_timeout = 0;
list_del(&task->u.tk_wait.timer_list); if (list_empty(&queue->timer_list.list))
cancel_delayed_work(&queue->timer_list.dwork);
}
/* * Set up a timer for the current task.
*/ staticvoid
__rpc_add_timer(struct rpc_wait_queue *queue, struct rpc_task *task, unsignedlong timeout)
{
task->tk_timeout = timeout; if (list_empty(&queue->timer_list.list) || time_before(timeout, queue->timer_list.expires))
rpc_set_queue_timer(queue, timeout);
list_add(&task->u.tk_wait.timer_list, &queue->timer_list.list);
}
/* * Mark an RPC call as having completed by clearing the 'active' bit * and then waking up all tasks that were sleeping.
*/ staticint rpc_complete_task(struct rpc_task *task)
{ void *m = &task->tk_runstate;
wait_queue_head_t *wq = bit_waitqueue(m, RPC_TASK_ACTIVE); struct wait_bit_key k = __WAIT_BIT_KEY_INITIALIZER(m, RPC_TASK_ACTIVE); unsignedlong flags; int ret;
trace_rpc_task_complete(task, NULL);
spin_lock_irqsave(&wq->lock, flags);
clear_bit(RPC_TASK_ACTIVE, &task->tk_runstate);
ret = atomic_dec_and_test(&task->tk_count); if (waitqueue_active(wq))
__wake_up_locked_key(wq, TASK_NORMAL, &k);
spin_unlock_irqrestore(&wq->lock, flags); return ret;
}
/* * Allow callers to wait for completion of an RPC call * * Note the use of out_of_line_wait_on_bit() rather than wait_on_bit() * to enforce taking of the wq->lock and hence avoid races with * rpc_complete_task().
*/ int rpc_wait_for_completion_task(struct rpc_task *task)
{ return out_of_line_wait_on_bit(&task->tk_runstate, RPC_TASK_ACTIVE,
rpc_wait_bit_killable, TASK_KILLABLE|TASK_FREEZABLE_UNSAFE);
}
EXPORT_SYMBOL_GPL(rpc_wait_for_completion_task);
/* * Make an RPC task runnable. * * Note: If the task is ASYNC, and is being made runnable after sitting on an * rpc_wait_queue, this must be called with the queue spinlock held to protect * the wait queue operation. * Note the ordering of rpc_test_and_set_running() and rpc_clear_queued(), * which is needed to ensure that __rpc_execute() doesn't loop (due to the * lockless RPC_IS_QUEUED() test) before we've had a chance to test * the RPC_TASK_RUNNING flag.
*/ staticvoid rpc_make_runnable(struct workqueue_struct *wq, struct rpc_task *task)
{ bool need_wakeup = !rpc_test_and_set_running(task);
rpc_clear_queued(task); if (!need_wakeup) return; if (RPC_IS_ASYNC(task)) {
INIT_WORK(&task->u.tk_work, rpc_async_schedule);
queue_work(wq, &task->u.tk_work);
} else {
smp_mb__after_atomic();
wake_up_bit(&task->tk_runstate, RPC_TASK_QUEUED);
}
}
/* * Prepare for sleeping on a wait queue. * By always appending tasks to the list we ensure FIFO behavior. * NB: An RPC task will only receive interrupt-driven events as long * as it's on a wait queue.
*/ staticvoid __rpc_do_sleep_on_priority(struct rpc_wait_queue *q, struct rpc_task *task, unsignedchar queue_priority)
{
trace_rpc_task_sleep(task, q);
/** * __rpc_do_wake_up_task_on_wq - wake up a single rpc_task * @wq: workqueue on which to run task * @queue: wait queue * @task: task to be woken up * * Caller must hold queue->lock, and have cleared the task queued flag.
*/ staticvoid __rpc_do_wake_up_task_on_wq(struct workqueue_struct *wq, struct rpc_wait_queue *queue, struct rpc_task *task)
{ /* Has the task been executed yet? If not, we cannot wake it up! */ if (!RPC_IS_ACTIVATED(task)) {
printk(KERN_ERR "RPC: Inactive task (%p) being woken up!\n", task); return;
}
trace_rpc_task_wakeup(task, queue);
__rpc_remove_wait_queue(queue, task);
rpc_make_runnable(wq, task);
}
/* * Wake up a queued task while the queue lock is being held
*/ staticstruct rpc_task *
rpc_wake_up_task_on_wq_queue_action_locked(struct workqueue_struct *wq, struct rpc_wait_queue *queue, struct rpc_task *task, bool (*action)(struct rpc_task *, void *), void *data)
{ if (RPC_IS_QUEUED(task)) {
smp_rmb(); if (task->tk_waitqueue == queue) { if (action == NULL || action(task, data)) {
__rpc_do_wake_up_task_on_wq(wq, queue, task); return task;
}
}
} return NULL;
}
/* * Wake up a queued task while the queue lock is being held
*/ staticvoid rpc_wake_up_task_queue_locked(struct rpc_wait_queue *queue, struct rpc_task *task)
{
rpc_wake_up_task_on_wq_queue_action_locked(rpciod_workqueue, queue,
task, NULL, NULL);
}
/* * Wake up a task on a specific queue
*/ void rpc_wake_up_queued_task(struct rpc_wait_queue *queue, struct rpc_task *task)
{ if (!RPC_IS_QUEUED(task)) return;
spin_lock(&queue->lock);
rpc_wake_up_task_queue_locked(queue, task);
spin_unlock(&queue->lock);
}
EXPORT_SYMBOL_GPL(rpc_wake_up_queued_task);
/** * rpc_wake_up_queued_task_set_status - wake up a task and set task->tk_status * @queue: pointer to rpc_wait_queue * @task: pointer to rpc_task * @status: integer error value * * If @task is queued on @queue, then it is woken up, and @task->tk_status is * set to the value of @status.
*/ void
rpc_wake_up_queued_task_set_status(struct rpc_wait_queue *queue, struct rpc_task *task, int status)
{ if (!RPC_IS_QUEUED(task)) return;
spin_lock(&queue->lock);
rpc_wake_up_task_queue_set_status_locked(queue, task, status);
spin_unlock(&queue->lock);
}
/* * Wake up the next task on a priority queue.
*/ staticstruct rpc_task *__rpc_find_next_queued_priority(struct rpc_wait_queue *queue)
{ struct list_head *q; struct rpc_task *task;
/* * Service the privileged queue.
*/
q = &queue->tasks[RPC_NR_PRIORITY - 1]; if (queue->maxpriority > RPC_PRIORITY_PRIVILEGED && !list_empty(q)) {
task = list_first_entry(q, struct rpc_task, u.tk_wait.list); goto out;
}
/* * Service a batch of tasks from a single owner.
*/
q = &queue->tasks[queue->priority]; if (!list_empty(q) && queue->nr) {
queue->nr--;
task = list_first_entry(q, struct rpc_task, u.tk_wait.list); goto out;
}
/* * Service the next queue.
*/ do { if (q == &queue->tasks[0])
q = &queue->tasks[queue->maxpriority]; else
q = q - 1; if (!list_empty(q)) {
task = list_first_entry(q, struct rpc_task, u.tk_wait.list); goto new_queue;
}
} while (q != &queue->tasks[queue->priority]);
/* * Wake up the next task on the wait queue.
*/ struct rpc_task *rpc_wake_up_next(struct rpc_wait_queue *queue)
{ return rpc_wake_up_first(queue, rpc_wake_up_next_func, NULL);
}
EXPORT_SYMBOL_GPL(rpc_wake_up_next);
/** * rpc_wake_up_locked - wake up all rpc_tasks * @queue: rpc_wait_queue on which the tasks are sleeping *
*/ staticvoid rpc_wake_up_locked(struct rpc_wait_queue *queue)
{ struct rpc_task *task;
for (;;) {
task = __rpc_find_next_queued(queue); if (task == NULL) break;
rpc_wake_up_task_queue_locked(queue, task);
}
}
/** * rpc_wake_up - wake up all rpc_tasks * @queue: rpc_wait_queue on which the tasks are sleeping * * Grabs queue->lock
*/ void rpc_wake_up(struct rpc_wait_queue *queue)
{
spin_lock(&queue->lock);
rpc_wake_up_locked(queue);
spin_unlock(&queue->lock);
}
EXPORT_SYMBOL_GPL(rpc_wake_up);
/** * rpc_wake_up_status_locked - wake up all rpc_tasks and set their status value. * @queue: rpc_wait_queue on which the tasks are sleeping * @status: status value to set
*/ staticvoid rpc_wake_up_status_locked(struct rpc_wait_queue *queue, int status)
{ struct rpc_task *task;
for (;;) {
task = __rpc_find_next_queued(queue); if (task == NULL) break;
rpc_wake_up_task_queue_set_status_locked(queue, task, status);
}
}
/** * rpc_wake_up_status - wake up all rpc_tasks and set their status value. * @queue: rpc_wait_queue on which the tasks are sleeping * @status: status value to set * * Grabs queue->lock
*/ void rpc_wake_up_status(struct rpc_wait_queue *queue, int status)
{
spin_lock(&queue->lock);
rpc_wake_up_status_locked(queue, status);
spin_unlock(&queue->lock);
}
EXPORT_SYMBOL_GPL(rpc_wake_up_status);
/* * Run a task at a later time
*/ void rpc_delay(struct rpc_task *task, unsignedlong delay)
{
rpc_sleep_on_timeout(&delay_queue, task, __rpc_atrun, jiffies + delay);
}
EXPORT_SYMBOL_GPL(rpc_delay);
/* * This is the RPC `scheduler' (or rather, the finite state machine).
*/ staticvoid __rpc_execute(struct rpc_task *task)
{ struct rpc_wait_queue *queue; int task_is_async = RPC_IS_ASYNC(task); int status = 0; unsignedlong pflags = current->flags;
WARN_ON_ONCE(RPC_IS_QUEUED(task)); if (RPC_IS_QUEUED(task)) return;
for (;;) { void (*do_action)(struct rpc_task *);
/* * Perform the next FSM step or a pending callback. * * tk_action may be NULL if the task has been killed.
*/
do_action = task->tk_action; /* Tasks with an RPC error status should exit */ if (do_action && do_action != rpc_exit_task &&
(status = READ_ONCE(task->tk_rpc_status)) != 0) {
task->tk_status = status;
do_action = rpc_exit_task;
} /* Callbacks override all actions */ if (task->tk_callback) {
do_action = task->tk_callback;
task->tk_callback = NULL;
} if (!do_action) break; if (RPC_IS_SWAPPER(task) ||
xprt_needs_memalloc(task->tk_xprt, task))
current->flags |= PF_MEMALLOC;
/* * Lockless check for whether task is sleeping or not.
*/ if (!RPC_IS_QUEUED(task)) {
cond_resched(); continue;
}
/* * The queue->lock protects against races with * rpc_make_runnable(). * * Note that once we clear RPC_TASK_RUNNING on an asynchronous * rpc_task, rpc_make_runnable() can assign it to a * different workqueue. We therefore cannot assume that the * rpc_task pointer may still be dereferenced.
*/
queue = task->tk_waitqueue;
spin_lock(&queue->lock); if (!RPC_IS_QUEUED(task)) {
spin_unlock(&queue->lock); continue;
} /* Wake up any task that has an exit status */ if (READ_ONCE(task->tk_rpc_status) != 0) {
rpc_wake_up_task_queue_locked(queue, task);
spin_unlock(&queue->lock); continue;
}
rpc_clear_running(task);
spin_unlock(&queue->lock); if (task_is_async) goto out;
/* sync task: sleep here */
trace_rpc_task_sync_sleep(task, task->tk_action);
status = out_of_line_wait_on_bit(&task->tk_runstate,
RPC_TASK_QUEUED, rpc_wait_bit_killable,
TASK_KILLABLE|TASK_FREEZABLE); if (status < 0) { /* * When a sync task receives a signal, it exits with * -ERESTARTSYS. In order to catch any callbacks that * clean up after sleeping on some queue, we don't * break the loop here, but go around once more.
*/
rpc_signal_task(task);
}
trace_rpc_task_sync_wake(task, task->tk_action);
}
/* Release all resources associated with the task */
rpc_release_task(task);
out:
current_restore_flags(pflags, PF_MEMALLOC);
}
/* * User-visible entry point to the scheduler. * * This may be called recursively if e.g. an async NFS task updates * the attributes and finds that dirty pages must be flushed. * NOTE: Upon exit of this function the task is guaranteed to be * released. In particular note that tk_release() will have * been called, so your task memory may have been freed.
*/ void rpc_execute(struct rpc_task *task)
{ bool is_async = RPC_IS_ASYNC(task);
/** * rpc_malloc - allocate RPC buffer resources * @task: RPC task * * A single memory region is allocated, which is split between the * RPC call and RPC reply that this task is being used for. When * this RPC is retired, the memory is released by calling rpc_free. * * To prevent rpciod from hanging, this allocator never sleeps, * returning -ENOMEM and suppressing warning if the request cannot * be serviced immediately. The caller can arrange to sleep in a * way that is safe for rpciod. * * Most requests are 'small' (under 2KiB) and can be serviced from a * mempool, ensuring that NFS reads and writes can always proceed, * and that there is good locality of reference for these buffers.
*/ int rpc_malloc(struct rpc_task *task)
{ struct rpc_rqst *rqst = task->tk_rqstp;
size_t size = rqst->rq_callsize + rqst->rq_rcvsize; struct rpc_buffer *buf;
gfp_t gfp = rpc_task_gfp_mask();
size += sizeof(struct rpc_buffer); if (size <= RPC_BUFFER_MAXSIZE) {
buf = kmem_cache_alloc(rpc_buffer_slabp, gfp); /* Reach for the mempool if dynamic allocation fails */ if (!buf && RPC_IS_ASYNC(task))
buf = mempool_alloc(rpc_buffer_mempool, GFP_NOWAIT);
} else
buf = kmalloc(size, gfp);
/* * rpc_free_task - release rpc task and perform cleanups * * Note that we free up the rpc_task _after_ rpc_release_calldata() * in order to work around a workqueue dependency issue. * * Tejun Heo states: * "Workqueue currently considers two work items to be the same if they're * on the same address and won't execute them concurrently - ie. it * makes a work item which is queued again while being executed wait * for the previous execution to complete. * * If a work function frees the work item, and then waits for an event * which should be performed by another work item and *that* work item * recycles the freed work item, it can create a false dependency loop. * There really is no reliable way to detect this short of verifying * every memory free." *
*/ staticvoid rpc_free_task(struct rpc_task *task)
{ unsignedshort tk_flags = task->tk_flags;
/* * Note: at this point we have been removed from rpc_clnt->cl_tasks, * so it should be safe to use task->tk_count as a test for whether * or not any other processes still hold references to our rpc_task.
*/ if (atomic_read(&task->tk_count) != 1 + !RPC_IS_ASYNC(task)) { /* Wake up anyone who may be waiting for task completion */ if (!rpc_complete_task(task)) return;
} else { if (!atomic_dec_and_test(&task->tk_count)) return;
}
rpc_final_put_task(task, task->tk_workqueue);
}
int rpciod_up(void)
{ return try_module_get(THIS_MODULE) ? 0 : -EINVAL;
}
int
rpc_init_mempool(void)
{ /* * The following is not strictly a mempool initialisation, * but there is no harm in doing it here
*/
rpc_init_wait_queue(&delay_queue, "delayq"); if (!rpciod_start()) goto err_nomem;
Die Informationen auf dieser Webseite wurden
nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit,
noch Qualität der bereit gestellten Informationen zugesichert.
Bemerkung:
Die farbliche Syntaxdarstellung und die Messung sind noch experimentell.