// SPDX-License-Identifier: GPL-2.0-only /* * linux/net/sunrpc/xprt.c * * This is a generic RPC call interface supporting congestion avoidance, * and asynchronous calls. * * The interface works like this: * * - When a process places a call, it allocates a request slot if * one is available. Otherwise, it sleeps on the backlog queue * (xprt_reserve). * - Next, the caller puts together the RPC message, stuffs it into * the request struct, and calls xprt_transmit(). * - xprt_transmit sends the message and installs the caller on the * transport's wait list. At the same time, if a reply is expected, * it installs a timer that is run after the packet's timeout has * expired. * - When a packet arrives, the data_ready handler walks the list of * pending requests for that transport. If a matching XID is found, the * caller is woken up, and the timer removed. * - When no reply arrives within the timeout interval, the timer is * fired by the kernel and runs xprt_timer(). It either adjusts the * timeout values (minor timeout) or wakes up the caller with a status * of -ETIMEDOUT. * - When the caller receives a notification from RPC that a reply arrived, * it should release the RPC slot, and process the reply. * If the call timed out, it may choose to retry the operation by * adjusting the initial timeout value, and simply calling rpc_call * again. * * Support for async RPC is done through a set of RPC-specific scheduling * primitives that `transparently' work for processes as well as async * tasks that rely on callbacks. * * Copyright (C) 1995-1997, Olaf Kirch <okir@monad.swb.de> * * Transport switch API copyright (C) 2005, Chuck Lever <cel@netapp.com>
*/
if (time_before(timeout, req->rq_majortimeo)) return timeout; return req->rq_majortimeo;
}
/** * xprt_register_transport - register a transport implementation * @transport: transport to register * * If a transport implementation is loaded as a kernel module, it can * call this interface to make itself known to the RPC client. * * Returns: * 0: transport successfully registered * -EEXIST: transport already registered * -EINVAL: transport module being unloaded
*/ int xprt_register_transport(struct xprt_class *transport)
{ struct xprt_class *t; int result;
result = -EEXIST;
spin_lock(&xprt_list_lock);
list_for_each_entry(t, &xprt_list, list) { /* don't register the same transport class twice */ if (t->ident == transport->ident) goto out;
}
list_add_tail(&transport->list, &xprt_list);
printk(KERN_INFO "RPC: Registered %s transport module.\n",
transport->name);
result = 0;
/** * xprt_unregister_transport - unregister a transport implementation * @transport: transport to unregister * * Returns: * 0: transport successfully unregistered * -ENOENT: transport never registered
*/ int xprt_unregister_transport(struct xprt_class *transport)
{ struct xprt_class *t; int result;
result = 0;
spin_lock(&xprt_list_lock);
list_for_each_entry(t, &xprt_list, list) { if (t == transport) {
printk(KERN_INFO "RPC: Unregistered %s transport module.\n",
transport->name);
list_del_init(&transport->list); goto out;
}
}
result = -ENOENT;
spin_lock(&xprt_list_lock);
t = xprt_class_find_by_netid_locked(netid); if (!t) {
spin_unlock(&xprt_list_lock);
request_module("rpc%s", netid);
spin_lock(&xprt_list_lock);
t = xprt_class_find_by_netid_locked(netid);
}
spin_unlock(&xprt_list_lock); return t;
}
/** * xprt_find_transport_ident - convert a netid into a transport identifier * @netid: transport to load * * Returns: * > 0: transport identifier * -ENOENT: transport module not available
*/ int xprt_find_transport_ident(constchar *netid)
{ conststruct xprt_class *t; int ret;
t = xprt_class_find_by_netid(netid); if (!t) return -ENOENT;
ret = t->ident;
xprt_class_release(t); return ret;
}
EXPORT_SYMBOL_GPL(xprt_find_transport_ident);
/** * xprt_reserve_xprt - serialize write access to transports * @task: task that is requesting access to the transport * @xprt: pointer to the target transport * * This prevents mixing the payload of separate requests, and prevents * transport connects from colliding with writes. No congestion control * is provided.
*/ int xprt_reserve_xprt(struct rpc_xprt *xprt, struct rpc_task *task)
{ struct rpc_rqst *req = task->tk_rqstp;
if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) { if (task == xprt->snd_task) goto out_locked; goto out_sleep;
} if (test_bit(XPRT_WRITE_SPACE, &xprt->state)) goto out_unlock;
xprt->snd_task = task;
staticvoid
xprt_set_congestion_window_wait(struct rpc_xprt *xprt)
{ if (!list_empty(&xprt->xmit_queue)) { /* Peek at head of queue to see if it can make progress */ if (list_first_entry(&xprt->xmit_queue, struct rpc_rqst,
rq_xmit)->rq_cong) return;
}
set_bit(XPRT_CWND_WAIT, &xprt->state);
}
staticvoid
xprt_test_and_clear_congestion_window_wait(struct rpc_xprt *xprt)
{ if (!RPCXPRT_CONGESTED(xprt))
clear_bit(XPRT_CWND_WAIT, &xprt->state);
}
/* * xprt_reserve_xprt_cong - serialize write access to transports * @task: task that is requesting access to the transport * * Same as xprt_reserve_xprt, but Van Jacobson congestion control is * integrated into the decision of whether a request is allowed to be * woken up and given access to the transport. * Note that the lock is only granted if we know there are free slots.
*/ int xprt_reserve_xprt_cong(struct rpc_xprt *xprt, struct rpc_task *task)
{ struct rpc_rqst *req = task->tk_rqstp;
staticvoid __xprt_lock_write_next(struct rpc_xprt *xprt)
{ if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) return; if (test_bit(XPRT_WRITE_SPACE, &xprt->state)) goto out_unlock; if (rpc_wake_up_first_on_wq(xprtiod_workqueue, &xprt->sending,
__xprt_lock_write_func, xprt)) return;
out_unlock:
xprt_clear_locked(xprt);
}
staticvoid __xprt_lock_write_next_cong(struct rpc_xprt *xprt)
{ if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) return; if (test_bit(XPRT_WRITE_SPACE, &xprt->state)) goto out_unlock; if (xprt_need_congestion_window_wait(xprt)) goto out_unlock; if (rpc_wake_up_first_on_wq(xprtiod_workqueue, &xprt->sending,
__xprt_lock_write_func, xprt)) return;
out_unlock:
xprt_clear_locked(xprt);
}
/** * xprt_release_xprt - allow other requests to use a transport * @xprt: transport with other tasks potentially waiting * @task: task that is releasing access to the transport * * Note that "task" can be NULL. No congestion control is provided.
*/ void xprt_release_xprt(struct rpc_xprt *xprt, struct rpc_task *task)
{ if (xprt->snd_task == task) {
xprt_clear_locked(xprt);
__xprt_lock_write_next(xprt);
}
trace_xprt_release_xprt(xprt, task);
}
EXPORT_SYMBOL_GPL(xprt_release_xprt);
/** * xprt_release_xprt_cong - allow other requests to use a transport * @xprt: transport with other tasks potentially waiting * @task: task that is releasing access to the transport * * Note that "task" can be NULL. Another task is awoken to use the * transport if the transport's congestion window allows it.
*/ void xprt_release_xprt_cong(struct rpc_xprt *xprt, struct rpc_task *task)
{ if (xprt->snd_task == task) {
xprt_clear_locked(xprt);
__xprt_lock_write_next_cong(xprt);
}
trace_xprt_release_cong(xprt, task);
}
EXPORT_SYMBOL_GPL(xprt_release_xprt_cong);
/* * Van Jacobson congestion avoidance. Check if the congestion window * overflowed. Put the task to sleep if this is the case.
*/ staticint
__xprt_get_cong(struct rpc_xprt *xprt, struct rpc_rqst *req)
{ if (req->rq_cong) return 1;
trace_xprt_get_cong(xprt, req->rq_task); if (RPCXPRT_CONGESTED(xprt)) {
xprt_set_congestion_window_wait(xprt); return 0;
}
req->rq_cong = 1;
xprt->cong += RPC_CWNDSCALE; return 1;
}
/* * Adjust the congestion window, and wake up the next task * that has been sleeping due to congestion
*/ staticvoid
__xprt_put_cong(struct rpc_xprt *xprt, struct rpc_rqst *req)
{ if (!req->rq_cong) return;
req->rq_cong = 0;
xprt->cong -= RPC_CWNDSCALE;
xprt_test_and_clear_congestion_window_wait(xprt);
trace_xprt_put_cong(xprt, req->rq_task);
__xprt_lock_write_next_cong(xprt);
}
/** * xprt_request_get_cong - Request congestion control credits * @xprt: pointer to transport * @req: pointer to RPC request * * Useful for transports that require congestion control.
*/ bool
xprt_request_get_cong(struct rpc_xprt *xprt, struct rpc_rqst *req)
{ bool ret = false;
if (req->rq_cong) returntrue;
spin_lock(&xprt->transport_lock);
ret = __xprt_get_cong(xprt, req) != 0;
spin_unlock(&xprt->transport_lock); return ret;
}
EXPORT_SYMBOL_GPL(xprt_request_get_cong);
/** * xprt_release_rqst_cong - housekeeping when request is complete * @task: RPC request that recently completed * * Useful for transports that require congestion control.
*/ void xprt_release_rqst_cong(struct rpc_task *task)
{ struct rpc_rqst *req = task->tk_rqstp;
staticvoid xprt_clear_congestion_window_wait_locked(struct rpc_xprt *xprt)
{ if (test_and_clear_bit(XPRT_CWND_WAIT, &xprt->state))
__xprt_lock_write_next_cong(xprt);
}
/* * Clear the congestion window wait flag and wake up the next * entry on xprt->sending
*/ staticvoid
xprt_clear_congestion_window_wait(struct rpc_xprt *xprt)
{ if (test_and_clear_bit(XPRT_CWND_WAIT, &xprt->state)) {
spin_lock(&xprt->transport_lock);
__xprt_lock_write_next_cong(xprt);
spin_unlock(&xprt->transport_lock);
}
}
/** * xprt_adjust_cwnd - adjust transport congestion window * @xprt: pointer to xprt * @task: recently completed RPC request used to adjust window * @result: result code of completed RPC request * * The transport code maintains an estimate on the maximum number of out- * standing RPC requests, using a smoothed version of the congestion * avoidance implemented in 44BSD. This is basically the Van Jacobson * congestion algorithm: If a retransmit occurs, the congestion window is * halved; otherwise, it is incremented by 1/cwnd when * * - a reply is received and * - a full number of requests are outstanding and * - the congestion window hasn't been updated recently.
*/ void xprt_adjust_cwnd(struct rpc_xprt *xprt, struct rpc_task *task, int result)
{ struct rpc_rqst *req = task->tk_rqstp; unsignedlong cwnd = xprt->cwnd;
if (result >= 0 && cwnd <= xprt->cong) { /* The (cwnd >> 1) term makes sure
* the result gets rounded properly. */
cwnd += (RPC_CWNDSCALE * RPC_CWNDSCALE + (cwnd >> 1)) / cwnd; if (cwnd > RPC_MAXCWND(xprt))
cwnd = RPC_MAXCWND(xprt);
__xprt_lock_write_next_cong(xprt);
} elseif (result == -ETIMEDOUT) {
cwnd >>= 1; if (cwnd < RPC_CWNDSCALE)
cwnd = RPC_CWNDSCALE;
}
dprintk("RPC: cong %ld, cwnd was %ld, now %ld\n",
xprt->cong, xprt->cwnd, cwnd);
xprt->cwnd = cwnd;
__xprt_put_cong(xprt, req);
}
EXPORT_SYMBOL_GPL(xprt_adjust_cwnd);
/** * xprt_wake_pending_tasks - wake all tasks on a transport's pending queue * @xprt: transport with waiting tasks * @status: result code to plant in each task before waking it *
*/ void xprt_wake_pending_tasks(struct rpc_xprt *xprt, int status)
{ if (status < 0)
rpc_wake_up_status(&xprt->pending, status); else
rpc_wake_up(&xprt->pending);
}
EXPORT_SYMBOL_GPL(xprt_wake_pending_tasks);
/** * xprt_wait_for_buffer_space - wait for transport output buffer to clear * @xprt: transport * * Note that we only set the timer for the case of RPC_IS_SOFT(), since * we don't in general want to force a socket disconnection due to * an incomplete RPC call transmission.
*/ void xprt_wait_for_buffer_space(struct rpc_xprt *xprt)
{
set_bit(XPRT_WRITE_SPACE, &xprt->state);
}
EXPORT_SYMBOL_GPL(xprt_wait_for_buffer_space);
/** * xprt_write_space - wake the task waiting for transport output buffer space * @xprt: transport with waiting tasks * * Can be called in a soft IRQ context, so xprt_write_space never sleeps.
*/ bool xprt_write_space(struct rpc_xprt *xprt)
{ bool ret;
if (!test_bit(XPRT_WRITE_SPACE, &xprt->state)) returnfalse;
spin_lock(&xprt->transport_lock);
ret = xprt_clear_write_space_locked(xprt);
spin_unlock(&xprt->transport_lock); return ret;
}
EXPORT_SYMBOL_GPL(xprt_write_space);
/** * xprt_adjust_timeout - adjust timeout values for next retransmit * @req: RPC request containing parameters to use for the adjustment *
*/ int xprt_adjust_timeout(struct rpc_rqst *req)
{ struct rpc_xprt *xprt = req->rq_xprt; conststruct rpc_timeout *to = req->rq_task->tk_client->cl_timeout; int status = 0;
if (time_before(jiffies, req->rq_majortimeo)) { if (time_before(jiffies, req->rq_minortimeo)) return status; if (to->to_exponential)
req->rq_timeout <<= 1; else
req->rq_timeout += to->to_increment; if (to->to_maxval && req->rq_timeout >= to->to_maxval)
req->rq_timeout = to->to_maxval;
req->rq_retries++;
} else {
req->rq_timeout = to->to_initval;
req->rq_retries = 0;
xprt_reset_majortimeo(req, to); /* Reset the RTT counters == "slow start" */
spin_lock(&xprt->transport_lock);
rpc_init_rtt(req->rq_task->tk_client->cl_rtt, to->to_initval);
spin_unlock(&xprt->transport_lock);
status = -ETIMEDOUT;
}
xprt_reset_minortimeo(req);
/** * xprt_disconnect_done - mark a transport as disconnected * @xprt: transport to flag for disconnect *
*/ void xprt_disconnect_done(struct rpc_xprt *xprt)
{
trace_xprt_disconnect_done(xprt);
spin_lock(&xprt->transport_lock);
xprt_clear_connected(xprt);
xprt_clear_write_space_locked(xprt);
xprt_clear_congestion_window_wait_locked(xprt);
xprt_wake_pending_tasks(xprt, -ENOTCONN);
spin_unlock(&xprt->transport_lock);
}
EXPORT_SYMBOL_GPL(xprt_disconnect_done);
/** * xprt_schedule_autoclose_locked - Try to schedule an autoclose RPC call * @xprt: transport to disconnect
*/ staticvoid xprt_schedule_autoclose_locked(struct rpc_xprt *xprt)
{ if (test_and_set_bit(XPRT_CLOSE_WAIT, &xprt->state)) return; if (test_and_set_bit(XPRT_LOCKED, &xprt->state) == 0)
queue_work(xprtiod_workqueue, &xprt->task_cleanup); elseif (xprt->snd_task && !test_bit(XPRT_SND_IS_COOKIE, &xprt->state))
rpc_wake_up_queued_task_set_status(&xprt->pending,
xprt->snd_task, -ENOTCONN);
}
/** * xprt_force_disconnect - force a transport to disconnect * @xprt: transport to disconnect *
*/ void xprt_force_disconnect(struct rpc_xprt *xprt)
{
trace_xprt_disconnect_force(xprt);
/* Don't race with the test_bit() in xprt_clear_locked() */
spin_lock(&xprt->transport_lock);
xprt_schedule_autoclose_locked(xprt);
spin_unlock(&xprt->transport_lock);
}
EXPORT_SYMBOL_GPL(xprt_force_disconnect);
/** * xprt_conditional_disconnect - force a transport to disconnect * @xprt: transport to disconnect * @cookie: 'connection cookie' * * This attempts to break the connection if and only if 'cookie' matches * the current transport 'connection cookie'. It ensures that we don't * try to break the connection more than once when we need to retransmit * a batch of RPC requests. *
*/ void xprt_conditional_disconnect(struct rpc_xprt *xprt, unsignedint cookie)
{ /* Don't race with the test_bit() in xprt_clear_locked() */
spin_lock(&xprt->transport_lock); if (cookie != xprt->connect_cookie) goto out; if (test_bit(XPRT_CLOSING, &xprt->state)) goto out;
xprt_schedule_autoclose_locked(xprt);
out:
spin_unlock(&xprt->transport_lock);
}
/** * xprt_lookup_rqst - find an RPC request corresponding to an XID * @xprt: transport on which the original request was transmitted * @xid: RPC XID of incoming reply * * Caller holds xprt->queue_lock.
*/ struct rpc_rqst *xprt_lookup_rqst(struct rpc_xprt *xprt, __be32 xid)
{ struct rpc_rqst *entry;
/** * xprt_pin_rqst - Pin a request on the transport receive list * @req: Request to pin * * Caller must ensure this is atomic with the call to xprt_lookup_rqst() * so should be holding xprt->queue_lock.
*/ void xprt_pin_rqst(struct rpc_rqst *req)
{
atomic_inc(&req->rq_pin);
}
EXPORT_SYMBOL_GPL(xprt_pin_rqst);
/** * xprt_unpin_rqst - Unpin a request on the transport receive list * @req: Request to pin * * Caller should be holding xprt->queue_lock.
*/ void xprt_unpin_rqst(struct rpc_rqst *req)
{ if (!test_bit(RPC_TASK_MSG_PIN_WAIT, &req->rq_task->tk_runstate)) {
atomic_dec(&req->rq_pin); return;
} if (atomic_dec_and_test(&req->rq_pin))
wake_up_var(&req->rq_pin);
}
EXPORT_SYMBOL_GPL(xprt_unpin_rqst);
/** * xprt_request_enqueue_receive - Add an request to the receive queue * @task: RPC task *
*/ int
xprt_request_enqueue_receive(struct rpc_task *task)
{ struct rpc_rqst *req = task->tk_rqstp; struct rpc_xprt *xprt = req->rq_xprt; int ret;
if (!xprt_request_need_enqueue_receive(task, req)) return 0;
ret = xprt_request_prepare(task->tk_rqstp, &req->rq_rcv_buf); if (ret) return ret;
spin_lock(&xprt->queue_lock);
/* Update the softirq receive buffer */
memcpy(&req->rq_private_buf, &req->rq_rcv_buf, sizeof(req->rq_private_buf));
/* Add request to the receive list */
xprt_request_rb_insert(xprt, req);
set_bit(RPC_TASK_NEED_RECV, &task->tk_runstate);
spin_unlock(&xprt->queue_lock);
/* Turn off autodisconnect */
timer_delete_sync(&xprt->timer); return 0;
}
/** * xprt_request_dequeue_receive_locked - Remove a request from the receive queue * @task: RPC task * * Caller must hold xprt->queue_lock.
*/ staticvoid
xprt_request_dequeue_receive_locked(struct rpc_task *task)
{ struct rpc_rqst *req = task->tk_rqstp;
if (test_and_clear_bit(RPC_TASK_NEED_RECV, &task->tk_runstate))
xprt_request_rb_remove(req->rq_xprt, req);
}
if (timer) { if (req->rq_ntrans == 1)
rpc_update_rtt(rtt, timer, m);
rpc_set_timeo(rtt, timer, req->rq_ntrans - 1);
}
}
EXPORT_SYMBOL_GPL(xprt_update_rtt);
/** * xprt_complete_rqst - called when reply processing is complete * @task: RPC request that recently completed * @copied: actual number of bytes received from the transport * * Caller holds xprt->queue_lock.
*/ void xprt_complete_rqst(struct rpc_task *task, int copied)
{ struct rpc_rqst *req = task->tk_rqstp; struct rpc_xprt *xprt = req->rq_xprt;
xprt->stat.recvs++;
xdr_free_bvec(&req->rq_rcv_buf);
req->rq_private_buf.bvec = NULL;
req->rq_private_buf.len = copied; /* Ensure all writes are done before we update */ /* req->rq_reply_bytes_recvd */
smp_wmb();
req->rq_reply_bytes_recvd = copied;
xprt_request_dequeue_receive_locked(task);
rpc_wake_up_queued_task(&xprt->pending, task);
}
EXPORT_SYMBOL_GPL(xprt_complete_rqst);
trace_xprt_timer(xprt, req->rq_xid, task->tk_status); if (!req->rq_reply_bytes_recvd) { if (xprt->ops->timer)
xprt->ops->timer(xprt, task);
} else
task->tk_status = 0;
}
/** * xprt_wait_for_reply_request_def - wait for reply * @task: pointer to rpc_task * * Set a request's retransmit timeout based on the transport's * default timeout parameters. Used by transports that don't adjust * the retransmit timeout based on round-trip time estimation, * and put the task to sleep on the pending queue.
*/ void xprt_wait_for_reply_request_def(struct rpc_task *task)
{ struct rpc_rqst *req = task->tk_rqstp;
/** * xprt_wait_for_reply_request_rtt - wait for reply using RTT estimator * @task: pointer to rpc_task * * Set a request's retransmit timeout using the RTT estimator, * and put the task to sleep on the pending queue.
*/ void xprt_wait_for_reply_request_rtt(struct rpc_task *task)
{ int timer = task->tk_msg.rpc_proc->p_timer; struct rpc_clnt *clnt = task->tk_client; struct rpc_rtt *rtt = clnt->cl_rtt; struct rpc_rqst *req = task->tk_rqstp; unsignedlong max_timeout = clnt->cl_timeout->to_maxval; unsignedlong timeout;
/** * xprt_request_wait_receive - wait for the reply to an RPC request * @task: RPC task about to send a request *
*/ void xprt_request_wait_receive(struct rpc_task *task)
{ struct rpc_rqst *req = task->tk_rqstp; struct rpc_xprt *xprt = req->rq_xprt;
if (!test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate)) return; /* * Sleep on the pending queue if we're expecting a reply. * The spinlock ensures atomicity between the test of * req->rq_reply_bytes_recvd, and the call to rpc_sleep_on().
*/
spin_lock(&xprt->queue_lock); if (test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate)) {
xprt->ops->wait_for_reply_request(task); /* * Send an extra queue wakeup call if the * connection was dropped in case the call to * rpc_sleep_on() raced.
*/ if (xprt_request_retransmit_after_disconnect(task))
rpc_wake_up_queued_task_set_status(&xprt->pending,
task, -ENOTCONN);
}
spin_unlock(&xprt->queue_lock);
}
/** * xprt_request_enqueue_transmit - queue a task for transmission * @task: pointer to rpc_task * * Add a task to the transmission queue.
*/ void
xprt_request_enqueue_transmit(struct rpc_task *task)
{ struct rpc_rqst *pos, *req = task->tk_rqstp; struct rpc_xprt *xprt = req->rq_xprt; int ret;
if (xprt_request_need_enqueue_transmit(task, req)) {
ret = xprt_request_prepare(task->tk_rqstp, &req->rq_snd_buf); if (ret) {
task->tk_status = ret; return;
}
req->rq_bytes_sent = 0;
spin_lock(&xprt->queue_lock); /* * Requests that carry congestion control credits are added * to the head of the list to avoid starvation issues.
*/ if (req->rq_cong) {
xprt_clear_congestion_window_wait(xprt);
list_for_each_entry(pos, &xprt->xmit_queue, rq_xmit) { if (pos->rq_cong) continue; /* Note: req is added _before_ pos */
list_add_tail(&req->rq_xmit, &pos->rq_xmit);
INIT_LIST_HEAD(&req->rq_xmit2); goto out;
}
} elseif (req->rq_seqno_count == 0) {
list_for_each_entry(pos, &xprt->xmit_queue, rq_xmit) { if (pos->rq_task->tk_owner != task->tk_owner) continue;
list_add_tail(&req->rq_xmit2, &pos->rq_xmit2);
INIT_LIST_HEAD(&req->rq_xmit); goto out;
}
}
list_add_tail(&req->rq_xmit, &xprt->xmit_queue);
INIT_LIST_HEAD(&req->rq_xmit2);
out:
atomic_long_inc(&xprt->xmit_queuelen);
set_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate);
spin_unlock(&xprt->queue_lock);
}
}
/** * xprt_request_dequeue_transmit_locked - remove a task from the transmission queue * @task: pointer to rpc_task * * Remove a task from the transmission queue * Caller must hold xprt->queue_lock
*/ staticvoid
xprt_request_dequeue_transmit_locked(struct rpc_task *task)
{ struct rpc_rqst *req = task->tk_rqstp;
if (!test_and_clear_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate)) return; if (!list_empty(&req->rq_xmit)) { struct rpc_xprt *xprt = req->rq_xprt;
if (list_is_first(&req->rq_xmit, &xprt->xmit_queue) &&
xprt->ops->abort_send_request)
xprt->ops->abort_send_request(req);
/** * xprt_request_dequeue_xprt - remove a task from the transmit+receive queue * @task: pointer to rpc_task * * Remove a task from the transmit and receive queues, and ensure that * it is not pinned by the receive work item.
*/ void
xprt_request_dequeue_xprt(struct rpc_task *task)
{ struct rpc_rqst *req = task->tk_rqstp; struct rpc_xprt *xprt = req->rq_xprt;
/** * xprt_request_prepare - prepare an encoded request for transport * @req: pointer to rpc_rqst * @buf: pointer to send/rcv xdr_buf * * Calls into the transport layer to do whatever is needed to prepare * the request for transmission or receive. * Returns error, or zero.
*/ staticint
xprt_request_prepare(struct rpc_rqst *req, struct xdr_buf *buf)
{ struct rpc_xprt *xprt = req->rq_xprt;
if (xprt->ops->prepare_request) return xprt->ops->prepare_request(req, buf); return 0;
}
/** * xprt_request_need_retransmit - Test if a task needs retransmission * @task: pointer to rpc_task * * Test for whether a connection breakage requires the task to retransmit
*/ bool
xprt_request_need_retransmit(struct rpc_task *task)
{ return xprt_request_retransmit_after_disconnect(task);
}
/** * xprt_prepare_transmit - reserve the transport before sending a request * @task: RPC task about to send a request *
*/ bool xprt_prepare_transmit(struct rpc_task *task)
{ struct rpc_rqst *req = task->tk_rqstp; struct rpc_xprt *xprt = req->rq_xprt;
if (!xprt_lock_write(xprt, task)) { /* Race breaker: someone may have transmitted us */ if (!test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate))
rpc_wake_up_queued_task_set_status(&xprt->sending,
task, 0); returnfalse;
} if (atomic_read(&xprt->swapper)) /* This will be clear in __rpc_execute */
current->flags |= PF_MEMALLOC; returntrue;
}
/** * xprt_request_transmit - send an RPC request on a transport * @req: pointer to request to transmit * @snd_task: RPC task that owns the transport lock * * This performs the transmission of a single request. * Note that if the request is not the same as snd_task, then it * does need to be pinned. * Returns '0' on success.
*/ staticint
xprt_request_transmit(struct rpc_rqst *req, struct rpc_task *snd_task)
{ struct rpc_xprt *xprt = req->rq_xprt; struct rpc_task *task = req->rq_task; unsignedint connect_cookie; int is_retrans = RPC_WAS_SENT(task); int status;
if (test_bit(XPRT_CLOSE_WAIT, &xprt->state)) return -ENOTCONN;
if (!req->rq_bytes_sent) { if (xprt_request_data_received(task)) {
status = 0; goto out_dequeue;
} /* Verify that our message lies in the RPCSEC_GSS window */ if (rpcauth_xmit_need_reencode(task)) {
status = -EBADMSG; goto out_dequeue;
} if (RPC_SIGNALLED(task)) {
status = -ERESTARTSYS; goto out_dequeue;
}
}
/* * Update req->rq_ntrans before transmitting to avoid races with * xprt_update_rtt(), which needs to know that it is recording a * reply to the first transmission.
*/
req->rq_ntrans++;
trace_rpc_xdr_sendto(task, &req->rq_snd_buf);
connect_cookie = xprt->connect_cookie;
status = xprt->ops->send_request(req); if (status != 0) {
req->rq_ntrans--;
trace_xprt_transmit(req, status); return status;
}
if (is_retrans) {
task->tk_client->cl_stats->rpcretrans++;
trace_xprt_retransmit(req);
}
/** * xprt_transmit - send an RPC request on a transport * @task: controlling RPC task * * Attempts to drain the transmit queue. On exit, either the transport * signalled an error that needs to be handled before transmission can * resume, or @task finished transmitting, and detected that it already * received a reply.
*/ void
xprt_transmit(struct rpc_task *task)
{ struct rpc_rqst *next, *req = task->tk_rqstp; struct rpc_xprt *xprt = req->rq_xprt; int status;
spin_lock(&xprt->queue_lock); for (;;) {
next = list_first_entry_or_null(&xprt->xmit_queue, struct rpc_rqst, rq_xmit); if (!next) break;
xprt_pin_rqst(next);
spin_unlock(&xprt->queue_lock);
status = xprt_request_transmit(next, task); if (status == -EBADMSG && next != req)
status = 0;
spin_lock(&xprt->queue_lock);
xprt_unpin_rqst(next); if (status < 0) { if (test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate))
task->tk_status = status; break;
} /* Was @task transmitted, and has it received a reply? */ if (xprt_request_data_received(task) &&
!test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate)) break;
cond_resched_lock(&xprt->queue_lock);
}
spin_unlock(&xprt->queue_lock);
}
staticvoid xprt_complete_request_init(struct rpc_task *task)
{ if (task->tk_rqstp)
xprt_request_init(task);
}
/** * xprt_reserve - allocate an RPC request slot * @task: RPC task requesting a slot allocation * * If the transport is marked as being congested, or if no more * slots are available, place the task on the transport's * backlog queue.
*/ void xprt_reserve(struct rpc_task *task)
{ struct rpc_xprt *xprt = task->tk_xprt;
task->tk_status = 0; if (task->tk_rqstp != NULL) return;
task->tk_status = -EAGAIN; if (!xprt_throttle_congested(xprt, task))
xprt_do_reserve(xprt, task);
}
/** * xprt_retry_reserve - allocate an RPC request slot * @task: RPC task requesting a slot allocation * * If no more slots are available, place the task on the transport's * backlog queue. * Note that the only difference with xprt_reserve is that we now * ignore the value of the XPRT_CONGESTED flag.
*/ void xprt_retry_reserve(struct rpc_task *task)
{ struct rpc_xprt *xprt = task->tk_xprt;
task->tk_status = 0; if (task->tk_rqstp != NULL) return;
task->tk_rqstp = req;
req->rq_task = task;
xprt_init_connect_cookie(req, req->rq_xprt); /* * Set up the xdr_buf length. * This also indicates that the buffer is XDR encoded already.
*/
xbufp->len = xbufp->head[0].iov_len + xbufp->page_len +
xbufp->tail[0].iov_len; /* * Backchannel Replies are sent with !RPC_TASK_SOFT and * RPC_TASK_NO_RETRANS_TIMEOUT. The major timeout setting * affects only how long each Reply waits to be sent when * a transport connection cannot be established.
*/
xprt_init_majortimeo(task, req, to);
} #endif
staticvoid xprt_init(struct rpc_xprt *xprt, struct net *net)
{
kref_init(&xprt->kref);
rpc_xprt_debugfs_unregister(xprt);
rpc_destroy_wait_queue(&xprt->binding);
rpc_destroy_wait_queue(&xprt->pending);
rpc_destroy_wait_queue(&xprt->sending);
rpc_destroy_wait_queue(&xprt->backlog);
kfree(xprt->servername); /* * Destroy any existing back channel
*/
xprt_destroy_backchannel(xprt, UINT_MAX);
/* * Tear down transport state and free the rpc_xprt
*/
xprt->ops->destroy(xprt);
}
/** * xprt_destroy - destroy an RPC transport, killing off all requests. * @xprt: transport to destroy *
*/ staticvoid xprt_destroy(struct rpc_xprt *xprt)
{ /* * Exclude transport connect/disconnect handlers and autoclose
*/
wait_on_bit_lock(&xprt->state, XPRT_LOCKED, TASK_UNINTERRUPTIBLE);
/* * xprt_schedule_autodisconnect() can run after XPRT_LOCKED * is cleared. We use ->transport_lock to ensure the mod_timer() * can only run *before* del_time_sync(), never after.
*/
spin_lock(&xprt->transport_lock);
timer_delete_sync(&xprt->timer);
spin_unlock(&xprt->transport_lock);
/* * Destroy sockets etc from the system workqueue so they can * safely flush receive work running on rpciod.
*/
INIT_WORK(&xprt->task_cleanup, xprt_destroy_cb);
schedule_work(&xprt->task_cleanup);
}
/** * xprt_get - return a reference to an RPC transport. * @xprt: pointer to the transport *
*/ struct rpc_xprt *xprt_get(struct rpc_xprt *xprt)
{ if (xprt != NULL && kref_get_unless_zero(&xprt->kref)) return xprt; return NULL;
}
EXPORT_SYMBOL_GPL(xprt_get);
/** * xprt_put - release a reference to an RPC transport. * @xprt: pointer to the transport *
*/ void xprt_put(struct rpc_xprt *xprt)
{ if (xprt != NULL)
kref_put(&xprt->kref, xprt_destroy_kref);
}
EXPORT_SYMBOL_GPL(xprt_put);
Die Informationen auf dieser Webseite wurden
nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit,
noch Qualität der bereit gestellten Informationen zugesichert.
Bemerkung:
Die farbliche Syntaxdarstellung und die Messung sind noch experimentell.